消息队列进阶知识
一、消息队列的选择
1.1 主要的消息队列产品
RabbitMQ
RabbitMQ 一个比较有特色的功能是支持非常灵活的路由配置,它在生产者(Producer)和队列(Queue)之间增加了一个 Exchange 模块,这个 Exchange 模块的作用和交换机也非常相似,根据配置的路由规则将生产者发出的消息分发到不同的队列中。
结合erlang语言本身的并发优势,支持很多的协议:AMQP,XMPP, SMTP, STOMP,也正是如此,使的它变的非常重量级,更适合于企业级的开发。性能较好,但是不利于做二次开发和维护。
问题:
RabbitMQ 对消息堆积的支持并不好
RabbitMQ 的性能比较差
RabbitMQ 使用的编程语言 Erlang,小众并且学习曲线非常陡峭。
RocketMQ
阿里巴巴的MQ中间件,在其多个产品下使用,并能够撑住双十一的大流量,他并没有实现JMS规范,使用起来很简单。部署由一个 命名服务(nameserver)和一个代理(broker)组成,nameserver和broker以及producer都支持集群,队列的容量受机器硬盘的限制,队列满后可以支持持久化到硬盘(也可以自己适配代码,将其持久化到NOSQL数据库中),队列满后会影响吞吐量,可以采用主备来保证稳定性,支持回溯消费,可以在broker端进行消息过滤。
问题:
作为国产的消息队列,相比国外的比较流行的同类产品,在国际上还没有那么流行,与周边生态系统的集成和兼容程度要略逊一筹。
Kafka
快速持久化,可以在O(1)的系统开销下进行消息持久化;高吞吐,在一台普通的服务器上既可以 达到10W/s的吞吐速率;完全的分布式系统,Broker、Producer、Consumer都原生自动支持分布式,自动实现复杂均衡;支持 Hadoop数据并行加载,对于像Hadoop的一样的日志数据和离线分析系统,但又要求实时处理的限制,这是一个可行的解决方案。Kafka通过 Hadoop的并行加载机制来统一了在线和离线的消息处理,这一点也是本课题所研究系统所看重的。Apache Kafka相对于ActiveMQ是一个非常轻量级的消息系统,除了性能非常好之外,还是一个工作良好的分布式系统。
Kafka 与周边生态系统的兼容性是最好的没有之一,尤其在大数据和流计算领域,几乎所有的相关开源软件系统都会优先支持 Kafka。
问题:
Kafka 异步批量的设计导致它的同步收发消息的响应时延比较高,因为当客户端发送一条消息的时候,Kafka 并不会立即发送出去,而是要等一会儿攒一批再发送,在它的 Broker 中,很多地方都会使用这种“先攒一波再一起处理”的设计。当你的业务场景中,每秒钟消息数量没有那么多的时候,Kafka 的时延反而会比较高。所以,Kafka 不太适合在线业务场景。
1.2 消息队列的选择
如果说,消息队列并不是你将要构建系统的主角之一,你对消息队列功能和性能都没有很高的要求,只需要一个开箱即用易于维护的产品,我建议你使用 RabbitMQ。
如果你的系统使用消息队列主要场景是处理在线业务,比如在交易系统中用消息队列传递订单,那 RocketMQ 的低延迟和金融级的稳定性是你需要的。
如果你需要处理海量的消息,像收集日志、监控信息或是前端的埋点这类数据,或是你的应用场景大量使用了大数据、流计算相关的开源产品,那 Kafka 是最适合你的消息队列。
二、消息模型
2.1 主题和队列
队列模型
如果有多个生产者往同一个队列里面发送消息,这个队列中可以消费到的消息,就是这些生产者生产的所有消息的合集。消息的顺序就是这些生产者发送消息的自然顺序。如果有多个消费者接收同一个队列的消息,这些消费者之间实际上是竞争的关系,每个消费者只能收到队列中的一部分消息,也就是说任何一条消息只能被其中的一个消费者收到。
发布订阅模型
在发布 - 订阅模型中,消息的发送方称为发布者(Publisher),消息的接收方称为订阅者(Subscriber),服务端存放消息的容器称为主题(Topic)。发布者将消息发送到主题中,订阅者在接收消息之前需要先“订阅主题”。“订阅”在这里既是一个动作,同时还可以认为是主题在消费时的一个逻辑副本,每份订阅中,订阅者都可以接收到主题的所有消息。
它们最大的区别其实就是,一份消息数据能不能被消费多次的问题。
2.2 不同产品的策略
RabbitMQ 的消息模型
通过Exchange 将消息发送到多个队列,实现发布订阅模型。
RocketMQ 的消息模型
几乎所有的消息队列产品都使用一种非常朴素的“请求 - 确认”机制,确保消息不会在传递过程中由于网络或服务器故障丢失。在生产端,生产者先将消息发送给服务端,也就是 Broker,服务端在收到消息并将消息写入主题或者队列中后,会给生产者发送确认的响应。如果生产者没有收到服务端的确认或者收到失败的响应,则会重新发送消息;服务端只有收到消费确认后,才认为一条消息被成功消费,否则它会给消费者重新发送这条消息,直到收到对应的消费成功确认。
每个主题在任意时刻,至多只能有一个消费者实例在进行消费,那就没法通过水平扩展消费者的数量来提升消费端总体的消费性能。为了解决这个问题,RocketMQ 在主题下面增加了队列的概念。
每个主题包含多个队列,通过多个队列来实现多实例并行生产和消费。需要注意的是,RocketMQ 只在队列上保证消息的有序性,主题层面是无法保证消息的严格顺序的。
Kafka 的消息模型
Kafka 的消息模型和 RocketMQ 是完全一样的,我刚刚讲的所有 RocketMQ 中对应的概念,和生产消费过程中的确认机制,都完全适用于 Kafka。唯一的区别是,在 Kafka 中,队列这个概念的名称不一样,Kafka 中对应的名称是“分区(Partition)”,含义和功能是没有任何区别的。
三、分布式事务
3.1 消息队列为什么需要事务
很多场景下,我们“发消息”这个过程,目的往往是通知另外一个系统或者模块去更新数据,消息队列中的“事务”,主要解决的是消息生产者和消息消费者的数据一致性问题。
订单系统创建订单后,发消息给购物车系统,将已下单的商品从购物车中删除。因为从购物车删除已下单商品这个步骤,并不是用户下单支付这个主要流程中必需的步骤,使用消息队列来异步清理购物车是更加合理的设计。
订单系统需要做的内容:
- 在订单库中插入一条订单数据,创建订单;
- 发消息给消息队列,消息的内容就是刚刚创建的订单。
在分布式系统中,上面提到的这些步骤,任何一个步骤都有可能失败,如果不做任何处理,那就有可能出现订单数据与购物车数据不一致的情况,比如说:
- 创建了订单,没有清理购物车;
- 订单没创建成功,购物车里面的商品却被清掉了。
事务要解决的问题:
创建订单和发送消息这两个步骤要么都操作成功,要么都操作失败,不允许一个成功而另一个失败的情况出现。
3.2 什么是分布式事务
应该具有 4 个属性:原子性、一致性、隔离性、持久性。这四个属性通常称为 ACID 特性。
大部分传统的单体关系型数据库都完整的实现了 ACID,但是,对于分布式系统来说,严格的实现 ACID 这四个特性几乎是不可能的,或者说实现的代价太大,大到我们无法接受。
目前大家所说的分布式事务,更多情况下,是在分布式系统中事务的不完整实现。在不同的应用场景中,有不同的实现,目的都是通过一些妥协来解决实际问题。
用消息队列来实现分布式事务的例子:
订单系统在消息队列上开启一个事务。然后订单系统给消息服务器发送一个“半消息”,这个半消息不是说消息内容不完整,它包含的内容就是完整的消息内容,半消息和普通消息的唯一区别是,在事务提交之前,对于消费者来说,这个消息是不可见的。
半消息发送成功后,订单系统就可以执行本地事务了,在订单库中创建一条订单记录,并提交订单库的数据库事务。然后根据本地事务的执行结果决定提交或者回滚事务消息。如果订单创建成功,那就提交事务消息,购物车系统就可以消费到这条消息继续后续的流程。如果订单创建失败,那就回滚事务消息,购物车系统就不会收到这条消息。这样就基本实现了“要么都成功,要么都失败”的一致性要求。
但是要是在提交事务的时候发生异常怎么办?
Kafka:直接抛出异常,让用户自行处理。我们可以在业务代码中反复重试提交,直到提交成功,或者删除之前创建的订单进行补偿。
RocketMQ:增加了事务反查的机制来解决事务消息提交失败的问题。如果 Producer 也就是订单系统,在提交或者回滚事务消息时发生网络异常,RocketMQ 的 Broker 没有收到提交或者回滚的请求,Broker 会定期去 Producer 上反查这个事务对应的本地事务的状态,然后根据反查结果决定提交或者回滚这个事务。
四、消息丢失问题
4.1 检测消息丢失
我们可以利用消息队列的有序性来验证是否有消息丢失。原理非常简单,在 Producer 端,我们给每个发出的消息附加一个连续递增的序号,然后在 Consumer 端来检查这个序号的连续性。
大多数消息队列的客户端都支持拦截器机制,你可以利用这个拦截器机制,在 Producer 发送消息之前的拦截器中将序号注入到消息中,在 Consumer 收到消息的拦截器中检测序号的连续性,这样实现的好处是消息检测的代码不会侵入到你的业务代码中,待你的系统稳定后,也方便将这部分检测的逻辑关闭或者删除。
4.2 确保消息可靠传递
- 生产阶段: 在这个阶段,从消息在 Producer 创建出来,经过网络传输发送到 Broker 端。
- 存储阶段: 在这个阶段,消息在 Broker 端存储,如果是集群,消息会在这个阶段被复制到其他的副本上。
- 消费阶段: 在这个阶段,Consumer 从 Broker 上拉取消息,经过网络传输发送到 Consumer 上。
1、生产阶段:编写发送消息代码时,需要注意,正确处理返回值或者捕获异常,就可以保证这个阶段的消息不会丢失。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 |
// 同步发送时,只要注意捕获异常即可。 try { RecordMetadata metadata = producer.send(record).get(); System.out.println(" 消息发送成功。"); } catch (Throwable e) { System.out.println(" 消息发送失败!"); System.out.println(e); } // 异步发送时,则需要在回调方法里进行检查。这个地方是需要特别注意的,很多丢消息的原因就是,我们使用了异步发送,却没有在回调中检查发送结果。 producer.send(record, (metadata, exception) -> { if (metadata != null) { System.out.println(" 消息发送成功。"); } else { System.out.println(" 消息发送失败!"); System.out.println(exception); } }); |
2、存储阶段
在存储阶段正常情况下,只要 Broker 在正常运行,就不会出现丢失消息的问题,但是如果 Broker 出现了故障,比如进程死掉了或者服务器宕机了,还是可能会丢失消息的。
通过配置 Broker 参数来避免因为宕机丢消息:
单节点Broker,收到消息将消息写入磁盘再给Producer返回确认响应。
多节点Broker,将消息发送到2个以上的节点再给客户端回复发送确认响应。
3、消费阶段
消费阶段采用和生产阶段类似的确认机制来保证消息的可靠传递,客户端从 Broker 拉取消息后,执行用户的消费业务逻辑,成功后,才会给 Broker 发送消费确认响应。如果 Broker 没有收到消费确认响应,下次拉消息的时候还会返回同一条消息,确保消息不会在网络传输过程中丢失,也不会因为客户端在执行消费逻辑中出错导致丢失。
不要在收到消息后就立即发送消费确认,而是应该在执行完所有消费业务逻辑之后,再发送消费确认。
总结:
- 在生产阶段,你需要捕获消息发送的错误,并重发消息。
- 在存储阶段,你可以通过配置刷盘和复制相关的参数,让消息写入到多个副本的磁盘上,来确保消息不会因为某个 Broker 宕机或者磁盘损坏而丢失。
- 在消费阶段,你需要在处理完全部消费业务逻辑之后,再发送消费确认。
五、消息重复问题
5.1 服务质量标准
- At most once: 至多一次。消息在传递时,最多会被送达一次。换一个说法就是,没什么消息可靠性保证,允许丢消息。一般都是一些对消息可靠性要求不太高的监控场景使用,比如每分钟上报一次机房温度数据,可以接受数据少量丢失。
- At least once: 至少一次。消息在传递时,至少会被送达一次。也就是说,不允许丢消息,但是允许有少量重复消息出现。
- Exactly once:恰好一次。消息在传递时,只会被送达一次,不允许丢失也不允许重复,这个是最高的等级。
我们现在常用的绝大部分消息队列提供的服务质量都是 At least once,包括 RocketMQ、RabbitMQ 和 Kafka 都是这样。也就是说,消息队列很难保证消息不重复。
5.2 幂等性解决重复
一般解决重复消息的办法是,在消费端,让我们消费消息的操作具备幂等性。
如果一个函数 f(x) 满足:f(f(x)) = f(x),则函数 f(x) 满足幂等性。
一个幂等操作的特点是,其任意多次执行所产生的影响均与一次执行的影响相同。
将账户 X 的余额设置为 100 元(幂等)
将账户 X 的余额增加 100 元(不幂等)
At least once + 幂等消费 = Exactly once。
5.3 幂等性设计方案
1. 利用数据库的唯一约束实现幂等
将账户 X 的余额加 100 元。在这个例子中,我们可以通过改造业务逻辑,让它具备幂等性。
限定每个转账单每个账户只可以执行一次变更操作:在数据库中建一张转账流水表,这个表有三个字段:转账单 ID、账户 ID 和变更金额,然后给转账单 ID 和账户 ID 这两个字段联合起来创建一个唯一约束,这样对于相同的转账单 ID 和账户 ID,表里至多只能存在一条记录。
消费消息的逻辑可以变为:“在转账流水表中增加一条转账记录,然后再根据转账记录,异步操作更新用户余额即可。”在转账流水表增加一条转账记录这个操作中,由于我们在这个表中预先定义了“账户 ID 转账单 ID”的唯一约束,对于同一个转账单同一个账户只能插入一条记录,后续重复的插入操作都会失败,这样就实现了一个幂等的操作。
2. 为更新的数据设置前置条件
给数据变更设置一个前置条件,如果满足条件就更新数据,否则拒绝更新数据,在更新数据的时候,同时变更前置条件中需要判断的数据。这样,重复执行这个操作时,由于第一次更新数据的时候已经变更了前置条件中需要判断的数据,不满足前置条件,则不会重复执行更新数据操作。
将账户 X 的余额增加 100 元 —— 如果账户 X 当前的余额为 500 元,将余额加 100 元(幂等)
更加通用的方法是,给你的数据增加一个版本号属性,每次更数据前,比较当前数据的版本号是否和消息中的版本号一致,如果不一致就拒绝更新数据,更新数据的同时将版本号 +1,一样可以实现幂等更新。
3. 记录并检查操作
在执行数据更新操作之前,先检查一下是否执行过这个更新操作。
在发送消息时,给每条消息指定一个全局唯一的 ID,消费时,先根据这个 ID 检查这条消息是否有被消费过,如果没有消费过,才更新数据,然后将消费状态置为已消费。
在分布式系统中,这个方法其实是非常难实现的。首先,给每个消息指定一个全局唯一的 ID 就是一件不那么简单的事儿,方法有很多,但都不太好同时满足简单、高可用和高性能,或多或少都要有些牺牲。更加麻烦的是,在“检查消费状态,然后更新数据并且设置消费状态”中,三个操作必须作为一组操作保证原子性,才能真正实现幂等,否则就会出现 Bug。
比如说,对于同一条消息:“全局 ID 为 8,操作为:给 ID 为 666 账户增加 100 元”,有可能出现这样的情况:
- t0 时刻:Consumer A 收到条消息,检查消息执行状态,发现消息未处理过,开始执行“账户增加 100 元”;
- t1 时刻:Consumer B 收到条消息,检查消息执行状态,发现消息未处理过,因为这个时刻,Consumer A 还未来得及更新消息执行状态。
这样就会导致账户被错误地增加了两次 100 元,这是一个在分布式系统中非常容易犯的错误,一定要引以为戒。
六、消息积压问题
6.1 优化性能避免消息积压
消息队列本身的处理能力要远大于业务系统的处理能力。主流消息队列的单个节点,消息收发的性能可以达到每秒钟处理几万至几十万条消息的水平,还可以通过水平扩展 Broker 的实例数成倍地提升处理能力。而一般的业务系统需要处理的业务逻辑远比消息队列要复杂,单个节点每秒钟可以处理几百到几千次请求,已经可以算是性能非常好的了。对于消息队列的性能优化,我们更关注的是,在消息的收发两端,我们的业务代码怎么和消息队列配合,达到一个最佳的性能。
发送端性能优化
如果说,你的代码发送消息的性能上不去,你需要优先检查一下,是不是发消息之前的业务逻辑耗时太多导致的。
Producer 发送消息的过程,Producer 发消息给 Broker,Broker 收到消息后返回确认响应,这是一次完整的交互。假设这一次交互的平均时延是 1ms,我们把这 1ms 的时间分解开,它包括了下面这些步骤的耗时:
- 发送端准备数据、序列化消息、构造请求等逻辑的时间,也就是发送端在发送网络请求之前的耗时;
- 发送消息和返回响应在网络传输中的耗时;
- Broker 处理消息的时延。
如果之前是单线程发送,每次发送一条消息。无论是增加每次发送消息的批量大小,还是增加并发,都能成倍地提升发送性能。
消息发送端是一个微服务:所有 RPC 框架都是多线程支持多并发的,自然也就实现了并行发送消息。并且在线业务比较在意的是请求响应时延,选择批量发送必然会影响 RPC 服务的时延。这种情况,比较明智的方式就是通过并发来提升发送性能。
统是一个离线分析系统:它不关心时延,更注重整个系统的吞吐量。发送端的数据都是来自于数据库,这种情况就更适合批量发送,同样用少量的并发就可以获得非常高的吞吐量。
消费端性能优化
大部分的性能问题都出现在消费端,如果消费的速度跟不上发送端生产消息的速度,就会造成消息积压。要是消费速度一直比生产速度慢,时间长了,整个系统就会出现问题,要么,消息队列的存储被填满无法提供服务,要么消息丢失,这对于整个系统来说都是严重故障。
在设计系统的时候,一定要保证消费端的消费性能要高于生产端的发送性能,这样的系统才能健康的持续运行。
在扩容 Consumer 的实例数量的同时,必须同步扩容主题中的分区(也叫队列)数量,确保 Consumer 的实例数和分区数量是相等的。
6.2 优化性能避免消息积压
能导致积压突然增加,最粗粒度的原因,只有两种:要么是发送变快了,要么是消费变慢了。
要是短时间内不太可能优化消费端的代码来提升消费性能,唯一的方法是通过扩容消费端的实例数来提升总体的消费能力。如果短时间内没有足够的服务器资源进行扩容,没办法的办法是,将系统降级,通过关闭一些不重要的业务,减少发送方发送的数据量,最低限度让系统还能正常运转,服务一些重要业务。
若是你通过监控发现,无论是发送消息的速度还是消费消息的速度和原来都没什么变化,这时候你需要检查一下你的消费端,是不是消费失败导致的一条消息反复消费这种情况比较多,这种情况也会拖慢整个系统的消费速度。
七、异步设计
异步是一种程序设计的思想,使用异步模式设计的程序可以显著减少线程等待,从而在高吞吐量的场景中,极大提升系统的整体性能,显著降低时延。
7.1 同步实现的问题
实现一个转账的微服务 Transfer( accountFrom, accountTo, amount),这个服务有三个参数:分别是转出账户、转入账户和转账金额。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 |
Transfer(accountFrom, accountTo, amount) { // 先从 accountFrom 的账户中减去相应的钱数 Add(accountFrom, -1 * amount) // 再把减去的钱数加到 accountTo 的账户中 Add(accountTo, amount) return OK } |
假设微服务 Add 的平均响应时延是 50ms,微服务 Transfer 的平均响应时延100ms。每处理一个请求需要耗时 100ms,并在这 100ms 过程中是需要独占一个线程的,那么可以得出这样一个结论:每个线程每秒钟最多可以处理 10 个请求。每台服务器的线程数不是无限的,如果请求速度超过这个值,那么请求就不能被马上处理,只能阻塞或者排队,这时候 Transfer 服务的响应时延由 100ms 延长到了:排队的等待时延 + 处理时延 (100ms)。
但是服务器的CPU/RAM/IO并没有极限,绝大部分线程都在等待 Add 服务返回结果。采用同步实现的方式,整个服务器的所有线程大部分时间都没有在工作,而是都在等待。
7.2 采用异步实现解决等待问题
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 |
TransferAsync(accountFrom, accountTo, amount, OnComplete()) { // 异步从 accountFrom 的账户中减去相应的钱数,然后调用 OnDebit 方法。 AddAsync(accountFrom, -1 * amount, OnDebit(accountTo, amount, OnAllDone(OnComplete()))) } // 扣减账户 accountFrom 完成后调用 OnDebit(accountTo, amount, OnAllDone(OnComplete())) { // 再异步把减去的钱数加到 accountTo 的账户中,然后执行 OnAllDone 方法 AddAsync(accountTo, amount, OnAllDone(OnComplete())) } // 转入账户 accountTo 完成后调用 OnAllDone(OnComplete()) { OnComplete() } |
- OnDebit():扣减账户 accountFrom 完成后调用的回调方法;
- OnAllDone():转入账户 accountTo 完成后调用的回调方法。
整个异步实现的语义相当于:
- 异步从 accountFrom 的账户中减去相应的钱数,然后调用 OnDebit 方法;
- 在 OnDebit 方法中,异步把减去的钱数加到 accountTo 的账户中,然后执行 OnAllDone 方法;
- 在 OnAllDone 方法中,调用 OnComplete 方法。
和同步调用的区别只是在线程模型上由同步顺序调用改为了异步调用和回调的机制。
由于流程的时序和同步实现是一样,在低请求数量的场景下,平均响应时延一样是 100ms。在超高请求数量场景下,异步的实现不再需要线程等待执行结果,只需要个位数量的线程,即可实现同步场景大量线程一样的吞吐量。
由于没有了线程的数量的限制,总体吞吐量上限会大大超过同步实现,并且在服务器 CPU、网络带宽资源达到极限之前,响应时延不会随着请求数量增加而显著升高,几乎可以一直保持约 100ms 的平均响应时延。
异步思想就是,当我们要执行一项比较耗时的操作时,不去等待操作结束,而是给这个操作一个命令:“当操作完成后,接下来去执行什么。”