首页 ¦ 归档

OpenStack源码分析之RabbitMQ(二)

作者:ZZR

fanout exchange

在上一篇中,task_queue所解决的问题是,一个message只能被一种consumer所接收。现在我们有了新的需求,我们需要一个日志系统,我们希望有两种consumer,一种consumer将日志输出到屏幕,另一种consumer写到disk。为了实现这个目的,我们希望message被投到两个queue中,交给不同的consumer进行处理。如下所示:

two-consumer

对于producer,不再指定哪一个queue来接收消息,而是哪一个exchange来接收消息。exchange不保存信息,如果没有queue绑定在这个exchange上的话,消息就丢失了。代码如下:

# filename: emit_log.py
# !/usr/bin/env python
import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(
    host='localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='logs',
                         type='fanout')
message = ' '.join(sys.argv[1:]) or "info: Hello World!"
channel.basic_publish(exchange='logs',
                      routing_key='',
                      body=message)
print(" [x] Sent %r" % message)
connection.close()

对于consumer,自己创建临时queue(exclusive=True,当这个consumer终止,这个queue就销毁),将这个queue接到exchange上,然后通过这个queue接收exchange发出的消息:

# filename: receive_logs.py
# !/usr/bin/env python
import pika

def callback(ch, method, properties, body):
    print(" [x] %r" % body)

connection = pika.BlockingConnection(pika.ConnectionParameters(
    host='localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='logs',
                         type='fanout')
result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue
channel.queue_bind(exchange='logs',
                   queue=queue_name)

print(' [*] Waiting for logs. To exit press CTRL+C')
channel.basic_consume(callback,
                      queue=queue_name,
                      no_ack=True)
channel.start_consuming()

如下图所示,每一个consumer都建了自己的临时queue,并与exchange进行了绑定: fanout-exchange-list-queues

direct exchange

上面的例子中,queue_bind()的时候我们没有指定routing_key(为了避免混淆,后续将其称为binding_key)。binding_key的功能与exchange的类型有关。对于fanout exchange而言,binding_key没有意义。对于direct exchange而言,如下图所示,只有消息的routing_type与queue的binding_key相同时才会发送到该queue: direct-exchange

可以指定相同的binding_key,如下图所示: direct-exchange-multiple

借此,我们可以将日志系统改造为以下模式,不同的consumer只接收特定类型的日志信息: python-four

完整代码如下:

# filename: emit_log.py
#!/usr/bin/env python
import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='direct_logs',
                         type='direct')

severity = sys.argv[1] if len(sys.argv) > 1 else 'info'
message = ' '.join(sys.argv[2:]) or 'Hello World!'
channel.basic_publish(exchange='direct_logs',
                      routing_key=severity,
                      body=message)
print(" [x] Sent %r:%r" % (severity, message))
connection.close()
# filename: receiver_logs.py
#!/usr/bin/env python
import pika
import sys

def callback(ch, method, properties, body):
    print(" [x] %r:%r" % (method.routing_key, body))

connection = pika.BlockingConnection(pika.ConnectionParameters(
    host='localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='direct_logs',
                         type='direct')
result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue

severities = sys.argv[1:]
if not severities:
    sys.stderr.write("Usage: %s [info] [warning] [error]\n" % sys.argv[0])
    sys.exit(1)
for severity in severities:
    channel.queue_bind(exchange='direct_logs',
                       queue=queue_name,
                       routing_key=severity)

print(' [*] Waiting for logs. To exit press CTRL+C')
channel.basic_consume(callback,
                      queue=queue_name,
                      no_ack=True)
channel.start_consuming()

topic exchange

topic exchange与direct exchange类似,只是它允许binding_key使用特殊字符(注意,特殊字符代表的是单词,不是字母): - *:代表一个单词 - #:代表零个或多个单词

比如下面这个例子。routing_key定义为"<celerity>.<colour>.<species>"。举几个routing_key匹配的例子: - quick.orange.rabbit:一,二 - lazy.orange.elephant:一,二 - quick.orange.fox:一 - lazy.brown.fox:三 - lazy.pink.rabbit:二,三(但只发一次,因为二和三是同一个队列) - quick.brown.fox:无,舍弃 - orange:无,舍弃 - quick.orange.male.rabbit:无,舍弃 - lazy.orange.male.rabbit:三

python-five

当binding_key不使用这些特殊字符时,topic exchange其实就是direct exchange;当binding_key使用#时,topic exchange其实就是fanout exchange,也就是所有消息都接收。

一些极端的例子: binding_key:*,routing_key:空串。不匹配。 binding_key:#.*,routing_key:..。匹配。 binding_key:#.*,routing_key:apple。匹配。

完整代码:

# filename: emit_log_topic.py
#!/usr/bin/env python
import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='topic_logs',
                         type='topic')

routing_key = sys.argv[1] if len(sys.argv) > 1 else 'anonymous.info'
message = ' '.join(sys.argv[2:]) or 'Hello World!'
channel.basic_publish(exchange='topic_logs',
                      routing_key=routing_key,
                      body=message)
print(" [x] Sent %r:%r" % (routing_key, message))
connection.close()
# filename: receive_logs_topic.py
#!/usr/bin/env python
import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='topic_logs',
                         type='topic')

result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue

binding_keys = sys.argv[1:]
if not binding_keys:
    sys.stderr.write("Usage: %s [binding_key]...\n" % sys.argv[0])
    sys.exit(1)

for binding_key in binding_keys:
    channel.queue_bind(exchange='topic_logs',
                       queue=queue_name,
                       routing_key=binding_key)

print(' [*] Waiting for logs. To exit press CTRL+C')

def callback(ch, method, properties, body):
    print(" [x] %r:%r" % (method.routing_key, body))

channel.basic_consume(callback,
                      queue=queue_name,
                      no_ack=True)

channel.start_consuming()

总结

在上一篇中:

再回顾一下上面的代码。首先明确,这种情况使用的是默认exchange。对于producer,它将消息交给exchange,然后exchange通过routing key来判断要将消息交到哪个queue。实际上相当于将消息直接发送到queue中。而consumer直接指定queue的名字,也就是它直接绑定到这个queue。这个过程中exchange其实没什么存在感。

而这一篇,实际上producer只认exchange。它只负责将消息交给exchange。consumer自己搭建queue,将queue绑定到它感兴趣的exchange上。exchange决定它以什么样的形式提供服务,是fanous还是direct。

© PyCN技术评论. Built using Pelican. Theme by Giulio Fidente on github.