作者:ZZR
fanout exchange
在上一篇中,task_queue所解决的问题是,一个message只能被一种consumer所接收。现在我们有了新的需求,我们需要一个日志系统,我们希望有两种consumer,一种consumer将日志输出到屏幕,另一种consumer写到disk。为了实现这个目的,我们希望message被投到两个queue中,交给不同的consumer进行处理。如下所示:
对于producer,不再指定哪一个queue来接收消息,而是哪一个exchange来接收消息。exchange不保存信息,如果没有queue绑定在这个exchange上的话,消息就丢失了。代码如下:
# filename: emit_log.py
# !/usr/bin/env python
import pika
import sys
connection = pika.BlockingConnection(pika.ConnectionParameters(
host='localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='logs',
type='fanout')
message = ' '.join(sys.argv[1:]) or "info: Hello World!"
channel.basic_publish(exchange='logs',
routing_key='',
body=message)
print(" [x] Sent %r" % message)
connection.close()
对于consumer,自己创建临时queue(exclusive=True
,当这个consumer终止,这个queue就销毁),将这个queue接到exchange上,然后通过这个queue接收exchange发出的消息:
# filename: receive_logs.py
# !/usr/bin/env python
import pika
def callback(ch, method, properties, body):
print(" [x] %r" % body)
connection = pika.BlockingConnection(pika.ConnectionParameters(
host='localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='logs',
type='fanout')
result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue
channel.queue_bind(exchange='logs',
queue=queue_name)
print(' [*] Waiting for logs. To exit press CTRL+C')
channel.basic_consume(callback,
queue=queue_name,
no_ack=True)
channel.start_consuming()
如下图所示,每一个consumer都建了自己的临时queue,并与exchange进行了绑定:
direct exchange
上面的例子中,queue_bind()的时候我们没有指定routing_key(为了避免混淆,后续将其称为binding_key)。binding_key的功能与exchange的类型有关。对于fanout exchange而言,binding_key没有意义。对于direct exchange而言,如下图所示,只有消息的routing_type与queue的binding_key相同时才会发送到该queue:
可以指定相同的binding_key,如下图所示:
借此,我们可以将日志系统改造为以下模式,不同的consumer只接收特定类型的日志信息:
完整代码如下:
# filename: emit_log.py
#!/usr/bin/env python
import pika
import sys
connection = pika.BlockingConnection(pika.ConnectionParameters(
host='localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='direct_logs',
type='direct')
severity = sys.argv[1] if len(sys.argv) > 1 else 'info'
message = ' '.join(sys.argv[2:]) or 'Hello World!'
channel.basic_publish(exchange='direct_logs',
routing_key=severity,
body=message)
print(" [x] Sent %r:%r" % (severity, message))
connection.close()
# filename: receiver_logs.py
#!/usr/bin/env python
import pika
import sys
def callback(ch, method, properties, body):
print(" [x] %r:%r" % (method.routing_key, body))
connection = pika.BlockingConnection(pika.ConnectionParameters(
host='localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='direct_logs',
type='direct')
result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue
severities = sys.argv[1:]
if not severities:
sys.stderr.write("Usage: %s [info] [warning] [error]\n" % sys.argv[0])
sys.exit(1)
for severity in severities:
channel.queue_bind(exchange='direct_logs',
queue=queue_name,
routing_key=severity)
print(' [*] Waiting for logs. To exit press CTRL+C')
channel.basic_consume(callback,
queue=queue_name,
no_ack=True)
channel.start_consuming()
topic exchange
topic exchange与direct exchange类似,只是它允许binding_key使用特殊字符(注意,特殊字符代表的是单词,不是字母):
- *
:代表一个单词
- #
:代表零个或多个单词
比如下面这个例子。routing_key定义为"<celerity>.<colour>.<species>"
。举几个routing_key匹配的例子:
- quick.orange.rabbit:一,二
- lazy.orange.elephant:一,二
- quick.orange.fox:一
- lazy.brown.fox:三
- lazy.pink.rabbit:二,三(但只发一次,因为二和三是同一个队列)
- quick.brown.fox:无,舍弃
- orange:无,舍弃
- quick.orange.male.rabbit:无,舍弃
- lazy.orange.male.rabbit:三
当binding_key不使用这些特殊字符时,topic exchange其实就是direct exchange;当binding_key使用#
时,topic exchange其实就是fanout exchange,也就是所有消息都接收。
一些极端的例子:
binding_key:*
,routing_key:空串。不匹配。
binding_key:#.*
,routing_key:..
。匹配。
binding_key:#.*
,routing_key:apple。匹配。
完整代码:
# filename: emit_log_topic.py
#!/usr/bin/env python
import pika
import sys
connection = pika.BlockingConnection(pika.ConnectionParameters(
host='localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='topic_logs',
type='topic')
routing_key = sys.argv[1] if len(sys.argv) > 1 else 'anonymous.info'
message = ' '.join(sys.argv[2:]) or 'Hello World!'
channel.basic_publish(exchange='topic_logs',
routing_key=routing_key,
body=message)
print(" [x] Sent %r:%r" % (routing_key, message))
connection.close()
# filename: receive_logs_topic.py
#!/usr/bin/env python
import pika
import sys
connection = pika.BlockingConnection(pika.ConnectionParameters(
host='localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='topic_logs',
type='topic')
result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue
binding_keys = sys.argv[1:]
if not binding_keys:
sys.stderr.write("Usage: %s [binding_key]...\n" % sys.argv[0])
sys.exit(1)
for binding_key in binding_keys:
channel.queue_bind(exchange='topic_logs',
queue=queue_name,
routing_key=binding_key)
print(' [*] Waiting for logs. To exit press CTRL+C')
def callback(ch, method, properties, body):
print(" [x] %r:%r" % (method.routing_key, body))
channel.basic_consume(callback,
queue=queue_name,
no_ack=True)
channel.start_consuming()
总结
在上一篇中:
再回顾一下上面的代码。首先明确,这种情况使用的是默认exchange。对于producer,它将消息交给exchange,然后exchange通过routing key来判断要将消息交到哪个queue。实际上相当于将消息直接发送到queue中。而consumer直接指定queue的名字,也就是它直接绑定到这个queue。这个过程中exchange其实没什么存在感。
而这一篇,实际上producer只认exchange。它只负责将消息交给exchange。consumer自己搭建queue,将queue绑定到它感兴趣的exchange上。exchange决定它以什么样的形式提供服务,是fanous还是direct。