RabbitMQ是AMQP(高级音讯队列协议)的正统兑现,音讯队列(MQ)是一种应用程序对应用程序的通讯格局ca88亚洲城网站

简介

RabbitMQ是流行的开源新闻队列系统,用erlang语言开发。RabbitMQ是AMQP(高级音讯队列协议)的正式落实。

简介

RabbitMQ是风靡的开源音信队列系统,用erlang语言开发。RabbitMQ是AMQP(高级音信队列协议)的科班落实。

python之RabbitMQ,pythonrabbitmq

RabbitMQ是一个在AMQP基础上完全的,可复用的商家音讯系统。他依据Mozilla
Public License开源协议。 MQ全称为Message Queue,
音讯队列(MQ)是一种应用程序对应用程序的通信格局。应用程序通过读写出入队列的新闻(针对应用程序的数目)来通讯,而无需专用连接来链接它们。音讯传递指的是先后之间通过在新闻中发送数据进行通讯,而不是经过一向调用相互来通讯,直接调用常常是用来诸如远程进程调用的技术。排队指的是应用程序通过
队列来通讯。队列的应用除去了吸收和殡葬应用程序同时施行的需要。  
安装RabbitMQ:

安装配置epel源:(详见http://www.cnblogs.com/ernest-zhang/p/5714434.html)
安装erlang:
yum -y install erlang
注:安装erlang的时候碰到
    Error: Package: erlang-erts-R14B-04.3.el6.i686 (epel)
           Requires: libz.so.1(ZLIB_1.2.2)
[[email protected] ~]# yum whatprovides libz.so.1
Loaded plugins: rhnplugin
This system is not registered with RHN.
RHN support will be disabled.
zlib-1.2.3-25.el6.i686 : The zlib compression and decompression library #提供压缩与解压缩库
Repo        : local
Matched from:
Other       : libz.so.1
检查发现应该是zlib的版本太老了,从网上下载最新的zlib-1.2.8-10.fc24.i686,然后使用RPM安装后解决。
下载地址:http://www.zlib.net/  #zlib官网
http://rpmfind.net/linux/rpm2html/search.php?query=zlib    #zlib下载网站
安装rabbitMQ:
yum -y install rabbitmq-server

service rabbitmq-server start/stop 启动和截止rabbitmq
安装API,然后可以根据API操作rabbitmq

pip install pika
or
easy_install pika
or
源码

https://pypi.python.org/pypi/pika

Python 操作RabbitMQ 发布端:

import pika
connection=pika.BlockingConnection(pika.ConnectionParameters(host='192.168.0.74'))    #服务器地址
channel=connection.channel()
channel.queue_declare(queue='Hi')   #如果有队列,略过;如果没有,创建队列
channel.basic_publish(exchange='',routing_key='cc',body='hello!world!!!')
print("[x] sent 'hello,world!'")
connection.close()

接收端:

import pika
#创建一个连接对象,绑定rabbitmq的IP
connection=pika.BlockingConnection(pika.ConnectionParameters(host='192.168.0.74'))
#创建一个频道对象
channel=connection.channel()
#频道中声明指定queue,如果MQ中没有指定queue就创建,如果有,则略过
channel.queue_declare(queue='Hi')
#定义回调函数
def callback(ch,method,properties,body):
    print('[x] Recieved %r'%body)
    # channel.close()
#no_ack=Fales:表示消费完以后不主动把状态通知rabbitmq,callback:回调函数,queue:指定队列
channel.basic_consume(callback,queue='Hi',no_ack=True)
# channel.basic_consume(callback,queue='cc')
print('[*] Waiting for msg')
channel.start_consuming()

1、acknowledgment 音信不丢掉

no-ack = False,若是顾客遇到景况(its channel is closed, connection is
closed, or TCP connection is
lost)挂掉了,那么,RabbitMQ会重新将该任务添加到行列中。

  • 回调函数中的ch.basic_ack(delivery_tag=method.delivery_tag)
  • basic_comsume中的no_ack=False

ca88亚洲城网站 1import
pika connection =
pika.BlockingConnection(pika.ConnectionParameters(host=’192.168.0.74′))
channel = connection.channel() channel.queue_declare(queue=’Hi’) #
定义回调函数 def callback(ch, method, properties, body): print(‘[x]
Recieved %r’ % body) # channel.close()
ch.basic_ack(delivery_tag=method.delivery_tag) #
no_ack=Fales:表示开支完事后不积极把情状通告rabbitmq
channel.basic_consume(callback, queue=’Hi’, no_ack=False)
print(‘[*] Waiting for msg’) channel.start_consuming() View Code

安装

第一安装erlang环境。

官网:http://www.erlang.org/

Windows版下载地址:http://erlang.org/download/otp\_win64\_20.0.exe

Linux版:yum安装

安装

率先安装erlang环境。

官网:http://www.erlang.org/

Windows版下载地址:http://erlang.org/download/otp\_win64\_20.0.exe

Linux版:yum安装

durable 音信不丢掉

新闻生产者端发送音讯时挂掉了,消费者接音讯时挂掉了,以下办法会让RabbitMQ重新将该音信添加到队列中:

  • 回调函数中的ch.basic_ack(delivery_tag=method.delivery_tag),消费端须求做的
  • basic_comsume中的no_ack=False,消费端需求做的
  • 发布音讯端的basic_publish添加参数properties=pika.BasicProperties(delivery_mode=2),劳动者端必要做的

ca88亚洲城网站 2import
pika connection =
pika.BlockingConnection(pika.ConnectionParameters(host=’192.168.0.74′))
channel = connection.channel() channel.queue_declare(queue=’Hi’) #
假如有,略过;借使没有,创立队列 channel.basic_publish(exchange=”,
routing_key=’Hi’, body=’hello!world!!!’,
properties=pika.BasicProperties(delivery_mode=2)) #信息持久化
print(“[x] sent ‘hello,world!'”) connection.close() 生产者
ca88亚洲城网站 3import
pika connection =
pika.BlockingConnection(pika.ConnectionParameters(host=’192.168.0.74′))
channel = connection.channel() channel.queue_declare(queue=’Hi’) #
定义回调函数 def callback(ch, method, properties, body): print(‘[x]
Recieved %r’ % body) # channel.close()
ch.basic_ack(delivery_tag=method.delivery_tag) #
no_ack=Fales:表示开支完事后不主动把境况通告rabbitmq
channel.basic_consume(callback, queue=’Hi’, no_ack=True) print(‘[*]
Waiting for msg’) channel.start_consuming() 消费者

Windows安装步骤

率先步运行

ca88亚洲城网站 4

第二步

ca88亚洲城网站 5

第三步

ca88亚洲城网站 6

第四步

ca88亚洲城网站 7

第五步

ca88亚洲城网站 8

Erlang安装完结。

下一场安装RabbitMQ,首先下载RabbitMQ的Windows版本。

官网:http://www.rabbitmq.com/

Windows版下载地址:http://www.rabbitmq.com/releases/rabbitmq-server/v3.6.10/rabbitmq-server-3.6.10.exe

开辟安装程序,按照上边步骤安装。

ca88亚洲城网站 9

ca88亚洲城网站 10

ca88亚洲城网站 11

ca88亚洲城网站 12

ca88亚洲城网站 13

RabbitMQ安装到位。

开始菜单中跻身管理工具。

ca88亚洲城网站 14

ca88亚洲城网站 15

运作命令

  1. rabbitmq-plugins enable
    rabbitmq_management

ca88亚洲城网站 16

查看RabbitMQ服务是不是启动。

ca88亚洲城网站 17

ca88亚洲城网站 18

从那之后全体设置到位。

Windows安装步骤

第一步运行

ca88亚洲城网站 19

第二步

ca88亚洲城网站 20

第三步

ca88亚洲城网站 21

第四步

ca88亚洲城网站 22

第五步

ca88亚洲城网站 23

Erlang安装落成。

然后安装RabbitMQ,首先下载RabbitMQ的Windows版本。

官网:http://www.rabbitmq.com/

Windows版下载地址:http://www.rabbitmq.com/releases/rabbitmq-server/v3.6.10/rabbitmq-server-3.6.10.exe

开辟安装程序,按照下边步骤安装。

ca88亚洲城网站 24

ca88亚洲城网站 25

ca88亚洲城网站 26

ca88亚洲城网站 27

ca88亚洲城网站 28

RabbitMQ安装到位。

开始菜单中跻身管理工具。

ca88亚洲城网站 29

ca88亚洲城网站 30

运作命令

  1. rabbitmq-plugins enable
    rabbitmq_management

ca88亚洲城网站 31

查看RabbitMQ服务是还是不是启动。

ca88亚洲城网站 32

ca88亚洲城网站 33

由来全体设置到位。

信息得到顺序

默许音信队列里的多寡是比照顺序被消费者拿走,例如:消费者1去队列中拿走
奇数 系列的天职,消费者2去队列中取得 偶数
种类的职务。但有半数以上情状下,新闻队列后端的买主服务器的拍卖能力是分化的,那就会并发部分服务器闲置时间较长,资源浪费的意况,那么,我们就要求变更默许的信息队列获取顺序!

channel.basic_qos(prefetch_count=1) 表示哪个人来什么人取,不再依据奇偶数排列,那是消费者端需求做的

ca88亚洲城网站 34import
pika connection =
pika.BlockingConnection(pika.ConnectionParameters(host=’192.168.0.74′))
channel = connection.channel() channel.queue_declare(queue=’Hi’) #
定义回调函数 def callback(ch, method, properties, body): print(‘[x]
Recieved %r’ % body) # channel.close()
ch.basic_ack(delivery_tag=method.delivery_tag)
channel.basic_qos(prefetch_count=1) #更改默许获取顺序,什么人来何人取 #
no_ack=Fales:表示开销完之后不主动把状态文告rabbitmq
channel.basic_consume(callback, queue=’Hi’, no_ack=True) print(‘[*]
Waiting for msg’) channel.start_consuming() 消费者

Linux安装步骤

安装erlang。

  1. yum -y install erlang

安装RabbitMQ。

  1. wget https://github.com/rabbitmq/rabbitmq-server/archive/rabbitmq\_v3\_6\_10.tar.gz
  1. rpm -ivh
    rabbitmq-server-3.6.10-1.el6.noarch.rpm

RabbitMQ安装战败,报错如下。

  1. warning:
    rabbitmq-server-3.6.10-1.el6.noarch.rpm: Header V4 RSA/SHA512
    Signature, key ID 6026dfca: NOKEY

  2. error: Failed dependencies:

  3.         erlang >= R16B-03 is needed by
    rabbitmq-server-3.6.10-1.el6.noarch

  4.         socat is needed by
    rabbitmq-server-3.6.10-1.el6.noarch

原因是yum安装的erlang版本太低,那里提供的RabbitMQ是流行版3.6.10,所需的erlang版本最低为R16B-03,否则编译时将破产,也就是上述失实。

再一次安装erlang。

  1. wget http://erlang.org/download/otp\_src\_20.0.tar.gz
  1. tar xvzf otp_src_20.0.tar.gz

  2. cd otp_src_20.0

  3. ./configure

  4. make && make install

重复安装erlang已毕。

运行erlang。

  1. erl

  2. Erlang/OTP 20 [erts-9.0] [source]
    [64-bit] [smp:1:1] [ds:1:1:10] [async-threads:10] [hipe]
    [kernel-poll:false]

  3.  

  4. Eshell V9.0 (abort with ^G)

安装socat。

  1. yum install -y socat

双重安装RabbitMQ。

  1. rpm -ivh
    rabbitmq-server-3.6.10-1.el6.noarch.rpm

  2. warning:
    rabbitmq-server-3.6.10-1.el6.noarch.rpm: Header V4 RSA/SHA512
    Signature, key ID 6026dfca: NOKEY

  3. error: Failed dependencies:

  4.         erlang >= R16B-03 is needed by
    rabbitmq-server-3.6.10-1.el6.noarch

上述错误新闻呈现安装失利,因为rabbitMQ的器重关系所造成,所以要不经意保养,执行以下命令。

  1. rpm -ivh –nodeps
    rabbitmq-server-3.6.10-1.el6.noarch.rpm

安装成功。

启动、停止RabbitMQ。

  1. rabbitmq-server start     #启动
  1. rabbitmq-server stop     #停止
  1. rabbitmq-server restart    #重启

 

Linux安装步骤

安装erlang。

  1. yum -y install erlang

安装RabbitMQ。

  1. wget https://github.com/rabbitmq/rabbitmq-server/archive/rabbitmq\_v3\_6\_10.tar.gz
  1. rpm -ivh
    rabbitmq-server-3.6.10-1.el6.noarch.rpm

RabbitMQ安装失利,报错如下。

  1. warning:
    rabbitmq-server-3.6.10-1.el6.noarch.rpm: Header V4 RSA/SHA512
    Signature, key ID 6026dfca: NOKEY

  2. error: Failed dependencies:

  3.         erlang >= R16B-03 is needed by
    rabbitmq-server-3.6.10-1.el6.noarch

  4.         socat is needed by
    rabbitmq-server-3.6.10-1.el6.noarch

原因是yum安装的erlang版本太低,那里提供的RabbitMQ是前卫版3.6.10,所需的erlang版本最低为R16B-03,否则编译时将破产,也就是上述失实。

双重安装erlang。

  1. wget http://erlang.org/download/otp\_src\_20.0.tar.gz
  1. tar xvzf otp_src_20.0.tar.gz

  2. cd otp_src_20.0

  3. ./configure

  4. make && make install

再也安装erlang达成。

运行erlang。

  1. erl

  2. Erlang/OTP 20 [erts-9.0] [source]
    [64-bit] [smp:1:1] [ds:1:1:10] [async-threads:10] [hipe]
    [kernel-poll:false]

  3.  

  4. Eshell V9.0 (abort with ^G)

安装socat。

  1. yum install -y socat

再度安装RabbitMQ。

  1. rpm -ivh
    rabbitmq-server-3.6.10-1.el6.noarch.rpm

  2. warning:
    rabbitmq-server-3.6.10-1.el6.noarch.rpm: Header V4 RSA/SHA512
    Signature, key ID 6026dfca: NOKEY

  3. error: Failed dependencies:

  4.         erlang >= R16B-03 is needed by
    rabbitmq-server-3.6.10-1.el6.noarch

上述错误音讯彰显安装失利,因为rabbitMQ的依赖关系所导致,所以要不经意依赖,执行以下命令。

  1. rpm -ivh –nodeps
    rabbitmq-server-3.6.10-1.el6.noarch.rpm

设置成功。

启动、停止RabbitMQ。

  1. rabbitmq-server start     #启动
  1. rabbitmq-server stop     #停止
  1. rabbitmq-server restart    #重启

 

公布和订阅

颁发订阅和简易的新闻队列差异在于,发表订阅会将新闻发送给所有的订阅者,而新闻队列中的数据被消费五遍便没有。所以,RabbitMQ完成公布和订阅时,会为每一个订阅者创立一个行列,而公布者公布音信时,会将音讯放置在具备相关队列中。

RabbitMQ中,所有生产者提交的讯息都由Exchange来接受,然后Exchange根据一定的国策转向到Queue进行仓储 。RabbitMQ提供了四种Exchange:fanout,direct,topic,header
header形式在骨子里运用中较少,只对前三种格局展开比较。

exchange type = fanout

ca88亚洲城网站 35

其余发送到Fanout
Exchange的新闻都会被转载到与该
Exchange绑定(Binding)的所有Queue上。

1.足以知晓为路由表的形式

2.那种格局不必要RouteKey

3.这种情势须求超前将Exchange与Queue举行绑定,一个Exchange可以绑定四个Queue,一个Queue可以同多少个Exchange进行绑定。

4.假如接受到信息的Exchange没有与其他Queue绑定,则音信会被打消。

ca88亚洲城网站 36import
pika
connection=pika.BlockingConnection(pika.ConnectionParameters(host=’192.168.0.74′))
channel = connection.channel()
channel.exchange_declare(exchange=’logs_fanout’,type=’fanout’)
msg=’456′
channel.basic_publish(exchange=’logs_fanout’,routing_key=”,body=msg)
print(‘最头阵送:%s’%msg) connection.close() 生产者
ca88亚洲城网站 37import
pika
connection=pika.BlockingConnection(pika.ConnectionParameters(host=’192.168.0.74′))
channel = connection.channel()
channel.exchange_declare(exchange=’logs_fanout’,type=’fanout’)
#肆意创立队列 result=channel.queue_declare(exclusive=True)
queue_name=result.method.queue #绑定相关队列名称
channel.queue_bind(exchange=’logs_fanout’,queue=queue_name) def
callback(ch,method,properties,body): print(‘[x] %r’%body)
channel.basic_consume(callback,queue=queue_name,no_ack=True)
channel.start_consuming() 消费者

 关键字

ca88亚洲城网站 38

任何发送到Direct Exchange的音讯都会被转化到RouteKey中指定的Queue。
1.貌似情况可以利用rabbitMQ自带的Exchange:””(该Exchange的名字为空字符串,下文称其为default
Exchange)。  2.那种情势下不须要将Exchange举行其余绑定(binding)操作 
3.信息传递时须要一个“RouteKey”,可以省略的了然为要发送到的队列名字。 
4.若是vhost中不设有RouteKey中指定的连串名,则该新闻会被扬弃。
ca88亚洲城网站 39import
pika
connection=pika.BlockingConnection(pika.ConnectionParameters(host=’192.168.0.74′))
channel = connection.channel()
channel.exchange_declare(exchange=’logs_direct_test1′,type=’direct’)
serverity=’error’ msg=’123′
channel.basic_publish(exchange=’logs_direct_test1′,routing_key=serverity,body=msg)
print(‘初始发送:%r:%r’%(serverity,msg)) connection.close() 生产者
ca88亚洲城网站 40import
pika
connection=pika.BlockingConnection(pika.ConnectionParameters(host=’192.168.0.74′))
channel = connection.channel()
channel.exchange_declare(exchange=’logs_direct_test1′,type=’direct’)
#随便创设队列 result=channel.queue_declare(exclusive=True)
queue_name=result.method.queue
serverities=[‘error’,’info’,’warning’,] for serverity in serverities:
channel.queue_bind(exchange=’logs_direct_test1′,queue=queue_name,routing_key=serverity)
print(‘[***] 先河收受信息!’) def
callback(ch,method,properties,body): print(‘[x]
%r:%r’%(method.routing_key,body))
channel.basic_consume(callback,queue=queue_name,no_ack=True)
channel.start_consuming() 消费者1
ca88亚洲城网站 41import
pika
connection=pika.BlockingConnection(pika.ConnectionParameters(host=’192.168.0.74′))
channel = connection.channel()
channel.exchange_declare(exchange=’logs_direct_test1′,type=’direct’)
#肆意成立队列 result=channel.queue_declare(exclusive=True)
queue_name=result.method.queue serverities=[‘error’,] for serverity
in serverities:
channel.queue_bind(exchange=’logs_direct_test1′,queue=queue_name,routing_key=serverity)
print(‘[***] 初阶接受新闻!’) def
callback(ch,method,properties,body): print(‘[x]
%r:%r’%(method.routing_key,body))
channel.basic_consume(callback,queue=queue_name,no_ack=True)
channel.start_consuming() 消费者2

张冠李戴订阅

ca88亚洲城网站 42

其它发送到Topic
Exchange的音信都会被转接到具有关切RouteKey中指定话题的Queue上
1.那种格局相比较复杂,简单的话,就是每个队列都有其关注的主旨,所有的音信都包罗一个“标题”(RouteKey),Exchange会将新闻转载到具有关注宗旨能与RouteKey模糊匹配的队列。 
2.那种方式需求RouteKey,也许要超前绑定Exchange与Queue。 
3.在展开绑定时,要提供一个该队列关切的大旨,如“#.log.#”表示该队列关注所有涉嫌log的新闻(一个RouteKey为”MQ.log.error”的新闻会被转正到该队列)。 
4.“#”表示0个或若干个重点字,“*”表示一个敬服字。如“log.*”能与“log.warn”匹配,无法与“log.warn.timeout”匹配;但是“log.#”能与上述二者合营。 
5.同等,假诺Exchange没有察觉可以与RouteKey匹配的Queue,则会丢掉此新闻。
ca88亚洲城网站 43#!/usr/bin/env
python import pika import sys connection =
pika.BlockingConnection(pika.ConnectionParameters( host=’192.168.0.74′))
channel = connection.channel()
channel.exchange_declare(exchange=’topic_logs’, type=’topic’)
routing_key = sys.argv[1] if len(sys.argv) > 1 else
‘anonymous.info’ message = ‘ ‘.join(sys.argv[2:]) or ‘Hello World!’
channel.basic_publish(exchange=’topic_logs’,
routing_key=routing_key, body=message) print(” [x] Sent %r:%r” %
(routing_key, message)) connection.close() 生产者
ca88亚洲城网站 44#!/usr/bin/env
python import pika import sys connection =
pika.BlockingConnection(pika.ConnectionParameters( host=’192.168.0.74′))
channel = connection.channel()
channel.exchange_declare(exchange=’topic_logs’, type=’topic’) result =
channel.queue_declare(exclusive=True) queue_name = result.method.queue
binding_keys = sys.argv[1:] if not binding_keys:
sys.stderr.write(“Usage: %s [binding_key]…\n” % sys.argv[0])
sys.exit(1) for binding_key in binding_keys:
channel.queue_bind(exchange=’topic_logs’, queue=queue_name,
routing_key=binding_key) print(‘ [*] Waiting for logs. To exit
press CTRL+C’) def callback(ch, method, properties, body): print(” [x]
%r:%r” % (method.routing_key, body)) channel.basic_consume(callback,
queue=queue_name, no_ack=True) channel.start_consuming() 消费者 好文推荐:
http://hwcrazy.com/b5fce358672411e3baa0000d601c5586/group/free\_open\_source\_project/

 

 

  

  

 

 

 

 

  

http://www.bkjia.com/Pythonjc/1146540.htmlwww.bkjia.comtruehttp://www.bkjia.com/Pythonjc/1146540.htmlTechArticlepython之RabbitMQ,pythonrabbitmq
RabbitMQ是一个在AMQP基础上全部的,可复用的店铺音信系统。他听从Mozilla
Public License开源协议。MQ全称为Message Qu…

RabbitMQ使用

落成最简便易行的种类通讯

ca88亚洲城网站 45

send端(producer)

  1. __author__ = ‘Golden’

  2. #!/usr/bin/env python3

  3. # -*- coding:utf-8 -*-

  4.  

  5. import pika

  6.  

  7. connection =
    pika.BlockingConnection(pika.ConnectionParameters(‘localhost’))

  1. channel = connection.channel()

  2.  

  3. # 声明queue

  4. channel.queue_declare(queue=’hello’)

  1.  

  2. channel.basic_publish(exchange=”,

  1.                       routing_key=’hello’,
  1.                       body=’hello
    word’)

  2. print(“[x] Sent ‘hello word!'”)

  3. connection.close()

receive端(consumer)

  1. __author__ = ‘Golden’

  2. #!/usr/bin/env python3

  3. # -*- coding:utf-8 -*-

  4.  

  5. import pika,time

  6.  

  7. connection =
    pika.BlockingConnection(pika.ConnectionParameters(‘localhost’))

  1. channel = connection.channel()

  2.  

  3. channel.queue_declare(queue=’hello’)

  1.  

  2. def
    callback(ch,method,properties,body):

  3.     print(‘–>’,ch,method,properties)

  1.     print(“[x] Received %s” % body)

  2.  

  3. channel.basic_consume(callback,

  1.                       queue=’hello’,
  1.                       no_ack=True
  1.                       )

  2.  

  3. print(‘[*] waiting for messages.To exit press CTRL+C’)

  1. channel.start_consuming()

RabbitMQ使用

兑现最简易的队列通讯

ca88亚洲城网站 46

send端(producer)

  1. __author__ = ‘Golden’

  2. #!/usr/bin/env python3

  3. # -*- coding:utf-8 -*-

  4.  

  5. import pika

  6.  

  7. connection =
    pika.BlockingConnection(pika.ConnectionParameters(‘localhost’))

  1. channel = connection.channel()

  2.  

  3. # 声明queue

  4. channel.queue_declare(queue=’hello’)

  1.  

  2. channel.basic_publish(exchange=”,

  1.                       routing_key=’hello’,
  1.                       body=’hello
    word’)

  2. print(“[x] Sent ‘hello word!'”)

  3. connection.close()

receive端(consumer)

  1. __author__ = ‘Golden’

  2. #!/usr/bin/env python3

  3. # -*- coding:utf-8 -*-

  4.  

  5. import pika,time

  6.  

  7. connection =
    pika.BlockingConnection(pika.ConnectionParameters(‘localhost’))

  1. channel = connection.channel()

  2.  

  3. channel.queue_declare(queue=’hello’)

  1.  

  2. def
    callback(ch,method,properties,body):

  3.     print(‘–>’,ch,method,properties)

  1.     print(“[x] Received %s” % body)

  2.  

  3. channel.basic_consume(callback,

  1.                       queue=’hello’,
  1.                       no_ack=True
  1.                       )

  2.  

  3. print(‘[*] waiting for messages.To exit press CTRL+C’)

  1. channel.start_consuming()

no_ack分析

no_ack属性是在调用Basic.Consume方法时能够安装的一个首要参数。no_ack的用途是确保message被consumer成功处理了。那里成功的发现是,在设置了no_ack=false的意况下,只要consumer手动应答了Basic.Ack,即便其成功拍卖了。

no_ack分析

no_ack属性是在调用Basic.Consume方法时可以安装的一个至关主要参数。no_ack的用途是保障message被consumer成功处理了。那里成功的发现是,在装置了no_ack=false的境况下,只要consumer手动应答了Basic.Ack,即便其成功拍卖了。

no_ack=true(此时为机关回复)

在那种景况下,consumer会在收取到Basic.Deliver+Content-Header+Content-Body之后,立即回复Ack,而那么些Ack是TCP协议中的Ack。此Ack的回复不保护consumer是或不是对接受到的数码开展了处理,当然也不尊敬处理数据所急需的耗时。

no_ack=true(此时为电动回复)

在那种气象下,consumer会在收到到Basic.Deliver+Content-Header+Content-Body之后,马上回复Ack,而那一个Ack是TCP协议中的Ack。此Ack的还原不关切consumer是不是对接受到的数额开展了拍卖,当然也不关怀处理数据所急需的耗时。

no_ack=False(此时为手动应答)

在那种情景下,要求consumer在处理完接收到的Basic.Deliver+Content-Header+Content-Body之后才回复Ack,而那些Ack是AMQP协议中的Basic.Ack。此Ack的东山再起与事务处理有关,所以具体的苏醒时间应该要在于业务处理的耗时。

no_ack=False(此时为手动应答)

在这种景色下,必要consumer在拍卖完接收到的Basic.Deliver+Content-Header+Content-Body之后才回复Ack,而以此Ack是AMQP协议中的Basic.Ack。此Ack的复原与业务处理相关,所以实际的回复时间应当要取决于业务处理的耗时。

总结

Basic.Ack发给RabbitMQ以报告,可以将相应message从RabbitMQ的信息从缓存中移除。

Basic.Ack未被consumer发给RabbitMQ前出现了充足,RabbitMQ发现与该consumer对应的连接被断开,将该该message以轮询格局发送给其他consumer(须要存在多个consumer订阅同一个queue)。

在no_ack=true的情况下,RabbitMQ认为message一旦被deliver出去后就已被认同了,所以会霎时将缓存中的message删除,由此在consumer格外时会导致音信丢失。

源于consumer的Basic.Ack与发送给Producer的Basic.Ack没有一贯关乎。

总结

Basic.Ack发给RabbitMQ以告知,可以将相应message从RabbitMQ的消息从缓存中移除。

Basic.Ack未被consumer发给RabbitMQ前现身了要命,RabbitMQ发现与该consumer对应的总是被断开,将该该message以轮询方式发送给其他consumer(要求存在七个consumer订阅同一个queue)。

在no_ack=true的事态下,RabbitMQ认为message一旦被deliver出去后就已被认同了,所以会应声将缓存中的message删除,因而在consumer相当时会导致信息丢失。

发源consumer的Basic.Ack与发送给Producer的Basic.Ack没有直接关系。

音信持久化

音讯持久化

acknowledgment音讯持久化

no-ack=False,如若consumer挂掉了,那么RabbitMQ会重新将该职责添加到行列中。

回调函数中

  1. ch.basic_ack(delivery_tag=method.delivery_tag)

basic_consume中

  1. no_ack=False

receive端(consumer)

  1. __author__ = ‘Golden’

  2. #!/usr/bin/env python3

  3. # -*- coding:utf-8 -*-

  4.  

  5. import pika,time

  6.  

  7. connection =
    pika.BlockingConnection(pika.ConnectionParameters(‘localhost’))

  1. channel = connection.channel()

  2.  

  3. channel.queue_declare(queue=’hello’)

  1.  

  2. # 定义回调函数

  3. def
    callback(ch,method,properties,body):

  4.     print(‘–>’,ch,method,properties)

  1.     print(“[x] Received %s” % body)

  2.     ch.basic_ack(delivery_tag=method.delivery_tag)

  1.  

  2. #
    no_ack=False表示开销完未来不主动把状态布告RabbitMQ

  3. channel.basic_consume(callback,

  1.                       queue=’hello’,
  1.                       no_ack=False
  1.                       )

  2.  

  3. print(‘[*] waiting for messages.To exit press CTRL+C’)

  1. channel.start_consuming()

acknowledgment音信持久化

no-ack=False,假诺consumer挂掉了,那么RabbitMQ会重新将该职分添加到行列中。

回调函数中

  1. ch.basic_ack(delivery_tag=method.delivery_tag)

basic_consume中

  1. no_ack=False

receive端(consumer)

  1. __author__ = ‘Golden’

  2. #!/usr/bin/env python3

  3. # -*- coding:utf-8 -*-

  4.  

  5. import pika,time

  6.  

  7. connection =
    pika.BlockingConnection(pika.ConnectionParameters(‘localhost’))

  1. channel = connection.channel()

  2.  

  3. channel.queue_declare(queue=’hello’)

  1.  

  2. # 定义回调函数

  3. def
    callback(ch,method,properties,body):

  4.     print(‘–>’,ch,method,properties)

  1.     print(“[x] Received %s” % body)

  2.     ch.basic_ack(delivery_tag=method.delivery_tag)

  1.  

  2. #
    no_ack=False表示费用完事后不主动把情形文告RabbitMQ

  3. channel.basic_consume(callback,

  1.                       queue=’hello’,
  1.                       no_ack=False
  1.                       )

  2.  

  3. print(‘[*] waiting for messages.To exit press CTRL+C’)

  1. channel.start_consuming()

durable音讯持久化

producer发送音信时挂掉了,consumer接收音讯时挂掉了,以下模式会让RabbitMQ重新将该音讯添加到队列中。

回调函数中

  1. ch.basic_ack(delivery_tag=method.delivery_tag)

basic_consume中

  1. no_ack=False

basic_publish中添加参数

  1. properties=pika.BasicProperties(delivery_mode=2)

channel.queue_declare中添加参数

  1. channel.queue_declare(queue=’hello’,durable=True)

send端(producer)

  1. __author__ = ‘Golden’

  2. #!/usr/bin/env python3

  3. # -*- coding:utf-8 -*-

  4.  

  5. import pika

  6.  

  7. connection =
    pika.BlockingConnection(pika.ConnectionParameters(‘localhost’))

  1. channel = connection.channel()

  2.  

  3. # 声明queue

  4. channel.queue_declare(queue=’hello’,durable=True)

  1.  

  2. channel.basic_publish(exchange=”,

  1.                       routing_key=’hello’,
  1.                       body=’hello
    word’,

  2.                       properties=pika.BasicProperties(delivery_mode=2))

  1. print(“[x] Sent ‘hello word!'”)

  2. connection.close()

receive端(consumer)与acknowledgment信息持久化中receive端(consumer)相同。

durable音信持久化

producer发送音讯时挂掉了,consumer接收新闻时挂掉了,以下方式会让RabbitMQ重新将该音信添加到队列中。

回调函数中

  1. ch.basic_ack(delivery_tag=method.delivery_tag)

basic_consume中

  1. no_ack=False

basic_publish中添加参数

  1. properties=pika.BasicProperties(delivery_mode=2)

channel.queue_declare中添加参数

  1. channel.queue_declare(queue=’hello’,durable=True)

send端(producer)

  1. __author__ = ‘Golden’

  2. #!/usr/bin/env python3

  3. # -*- coding:utf-8 -*-

  4.  

  5. import pika

  6.  

  7. connection =
    pika.BlockingConnection(pika.ConnectionParameters(‘localhost’))

  1. channel = connection.channel()

  2.  

  3. # 声明queue

  4. channel.queue_declare(queue=’hello’,durable=True)

  1.  

  2. channel.basic_publish(exchange=”,

  1.                       routing_key=’hello’,
  1.                       body=’hello
    word’,

  2.                       properties=pika.BasicProperties(delivery_mode=2))

  1. print(“[x] Sent ‘hello word!'”)

  2. connection.close()

receive端(consumer)与acknowledgment音讯持久化中receive端(consumer)相同。

信息分发

默许音信队列里的数目是依据顺序分发到各种消费者,不过多数场地下,信息队列后端的买主服务器的处理能力是不同的,这就会合世部分服务器闲置时间较长,资源浪费的图景。那么,大家就要求改变默许的信息队列获取顺序。可以在相继消费者端配置prefetch_count=1,意思就是报告RabbitMQ在这些消费者当前信息还尚未拍卖完的时候就绝不再发新信息了。

ca88亚洲城网站 47

顾客端

  1. __author__ = ‘Golden’

  2. #!/usr/bin/env python3

  3. # -*- coding:utf-8 -*-

  4. __author__ = ‘Golden’

  5. #!/usr/bin/env python3

  6. # -*- coding:utf-8 -*-

  7.  

  8. import pika,time

  9.  

  10. connection =
    pika.BlockingConnection(pika.ConnectionParameters(‘localhost’))

  1. channel = connection.channel()

  2.  

  3. channel.queue_declare(queue=’hello2′,durable=True)

  1.  

  2. def
    callback(ch,method,properties,body):

  3.     print(‘–>’,ch,method,properties)

  1.     print(“[x] Received %s” % body)

  2.     time.sleep(30)

  3.     ch.basic_ack(delivery_tag=method.delivery_tag)

  1.  

  2. channel.basic_qos(prefetch_count=1)

  1. channel.basic_consume(callback,
  1.                       queue=’hello2′,
  1.                       no_ack=False
  1.                       )

  2.  

  3. print(‘[*] waiting for messages.To exit press CTRL+C’)

  1. channel.start_consuming()

劳动者端不变。

新闻分发

默许新闻队列里的数额是安份守己顺序分发到种种消费者,然而多数意况下,信息队列后端的主顾服务器的处理能力是区其余,那就会油然则生局地服务器闲置时间较长,资源浪费的情形。那么,我们就必要变更默许的音信队列获取顺序。可以在逐一消费者端配置prefetch_count=1,意思就是报告RabbitMQ在那些消费者当前音讯还不曾处理完的时候就无须再发新新闻了。

ca88亚洲城网站 48

买主端

  1. __author__ = ‘Golden’

  2. #!/usr/bin/env python3

  3. # -*- coding:utf-8 -*-

  4. __author__ = ‘Golden’

  5. #!/usr/bin/env python3

  6. # -*- coding:utf-8 -*-

  7.  

  8. import pika,time

  9.  

  10. connection =
    pika.BlockingConnection(pika.ConnectionParameters(‘localhost’))

  1. channel = connection.channel()

  2.  

  3. channel.queue_declare(queue=’hello2′,durable=True)

  1.  

  2. def
    callback(ch,method,properties,body):

  3.     print(‘–>’,ch,method,properties)

  1.     print(“[x] Received %s” % body)

  2.     time.sleep(30)

  3.     ch.basic_ack(delivery_tag=method.delivery_tag)

  1.  

  2. channel.basic_qos(prefetch_count=1)

  1. channel.basic_consume(callback,
  1.                       queue=’hello2′,
  1.                       no_ack=False
  1.                       )

  2.  

  3. print(‘[*] waiting for messages.To exit press CTRL+C’)

  1. channel.start_consuming()

生产者端不变。

信息宣布和订阅(publish\subscribe)

公布和订阅与简短的音信队列分化在于,发布和订阅会将新闻发送给所有的订阅者,而信息队列中的数据被消费两次便没有。所以,RabbitMQ完成发表和订阅时,会为每一个订阅者制造一个行列,而公布者发表音讯时,会将音讯放置在所有相关队列中。类似广播的功用,那时候就要用到exchange。Exchange在概念的时候是有项目标,以控制到底是何许Queue符合条件,可以吸收新闻。

fanout:所有bind到此exchange的queue都足以接过音讯。

direct:通过routingKey和exchange决定的哪些唯一的queue可以选拔新闻。

topic:所有符合routingKey(可以是一个表明式)的routingKey所bind的queue可以收起消息。

表明式符号表达

#:一个或多个字符

*:任何字符

例如:#.a会匹配a.a,aa.a,aaa.a等。

*.a会匹配a.a,b.a,c.a等。

注意:使用RoutingKey为#,Exchange Type为topic的时候相对于采纳fanout。

heaers:通过headers来支配把新闻发给哪些queue。

ca88亚洲城网站 49

publisher

  1. __author__ = ‘Golden’

  2. #!/usr/bin/env python3

  3. # -*- coding:utf-8 -*-

  4.  

  5. import pika,sys

  6.  

  7. connection =
    pika.BlockingConnection(pika.ConnectionParameters(host=’localhost’))

  1. channel = connection.channel()

  2.  

  3. channel.exchange_declare(exchange=’logs’,type=’fanout’)

  1.  

  2. message = ”.join(sys.argv[1:]) or
    ‘info:Hello World!’

  3. channel.basic_publish(exchange=’logs’,

  1.                       routing_key=”,
  1.                       body=message)
  1.  

  2. print(‘[x] Send %r’ % message)

  1. connection.close()

subscriber

  1. __author__ = ‘Golden’

  2. #!/usr/bin/env python3

  3. # -*- coding:utf-8 -*-

  4.  

  5. import pika

  6.  

  7. connection =
    pika.BlockingConnection(pika.ConnectionParameters(host=’localhost’))

  1. channel = connection.channel()

  2. channel.exchange_declare(exchange=’logs’,type=’fanout’)

  1. #
    不点名queue名字,rabbit会随机分配一个名字,exclusive=True会在选择此queue的消费者断开后,自动将queue删除
  1. result =
    channel.queue_declare(exclusive=True)

  2. queue_name = result.method.queue

  1. channel.queue_bind(exchange=’logs’,queue=queue_name)
  1. print(‘[*]Waiting for logs.To exit press CTRL+C’)

  2. def
    callback(ch,method,properties,body):

  3.     print(‘[*] %s’%body)

  4.  

  5. channel.basic_consume(callback,

  1.                       queue=queue_name,
  1.                       no_ack=True)
  1.  

  2. channel.start_consuming()

新闻宣布和订阅(publish\subscribe)

公布和订阅与简单的新闻队列分歧在于,公布和订阅会将音讯发送给所有的订阅者,而新闻队列中的数据被消费三遍便消失。所以,RabbitMQ完结公布和订阅时,会为每一个订阅者创制一个连串,而公布者公布音讯时,会将信息放置在有着有关队列中。类似广播的法力,那时候就要用到exchange。Exchange在概念的时候是有档次的,以控制到底是怎么着Queue符合条件,可以接收新闻。

fanout:所有bind到此exchange的queue都得以吸收音信。

direct:通过routingKey和exchange决定的哪位唯一的queue可以接过音讯。

topic:所有符合routingKey(可以是一个表明式)的routingKey所bind的queue可以接纳音信。

表明式符号表明

#:一个或七个字符

*:任何字符

例如:#.a会匹配a.a,aa.a,aaa.a等。

*.a会匹配a.a,b.a,c.a等。

注意:使用RoutingKey为#,Exchange Type为topic的时候相对于采纳fanout。

heaers:通过headers来控制把新闻发给哪些queue。

ca88亚洲城网站 50

publisher

  1. __author__ = ‘Golden’

  2. #!/usr/bin/env python3

  3. # -*- coding:utf-8 -*-

  4.  

  5. import pika,sys

  6.  

  7. connection =
    pika.BlockingConnection(pika.ConnectionParameters(host=’localhost’))

  1. channel = connection.channel()

  2.  

  3. channel.exchange_declare(exchange=’logs’,type=’fanout’)

  1.  

  2. message = ”.join(sys.argv[1:]) or
    ‘info:Hello World!’

  3. channel.basic_publish(exchange=’logs’,

  1.                       routing_key=”,
  1.                       body=message)
  1.  

  2. print(‘[x] Send %r’ % message)

  1. connection.close()

subscriber

  1. __author__ = ‘Golden’

  2. #!/usr/bin/env python3

  3. # -*- coding:utf-8 -*-

  4.  

  5. import pika

  6.  

  7. connection =
    pika.BlockingConnection(pika.ConnectionParameters(host=’localhost’))

  1. channel = connection.channel()

  2. channel.exchange_declare(exchange=’logs’,type=’fanout’)

  1. #
    不指定queue名字,rabbit会随机分配一个名字,exclusive=True会在使用此queue的主顾断开后,自动将queue删除
  1. result =
    channel.queue_declare(exclusive=True)

  2. queue_name = result.method.queue

  1. channel.queue_bind(exchange=’logs’,queue=queue_name)
  1. print(‘[*]Waiting for logs.To exit press CTRL+C’)

  2. def
    callback(ch,method,properties,body):

  3.     print(‘[*] %s’%body)

  4.  

  5. channel.basic_consume(callback,

  1.                       queue=queue_name,
  1.                       no_ack=True)
  1.  

  2. channel.start_consuming()

重大字发送(echange type=direct)

出殡音信时肯定指定某个队列并向里面发送音讯,RabbitMQ还援助根据重点字发送,即队列绑定关键字,发送者将数据按照紧要字发送到音讯exchange,exchange根据重大字判定应该将数据发送至哪个队列。

ca88亚洲城网站 51

publisher

  1. __author__ = ‘Golden’

  2. #!/usr/bin/env python3

  3. # -*- coding:utf-8 -*-

  4.  

  5. import pika,sys

  6.  

  7. connection =
    pika.BlockingConnection(pika.ConnectionParameters(host=’localhost’))

  1. channel = connection.channel()

  2.  

  3. channel.exchange_declare(exchange=’direct_logs’,

  1.                          type=’direct’)
  1.  

  2. # severity = ‘error’

  3. severity = sys.argv[1] if len(sys.argv) > 1 else ‘info’

  4. # message = ‘Hello World!’

  5. message = ”.join(sys.argv[2:]) or
    ‘Hello World!’

  6.  

  7. channel.basic_publish(exchange=’direct_logs’,

  1.                       routing_key=severity,
  1.                       body=message)
  1. print(‘[x] Send %r:%r’ %
    (severity,message))

  2. connection.close()

subscriber

  1. __author__ = ‘Golden’

  2. #!/usr/bin/env python3

  3. # -*- coding:utf-8 -*-

  4.  

  5. import pika,sys

  6.  

  7. connection =
    pika.BlockingConnection(pika.ConnectionParameters(host=’localhost’))

  1. channel = connection.channel()

  2.  

  3. channel.exchange_declare(exchange=’direct_logs’,

  1.                          type=’direct’)
  1.  

  2. result =
    channel.queue_declare(exclusive=True)

  3. queue_name = result.method.queue

  1.  

  2. severities = sys.argv[1:]

  3. if not
    severities:

  4.     sys.stderr.write(‘Usage: %s
    [info] [warning] [error]\n’ % sys.argv[0])

  5.     sys.exit(1)

  6.  

  7. for
    severity in severities:

  8.     channel.queue_bind(exchange=’direct_logs’,

  1.                        queue=queue_name,
  1.                        routing_key=severity)
  1.  

  2. print(‘[*] Waiting for logs.To exit press CTRL+C’)

  3.  

  4. def
    callback(ch,method,properties,body):

  5.     print(‘[*] %r:%r’ %
    (method.routing_key,body))

  6.  

  7. channel.basic_consume(callback,

  1.                       queue=queue_name,
  1.                       no_ack=True)
  1.  

  2. channel.start_consuming()

启动subscriber1

  1. python3 direct_subscriber.py warning

启动subscriber2

  1. python3 direct_subscriber.py error

启动publisher1

  1. python3 direct_publisher.py info

启动publisher2

  1. python3 direct_publisher.py warning

启动publisher3

  1. python3 direct_publisher.py error

结果

ca88亚洲城网站 52

主要字发送(echange type=direct)

发送音讯时明确指定某个队列并向其中发送音讯,RabbitMQ还帮助依照重大字发送,即队列绑定关键字,发送者将数据根据重点字发送到信息exchange,exchange根据紧要字判定应该将数据发送至哪个队列。

ca88亚洲城网站 53

publisher

  1. __author__ = ‘Golden’

  2. #!/usr/bin/env python3

  3. # -*- coding:utf-8 -*-

  4.  

  5. import pika,sys

  6.  

  7. connection =
    pika.BlockingConnection(pika.ConnectionParameters(host=’localhost’))

  1. channel = connection.channel()

  2.  

  3. channel.exchange_declare(exchange=’direct_logs’,

  1.                          type=’direct’)
  1.  

  2. # severity = ‘error’

  3. severity = sys.argv[1] if len(sys.argv) > 1 else ‘info’

  4. # message = ‘Hello World!’

  5. message = ”.join(sys.argv[2:]) or
    ‘Hello World!’

  6.  

  7. channel.basic_publish(exchange=’direct_logs’,

  1.                       routing_key=severity,
  1.                       body=message)
  1. print(‘[x] Send %r:%r’ %
    (severity,message))

  2. connection.close()

subscriber

  1. __author__ = ‘Golden’

  2. #!/usr/bin/env python3

  3. # -*- coding:utf-8 -*-

  4.  

  5. import pika,sys

  6.  

  7. connection =
    pika.BlockingConnection(pika.ConnectionParameters(host=’localhost’))

  1. channel = connection.channel()

  2.  

  3. channel.exchange_declare(exchange=’direct_logs’,

  1.                          type=’direct’)
  1.  

  2. result =
    channel.queue_declare(exclusive=True)

  3. queue_name = result.method.queue

  1.  

  2. severities = sys.argv[1:]

  3. if not
    severities:

  4.     sys.stderr.write(‘Usage: %s
    [info] [warning] [error]\n’ % sys.argv[0])

  5.     sys.exit(1)

  6.  

  7. for
    severity in severities:

  8.     channel.queue_bind(exchange=’direct_logs’,

  1.                        queue=queue_name,
  1.                        routing_key=severity)
  1.  

  2. print(‘[*] Waiting for logs.To exit press CTRL+C’)

  3.  

  4. def
    callback(ch,method,properties,body):

  5.     print(‘[*] %r:%r’ %
    (method.routing_key,body))

  6.  

  7. channel.basic_consume(callback,

  1.                       queue=queue_name,
  1.                       no_ack=True)
  1.  

  2. channel.start_consuming()

启动subscriber1

  1. python3 direct_subscriber.py warning

启动subscriber2

  1. python3 direct_subscriber.py error

启动publisher1

  1. python3 direct_publisher.py info

启动publisher2

  1. python3 direct_publisher.py warning

启动publisher3

  1. python3 direct_publisher.py error

结果

ca88亚洲城网站 54

混淆匹配(exchange type=topic)

在topic类型下,可以让队列绑定多少个模糊的重大字,发送者将数据发送到exchange,exchange将盛传”路由值”和”关键字”举行匹配,匹配成功则将数据发送到指定队列。

ca88亚洲城网站 55

*:匹配任意一个字符

#:匹配任意个字符

publisher

  1. __author__ = ‘Golden’

  2. #!/usr/bin/env python3

  3. # -*- coding:utf-8 -*-

  4.  

  5. import pika,sys

  6.  

  7. connection =
    pika.BlockingConnection(pika.ConnectionParameters(host=’localhost’))

  1. channel = connection.channel()

  2.  

  3. channel.exchange_declare(exchange=’topic_logs’,

  1.                          type=’topic’)
  1.  

  2. routing_key = sys.argv[1] if len(sys.argv) > 1 else ‘anonymous.info’

  3. message = ”.join(sys.argv[2:]) or
    ‘Hello World!’

  4. channel.basic_publish(exchange=’topic_logs’,

  1.                       routing_key=routing_key,
  1.                       body=message)
  1.  

  2. print(‘[x] Sent %r:%r’ %
    (routing_key,message))

  3. connection.close()

subscriber

  1. __author__ = ‘Golden’

  2. #!/usr/bin/env python3

  3. # -*- coding:utf-8 -*-

  4.  

  5. import pika,sys

  6.  

  7. connection =
    pika.BlockingConnection(pika.ConnectionParameters(host=’localhost’))

  1. channel = connection.channel()

  2.  

  3. channel.exchange_declare(exchange=’topic_logs’,

  1.                          type=’topic’)
  1.  

  2. result =
    channel.queue_declare(exclusive=True)

  3. queue_name = result.method.queue

  1.  

  2. binding_keys = sys.argv[1:]

  3. if not
    binding_keys:

  4.     sys.stderr.write(‘Usage: %s
    [binding_key]…\n’ % sys.argv[0])

  5.     sys.exit(1)

  6.  

  7. for
    binding_key in binding_keys:

  1.     channel.queue_bind(exchange=’topic_logs’,
  1.                        queue=queue_name,
  1.                        routing_key=binding_key)
  1.  

  2. print(‘[*] Waiting for logs.To exit press CTRL+C’)

  3.  

  4. def
    callback(ch,method,properties,body):

  5.     print(‘[x] %r:%r’ %
    (method.routing_key,body))

  6.  

  7. channel.basic_consume(callback,

  1.                       queue=queue_name,
  1.                       no_ack=True)
  1.  

  2. channel.start_consuming()

测试

ca88亚洲城网站 56

混淆匹配(exchange type=topic)

在topic类型下,可以让队列绑定多少个模糊的第一字,发送者将数据发送到exchange,exchange将盛传”路由值”和”关键字”进行匹配,匹配成功则将数据发送到指定队列。

ca88亚洲城网站 57

*:匹配任意一个字符

#:匹配任意个字符

publisher

  1. __author__ = ‘Golden’

  2. #!/usr/bin/env python3

  3. # -*- coding:utf-8 -*-

  4.  

  5. import pika,sys

  6.  

  7. connection =
    pika.BlockingConnection(pika.ConnectionParameters(host=’localhost’))

  1. channel = connection.channel()

  2.  

  3. channel.exchange_declare(exchange=’topic_logs’,

  1.                          type=’topic’)
  1.  

  2. routing_key = sys.argv[1] if len(sys.argv) > 1 else ‘anonymous.info’

  3. message = ”.join(sys.argv[2:]) or
    ‘Hello World!’

  4. channel.basic_publish(exchange=’topic_logs’,

  1.                       routing_key=routing_key,
  1.                       body=message)
  1.  

  2. print(‘[x] Sent %r:%r’ %
    (routing_key,message))

  3. connection.close()

subscriber

  1. __author__ = ‘Golden’

  2. #!/usr/bin/env python3

  3. # -*- coding:utf-8 -*-

  4.  

  5. import pika,sys

  6.  

  7. connection =
    pika.BlockingConnection(pika.ConnectionParameters(host=’localhost’))

  1. channel = connection.channel()

  2.  

  3. channel.exchange_declare(exchange=’topic_logs’,

  1.                          type=’topic’)
  1.  

  2. result =
    channel.queue_declare(exclusive=True)

  3. queue_name = result.method.queue

  1.  

  2. binding_keys = sys.argv[1:]

  3. if not
    binding_keys:

  4.     sys.stderr.write(‘Usage: %s
    [binding_key]…\n’ % sys.argv[0])

  5.     sys.exit(1)

  6.  

  7. for
    binding_key in binding_keys:

  1.     channel.queue_bind(exchange=’topic_logs’,
  1.                        queue=queue_name,
  1.                        routing_key=binding_key)
  1.  

  2. print(‘[*] Waiting for logs.To exit press CTRL+C’)

  3.  

  4. def
    callback(ch,method,properties,body):

  5.     print(‘[x] %r:%r’ %
    (method.routing_key,body))

  6.  

  7. channel.basic_consume(callback,

  1.                       queue=queue_name,
  1.                       no_ack=True)
  1.  

  2. channel.start_consuming()

测试

ca88亚洲城网站 58

长距离进度调用(RPC)

RPC(Remote Procedure Call
Protocol)远程进程调用协议。在一个巨型的商店,系统由大大小小的服务组合,区其余社团维护分化的代码,安排在不一样的服务器。可是在做开发的时候屡次要用到其余团伙的方式,因为早已有了落到实处。但是这一个劳动配置在分歧的服务器,想要调用就要求网络通讯,那些代码繁琐且复杂,一不小心就会很没用。PRC协议定义了规划,其余的铺面都付出了分歧的贯彻。比如微软的wcf,以及WebApi。

在RabbitMQ中RPC的落到实处是很简单快速的,现在客户端、服务端都是新闻发表者与音讯接受者。

ca88亚洲城网站 59

先是客户端通过RPC向服务端爆发请求。correlation_id:请求标识,erply_to:结果再次来到队列。(我那里有部分数目必要您给自家处理一下,correlation_id是自己呼吁标识,你处理到位之后把结果回到到erply_to队列)

服务端获得请求,起始拍卖并回到。correlation_id:客户端请求标识。(correlation_id那是你的呼吁标识,还给您。那时候客户端用自己的correlation_id与服务端重回的correlation_id举办自查自纠,相同则吸纳。)

rpc_server

  1. __author__ = ‘Golden’

  2. #!/usr/bin/env python3

  3. # -*- coding:utf-8 -*-

  4.  

  5. import pika,time

  6.  

  7. connection =
    pika.BlockingConnection(pika.ConnectionParameters(host=’localhost’))

  1. channel = connection.channel()

  2.  

  3. channel.queue_declare(queue=’rpc_queue’)

  1. def fib(n):

  2.     if
    n == 0:

  3.         return 0

  4.     elif n == 1:

  5.         return 1

  6.     else:

  7.         return fib(n-1) + fib(n-2)

  8.  

  9. def on_request(ch,method,props,body):

  1.     n = int(body)

  2.     print(‘[.] fib(%s)’ % n)

  3.     response = fib(n)

  4.     ch.basic_publish(exchange=”,

  1.                      routing_key=props.reply_to,
  1.                      properties=pika.BasicProperties(correlation_id=props.correlation_id),
  1.                      body =
    str(response))

  2.     ch.basic_ack(delivery_tag=method.delivery_tag)

  1.  

  2. channel.basic_qos(prefetch_count=1)

  1. channel.basic_consume(on_request,queue=’rpc_queue’)
  1.  

  2. print(‘[x] Awaiting RPC requests’)

  1. channel.start_consuming()

rpc_client

  1. __author__ = ‘Golden’

  2. #!/usr/bin/env python3

  3. # -*- coding:utf-8 -*-

  4.  

  5. import pika,uuid

  6.  

  7. class
    FibonacciRpcClient(object):

  8.     def __init__(self):

  9.         self.connection =
    pika.BlockingConnection(pika.ConnectionParameters(host=’localhost’))

  1.         self.channel =
    self.connection.channel()

  2.         result =
    self.channel.queue_declare(exclusive=True)

  3.         self.callback_queue =
    result.method.queue

  4.         self.channel.basic_consume(self.on_response,no_ack=True,

  1.                                    queue=self.callback_queue)
  1.  

  2.     def
    on_response(self,ch,method,props,body):

  3.         if self.corr_id ==
    props.correlation_id:

  4.             self.response = body

  1.  

  2.     def call(self,n):

  3.         self.response = None

  4.         self.corr_id =
    str(uuid.uuid4())

  5.         self.channel.basic_publish(exchange=”,

  1.                                    routing_key=’rpc_queue’,
  1.                                    properties=pika.BasicProperties(
  1.                                        reply_to=self.callback_queue,
  1.                                        correlation_id=self.corr_id,),
  1.                                    body=str(n))
  1.         while self.response is None:

  2.             self.connection.process_data_events()

  1.         return int(self.response)

  2.  

  3. fibonacci_rpc = FibonacciRpcClient()

  1.  

  2. print(‘[x] Requesting fib(10)’)

  1. response = fibonacci_rpc.call(10)
  1. print(‘[.] Got %r ‘ % response)

 

长途过程调用(RPC)

RPC(Remote Procedure Call
Protocol)远程进度调用协议。在一个重型的商家,系统由大大小小的服务组合,不一样的集体维护差其余代码,安排在不一致的服务器。然则在做开发的时候屡次要用到任何团队的法门,因为早已有了落到实处。可是那个劳务配置在差其他服务器,想要调用就要求网络通讯,那些代码繁琐且复杂,一不小心就会很没用。PRC协议定义了设计,别的的信用社都提交了差其他兑现。比如微软的wcf,以及WebApi。

在RabbitMQ中RPC的贯彻是很不难高效的,现在客户端、服务端都是音信宣布者与新闻接受者。

ca88亚洲城网站 60

率先客户端通过RPC向服务端暴发请求。correlation_id:请求标识,erply_to:结果再次回到队列。(我那边有局地数额要求你给自身处理一下,correlation_id是自个儿请求标识,你处理完了之后把结果重返到erply_to队列)

服务端得到请求,开首拍卖并赶回。correlation_id:客户端请求标识。(correlation_id那是你的央浼标识,还给您。那时候客户端用自己的correlation_id与服务端重回的correlation_id举办相比,相同则接受。)

rpc_server

  1. __author__ = ‘Golden’

  2. #!/usr/bin/env python3

  3. # -*- coding:utf-8 -*-

  4.  

  5. import pika,time

  6.  

  7. connection =
    pika.BlockingConnection(pika.ConnectionParameters(host=’localhost’))

  1. channel = connection.channel()

  2.  

  3. channel.queue_declare(queue=’rpc_queue’)

  1. def fib(n):

  2.     if
    n == 0:

  3.         return 0

  4.     elif n == 1:

  5.         return 1

  6.     else:

  7.         return fib(n-1) + fib(n-2)

  8.  

  9. def on_request(ch,method,props,body):

  1.     n = int(body)

  2.     print(‘[.] fib(%s)’ % n)

  3.     response = fib(n)

  4.     ch.basic_publish(exchange=”,

  1.                      routing_key=props.reply_to,
  1.                      properties=pika.BasicProperties(correlation_id=props.correlation_id),
  1.                      body =
    str(response))

  2.     ch.basic_ack(delivery_tag=method.delivery_tag)

  1.  

  2. channel.basic_qos(prefetch_count=1)

  1. channel.basic_consume(on_request,queue=’rpc_queue’)
  1.  

  2. print(‘[x] Awaiting RPC requests’)

  1. channel.start_consuming()

rpc_client

  1. __author__ = ‘Golden’

  2. #!/usr/bin/env python3

  3. # -*- coding:utf-8 -*-

  4.  

  5. import pika,uuid

  6.  

  7. class
    FibonacciRpcClient(object):

  8.     def __init__(self):

  9.         self.connection =
    pika.BlockingConnection(pika.ConnectionParameters(host=’localhost’))

  1.         self.channel =
    self.connection.channel()

  2.         result =
    self.channel.queue_declare(exclusive=True)

  3.         self.callback_queue =
    result.method.queue

  4.         self.channel.basic_consume(self.on_response,no_ack=True,

  1.                                    queue=self.callback_queue)
  1.  

  2.     def
    on_response(self,ch,method,props,body):

  3.         if self.corr_id ==
    props.correlation_id:

  4.             self.response = body

  1.  

  2.     def call(self,n):

  3.         self.response = None

  4.         self.corr_id =
    str(uuid.uuid4())

  5.         self.channel.basic_publish(exchange=”,

  1.                                    routing_key=’rpc_queue’,
  1.                                    properties=pika.BasicProperties(
  1.                                        reply_to=self.callback_queue,
  1.                                        correlation_id=self.corr_id,),
  1.                                    body=str(n))
  1.         while self.response is None:

  2.             self.connection.process_data_events()

  1.         return int(self.response)

  2.  

  3. fibonacci_rpc = FibonacciRpcClient()

  1.  

  2. print(‘[x] Requesting fib(10)’)

  1. response = fibonacci_rpc.call(10)
  1. print(‘[.] Got %r ‘ % response)

 

相关文章