总音信队列RabbitMQ的要旨用法

一、RabbitMQ是什么?

AMQP,即Advanced Message
Queuing
Protocol,高级信息队列协议,是应用层协议的一个绽放标准,为面向音讯的中间件设计。音信中间件首要用于组件之间的解耦,音信的发送者无需明白音信使用者的存在,反之亦然。
AMQP的首要特点是面向信息、队列、路由于(包括点对碰和宣布/订阅)、可靠性、安全。
RabbitMQ是一个开源的AMQP实现,服务器端用Erlang语言编写,匡助多客户端,如:Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP等,帮助AJAX。用于在分布式系统中贮存转发信息,在易用性、扩充性、高可用性等地点表现不俗。

次、音讯队列的特征

解耦:信息之劳动者和顾客都基于AMQP协议(相同的接口和正规)举行发送和吸纳信息,相互不抱倚重;

冗余:信息只有处理了才会吃剔除,除非明确同意四只顾客可以采取一模一样音讯之差不多单副本,否则每个信息才会叫单个消费者接受并拍卖;

扩展性:可长或者收缩五只音讯的劳动者和买主,两者的变更均未会面潜移默化到双方;

世故 &
峰值处理能力
:因为有美的增加性,所以可视服务器的处理意况【可称为:消费者】(比如:高并发负载过好)动态的增减服务器,以提取提升处理能力(可称:负载均衡);

不过恢复生机性:音讯的劳动者和消费者不论哪一样方出现问题,均不谋面影响音讯的正常化产生和吸收(当然单一的生产者与顾客除了,假诺是如此吗即不曾必要接纳分布式音信队列);

送达保证:唯有音信给确认成功拍卖后才会师被去除,否则会再一次分发给此外的消费者举办拍卖,直到确认处理成截止;

排序保证:先进先出是班的中央特性;

缓冲:同一时间有差不两只音讯进入音信队列,不过同一时间可以指定一个差不多独信息给音信者接收并处理,其余的信息处理等状态,这样好落服务器的压力,起及缓冲的来意;

接头数据流:传递的信息内容因字节数组为主,但足以将目的体系化后成字节数组,然后以消费者收到到信息后,可反系列化成对象并展开有关的处理,应用场景:CQRS;

异步通信:允许用一个或者多单信息放入音讯队列,但并无及时处理它,而是在方便的早晚又夺由一个要么多单顾客分别接收并拍卖它们;

如上是自个儿之个人通晓,也可参看《行使信息队列的 10
只理由

用场景:针对高并发且无需及时赶回处理结果的早晚,可以设想用消息队列,要是拍卖要立刻再次来到结果虽不吻合;

其三、RabbitMQ环境之设置

1.服务器端:

A.需要先安装Erlang环境,下载地址:http://www.erlang.org/download.html,可能有时候无法正常访问,可以透过VPN代理来做客该网站或者在外网站及下载(比如:CSDN)

B.安装RabbitMQ
Server(有指向多单操作系统的下载,我当即边坐WINDOWS平台为主),下载地址:http://www.rabbitmq.com/download.html

表达:最新版本的Erlang及abbitMQ
Server安装后,一般WINDOWS环境变量及劳动均都早已不足为奇安装及并正常启动,可不是风靡版本或没有设置好,则可尽以下命令:

Setx ERLANG_HOME “C:\Program Files\erl7.1″
-Erlang的-安装目录,也只是通过系统性能–>高级–>环境变量来手动设置;

cd C:\Program Files (x86)\RabbitMQ
Server\rabbitmq_server-3.5.6\sbin –切换至RabbitMQ
Server的sbin目录下,然后实施如下命令:

rabbitmq-service install
rabbitmq-service enable
rabbitmq-service start

安并设置OK后,可以经过:rabbitmqctl status查看运行境况、rabbitmqctl
list_users查看时用户、以下命令扩充一个初用户:

rabbitmqctl add_user username password
rabbitmqctl set_permissions username “.*” “.*” “.*”
rabbitmqctl set_user_tags username administrator

改密码:rabbitmqctl change_password username newpassowrd

删去指定的用户:rabbitmqctl delete_user username 

列有装有queue:rabbitmqctl list_queues

列有指定queue的音信:rabbitmqctl list_queues [the queue name]
messages_ready messages_unacknowledged

列有所有exchange:rabbitmqctl list_exchanges

排有所有binding:rabbitmqctl list_bindings

安装基于web的保管插件:rabbitmq-plugins.bat enable rabbitmq_management

自还生另外的通令,大家可去查看官网以及另素材,但自我当领悟以上的吩咐充裕用了

季、RabbitMQ的焦点用法

使RabbitMQ客户端就必将需要在列蒙援引其连带的机件,那里可以透过NuGet安装或由官网下载再引用均只是,方法很简短,不再重述;

1.常备用法:接纳默认的exchange(交流机,或如路由器)+默认的exchange类型:direct+noAck(自动答,接收就应)

    /// <summary>
    /// 消息发送者,一般用在客户端
    /// </summary>
    class RabbitMQPublish
    {
        static void Main(string[] args)
        {
            var factory = new ConnectionFactory();//创建连接工厂并初始连接
            factory.HostName = "localhost";
            factory.UserName = "zwj";
            factory.Password = "www.zuowenjun.cn";

            using (var connection = factory.CreateConnection())//创建一个连接
            {
                using (var channel = connection.CreateModel()) //创建一个通道
                {
                    channel.QueueDeclare("hello", false, false, false, null);//创建一个队列

                    string message = "";
                    while (message!="exit")
                    {
                        Console.Write("Please enter the message to be sent:");
                        message = Console.ReadLine();
                        var body = Encoding.UTF8.GetBytes(message);
                        channel.BasicPublish("", "hello", null, body); //发送消息
                        Console.WriteLine("set message: {0}", message);
                    }
                }
            }
        }
    }



    /// <summary>
    /// 消费者,一般用在服务端
    /// </summary>
    class RabbitMQConsume
    {
        static void Main(string[] args)
        {
            var factory = new ConnectionFactory();//创建连接工厂并初始连接
            factory.HostName = "localhost";
            factory.UserName = "zwj";
            factory.Password = "www.zuowenjun.cn";

            using (var connection = factory.CreateConnection())//创建一个连接
            {
                using (var channel = connection.CreateModel())//创建一个通道
                {
                    channel.QueueDeclare("hello", false, false, false, null);//创建一个队列

                    var consumer = new QueueingBasicConsumer(channel);//创建一个消费者
                    channel.BasicConsume("hello", true, consumer);//开启消息者与通道、队列关联

                    Console.WriteLine(" waiting for message.");
                    while (true)
                    {
                        var ea = (BasicDeliverEventArgs)consumer.Queue.Dequeue();//接收消息并出列

                        var body = ea.Body;//消息主体
                        var message = Encoding.UTF8.GetString(body);
                        Console.WriteLine("Received {0}", message);
                        if (message == "exit")
                        {
                            Console.WriteLine("exit!");
                            break;
                        }

                    }
                }
            }

        }
    }

2.载重均衡处理格局:拔取默认的exchange(交流机)+智能分发+默认的exchange类型:direct+手动应答

信息生产者/发表者代码和方一样;

以下是顾客代码:

    /// <summary>
    /// 消费者,一般用在服务端
    /// </summary>
    class RabbitMQConsume
    {
        static void Main(string[] args)
        {
            var factory = new ConnectionFactory();//创建连接工厂并初始连接
            factory.HostName = "localhost";
            factory.UserName = "zwj";
            factory.Password = "www.zuowenjun.cn";

            using (var connection = factory.CreateConnection())//创建一个连接
            {
                using (var channel = connection.CreateModel())//创建一个通道
                {
                    channel.QueueDeclare("hello", false, false, false, null);//创建一个队列
                    channel.BasicQos(0, 1, false);//在一个工作者还在处理消息,并且没有响应消息之前,不要给他分发新的消息。相反,将这条新的消息发送给下一个不那么忙碌的工作者。

                    var consumer = new QueueingBasicConsumer(channel);//创建一个消费者
                    channel.BasicConsume("hello", false, consumer);//开启消息者与通道、队列关联

                    Console.WriteLine(" waiting for message.");
                    while (true)
                    {
                        var ea = (BasicDeliverEventArgs)consumer.Queue.Dequeue();//接收消息并出列

                        var body = ea.Body;//消息主体
                        var message = Encoding.UTF8.GetString(body);
                        Console.WriteLine("Received {0}", message);
                        channel.BasicAck(ea.DeliveryTag, false);
                        if (message == "exit")
                        {
                            Console.WriteLine("exit!");
                            break;
                        }
                        Thread.Sleep(1000);
                    }
                }
            }

        }
    }

3.音讯持久化形式:在2的底子及长持久化,这样即便生产者或消费者或服务端断开,消息都未会晤少

    /// <summary>
    /// 消息发送者,一般用在客户端
    /// </summary>
    class RabbitMQPublish
    {
        static void Main(string[] args)
        {
            var factory = new ConnectionFactory();//创建连接工厂并初始连接
            factory.HostName = "localhost";
            factory.UserName = "zwj";
            factory.Password = "www.zuowenjun.cn";

            using (var connection = factory.CreateConnection())//创建一个连接
            {
                using (var channel = connection.CreateModel()) //创建一个通道
                {
                    channel.QueueDeclare("hello", true, false, false, null);//创建一个队列,第2个参数为true表示为持久队列
                    var properties = channel.CreateBasicProperties();
                    //properties.SetPersistent(true);这个方法提示过时,不建议使用
                    properties.DeliveryMode = 2;//1表示不持久,2.表示持久化
                    string message = "";
                    while (message!="exit")
                    {
                        Console.Write("Please enter the message to be sent:");
                        message = Console.ReadLine();
                        var body = Encoding.UTF8.GetBytes(message);
                        channel.BasicPublish("", "hello", properties, body); //发送消息
                        Console.WriteLine("set message: {0}", message);
                    }
                }
            }
        }
    }

    /// <summary>
    /// 消费者,一般用在服务端
    /// </summary>
    class RabbitMQConsume
    {
        static void Main(string[] args)
        {
            var factory = new ConnectionFactory();//创建连接工厂并初始连接
            factory.HostName = "localhost";
            factory.UserName = "zwj";
            factory.Password = "www.zuowenjun.cn";

            using (var connection = factory.CreateConnection())//创建一个连接
            {
                using (var channel = connection.CreateModel())//创建一个通道
                {
                    channel.QueueDeclare("hello", true, false, false, null);//创建一个队列,第2个参数为true表示为持久队列
                    channel.BasicQos(0, 1, false);//在一个工作者还在处理消息,并且没有响应消息之前,不要给他分发新的消息。相反,将这条新的消息发送给下一个不那么忙碌的工作者。

                    var consumer = new QueueingBasicConsumer(channel);//创建一个消费者
                    channel.BasicConsume("hello", false, consumer);//开启消息者与通道、队列关联

                    Console.WriteLine(" waiting for message.");
                    while (true)
                    {
                        var ea = (BasicDeliverEventArgs)consumer.Queue.Dequeue();//接收消息并出列

                        var body = ea.Body;//消息主体
                        var message = Encoding.UTF8.GetString(body);
                        Console.WriteLine("Received {0}", message);
                        channel.BasicAck(ea.DeliveryTag, false);
                        if (message == "exit")
                        {
                            Console.WriteLine("exit!");
                            break;
                        }
                        Thread.Sleep(1000);
                    }
                }
            }

        }
    }

4.播订阅格局:定义一个交流机,其连串设为广播类型,发送音信时指定这一个交换机,消费者的音讯队列绑定到拖欠互换机实现消息之订阅,订阅后虽只是接收信息,未订阅则无从收到信

    /// <summary>
    /// 消息发送者/生产者,一般用在客户端
    /// </summary>
    class RabbitMQPublish
    {
        static void Main(string[] args)
        {
            var factory = new ConnectionFactory();//创建连接工厂并初始连接
            factory.HostName = "localhost";
            factory.UserName = "zwj";
            factory.Password = "www.zuowenjun.cn";

            using (var connection = factory.CreateConnection())//创建一个连接
            {
                using (var channel = connection.CreateModel()) //创建一个通道
                {
                    channel.ExchangeDeclare("publish", "fanout",true);//定义一个交换机,且采用广播类型,并设为持久化
                    string queueName = channel.QueueDeclare("hello", true, false, false, null);//创建一个队列,第2个参数为true表示为持久队列,这里将结果隐式转换成string
                    var properties = channel.CreateBasicProperties();
                    //properties.SetPersistent(true);这个方法提示过时,不建议使用
                    properties.DeliveryMode = 2;//1表示不持久,2.表示持久化
                    string message = "";
                    while (message!="exit")
                    {
                        Console.Write("Please enter the message to be sent:");
                        message = Console.ReadLine();
                        var body = Encoding.UTF8.GetBytes(message);
                        channel.BasicPublish("publish", "hello", properties, body); //发送消息,这里指定了交换机名称,且routeKey会被忽略
                        Console.WriteLine("set message: {0}", message);
                    }
                }
            }
        }
    }

    /// <summary>
    /// 消费者,一般用在服务端
    /// </summary>
    class RabbitMQConsume
    {
        static void Main(string[] args)
        {
            var factory = new ConnectionFactory();//创建连接工厂并初始连接
            factory.HostName = "localhost";
            factory.UserName = "zwj";
            factory.Password = "www.zuowenjun.cn";

            using (var connection = factory.CreateConnection())//创建一个连接
            {
                using (var channel = connection.CreateModel())//创建一个通道
                {
                    channel.ExchangeDeclare("publish", "fanout", true);//定义一个交换机,且采用广播类型,并持久化该交换机,并设为持久化
                    string queueName = channel.QueueDeclare("hello", true, false, false, null);//创建一个队列,第2个参数为true表示为持久队列
                    channel.QueueBind(queueName, "publish", "");//将队列绑定到名publish的交换机上,实现消息订阅
                    channel.BasicQos(0, 1, false);//在一个工作者还在处理消息,并且没有响应消息之前,不要给他分发新的消息。相反,将这条新的消息发送给下一个不那么忙碌的工作者。

                    var consumer = new QueueingBasicConsumer(channel);//创建一个消费者
                    channel.BasicConsume(queueName, false, consumer);//开启消息者与通道、队列关联

                    Console.WriteLine(" waiting for message.");
                    while (true)
                    {
                        var ea = (BasicDeliverEventArgs)consumer.Queue.Dequeue();//接收消息并出列

                        var body = ea.Body;//消息主体
                        var message = Encoding.UTF8.GetString(body);
                        Console.WriteLine("Received {0}", message);
                        channel.BasicAck(ea.DeliveryTag, false);//应答
                        if (message == "exit")
                        {
                            Console.WriteLine("exit!");
                            break;
                        }
                        Thread.Sleep(1000);
                    }
                }
            }

        }
    }

5.主旨订阅形式:定义一个交流机,其色设为主旨订阅类型,发送信息时指定这些交流机及RoutingKey,消费者的音讯队列绑定到该互换机并配合到RoutingKey实现信息之订阅,订阅后虽然只是接纳音信,未订阅则无从吸纳信

    /// <summary>
    /// 消息发送者/生产者,一般用在客户端
    /// </summary>
    class RabbitMQPublish
    {
        static void Main(string[] args)
        {
            var factory = new ConnectionFactory();//创建连接工厂并初始连接
            factory.HostName = "localhost";
            factory.UserName = "zwj";
            factory.Password = "www.zuowenjun.cn";

            using (var connection = factory.CreateConnection())//创建一个连接
            {
                using (var channel = connection.CreateModel()) //创建一个通道
                {
                    channel.ExchangeDeclare("publish-topic", "topic", true);//定义一个交换机,且采用广播类型,并持久化该交换机
                   channel.QueueDeclare("hello-mq", true, false, false, null);//创建一个队列,第2个参数为true表示为持久队列
                    var properties = channel.CreateBasicProperties();
                    //properties.SetPersistent(true);这个方法提示过时,不建议使用
                    properties.DeliveryMode = 2;//1表示不持久,2.表示持久化
                    string message = "";
                    while (message!="exit")
                    {
                        Console.Write("Please enter the message to be sent:");
                        message = Console.ReadLine();
                        var body = Encoding.UTF8.GetBytes(message);
                        channel.BasicPublish("publish-topic", "hello.test", properties, body); //发送消息,这里指定了交换机名称,且routeKey会被忽略
                        Console.WriteLine("set message: {0}", message);
                    }
                }
            }
        }
    }


    /// <summary>
    /// 消费者,一般用在服务端
    /// </summary>
    class RabbitMQConsume
    {
        static void Main(string[] args)
        {
            var factory = new ConnectionFactory();//创建连接工厂并初始连接
            factory.HostName = "localhost";
            factory.UserName = "zwj";
            factory.Password = "www.zuowenjun.cn";

            using (var connection = factory.CreateConnection())//创建一个连接
            {
                using (var channel = connection.CreateModel())//创建一个通道
                {
                    channel.ExchangeDeclare("publish-topic", "topic",true);//定义一个交换机,且采用广播类型,并持久化该交换机
                    string queueName = channel.QueueDeclare("hello-mq", true, false, false, null);//创建一个队列,第2个参数为true表示为持久队列
                    channel.QueueBind(queueName, "publish-topic", "*.test");//将队列绑定到路由上,实现消息订阅
                    channel.BasicQos(0, 1, false);//在一个工作者还在处理消息,并且没有响应消息之前,不要给他分发新的消息。相反,将这条新的消息发送给下一个不那么忙碌的工作者。

                    var consumer = new QueueingBasicConsumer(channel);//创建一个消费者
                    channel.BasicConsume(queueName, false, consumer);//开启消息者与通道、队列关联

                    Console.WriteLine(" waiting for message.");
                    while (true)
                    {
                        var ea = (BasicDeliverEventArgs)consumer.Queue.Dequeue();//接收消息并出列

                        var body = ea.Body;//消息主体
                        var message = Encoding.UTF8.GetString(body);
                        Console.WriteLine("Received {0}", message);
                        channel.BasicAck(ea.DeliveryTag, false);//应答
                        if (message == "exit")
                        {
                            Console.WriteLine("exit!");
                            break;
                        }
                        Thread.Sleep(1000);
                    }
                }
            }

        }
    }

  

换成机路由种如下:

Direct
Exchange
:直接匹配,通过Exchange名称+RoutingKey来发送和接收音讯;

Fanout
Exchange
:广播订阅,向有顾客发表音信,但无非生消费者将行绑定到该路由才可以收音信,忽小RoutingKey;

Topic
Exchange
:主旨匹配订阅,这里的核心指的凡RoutingKey,RoutingKey可以下通配符,如:*或#,RoutingKey命名选拔.来分隔六只词,唯有消费者将行绑定到该路由且指定的RoutingKey符合匹配规则时才能够接收音信;

Headers
Exchange
:音信头订阅,音信发布前,为信定义一个仍旧多独键值对的音讯头,然后消费者接受音讯时一致用定义类似之键值对要求头,里面要多包含一个配合形式(有:x-mactch=all,或者x-mactch=any),唯有告求头与音讯头相匹配,才会选拔到新闻,忽小RoutingKey;

本文内容参考了以下散文:

.NET
环境被使RabbitMQ

.Net下RabbitMQ的运用系列小说

相关文章