一、为什么使用RabbitMQ
在分布式系统中,各个组件之间的通信是一个关键问题。RabbitMQ作为一个基于 AMQP 标准开发的消息中间件,可以很好地解决这个问题。它可以帮助我们实现应用程序的解耦、异步通信、流量削峰等。
二、RabbitMQ的基本概念
- AMQP:即 Advanced Message Queuing Protocol(高级消息队列协议),是一个网络协议,是应用层协议的一个开放标准,为面向消息的中间件设计。基于此协议的客户端与消息中间件可传递消息,并不受客户端/中间件不同产品,不同的开发语言等条件的限制。2006年,AMQP 规范发布。
- Broker(RabbitMQ Server):接收和分发消息的应用,RabbitMQ Server就是 Message Broker。
- Virtual host(虚拟主机):出于多租户和安全因素设计的,把 AMQP 的基本组件划分到一个虚拟的分组中,类似于网络中的 namespace 概念。当多个不同的用户使用同一个 RabbitMQ server 提供的服务时,可以划分出多个vhost,每个用户在自己的 vhost 创建 exchange/queue 等,每一个虚拟主机都有AMQP的全套基础组件,并且可以针对每个虚拟主机进行权限以及数据分配,并且不同虚拟主机之间是完全隔离的。
- Connection(连接):客户端与RabbitMQ进行交互,首先就需要建立一个TPC连接。
- Channel(信道):客户端与RabbitMQ建立了连接,就会分配一个AMQP信道 Channel。每个信道都会被分配一个唯一的ID。对于操作系统来说,建立和销毁TCP连接是非常大的开销,在系统访问流量高峰时,会严重影响系统性能,RabbitMQ为了减少性能开销,会在一个Connection中建立多个Channel,这样便于客户端进行多线程连接,这些连接会复用同一个Connection的TCP通道,提高性能。
- Producer(生产者):发送消息的一方。
- Consumer(消费者):接收消息的一方。
- Queue(队列):用于存储消息的缓冲区,队列是实际保存数据的最小单位。队列结构天生就具有FIFO的顺序。
- Exchange(交换机):用于接收生产者发送的消息,并根据路由规则将消息发送到相应的队列。消息发送到RabbitMQ中后,会首先进入一个交换机,然后由交换机负责将数据转发到不同的队列中。RabbitMQ中有多种不同类型的交换机来支持不同的路由策略。
- Direct Exchange
- Topic Exchange
- Fanout Exchange
- Headers Exchange
- Binding(绑定):定义了交换机和队列之间的关联关系,以及消息如何路由到队列的规则。
- RoutingKey(路由键):生产者发送消息时指定的一个键,用于交换机根据这个键来决定如何路由消息。
三、RabbitMQ的基本使用
-
创建连接和通道:首先,我们需要带上认证信息(用户名、密码、VirtualHost)创建一个到RabbitMQ服务器的连接,并在这个连接上创建一个通道。通道是进行消息发送和接收的主要接口。
-
声明交换机和队列:在生产者发送消息之前,需要先声明一个交换机和队列,并定义它们之间的绑定关系。RabbitMQ支持多种类型的交换机,如直连交换机、主题交换机等。队列也需要声明,以便RabbitMQ能够为其分配资源。在 Rabbit MQ 中,无论是生产者发送消息还是消费者接受消息,都首先需要声明一个 MessageQueue。这就存在一个问题,是生产者声明还是消费者声明呢?要解决这个问题,首先需要明确:
- 消费者是无法订阅或者获取不存在的 MessageQueue 中信息。
- 消息被 Exchange 接受以后,如果没有匹配的 Queue,则会被丢弃。
在明白了上述两点以后,就容易理解如果是消费者去声明 Queue,就有可能会出现在声明 Queue 之前,生产者已发送的消息被丢弃的隐患。如果应用能够通过消息重发的机制允许消息丢失,则使用此方案没有任何问题。但是如果不能接受该方案,这就需要无论是生产者还是消费者,在发送或者接受消息前,都需要去尝试建立消息队列。这里有一点需要明确,如果客户端尝试建立一个已经存在的消息队列,Rabbit MQ 不会做任何事情,并返回客户端建立成功的。
如果一个消费者在一个信道中正在监听某一个队列的消息,Rabbit MQ 是不允许该消费者在同一个 channel 去声明其他队列的。Rabbit MQ 中,可以通过 queue.declare 命令声明一个队列,可以设置该队列以下属性:
- Exclusive:排他队列,如果一个队列被声明为排他队列,该队列仅对首次声明它的连接可见,并在连接断开时自动删除。这里需要注意三点:其一,排他队列是基于连接可见的,同一连接的不同信道是可以同时访问同一个连接创建的排他队列的。其二,“首次”,如果一个连接已经声明了一个排他队列,其他连接是不允许建立同名的排他队列的,这个与普通队列不同。其三,即使该队列是持久化的,一旦连接关闭或者客户端退出,该排他队列都会被自动删除的。这种队列适用于只限于一个客户端发送读取消息的应用场景。
- Auto-delete: 自动删除,如果该队列没有任何订阅的消费者的话,该队列会被自动删除。这种队列适用于临时队列。
- Durable: 持久化,这个会在后面作为专门一个章节讨论。
- 其他选项,例如如果用户仅仅想查询某一个队列是否已存在,如果不存在,不想建立该队列,仍然可以调用 queue.declare,只不过需要将参数 passive 设为 true,传给 queue.declare,如果该队列已存在,则会返回 true;如果不存在,则会返回 Error,但是不会创建新的队列。
-
发送消息:生产者通过通道发送消息到交换机,并指定一个RoutingKey。交换机根据这个Key和绑定关系来决定将消息发送到哪个队列。 生产者在发送消息时,都需要指定一个 RoutingKey 和 Exchange,Exchange 在接到该 RoutingKey 以后,会判断该 ExchangeType:
- 如果是 Direct 类型,则会将消息中的 RoutingKey 与该 Exchange 关联的所有 Binding 中的 BindingKey 进行比较,如果相等,则发送到该 Binding 对应的 Queue 中。
- 如果是 Fanout 类型,则会将消息发送给所有与该 Exchange 定义过 Binding 的所有 Queues 中去,其实是一种广播行为。
- 如果是 Topic 类型,则会按照正则表达式,对 RoutingKey 与 BindingKey 进行匹配,如果匹配成功,则发送到对应的 Queue 中。
-
接收消息:消费者通过订阅队列来接收消息。当队列中有消息时,RabbitMQ会将消息推送给消费者。消费者处理完消息后,需要向RabbitMQ发送一个确认消息,表示这条消息已经被正确处理。在 RabbitMQ 中消费者有 2 种方式获取队列中的消息:
- 一种是通过 basic.consume 命令,订阅某一个队列中的消息,channel 会自动在处理完上一条消息之后,接收下一条消息。(同一个 channel 消息处理是串行的)。除非关闭 channel 或者取消订阅,否则客户端将会一直接收队列的消息。
- 另外一种方式是通过 basic.get 命令主动获取队列中的消息,但是绝对不可以通过循环调用 basic.get 来代替 basic.consume,这是因为 basic.get RabbitMQ 在实际执行的时候,是首先 consume 某一个队列,然后检索第一条消息,然后再取消订阅。如果是高吞吐率的消费者,最好还是建议使用 basic.consume。
如果有多个消费者同时订阅同一个队列的话,RabbitMQ 是采用循环的方式分发消息的,每一条消息只能被一个订阅者接收。例如,有队列 Queue,其中 ClientA 和 ClientB 都 Consume 了该队列,MessageA 到达队列后,被分派到 ClientA,ClientA 服务器收到响应,服务器删除 MessageA;再有一条消息 MessageB 抵达队列,服务器根据 “循环推送” 原则,将消息会发给 ClientB,然后收到 ClientB 的确认后,删除 MessageB;等到再下一条消息时,服务器会再将消息发送给 ClientA。
这里我们可以看出,消费者再接到消息以后,都需要给服务器发送一条确认命令,这个即可以在 handleDelivery 里显示的调用 basic.ack 实现,也可以在 Consume 某个队列的时候,设置 autoACK 属性为 true 实现。这个 ACK 仅仅是通知服务器可以安全的删除该消息,而不是通知生产者,与 RPC 不同。 如果消费者在接到消息以后还没来得及返回 ACK 就断开了连接,消息服务器会重传该消息给下一个订阅者,如果没有订阅者就会存储该消息。
既然 RabbitMQ 提供了 ACK 某一个消息的命令,当然也提供了 Reject 某一个消息的命令。当客户端发生错误,调用 basic.reject 命令拒绝某一个消息时,可以设置一个 requeue 的属性,如果为 true,则消息服务器会重传该消息给下一个订阅者;如果为 false,则会直接删除该消息。当然,也可以通过 ack,让消息服务器直接删除该消息并且不会重传。