AMQP(Advanced Message Queuing Protocol, 高级消息队列协议) 是一种网络协议,它支持符合要求的客户端应用(如publisher, consumer)与消息中间件代理(brokers)进行通信。
如下图所示,Broker中的exchange接受 Publisher的消息,然后通过定义好的路由规则将消息分发给绑定的队列queue,最后broker将消息投递给订阅者或者消费者按照需要自行获取。
发布者(publisher)发布消息时可以给消费者指定各种消息属性(message meta-data)。有些属性可能被broker使用,有些则对broker透明,只能被consumer使用。
从可靠性考虑,consumers可能在处理messages时失效,基于此,AMQP中包含一个消息确认的概念(message ack):当一个message从队列中投递给consumer后,consumer会通知broker,这个可以是自动地也可以由处理消息的应用开发者执行。当“消息确认”被启动后,broker不会将messages从队列中删除,除非其收到了来自consumer的ack。
在某些情况变,例如当一个消息无法被成功路由时,消息或许会被返回给发布者并被丢弃。或者,如果broker执行延期操作,消息会被放入一个所谓的死信队列中。
exchange, queue, binding 统称为AMQP的实体(AMQP entities)。应用程序通过声明AMQP entities,定义路由来完成其需要的功能。
Exchange(交换机)
exchange从publisher那拿到message后,将其路由给零个或者多个队列。其使用哪种路由算法由exchange的类型与bindings规则决定。
exchange的类型有四种:Direct exchange(直连交换机),Fanout exchange(扇型交换机),Topic exchange(主题交换机), Header exchange(头交换机)
除了交换机的类型外,在声明交换机时还可以附带其他一些属性,如下:
1、Name (名称) 2、Durability (持久性,broker重启后,exchange是否还存在) 3、Auto-delete(当所有与之绑定的消息队列都完成后,删除exchange) 4、Arguments(依赖broker)
Direct exchange(直接交换机)
Direct exchange 是根据messages携带的路由键routing key将messages投递到对应的队列中。
Direct exchange一般用于处理messages的 单播路由(unicast routing)(也可以处理多播路由),其工作流程如下:
1、将一个队列绑定到某个exchange上,同时赋予该绑定(binding)一个路由键(routing key)(这个操作可由publisher或consumer完成)。 2、publisher将一个路由键为R的message发送给direct exchange,exchange将它路由到绑定值同为R的队列。
直连交换机经常用来循环分发任务给多个工作者(worker)。当这个时候,我们需要明白一点:在AMQP0-9-1中,消息的负载均衡是发生在消费者(consumer)之间的,而不是队列(queue)之间。一个队列可以注册多个消费者,或者注册一个独享消费者。
直连交换机的图例,如下图所示:
publisher的示例代码,如下
#!/usr/bin/env python import pika import sys connection = pika.BlockingConnection(pika.ConnectionParameters( host='192.168.99.66')) channel = connection.channel() channel.exchange_declare(exchange='images', type='direct') routing_key = sys.argv[1] if len(sys.argv) > 1 else 'archive' routing_keys = routing_key.split(' ') routing_keys = [elem for elem in routing_keys if elem] message = ' '.join(sys.argv[2:]) or "info: Hello World!" for routing_key in routing_keys: channel.basic_publish(exchange='images', routing_key=routing_key, body=message) print " [x] Sent %r to %s" % (message,routing_key) connection.close()
consumer的示例代码如下:
#!/usr/bin/env python import pika import sys connection = pika.BlockingConnection(pika.ConnectionParameters( host='192.168.99.66')) channel = connection.channel() channel.exchange_declare(exchange='images', type='direct') result = channel.queue_declare(exclusive=True) queue_name = result.method.queue binding_keys = sys.argv[1:] if not binding_keys: print >> sys.stderr, "Usage: %s [binding_keys]..." % (sys.argv[0],) exit(0) for binding_key in binding_keys: channel.queue_bind(exchange='images', queue=queue_name, routing_key=binding_key) print ' [*] Waiting for logs. To exit press CTRL+C' def callback(ch, method, properties, body): print " [x] %r" % (body,) channel.basic_consume(callback, queue=queue_name, no_ack=True) channel.start_consuming()
Funout exchange(扇型交换机)
Funout exchange 将message分发给binding到它身上的所有queues,而不理会routing keys。Funout exchange处理消息的广播路由(broadcast routing)。
Topic exchange(主题交换机)
Topic exchange通过对消息的routing_keys和队列绑定的routing_key进行 模式匹配,将messages路由给一个或者多个队列。
发送到Topic exchange 的消息的routing_key具有一定的格式,必须由.号将各词语分割开,这些单词是什么无所谓,但最好是与发送的messages相关的词。message的routing_key示例如下:
“stack.usd.nyse”,“nyse.vmw”, “vedio.av”, “images.png”。
单词的个数可以随意,但是不能超过255字节。
队列绑定的routing_key也必须拥有同样的格式。但是其可以使用两个通配符 * 与 #:
*(星号)代表一个单词 #(井号)代表任意数量的单词(零个及零个以上)
下面用图说明:
Topic exchange 背后的逻辑与Direct exchange 很像,在上述Direct exchange 的示例代码中,把交换机类型从‘direct’改为‘topic’,就是Topic exchange的示例代码了。
在Topic exchange中,若*(星号)与#(井号)者两个特殊字符没有在绑定键中出现时,此时Topic exchange就拥有 Direct exchange的行为。
Topic exchange通过用于实现message的多播路由(multicast routing)。
Headers exchange(头交换机)
有的时候路由操作会涉及多个属性,此时使用Header exchange 比routing keys更容易表达。 Headers exchange 使用多个消息属性代替路由键建立路由规则。通过判断消息头的值能否与指定的绑定相匹配来确定路由。
头交换机可以视为直连交换机的另一种表现形式。头交换机能够像直连交换机一样工作,不同之处在于头交换机的路由规则是建立在头属性值之上,而不是路由键。路由键必须是一个字符串,而头属性值则没有这个约束,它们甚至可以是整数或者哈希值(字典)等。
Default exchange(默认交换机)
Default exchange是一个由broker事先声明好的,没有名字(名字为空字符串)的direct exchange。他有一个特殊的属性:每个新建的队列都会自动绑定到default exchange上,绑定的routing_key与队列的名称相同。
Queue(队列)
Queue存储着即将被应用消费掉的消息,其与exchange共享某些属性,但其也有一些自己的属性:
Name (队列名称) Durable (持久性,broker重启后,队列依然存在) Exclusive (排他性,只被一个连接使用,连接关闭,队列删除) Auto-delete (当最后一个consumer退订后即删除) Argument (一些brokers用其来完成类似TTL的功能)
队列在声明(declare)后才能被使用。如果一个队列尚不存在,声明一个队列会创建它。如果声明的队列已经存在,并且属性完全相同,那么此次声明不会对原有队列产生任何影响。如果声明中的属性与已存在队列的属性有差异,那么一个错误代码为406的通道级异常就会被抛出。
Queue Name(队列名称)
队列的名字可以由应用(application)来取,也可以让消息代理(broker)直接生成一个。队列的名字可以是最多255字节的一个utf-8字符串。若希望AMQP消息代理生成队列名,需要给队列的name参数赋值一个空字符串:在同一个通道(channel)的后续的方法(method)中,我们可以使用空字符串来表示之前生成的队列名称。之所以之后的方法可以获取正确的队列名是因为通道可以默默地记住消息代理最后一次生成的队列名称。
以”amq.”开始的队列名称被预留做消息代理内部使用。如果试图在队列声明时打破这一规则的话,一个通道级的403 (ACCESS_REFUSED)错误会被抛出。
Queue Durable(队列持久化)
持久化队列(Durable queues)会被存储在磁盘上,当消息代理(broker)重启的时候,它依旧存在。没有被持久化的队列称作暂存队列(Transient queues)。并不是所有的场景和案例都需要将队列持久化。
持久化的队列并不会使得路由到它的消息也具有持久性。倘若消息代理挂掉了,重新启动,那么在重启的过程中持久化队列会被重新声明,无论怎样,只有经过持久化的消息才能被重新恢复
Binding (绑定)
Binding 是exchange 将message路由到queue的规则。
如果exchange ”E“将message路由给队列Q,那么Q就需要与E进行binding。binding操作需要定义一个routing_key属性给某种型号的交换机。routing_key的意义就是从发给exchange的众多messages 中选择出某些messages,将其路由给绑定的队列。
Consumer (消费者)
在AMQP 0-9-1模型中,消费者获得消息有两种方法:
1、broker将消息投递给aplication(“push API”) 2、aplication根据需要主动获取消息(“pull API”)
使用push API,应用(application)需要明确表示出它在某个特定队列里所感兴趣的,想要消费的消息。如是,我们可以说应用注册了一个消费者,或者说订阅了一个队列。一个队列可以注册多个消费者,也可以注册一个独享的消费者(当独享消费者存在时,其他消费者即被排除在外)。
每个消费者都有一个叫做消费者标签的标识符(一个字符串),它被用于退订消息。
消息确认
消费者应用确认消息有两种模式:
自动确认模式:当消息代理(broker)将消息发送给应用后立即删除。(使用AMQP方法:basic.deliver或basic.get-ok) 显式确认模式:待应用(application)发送一个确认回执(acknowledgement)后再删除消息。(使用AMQP方法:basic.ack)
如果一个消费者在尚未发送确认回执的情况下挂掉了,那AMQP代理会将消息重新投递给另一个消费者。如果当时没有可用的消费者了,消息代理会死等下一个注册到此队列的消费者,然后再次尝试投递。
拒绝消息
当一个消费者接收到某条消息后,处理过程有可能成功,有可能失败。应用可以向消息代理表明,本条消息由于“拒绝消息(Rejecting Messages)”的原因处理失败了(或者未能在此时完成)。当拒绝某条消息时,应用可以告诉消息代理如何处理这条消息——销毁它或者重新放入队列。当此队列只有一个消费者时,请确认不要由于拒绝消息并且选择了重新放入队列的行为而引起消息在同一个消费者身上无限循环的情况发生。
预取消息
在多个消费者共享一个队列的案例中,明确指定在收到下一个确认回执前每个消费者一次可以接受多少条消息是非常有用的。这可以在试图批量发布消息的时候起到简单的负载均衡和提高消息吞吐量的作用。
Message (消息)
消息属性
消息带有属性,有些常见属性,AMQP 0-9-1定义了他们,如下:
Content type(内容类型) Content encoding(内容编码) Routing key(路由键) Delivery mode (persistent or not) 投递模式(持久化 或 非持久化) Message priority(消息优先权) Message publishing timestamp(消息发布的时间戳) Expiration period(消息有效期) Publisher application id(发布应用的ID)
有些属性被broker使用,但大多数都是开发给开发者使用。有些属性可选也被称为消息头(Headers),其与HTTP协议的X-Headers很相似。
Payload(有效载荷)
Messages除了属性外,还含有有效载荷-Paylaod,其被broker当做不透明的字节数组来处理。message可以只包含属性而不含有有效载荷。它通常采用Json格式数据,但为了节省,协议缓冲器和MessagePack将数据序列化。有效载荷通常以“Content-type”和“content-encoding”标示。
Messages能够以“持久化的方式”发布,但简单地将消息发送给持久化的exchange或者持久化的队列,并不会使消息具有持久化的性质。
Messages的持久化完全取决于Messages本身的持久化模式(Persistence mode)。
当然,将messages持久化发布会带来一定的性能下降。