RabbitMQ是流行的开源新闻队列系统,RabbitMQ是风靡的开源音信队列系统

简介

RabbitMQ是风靡的开源音信队列系统,用erlang语言开采。RabbitMQ是AMQP(高等消息队列协议)的职业兑现。

简介

RabbitMQ是风靡的开源音信队列系统,用erlang语言开辟。RabbitMQ是AMQP(高端音讯队列协议)的正规得以落成。

安装

先是安装erlang境况。

官网:http://www.erlang.org/

Windows版下载地址:http://erlang.org/download/otp\_win64\_20.0.exe

Linux版:yum安装

安装

第一安装erlang情形。

官网:http://www.erlang.org/

Windows版下载地址:http://erlang.org/download/otp\_win64\_20.0.exe

Linux版:yum安装

Windows安装步骤

先是步运维

图片 1

第二步

图片 2

第三步

图片 3

第四步

图片 4

第五步

图片 5

Erlang安装完毕。

接下来安装RabbitMQ,首先下载RabbitMQ的Windows版本。

官网:http://www.rabbitmq.com/

Windows版下载地址:http://www.rabbitmq.com/releases/rabbitmq-server/v3.6.10/rabbitmq-server-3.6.10.exe

开采安装程序,依据上边步骤安装。

图片 6

图片 7

图片 8

图片 9

图片 10

RabbitMQ安装实现。

初叶菜单中进入管理工科具。

图片 11

图片 12

运维命令

  1. rabbitmq-plugins enable
    rabbitmq_management

图片 13

查看RabbitMQ服务是还是不是运转。

图片 14

图片 15

由来全体装置达成。

Windows安装步骤

先是步运转

图片 16

第二步

图片 17

第三步

图片 18

第四步

图片 19

第五步

图片 20

Erlang安装落成。

然后安装RabbitMQ,首先下载RabbitMQ的Windows版本。

官网:http://www.rabbitmq.com/

Windows版下载地址:http://www.rabbitmq.com/releases/rabbitmq-server/v3.6.10/rabbitmq-server-3.6.10.exe

开垦安装程序,根据上边步骤安装。

图片 21

图片 22

图片 23

图片 24

图片 25

RabbitMQ安装到位。

伊始菜单中跻身管理工科具。

图片 26

图片 27

运营命令

  1. rabbitmq-plugins enable
    rabbitmq_management

图片 28

查看RabbitMQ服务是还是不是运维。

图片 29

图片 30

至此全体设置到位。

Linux安装步骤

安装erlang。

  1. yum -y install erlang

安装RabbitMQ。

  1. wget https://github.com/rabbitmq/rabbitmq-server/archive/rabbitmq\_v3\_6\_10.tar.gz
  1. rpm -ivh
    rabbitmq-server-3.6.10-1.el6.noarch.rpm

RabbitMQ安装失利,报错如下。

  1. warning:
    rabbitmq-server-3.6.10-1.el6.noarch.rpm: Header V4 RSA/SHA512
    Signature, key ID 6026dfca: NOKEY

  2. error: Failed dependencies:

  3.         erlang >= R16B-03 is needed by
    rabbitmq-server-3.6.10-1.el6.noarch

  4.         socat is needed by
    rabbitmq-server-3.6.10-1.el6.noarch

由来是yum安装的erlang版本太低,那里提供的RabbitMQ是流行版三.六.拾,所需的erlang版本最低为汉兰达1六B-0三,不然编写翻译时将失利,也正是上述荒唐。

再也安装erlang。

  1. wget http://erlang.org/download/otp\_src\_20.0.tar.gz
  1. tar xvzf otp_src_20.0.tar.gz

  2. cd otp_src_20.0

  3. ./configure

  4. make && make install

再一次安装erlang实现。

运行erlang。

  1. erl

  2. Erlang/OTP 20 [erts-9.0] [source]
    [64-bit] [smp:1:1] [ds:1:1:10] [async-threads:10] [hipe]
    [kernel-poll:false]

  3.  

  4. Eshell V9.0 (abort with ^G)

安装socat。

  1. yum install -y socat

重复安装RabbitMQ。

  1. rpm -ivh
    rabbitmq-server-3.6.10-1.el6.noarch.rpm

  2. warning:
    rabbitmq-server-3.6.10-1.el6.noarch.rpm: Header V4 RSA/SHA512
    Signature, key ID 6026dfca: NOKEY

  3. error: Failed dependencies:

  4.         erlang >= R16B-03 is needed by
    rabbitmq-server-3.6.10-1.el6.noarch

上述错误新闻展现安装退步,因为rabbitMQ的借助关系所产生,所以要不经意重视,实行以下命令。

  1. rpm -ivh –nodeps
    rabbitmq-server-3.6.10-1.el6.noarch.rpm

安装成功。

启动、停止RabbitMQ。

  1. rabbitmq-server start     #启动
  1. rabbitmq-server stop     #停止
  1. rabbitmq-server restart    #重启

 

Linux安装步骤

安装erlang。

  1. yum -y install erlang

安装RabbitMQ。

  1. wget https://github.com/rabbitmq/rabbitmq-server/archive/rabbitmq\_v3\_6\_10.tar.gz
  1. rpm -ivh
    rabbitmq-server-3.6.10-1.el6.noarch.rpm

RabbitMQ安装失败,报错如下。

  1. warning:
    rabbitmq-server-3.6.10-1.el6.noarch.rpm: Header V4 RSA/SHA512
    Signature, key ID 6026dfca: NOKEY

  2. error: Failed dependencies:

  3.         erlang >= R16B-03 is needed by
    rabbitmq-server-3.6.10-1.el6.noarch

  4.         socat is needed by
    rabbitmq-server-3.6.10-1.el6.noarch

案由是yum安装的erlang版本太低,那里提供的RabbitMQ是流行版三.六.10,所需的erlang版本最低为PRADO1陆B-03,不然编写翻译时将倒闭,也正是上述失实。

再次安装erlang。

  1. wget http://erlang.org/download/otp\_src\_20.0.tar.gz
  1. tar xvzf otp_src_20.0.tar.gz

  2. cd otp_src_20.0

  3. ./configure

  4. make && make install

重新安装erlang落成。

运行erlang。

  1. erl

  2. Erlang/OTP 20 [erts-9.0] [source]
    [64-bit] [smp:1:1] [ds:1:1:10] [async-threads:10] [hipe]
    [kernel-poll:false]

  3.  

  4. Eshell V9.0 (abort with ^G)

安装socat。

  1. yum install -y socat

双重安装RabbitMQ。

  1. rpm -ivh
    rabbitmq-server-3.6.10-1.el6.noarch.rpm

  2. warning:
    rabbitmq-server-3.6.10-1.el6.noarch.rpm: Header V4 RSA/SHA512
    Signature, key ID 6026dfca: NOKEY

  3. error: Failed dependencies:

  4.         erlang >= R16B-03 is needed by
    rabbitmq-server-3.6.10-1.el6.noarch

上述错误讯息呈现安装失败,因为rabbitMQ的依赖关系所导致,所以要不经意注重,实践以下命令。

  1. rpm -ivh –nodeps
    rabbitmq-server-3.6.10-1.el6.noarch.rpm

安装成功。

启动、停止RabbitMQ。

  1. rabbitmq-server start     #启动
  1. rabbitmq-server stop     #停止
  1. rabbitmq-server restart    #重启

 

RabbitMQ使用

福如东海最简便易行的行列通讯

图片 31

send端(producer)

  1. __author__ = ‘Golden’

  2. #!/usr/bin/env python3

  3. # -*- coding:utf-8 -*-

  4.  

  5. import pika

  6.  

  7. connection =
    pika.BlockingConnection(pika.ConnectionParameters(‘localhost’))

  1. channel = connection.channel()

  2.  

  3. # 声明queue

  4. channel.queue_declare(queue=’hello’)

  1.  

  2. channel.basic_publish(exchange=”,

  1.                       routing_key=’hello’,
  1.                       body=’hello
    word’)

  2. print(“[x] Sent ‘hello word!'”)

  3. connection.close()

receive端(consumer)

  1. __author__ = ‘Golden’

  2. #!/usr/bin/env python3

  3. # -*- coding:utf-8 -*-

  4.  

  5. import pika,time

  6.  

  7. connection =
    pika.BlockingConnection(pika.ConnectionParameters(‘localhost’))

  1. channel = connection.channel()

  2.  

  3. channel.queue_declare(queue=’hello’)

  1.  

  2. def
    callback(ch,method,properties,body):

  3.     print(‘–>’,ch,method,properties)

  1.     print(“[x] Received %s” % body)

  2.  

  3. channel.basic_consume(callback,

  1.                       queue=’hello’,
  1.                       no_ack=True
  1.                       )

  2.  

  3. print(‘[*] waiting for messages.To exit press CTRL+C’)

  1. channel.start_consuming()

RabbitMQ使用

兑现最简易的系列通讯

图片 32

send端(producer)

  1. __author__ = ‘Golden’

  2. #!/usr/bin/env python3

  3. # -*- coding:utf-8 -*-

  4.  

  5. import pika

  6.  

  7. connection =
    pika.BlockingConnection(pika.ConnectionParameters(‘localhost’))

  1. channel = connection.channel()

  2.  

  3. # 声明queue

  4. channel.queue_declare(queue=’hello’)

  1.  

  2. channel.basic_publish(exchange=”,

  1.                       routing_key=’hello’,
  1.                       body=’hello
    word’)

  2. print(“[x] Sent ‘hello word!'”)

  3. connection.close()

receive端(consumer)

  1. __author__ = ‘Golden’

  2. #!/usr/bin/env python3

  3. # -*- coding:utf-8 -*-

  4.  

  5. import pika,time

  6.  

  7. connection =
    pika.BlockingConnection(pika.ConnectionParameters(‘localhost’))

  1. channel = connection.channel()

  2.  

  3. channel.queue_declare(queue=’hello’)

  1.  

  2. def
    callback(ch,method,properties,body):

  3.     print(‘–>’,ch,method,properties)

  1.     print(“[x] Received %s” % body)

  2.  

  3. channel.basic_consume(callback,

  1.                       queue=’hello’,
  1.                       no_ack=True
  1.                       )

  2.  

  3. print(‘[*] waiting for messages.To exit press CTRL+C’)

  1. channel.start_consuming()

no_ack分析

no_ack属性是在调用Basic.Consume方法时方可设置的三个关键参数。no_ack的用途是承接保险message被consumer成功管理了。那里成功的意识是,在安装了no_ack=false的事态下,只要consumer手动应答了Basic.Ack,固然其成功拍卖了。

no_ack分析

no_ack属性是在调用Basic.Consume方法时得以设置的多少个最重要参数。no_ack的用途是确定保证message被consumer成功管理了。那里成功的意识是,在设置了no_ack=false的意况下,只要consumer手动应答了Basic.Ack,纵然其成功拍卖了。

no_ack=true(此时为全自动回复)

在这种情景下,consumer会在吸收接纳到Basic.Deliver+Content-Header+Content-Body之后,马上回复Ack,而以此Ack是TCP协议中的Ack。此Ack的死灰复燃不关注consumer是不是对收到到的数据实行了拍卖,当然也不爱护管理数据所急需的耗费时间。

no_ack=true(此时为全自动回复)

在这种状态下,consumer会在吸收接纳到Basic.Deliver+Content-Header+Content-Body之后,立时回复Ack,而以此Ack是TCP协议中的Ack。此Ack的死灰复燃不关心consumer是还是不是对接收到的数量举行了拍卖,当然也不吝惜管理数量所急需的耗费时间。

no_ack=False(此时为手动应答)

在这种情景下,供给consumer在管理完接收到的Basic.Deliver+Content-Header+Content-Body之后才回复Ack,而那些Ack是AMQP协议中的Basic.Ack。此Ack的东山再起与事务管理有关,所以实际的复苏时间应该要取决于业务管理的耗费时间。

no_ack=False(此时为手动应答)

在那种情景下,须求consumer在处理完接收到的Basic.Deliver+Content-Header+Content-Body之后才回复Ack,而以此Ack是AMQP协议中的Basic.Ack。此Ack的回复与事务管理相关,所以具体的死灰复燃时间应当要在于业务管理的耗费时间。

总结

Basic.Ack发给RabbitMQ以报告,能够将相应message从RabbitMQ的音讯从缓存中移除。

Basic.Ack未被consumer发给RabbitMQ前出现了老大,RabbitMQ发掘与该consumer对应的连接被断开,将该该message以轮询格局发送给别的consumer(必要存在多个consumer订阅同三个queue)。

在no_ack=true的情状下,RabbitMQ以为message壹旦被deliver出去后就已被认可了,所以会立马将缓存中的message删除,由此在consumer至极时会导致新闻丢失。

来自consumer的Basic.Ack与发送给Producer的Basic.Ack没有一贯关联。

总结

Basic.Ack发给RabbitMQ以报告,能够将相应message从RabbitMQ的新闻从缓存中移除。

Basic.Ack未被consumer发给RabbitMQ前出现了格外,RabbitMQ发掘与该consumer对应的连接被断开,将该该message以轮询情势发送给别的consumer(必要存在四个consumer订阅同四个queue)。

在no_ack=true的意况下,RabbitMQ感到message一旦被deliver出去后就已被料定了,所以会立时将缓存中的message删除,由此在consumer卓殊时会导致消息丢失。

源于consumer的Basic.Ack与发送给Producer的Basic.Ack未有一直关联。

音信持久化

音讯持久化

acknowledgment音信持久化

no-ack=False,假如consumer挂掉了,那么RabbitMQ会重新将该职分增多到行列中。

回调函数中

  1. ch.basic_ack(delivery_tag=method.delivery_tag)

basic_consume中

  1. no_ack=False

receive端(consumer)

  1. __author__ = ‘Golden’

  2. #!/usr/bin/env python3

  3. # -*- coding:utf-8 -*-

  4.  

  5. import pika,time

  6.  

  7. connection =
    pika.BlockingConnection(pika.ConnectionParameters(‘localhost’))

  1. channel = connection.channel()

  2.  

  3. channel.queue_declare(queue=’hello’)

  1.  

  2. # 定义回调函数

  3. def
    callback(ch,method,properties,body):

  4.     print(‘–>’,ch,method,properties)

  1.     print(“[x] Received %s” % body)

  2.     ch.basic_ack(delivery_tag=method.delivery_tag)

  1.  

  2. #
    no_ack=False表示费用完未来不主动把状态布告RabbitMQ

  3. channel.basic_consume(callback,

  1.                       queue=’hello’,
  1.                       no_ack=False
  1.                       )

  2.  

  3. print(‘[*] waiting for messages.To exit press CTRL+C’)

  1. channel.start_consuming()

acknowledgment新闻持久化

no-ack=False,假诺consumer挂掉了,那么RabbitMQ会重新将该职责增多到行列中。

回调函数中

  1. ch.basic_ack(delivery_tag=method.delivery_tag)

basic_consume中

  1. no_ack=False

receive端(consumer)

  1. __author__ = ‘Golden’

  2. #!/usr/bin/env python3

  3. # -*- coding:utf-8 -*-

  4.  

  5. import pika,time

  6.  

  7. connection =
    pika.BlockingConnection(pika.ConnectionParameters(‘localhost’))

  1. channel = connection.channel()

  2.  

  3. channel.queue_declare(queue=’hello’)

  1.  

  2. # 定义回调函数

  3. def
    callback(ch,method,properties,body):

  4.     print(‘–>’,ch,method,properties)

  1.     print(“[x] Received %s” % body)

  2.     ch.basic_ack(delivery_tag=method.delivery_tag)

  1.  

  2. #
    no_ack=False表示开销完之后不积极把情状布告RabbitMQ

  3. channel.basic_consume(callback,

  1.                       queue=’hello’,
  1.                       no_ack=False
  1.                       )

  2.  

  3. print(‘[*] waiting for messages.To exit press CTRL+C’)

  1. channel.start_consuming()

durable音讯持久化

producer发送音信时挂掉了,consumer接收音讯时挂掉了,以下方法会让RabbitMQ重新将该音讯增添到队列中。

回调函数中

  1. ch.basic_ack(delivery_tag=method.delivery_tag)

basic_consume中

  1. no_ack=False

basic_publish中增添参数

  1. properties=pika.BasicProperties(delivery_mode=2)

channel.queue_declare中加多参数

  1. channel.queue_declare(queue=’hello’,durable=True)

send端(producer)

  1. __author__ = ‘Golden’

  2. #!/usr/bin/env python3

  3. # -*- coding:utf-8 -*-

  4.  

  5. import pika

  6.  

  7. connection =
    pika.BlockingConnection(pika.ConnectionParameters(‘localhost’))

  1. channel = connection.channel()

  2.  

  3. # 声明queue

  4. channel.queue_declare(queue=’hello’,durable=True)

  1.  

  2. channel.basic_publish(exchange=”,

  1.                       routing_key=’hello’,
  1.                       body=’hello
    word’,

  2.                       properties=pika.BasicProperties(delivery_mode=2))

  1. print(“[x] Sent ‘hello word!'”)

  2. connection.close()

receive端(consumer)与acknowledgment音信持久化中receive端(consumer)一样。

durable消息持久化

producer发送音信时挂掉了,consumer接收音讯时挂掉了,以下措施会让RabbitMQ重新将该音讯加多到队列中。

回调函数中

  1. ch.basic_ack(delivery_tag=method.delivery_tag)

basic_consume中

  1. no_ack=False

basic_publish中加多参数

  1. properties=pika.BasicProperties(delivery_mode=2)

channel.queue_declare中增添参数

  1. channel.queue_declare(queue=’hello’,durable=True)

send端(producer)

  1. __author__ = ‘Golden’

  2. #!/usr/bin/env python3

  3. # -*- coding:utf-8 -*-

  4.  

  5. import pika

  6.  

  7. connection =
    pika.BlockingConnection(pika.ConnectionParameters(‘localhost’))

  1. channel = connection.channel()

  2.  

  3. # 声明queue

  4. channel.queue_declare(queue=’hello’,durable=True)

  1.  

  2. channel.basic_publish(exchange=”,

  1.                       routing_key=’hello’,
  1.                       body=’hello
    word’,

  2.                       properties=pika.BasicProperties(delivery_mode=2))

  1. print(“[x] Sent ‘hello word!'”)

  2. connection.close()

receive端(consumer)与acknowledgment新闻持久化中receive端(consumer)一样。

消息分发

暗中同意新闻队列里的数码是鲁人持竿顺序分发到各样消费者,但是很多动静下,音信队列后端的主顾服务器的管理本事是不雷同的,那就会见世界形势部服务器闲置时间较长,财富浪费的图景。那么,我们就必要改造暗中同意的新闻队列获取顺序。可以在相继消费者端配置prefetch_count=1,意思就是告诉RabbitMQ在那几个消费者当前新闻还从未处理完的时候就不要再发新音讯了。

图片 33

消费者端

  1. __author__ = ‘Golden’

  2. #!/usr/bin/env python3

  3. # -*- coding:utf-8 -*-

  4. __author__ = ‘Golden’

  5. #!/usr/bin/env python3

  6. # -*- coding:utf-8 -*-

  7.  

  8. import pika,time

  9.  

  10. connection =
    pika.BlockingConnection(pika.ConnectionParameters(‘localhost’))

  1. channel = connection.channel()

  2.  

  3. channel.queue_declare(queue=’hello2′,durable=True)

  1.  

  2. def
    callback(ch,method,properties,body):

  3.     print(‘–>’,ch,method,properties)

  1.     print(“[x] Received %s” % body)

  2.     time.sleep(30)

  3.     ch.basic_ack(delivery_tag=method.delivery_tag)

  1.  

  2. channel.basic_qos(prefetch_count=1)

  1. channel.basic_consume(callback,
  1.                       queue=’hello2′,
  1.                       no_ack=False
  1.                       )

  2.  

  3. print(‘[*] waiting for messages.To exit press CTRL+C’)

  1. channel.start_consuming()

劳动者端不改变。

音讯分发

暗许消息队列里的数据是安分守己顺序分发到各样消费者,可是大多意况下,音讯队列后端的消费者服务器的管理技艺是不均等的,那就会晤世一些服务器闲置时间较长,能源浪费的意况。那么,大家就须求改动暗中认可的新闻队列获取顺序。可以在相继消费者端配置prefetch_count=一,意思正是报告RabbitMQ在这么些消费者当前音讯还尚未管理完的时候就毫无再发新音讯了。

图片 34

消费者端

  1. __author__ = ‘Golden’

  2. #!/usr/bin/env python3

  3. # -*- coding:utf-8 -*-

  4. __author__ = ‘Golden’

  5. #!/usr/bin/env python3

  6. # -*- coding:utf-8 -*-

  7.  

  8. import pika,time

  9.  

  10. connection =
    pika.BlockingConnection(pika.ConnectionParameters(‘localhost’))

  1. channel = connection.channel()

  2.  

  3. channel.queue_declare(queue=’hello2′,durable=True)

  1.  

  2. def
    callback(ch,method,properties,body):

  3.     print(‘–>’,ch,method,properties)

  1.     print(“[x] Received %s” % body)

  2.     time.sleep(30)

  3.     ch.basic_ack(delivery_tag=method.delivery_tag)

  1.  

  2. channel.basic_qos(prefetch_count=1)

  1. channel.basic_consume(callback,
  1.                       queue=’hello2′,
  1.                       no_ack=False
  1.                       )

  2.  

  3. print(‘[*] waiting for messages.To exit press CTRL+C’)

  1. channel.start_consuming()

生产者端不改变。

新闻发布和订阅(publish\subscribe)

颁发和订阅与简短的消息队列不相同在于,发表和订阅会将音讯发送给全体的订阅者,而音信队列中的数据被消费一遍便没有。所以,RabbitMQ达成公布和订阅时,会为每三个订阅者创造3个连串,而揭橥者发布音信时,会将音讯放置在享有有关队列中。类似广播的功能,这时候就要用到exchange。Exchange在概念的时候是有项目标,以决定到底是怎样Queue符合条件,能够接到音信。

fanout:全体bind到此exchange的queue都能够吸收接纳音讯。

direct:通过routingKey和exchange决定的哪位唯一的queue能够接到信息。

topic:全体符合routingKey(可以是一个表明式)的routingKey所bind的queue能够吸收新闻。

表达式符号表明

#:2个或几个字符

*:任何字符

例如:#.a会匹配a.a,aa.a,aaa.a等。

*.a会匹配a.a,b.a,c.a等。

注意:使用RoutingKey为#,Exchange Type为topic的时候相对于接纳fanout。

heaers:通过headers来决定把音讯发给哪些queue。

图片 35

publisher

  1. __author__ = ‘Golden’

  2. #!/usr/bin/env python3

  3. # -*- coding:utf-8 -*-

  4.  

  5. import pika,sys

  6.  

  7. connection =
    pika.BlockingConnection(pika.ConnectionParameters(host=’localhost’))

  1. channel = connection.channel()

  2.  

  3. channel.exchange_declare(exchange=’logs’,type=’fanout’)

  1.  

  2. message = ”.join(sys.argv[1:]) or
    ‘info:Hello World!’

  3. channel.basic_publish(exchange=’logs’,

  1.                       routing_key=”,
  1.                       body=message)
  1.  

  2. print(‘[x] Send %r’ % message)

  1. connection.close()

subscriber

  1. __author__ = ‘Golden’

  2. #!/usr/bin/env python3

  3. # -*- coding:utf-8 -*-

  4.  

  5. import pika

  6.  

  7. connection =
    pika.BlockingConnection(pika.ConnectionParameters(host=’localhost’))

  1. channel = connection.channel()

  2. channel.exchange_declare(exchange=’logs’,type=’fanout’)

  1. #
    不点名queue名字,rabbit会随机分配一个名字,exclusive=True会在行使此queue的主顾断开后,自动将queue删除
  1. result =
    channel.queue_declare(exclusive=True)

  2. queue_name = result.method.queue

  1. channel.queue_bind(exchange=’logs’,queue=queue_name)
  1. print(‘[*]Waiting for logs.To exit press CTRL+C’)

  2. def
    callback(ch,method,properties,body):

  3.     print(‘[*] %s’%body)

  4.  

  5. channel.basic_consume(callback,

  1.                       queue=queue_name,
  1.                       no_ack=True)
  1.  

  2. channel.start_consuming()

新闻发布和订阅(publish\subscribe)

公告和订阅与简便的音讯队列不一样在于,公布和订阅会将音讯发送给全部的订阅者,而音讯队列中的数据被消费叁遍便收敛。所以,RabbitMQ达成公布和订阅时,会为每贰个订阅者成立1个行列,而公布者发表消息时,会将消息放置在具备相关队列中。类似广播的效果,那时候将在用到exchange。Exchange在概念的时候是有品种的,以决定到底是什么Queue符合条件,能够收到音讯。

fanout:全数bind到此exchange的queue都得以采用音信。

direct:通过routingKey和exchange决定的哪些唯一的queue能够收到消息。

topic:全数符合routingKey(能够是二个表明式)的routingKey所bind的queue尚可新闻。

表明式符号表达

#:1个或五个字符

*:任何字符

例如:#.a会匹配a.a,aa.a,aaa.a等。

*.a会匹配a.a,b.a,c.a等。

注意:使用RoutingKey为#,Exchange Type为topic的时候相对于采用fanout。

heaers:通过headers来决定把音信发给哪些queue。

图片 36

publisher

  1. __author__ = ‘Golden’

  2. #!/usr/bin/env python3

  3. # -*- coding:utf-8 -*-

  4.  

  5. import pika,sys

  6.  

  7. connection =
    pika.BlockingConnection(pika.ConnectionParameters(host=’localhost’))

  1. channel = connection.channel()

  2.  

  3. channel.exchange_declare(exchange=’logs’,type=’fanout’)

  1.  

  2. message = ”.join(sys.argv[1:]) or
    ‘info:Hello World!’

  3. channel.basic_publish(exchange=’logs’,

  1.                       routing_key=”,
  1.                       body=message)
  1.  

  2. print(‘[x] Send %r’ % message)

  1. connection.close()

subscriber

  1. __author__ = ‘Golden’

  2. #!/usr/bin/env python3

  3. # -*- coding:utf-8 -*-

  4.  

  5. import pika

  6.  

  7. connection =
    pika.BlockingConnection(pika.ConnectionParameters(host=’localhost’))

  1. channel = connection.channel()

  2. channel.exchange_declare(exchange=’logs’,type=’fanout’)

  1. #
    不点名queue名字,rabbit会随机分配2个名字,exclusive=True会在应用此queue的买主断开后,自动将queue删除
  1. result =
    channel.queue_declare(exclusive=True)

  2. queue_name = result.method.queue

  1. channel.queue_bind(exchange=’logs’,queue=queue_name)
  1. print(‘[*]Waiting for logs.To exit press CTRL+C’)

  2. def
    callback(ch,method,properties,body):

  3.     print(‘[*] %s’%body)

  4.  

  5. channel.basic_consume(callback,

  1.                       queue=queue_name,
  1.                       no_ack=True)
  1.  

  2. channel.start_consuming()

入眼字发送(echange type=direct)

出殡音信时鲜明内定有个别队列并向里面发送消息,RabbitMQ还援救遵照主要字发送,即队列绑定关键字,发送者将数据依赖入眼字发送到音讯exchange,exchange依据重大字判断应该将数据发送至哪个队列。

图片 37

publisher

  1. __author__ = ‘Golden’

  2. #!/usr/bin/env python3

  3. # -*- coding:utf-8 -*-

  4.  

  5. import pika,sys

  6.  

  7. connection =
    pika.BlockingConnection(pika.ConnectionParameters(host=’localhost’))

  1. channel = connection.channel()

  2.  

  3. channel.exchange_declare(exchange=’direct_logs’,

  1.                          type=’direct’)
  1.  

  2. # severity = ‘error’

  3. severity = sys.argv[1] if len(sys.argv) > 1 else ‘info’

  4. # message = ‘Hello World!’

  5. message = ”.join(sys.argv[2:]) or
    ‘Hello World!’

  6.  

  7. channel.basic_publish(exchange=’direct_logs’,

  1.                       routing_key=severity,
  1.                       body=message)
  1. print(‘[x] Send %r:%r’ %
    (severity,message))

  2. connection.close()

subscriber

  1. __author__ = ‘Golden’

  2. #!/usr/bin/env python3

  3. # -*- coding:utf-8 -*-

  4.  

  5. import pika,sys

  6.  

  7. connection =
    pika.BlockingConnection(pika.ConnectionParameters(host=’localhost’))

  1. channel = connection.channel()

  2.  

  3. channel.exchange_declare(exchange=’direct_logs’,

  1.                          type=’direct’)
  1.  

  2. result =
    channel.queue_declare(exclusive=True)

  3. queue_name = result.method.queue

  1.  

  2. severities = sys.argv[1:]

  3. if not
    severities:

  4.     sys.stderr.write(‘Usage: %s
    [info] [warning] [error]\n’ % sys.argv[0])

  5.     sys.exit(1)

  6.  

  7. for
    severity in severities:

  8.     channel.queue_bind(exchange=’direct_logs’,

  1.                        queue=queue_name,
  1.                        routing_key=severity)
  1.  

  2. print(‘[*] Waiting for logs.To exit press CTRL+C’)

  3.  

  4. def
    callback(ch,method,properties,body):

  5.     print(‘[*] %r:%r’ %
    (method.routing_key,body))

  6.  

  7. channel.basic_consume(callback,

  1.                       queue=queue_name,
  1.                       no_ack=True)
  1.  

  2. channel.start_consuming()

启动subscriber1

  1. python3 direct_subscriber.py warning

启动subscriber2

  1. python3 direct_subscriber.py error

启动publisher1

  1. python3 direct_publisher.py info

启动publisher2

  1. python3 direct_publisher.py warning

启动publisher3

  1. python3 direct_publisher.py error

结果

图片 38

要害字发送(echange type=direct)

发送音信时明显钦赐有些队列并向里面发送音讯,RabbitMQ还辅助依照重大字发送,即队列绑定关键字,发送者将数据依照重大字发送到新闻exchange,exchange依据重视字剖断应该将数据发送至哪个队列。

图片 39

publisher

  1. __author__ = ‘Golden’

  2. #!/usr/bin/env python3

  3. # -*- coding:utf-8 -*-

  4.  

  5. import pika,sys

  6.  

  7. connection =
    pika.BlockingConnection(pika.ConnectionParameters(host=’localhost’))

  1. channel = connection.channel()

  2.  

  3. channel.exchange_declare(exchange=’direct_logs’,

  1.                          type=’direct’)
  1.  

  2. # severity = ‘error’

  3. severity = sys.argv[1] if len(sys.argv) > 1 else ‘info’

  4. # message = ‘Hello World!’

  5. message = ”.join(sys.argv[2:]) or
    ‘Hello World!’

  6.  

  7. channel.basic_publish(exchange=’direct_logs’,

  1.                       routing_key=severity,
  1.                       body=message)
  1. print(‘[x] Send %r:%r’ %
    (severity,message))

  2. connection.close()

subscriber

  1. __author__ = ‘Golden’

  2. #!/usr/bin/env python3

  3. # -*- coding:utf-8 -*-

  4.  

  5. import pika,sys

  6.  

  7. connection =
    pika.BlockingConnection(pika.ConnectionParameters(host=’localhost’))

  1. channel = connection.channel()

  2.  

  3. channel.exchange_declare(exchange=’direct_logs’,

  1.                          type=’direct’)
  1.  

  2. result =
    channel.queue_declare(exclusive=True)

  3. queue_name = result.method.queue

  1.  

  2. severities = sys.argv[1:]

  3. if not
    severities:

  4.     sys.stderr.write(‘Usage: %s
    [info] [warning] [error]\n’ % sys.argv[0])

  5.     sys.exit(1)

  6.  

  7. for
    severity in severities:

  8.     channel.queue_bind(exchange=’direct_logs’,

  1.                        queue=queue_name,
  1.                        routing_key=severity)
  1.  

  2. print(‘[*] Waiting for logs.To exit press CTRL+C’)

  3.  

  4. def
    callback(ch,method,properties,body):

  5.     print(‘[*] %r:%r’ %
    (method.routing_key,body))

  6.  

  7. channel.basic_consume(callback,

  1.                       queue=queue_name,
  1.                       no_ack=True)
  1.  

  2. channel.start_consuming()

启动subscriber1

  1. python3 direct_subscriber.py warning

启动subscriber2

  1. python3 direct_subscriber.py error

启动publisher1

  1. python3 direct_publisher.py info

启动publisher2

  1. python3 direct_publisher.py warning

启动publisher3

  1. python3 direct_publisher.py error

结果

图片 40

漏洞格外多相配(exchange type=topic)

在topic类型下,可以让队列绑定多少个模糊的首要字,发送者将数据发送到exchange,exchange将盛传”路由值”和”关键字”进行相配,相配成功则将数据发送到钦定队列。

图片 41

*:相配放肆二个字符

#:相配任性个字符

publisher

  1. __author__ = ‘Golden’

  2. #!/usr/bin/env python3

  3. # -*- coding:utf-8 -*-

  4.  

  5. import pika,sys

  6.  

  7. connection =
    pika.BlockingConnection(pika.ConnectionParameters(host=’localhost’))

  1. channel = connection.channel()

  2.  

  3. channel.exchange_declare(exchange=’topic_logs’,

  1.                          type=’topic’)
  1.  

  2. routing_key = sys.argv[1] if len(sys.argv) > 1 else ‘anonymous.info’

  3. message = ”.join(sys.argv[2:]) or
    ‘Hello World!’

  4. channel.basic_publish(exchange=’topic_logs’,

  1.                       routing_key=routing_key,
  1.                       body=message)
  1.  

  2. print(‘[x] Sent %r:%r’ %
    (routing_key,message))

  3. connection.close()

subscriber

  1. __author__ = ‘Golden’

  2. #!/usr/bin/env python3

  3. # -*- coding:utf-8 -*-

  4.  

  5. import pika,sys

  6.  

  7. connection =
    pika.BlockingConnection(pika.ConnectionParameters(host=’localhost’))

  1. channel = connection.channel()

  2.  

  3. channel.exchange_declare(exchange=’topic_logs’,

  1.                          type=’topic’)
  1.  

  2. result =
    channel.queue_declare(exclusive=True)

  3. queue_name = result.method.queue

  1.  

  2. binding_keys = sys.argv[1:]

  3. if not
    binding_keys:

  4.     sys.stderr.write(‘Usage: %s
    [binding_key]…\n’ % sys.argv[0])

  5.     sys.exit(1)

  6.  

  7. for
    binding_key in binding_keys:

  1.     channel.queue_bind(exchange=’topic_logs’,
  1.                        queue=queue_name,
  1.                        routing_key=binding_key)
  1.  

  2. print(‘[*] Waiting for logs.To exit press CTRL+C’)

  3.  

  4. def
    callback(ch,method,properties,body):

  5.     print(‘[x] %r:%r’ %
    (method.routing_key,body))

  6.  

  7. channel.basic_consume(callback,

  1.                       queue=queue_name,
  1.                       no_ack=True)
  1.  

  2. channel.start_consuming()

测试

图片 42

混淆相称(exchange type=topic)

在topic类型下,能够让队列绑定多少个模糊的关键字,发送者将数据发送到exchange,exchange将盛传”路由值”和”关键字”举办匹配,相称成功则将数据发送到钦命队列。

图片 43

*:匹配大肆二个字符

#:相称任性个字符

publisher

  1. __author__ = ‘Golden’

  2. #!/usr/bin/env python3

  3. # -*- coding:utf-8 -*-

  4.  

  5. import pika,sys

  6.  

  7. connection =
    pika.BlockingConnection(pika.ConnectionParameters(host=’localhost’))

  1. channel = connection.channel()

  2.  

  3. channel.exchange_declare(exchange=’topic_logs’,

  1.                          type=’topic’)
  1.  

  2. routing_key = sys.argv[1] if len(sys.argv) > 1 else ‘anonymous.info’

  3. message = ”.join(sys.argv[2:]) or
    ‘Hello World!’

  4. channel.basic_publish(exchange=’topic_logs’,

  1.                       routing_key=routing_key,
  1.                       body=message)
  1.  

  2. print(‘[x] Sent %r:%r’ %
    (routing_key,message))

  3. connection.close()

subscriber

  1. __author__ = ‘Golden’

  2. #!/usr/bin/env python3

  3. # -*- coding:utf-8 -*-

  4.  

  5. import pika,sys

  6.  

  7. connection =
    pika.BlockingConnection(pika.ConnectionParameters(host=’localhost’))

  1. channel = connection.channel()

  2.  

  3. channel.exchange_declare(exchange=’topic_logs’,

  1.                          type=’topic’)
  1.  

  2. result =
    channel.queue_declare(exclusive=True)

  3. queue_name = result.method.queue

  1.  

  2. binding_keys = sys.argv[1:]

  3. if not
    binding_keys:

  4.     sys.stderr.write(‘Usage: %s
    [binding_key]…\n’ % sys.argv[0])

  5.     sys.exit(1)

  6.  

  7. for
    binding_key in binding_keys:

  1.     channel.queue_bind(exchange=’topic_logs’,
  1.                        queue=queue_name,
  1.                        routing_key=binding_key)
  1.  

  2. print(‘[*] Waiting for logs.To exit press CTRL+C’)

  3.  

  4. def
    callback(ch,method,properties,body):

  5.     print(‘[x] %r:%r’ %
    (method.routing_key,body))

  6.  

  7. channel.basic_consume(callback,

  1.                       queue=queue_name,
  1.                       no_ack=True)
  1.  

  2. channel.start_consuming()

测试

图片 44

长途进度调用(RPC)

RPC(Remote Procedure Call
Protocol)远程进程调用协议。在二个特大型的商店,系统由大大小小的服务组合,不相同的集体维护区别的代码,陈设在不一致的服务器。不过在做开垦的时候屡次要用到此外团队的艺术,因为早已有了达成。不过那几个劳动配置在分歧的服务器,想要调用就供给互连网通讯,那个代码繁琐且复杂,一相当大心就会很没用。PENCOREC协议定义了统一计划,其它的协作社都交由了分裂的落成。比方微软的wcf,以及WebApi。

在RabbitMQ中RPC的得以完毕是很简短快捷的,今后客户端、服务端都以音讯发表者与音信接受者。

图片 45

先是客户端通过RPC向服务端发生请求。correlation_id:请求标志,erply_to:结果重返队列。(小编那边有一些数码要求你给本人管理一下,correlation_id是本身伸手标识,你管理完了未来把结果回到到erply_to队列)

服务端获得请求,开头拍卖并赶回。correlation_id:客户端请求标志。(correlation_id那是你的乞请标记,还给您。那时候客户端用本身的correlation_id与服务端重临的correlation_id实行自己检查自纠,一样则抽出。)

rpc_server

  1. __author__ = ‘Golden’

  2. #!/usr/bin/env python3

  3. # -*- coding:utf-8 -*-

  4.  

  5. import pika,time

  6.  

  7. connection =
    pika.BlockingConnection(pika.ConnectionParameters(host=’localhost’))

  1. channel = connection.channel()

  2.  

  3. channel.queue_declare(queue=’rpc_queue’)

  1. def fib(n):

  2.     if
    n == 0:

  3.         return 0

  4.     elif n == 1:

  5.         return 1

  6.     else:

  7.         return fib(n-1) + fib(n-2)

  8.  

  9. def on_request(ch,method,props,body):

  1.     n = int(body)

  2.     print(‘[.] fib(%s)’ % n)

  3.     response = fib(n)

  4.     ch.basic_publish(exchange=”,

  1.                      routing_key=props.reply_to,
  1.                      properties=pika.BasicProperties(correlation_id=props.correlation_id),
  1.                      body =
    str(response))

  2.     ch.basic_ack(delivery_tag=method.delivery_tag)

  1.  

  2. channel.basic_qos(prefetch_count=1)

  1. channel.basic_consume(on_request,queue=’rpc_queue’)
  1.  

  2. print(‘[x] Awaiting RPC requests’)

  1. channel.start_consuming()

rpc_client

  1. __author__ = ‘Golden’

  2. #!/usr/bin/env python3

  3. # -*- coding:utf-8 -*-

  4.  

  5. import pika,uuid

  6.  

  7. class
    FibonacciRpcClient(object):

  8.     def __init__(self):

  9.         self.connection =
    pika.BlockingConnection(pika.ConnectionParameters(host=’localhost’))

  1.         self.channel =
    self.connection.channel()

  2.         result =
    self.channel.queue_declare(exclusive=True)

  3.         self.callback_queue =
    result.method.queue

  4.         self.channel.basic_consume(self.on_response,no_ack=True,

  1.                                    queue=self.callback_queue)
  1.  

  2.     def
    on_response(self,ch,method,props,body):

  3.         if self.corr_id ==
    props.correlation_id:

  4.             self.response = body

  1.  

  2.     def call(self,n):

  3.         self.response = None

  4.         self.corr_id =
    str(uuid.uuid4())

  5.         self.channel.basic_publish(exchange=”,

  1.                                    routing_key=’rpc_queue’,
  1.                                    properties=pika.BasicProperties(
  1.                                        reply_to=self.callback_queue,
  1.                                        correlation_id=self.corr_id,),
  1.                                    body=str(n))
  1.         while self.response is None:

  2.             self.connection.process_data_events()

  1.         return int(self.response)

  2.  

  3. fibonacci_rpc = FibonacciRpcClient()

  1.  

  2. print(‘[x] Requesting fib(10)’)

  1. response = fibonacci_rpc.call(10)
  1. print(‘[.] Got %r ‘ % response)

 

长距离进度调用(RPC)

RPC(Remote Procedure Call
Protocol)远程进程调用协议。在贰个巨型的信用合作社,系统由大大小小的服务组合,不相同的公司维护区别的代码,安排在区别的服务器。可是在做开采的时候屡次要用到其余团队的法子,因为早已有了得以完结。可是那一个劳动配置在不相同的服务器,想要调用就须求网络通信,那个代码繁琐且复杂,壹极大心就会很没用。PMuranoC协议定义了统筹,此外的公司都交由了不相同的落到实处。比方微软的wcf,以及WebApi。

在RabbitMQ中RPC的落到实处是相当粗略火速的,现在客户端、服务端都以音讯发表者与音讯接受者。

图片 46

首先客户端通过RPC向服务端发生请求。correlation_id:请求标记,erply_to:结果重临队列。(小编那边有一对数码要求你给小编处理一下,correlation_id是自家伸手标志,你管理完成以往把结果再次来到到erply_to队列)

服务端获得请求,伊始拍卖并赶回。correlation_id:客户端请求标志。(correlation_id那是你的央求标记,还给您。这时候客户端用自身的correlation_id与服务端重临的correlation_id实行比较,一样则接受。)

rpc_server

  1. __author__ = ‘Golden’

  2. #!/usr/bin/env python3

  3. # -*- coding:utf-8 -*-

  4.  

  5. import pika,time

  6.  

  7. connection =
    pika.BlockingConnection(pika.ConnectionParameters(host=’localhost’))

  1. channel = connection.channel()

  2.  

  3. channel.queue_declare(queue=’rpc_queue’)

  1. def fib(n):

  2.     if
    n == 0:

  3.         return 0

  4.     elif n == 1:

  5.         return 1

  6.     else:

  7.         return fib(n-1) + fib(n-2)

  8.  

  9. def on_request(ch,method,props,body):

  1.     n = int(body)

  2.     print(‘[.] fib(%s)’ % n)

  3.     response = fib(n)

  4.     ch.basic_publish(exchange=”,

  1.                      routing_key=props.reply_to,
  1.                      properties=pika.BasicProperties(correlation_id=props.correlation_id),
  1.                      body =
    str(response))

  2.     ch.basic_ack(delivery_tag=method.delivery_tag)

  1.  

  2. channel.basic_qos(prefetch_count=1)

  1. channel.basic_consume(on_request,queue=’rpc_queue’)
  1.  

  2. print(‘[x] Awaiting RPC requests’)

  1. channel.start_consuming()

rpc_client

  1. __author__ = ‘Golden’

  2. #!/usr/bin/env python3

  3. # -*- coding:utf-8 -*-

  4.  

  5. import pika,uuid

  6.  

  7. class
    FibonacciRpcClient(object):

  8.     def __init__(self):

  9.         self.connection =
    pika.BlockingConnection(pika.ConnectionParameters(host=’localhost’))

  1.         self.channel =
    self.connection.channel()

  2.         result =
    self.channel.queue_declare(exclusive=True)

  3.         self.callback_queue =
    result.method.queue

  4.         self.channel.basic_consume(self.on_response,no_ack=True,

  1.                                    queue=self.callback_queue)
  1.  

  2.     def
    on_response(self,ch,method,props,body):

  3.         if self.corr_id ==
    props.correlation_id:

  4.             self.response = body

  1.  

  2.     def call(self,n):

  3.         self.response = None

  4.         self.corr_id =
    str(uuid.uuid4())

  5.         self.channel.basic_publish(exchange=”,

  1.                                    routing_key=’rpc_queue’,
  1.                                    properties=pika.BasicProperties(
  1.                                        reply_to=self.callback_queue,
  1.                                        correlation_id=self.corr_id,),
  1.                                    body=str(n))
  1.         while self.response is None:

  2.             self.connection.process_data_events()

  1.         return int(self.response)

  2.  

  3. fibonacci_rpc = FibonacciRpcClient()

  1.  

  2. print(‘[x] Requesting fib(10)’)

  1. response = fibonacci_rpc.call(10)
  1. print(‘[.] Got %r ‘ % response)

 

相关文章