MQ消息队列知识小结
1. MQ解决的问题
- 解耦
- 不用MQ:要针对各业务方开发,要考虑中断、超时、重试
- 使用MQ:消息中间件类似一个代理,各业务方的通讯协调均由MQ负责,MQ来实现超时、重试机制
- 异步
- 不用MQ:长活同步事务,高延时,体验极差
- 使用MQ:分步异步执行,防止同步阻塞
- 削峰
- 不用MQ:请求并发,服务器或数据库压力过大,导致宕机
- 使用MQ:请求并发转串行,MQ来承担压力,平缓输出给服务器或数据库
2. 引入MQ产生的问题
- 可用性
- MQ挂了,整个系统都不能用
- 复杂性
- 消息重复:多次生产、多次消费
- 消息乱序:新数据被旧数据覆盖
- 消息丢失:漏数据、磁盘满了丢数据
- 一致性
- 分布式一致性问题,异步的多个事务没有最终都执行或都不执行
3. 常用MQ对比
4. RabbitMQ
4.1 架构模型:AMQP基本概念
- Server:又称 broker,接受客户端连接,实现 AMQP 实体服务。
- Connection:连接和具体 broker 网络连接。
- Channel:网络信道,几乎所有操作都在 channel 中进行,channel 是消息读写的通道。客户端可以建立多个 channel,每个 channel 表示一个会话任务。
- message:消息,服务器和应用程序之间传递的数据,由 properties 和 body 组成。properties 可以对消息进行修饰,比如消息的优先级,延迟等高级特性;body 是消息实体内容。
- Virtual host:虚拟主机,用于逻辑隔离,最上层消息的路由。一个 Virtual host 可以若干个 Exchange 和 Queue,同一个 Virtual host不能有同名的 Exchange 或 Queue。
- Exchange:交换机,接受消息,根据路由键转发消息到绑定的队列上。
- Banding:Exchange 和 Queue 之间的虚拟连接,binding 中可以包括 routing key。
- Routing key:一个路由规则,虚拟机根据他来确定如何路由 一条消息。
- Queue:消息队列,用来存放消息的队列。
4.2 消息模型:Exchange
- Direct Exchange,所有发送到 Direct Exchange 的消息被转发到 Routing Key中指定的 Queue, Direct Exchange 可以使用默认的默认的 Exchange (default Exchange),默认的 Exchange 会绑定所有的队列,所以 Direct 可以直接使用 Queue 名(作为routing key )绑定。或者消费者和生产者的 routing key 完全匹配。
- Topic Exchange,是指发送到 Topic Exchange 的消息被转发到所有关心的 Routing key 中指定 topic 的 Queue 上。Exchange 将 routing key 和某 Topic 进行模糊匹配,此时队列需要绑定一个 Topic。所谓模糊匹配就是可以使用通配符,“#”可以匹配一个或多个词,“ ”只匹配一个词。比如“log.#”可以匹配“log.info.test”,”log. “就只能匹配 log.error。
- Fanout Exchange,不处理路由键,只需简单的将队列绑定到交换机上。发送到该交换机上的消息都会被发送到与该交换机绑定的队列上。Fanout 转发是最快的。
4.3 高可用
4.3.1 Cluster集群模式
https://www.rabbitmq.com/cluster-formation.html#peer-discovery-dns
- Peer Discovery 对等发现:要形成集群,新的(“空白”)节点需要能够发现它们的对等节点。 这可以使用各种机制(后端)来完成。 一些机制假设所有集群成员都是提前知道的(例如,在配置文件中列出),其他机制是动态的(节点可以来来去去)。
- 配置文件
- DNS
- AWS
- K8S
- Consul
- etcd
4.3.2 普通集群模式
某一个 Queue 是在集群中的某一个 Broker 上,各个 Broker 会同步元数据,但不会同步 Queue 的消息数据
扩充 Broker 可以容纳更多的 Queue,提高吞吐量
没有达到高可用,扩展性较好
4.3.3 镜像集群模式
一个 Broker 中 Queue 的元数据和消息数据都会同步到其他 Broker 上,就是做了全量备份,所以称为 “镜像模式”
一个 Queue 的数据是全量存在 Broker 中的,所以 Queue 的消息容量、消息处理能力,都受限于 Broker
实现了高可用,但扩展性差
4.3.4 Federation联邦插件
http://linyishui.top/2020101001.html
Federation 插件的设计目标是使 RabbitMQ 在不同的 Broker 节点之间进行消息传递而无须建立集群
- Federation插件能够在不同管理域(可能设置了不同的用户和 vhost ,也可能运行在不同版本的 RabbitMQ Erlang 上)中的 Broker 或者集群之间传递消息
- Federation 插件基于 AMQP 0-9-1 协议在不同的 Broker 之间进行通信,并设计成能够容忍不稳定的网络连接情况
- 一个 Broker节点中可以同时存在联邦交换器(或队列)或者本地交换器(或队列),只需要对特定的交换器(或队列)创建 Federation 连接(Federation link )
- Federation 需要在 Broker 节点之间创建 O(N^2^)个连接(尽管这是最简单的使用方式),这也就意味着 Federation 在使用时更容易扩展
Federation 插件可以让多个交换器或者多个队列进行联邦。一个联邦交换器(federated exchange)或者一个联邦队列(federated queue)接收上游(upstream)的消息,这里的上游是指位于其他 Broker 上的交换器或者队列
- 联邦交换器能够将原本发送给上游交换器(upstream exchange)的消息路由到本地的某个队列中
- 联邦队列则允许一个本地消费者接收到来自上游队列(upstream queue)的消息
联邦交换器
Federation优化服务器通信网络延迟问题
联邦队列
可以在多个 Broker 节点(或者集群)之间为单个队列提供均衡负载的功能。一个联邦队列可以连接一个或者多个上游队列(upstream queue),并从这些上游队列中获取消息以满足本地消费者消费消息的需求
当有消费者 ClientA 连接 broker2 并通过 Basic.Consume 消费队列 queue1 (或 queue2 )中的消息时
如果此时队列已有消息堆积就可以直接被消费,且 broker2 的队列不会拉取 broker1 的队列中的消息
- 如果此时队列没有消息,会通过 Federation link 拉取在 broker1 的上游队列中的消息,然后存储到本地,再被 ClientA 消费
互为联邦队列
既可以消费联邦队列,又可以消费上游队列,这种分布式队列的部署可以提高单个队列的容量。如果上游一端部署的消费者来不及消费上游队列的消息,下游的消费者可以帮其分担消费,有一定的负载均衡的效果。
与联邦交换器不同的是,一条消息可以在联邦队列间转发无限次,因为队列可以互为联邦队列:消息会转向有多余消费能力的一方,所以可能会导致消费在队列间来回转发。
4.3.5 Shovel铲子插件
- Shovel 能够可靠、持续地从一个 Broker 中的队列拉取数据并转发至另一个 Broker 中的交换器
- 作为源端的队列和作为目的端的交换器可以同时位于同一个 Broker ,也可以位于不同的 Broker 上
- 优点
松耦合: Shovel 可以移动位于不同管理域中的 Broker(或者集群)上的消息,这些 Broker(或者集群)可以包含不同的用户和 vhost ,也可以使用不同的 RabbitMQ Erlang 版本。
支持广域网:Shovel 插件同样基于 AMQP 协议在 Broker 之间进行通信,被设计成可以容忍时断时续的连通情形,并且能够保证消息的可靠性。
高度定制:当 Shovel 成功连接后,可以对其进行配置以执行相关的 AMQP 命令。
4.3.6 对比
Cluster:通常使用集群的部署方式来提高可靠性和吞吐量,不过集群只能部署在局域网内。
Federation:Federation 可以通过 AMQP 协议(可配置 SSL)让原本发送到某个 Broker (或集群)中的交换器(或队列)上的消息能够转发到另一个 Broker (或集群)中的交换器(或队列)上,两方的交换器(或队列)看起来是以一种“联邦”的形式在运作。当然必须要确保这些“联邦”的交换器或者队列都具备合适的用户和权限。一般用于异地多活。
Shovel:概念上 Federation 的情形类似,不过 Shovel 工作在更低一层。鉴于 Federation 从一个交换器中转发消息到另一个交换器(如果必要可以确认消息是否被转发), Shovel 只是简单地从某个 Broker 上的队列中消费消息,然后转发消息到 Broker 上的交换器而已。
5. Kafka
5.1 架构模型
kafka通过Zookeeper管理集群配置、选举leader、consumer group发生变化时进行rebalance
5.2 消息模型
Apache Kafka 不是消息中间件的一种实现。相反,它只是一种分布式流式系统。
不同于基于队列和交换器的 RabbitMQ,Kafka 的存储层是使用分区事务日志来实现的。
Kafka 没有实现队列这种东西。相应的,Kafka 按照类别存储记录集,并且把这种类别称为主题。
Kafka 为每个主题维护一个消息分区日志。每个分区都是由有序的不可变的记录序列组成,并且消息都是连续的被追加在尾部。
Kafka 消费者使用pull模式
- push模式下,当broker的推送速度大于消费者的消费速度,消费者可能崩溃。如果推送速度慢,又导致消费效率低
- pull模式可以自主决定是否批量从broker拉取数据。但如果broker没有数据时,导致消费者不停的在空轮询
- Kafka可以设置参数,让消费者知道阻塞消息的数量,达到阈值后开始消费
5.3 高可用
Kafka 把 Topic(主题/队列)分为了多个 Partition(分区),Topic 只是逻辑概念,Partition 才是实际的消息存储单元。
一个 Topic 的多个 Partition 分散在多个 Broker 中,每个 Partition 存放 Topic 的一部分数据。
Partition 的多个副本分为两种角色,Leader 和 Follower。
- Leader、Follower是逻辑概念,一台物理机器可以同时具备两种角色。
- Leader 是由 Kafka 选举出来的,负责处理消息的读写。Leader 收到新消息后,会同步给 Follower。
- Follower 的作用是候选人,当 Leader 出事儿之后,Kafka 会从 Follower 中选举出新的 Leader。
可以配置消息写入完成的标准
写入 Leader 既可:速度快,但可能会有消息丢失,例如在同步到 Follower 之前 Broker 故障了,则消息丢失。
写入 Follower 成功之后才算写入成功 :消息可靠性极高,但影响写入速度。
6. RocketMQ
6.1 架构模型
NameServer 集群:存放元数据。是一个几乎无状态节点,可集群部署,在消息队列RocketMQ版中提供命名服务,更新和发现Broker服务。
Broker 集群:存放队列数据。消息中转角色,负责存储消息,转发消息。分为Master Broker和Slave Broker,一个Master Broker可以对应多个Slave Broker,但是一个Slave Broker只能对应一个Master Broker。Broker启动后需要完成一次将自己注册至Name Server的操作;随后每隔30s定期向Name Server上报Topic路由信息。是物理概念每台机器要么是Master,要么是Slave。
生产者:与Name Server集群中的其中一个节点(随机)建立长连接(Keep-alive),定期从Name Server读取Topic路由信息,并向提供Topic服务的Master Broker建立长连接,且定时向Master Broker发送心跳。
消费者:与Name Server集群中的其中一个节点(随机)建立长连接,定期从Name Server拉取Topic路由信息,并向提供Topic服务的Master Broker、Slave Broker建立长连接,且定时向Master Broker、Slave Broker发送心跳。Consumer既可以从Master Broker订阅消息,也可以从Slave Broker订阅消息,订阅规则由Broker配置决定。
6.2 消息模型
Topic:消息主题,一级消息类型,生产者向其发送消息。每个Topic包含一个或多个分区。
Tag:消息标签,二级消息类型,用来进一步区分某个Topic下的消息分类。
一个消费者集群对应一个Group ID,一个Group ID可以订阅多个Topic。
6.3 高可用
NameServer 集群独立运行
多主从
当 Master 故障之后,可以用 Slave 顶上去,数据和服务都不影响,但会有短暂的停顿,需要修改配置并重启才能完成切换动作。
数据同步的方式分为
异步:Master 写入完成即可,异步同步给 Slave。写入速度快,但同步会有延迟,可能会丢数据。
同步:Master 与 Slave 都写入之后才算成功。不会丢消息,但写入速度降低。
Dledger Group
是指一组相同名称的 Broker,至少需要 3 个节点,通过 Raft 自动选举出一个 Leader,其余节点 作为 Follower,并在 Leader 和 Follower 之间复制数据以保证高可用
RocketMQ 可以从组内选出一个新的 Master,完成自动切换
7. 消息丢失
7.1 生产者
- 思路
- 保证消息的成功发出
- 保证 MQ 节点的成功接收
- 发送端 MQ 节点(broker)收到消息确认应答
- 完善消息进行补偿机制
- RabbitMQ
- 消息事务:事务机制是同步的,你提交一个事务之后会阻塞,但是 confirm 机制是异步的。
- Confirm 机制:指生产者收到投递消息后,如果 Broker 收到消息就会给我们 的生产者一个应答,生产者接受应答来确认 Broker 是否收到消息。
- Return 机制:处理一些不可路由的消息。如果我们在发送消息的时候当 Exchange 不存在或者指定的路由 key 路由找不到,这个时候如果我们需要监听这种不可到达的消息。
- Kafka
- 在 producer 端设置 acks =all,这个是要求每条数据,必须是写入所有 replica 之后,才能认为是写成功了。否则会生产者会一直重试,此时设置 retries = MAX(很大的重试的值),要求一旦写入失败,就卡在这里(避免消息丢失)
- RocketMQ
- 消息事务:半消息机制
- Broker返回状态
- SEND_OK:消息发送成功,但不一定可靠,还需同步到master(SYNC_Master)或同步刷盘(SYNC_Flush)
- FLUSH_DISK_TIMEOUT:消息发送成功,但刷盘超时,如果服务器宕机,则可能丢失数据
- FLUSH_SLAVE_TIMEOUT:消息发送成功,但同步Slave超时,如果Master服务器宕机,则可能丢失数据
- SLAVE_NOT_AVAILABLE:消息发送成功,但没有Slave服务器
7.2 消息队列
- 思路
- 持久化
- RabbitMQ
- Queue持久化
- 消息持久化,deliveryMode = 2
- Kafka
- kafka 某个 broker 宕机,然后重新选举 partition 的 leader时,此时其他的 follower 刚好还有一些数据没有同步,结果此时 leader挂了,然后选举某个 follower成 leader之后,就丢掉了之前leader里未同步的数据。
- 给 topic设置 replication.factor ,这个值必须大于 1,保证每个 partition 必须至少有 2 个副本。
- 在 kafka 服务端设置 min.insync.replicas 参数,这个值必须大于 1,这个是要求一个leader至少感知到有至少一个follower还跟自己保持联系,没掉队,这样才能确保 leader挂了还有一个follower,保证至少一个 follower能和leader保持正常的数据同步。
- 刷盘策略
- 条件
- 主动调用 sync 或 fsync 函数。
- 可用内存低于阈值。
- dirty data 时间达到阈值。dirty 是 Page Cache 的一个标识位,当有数据写入到 Page Cache 时,Page Cache 被标注为 dirty,数据刷盘以后,dirty 标志清除。
- 步骤
- 数据从 Page Cache 被刷盘到 disk。因为只有 disk 中的数据才能被同步到 replica。数据同步到 replica,并且 replica 成功将数据写入 Page Cache。在 Producer 得到 ack 后,哪怕是所有机器都停电,数据也至少会存在于 leader 的磁盘内。
- 条件
- RocketMQ
- 刷盘策略默认是异步,需改成同步刷盘。
- 集群部署,默认同步策略是写入Master成功后就返回成功,然后异步写Slave。需改成同步模式,Master和Slave都写完再响应。
7.3 消费者
- 思路
- 消费者收到消息后autoACK,但还没来得及执行完就挂了
- RabbitMQ
- 关闭AutoACK,使用手动ACK
- Kafka
- 关闭自动 offset,使用手动提交
8. 消息乱序
RabbitMQ
乱序
每个queue一个consumer,每个consumer单线程消费,效率慢
一个queue,只对应一个consumer,consumer内部用内存队列做排队,然后多线程消费
Kafka
写入一个 partition中的数据一定是有顺序的,比如订单id作为key,那么订单相关的数据,一定会被分发到一个 partition中区,此时这个 partition中的数据一定是有顺序的。消费者从partition中取出数据的时候 ,一定是有顺序的。
RocketMQ
9. 消息重复
- 没消费完挂了,再次消费
- 消息幂等性(也可用于高并发场景得消息重复处理)
- 强校验
- 常用于金融相关
- 如消费者是一个打款服务,在付款成功后都加一条流水记录。再次消费的时候就去流水表查一下有没有这条纪录,如果有表示已经消费过了,直接返回。(类似本地消息表,要结合事务使用,保证流水记录和业务执行在一个事务中)
- 简单场景也可用唯一 id +加指纹码,利用数据库主键去重
- 弱校验
- 利用Redis原子性操作
- 强校验
10. MQTT、CoAP
10.1 常用物联网通讯协议
10.2 CoAP
- CoAP是受限制的应用协议(Constrained Application Protocol)的代名词。由于目前物联网中的很多设备都是资源受限型的,所以只有少量的内存空间和有限的计算能力,传统的HTTP协议在物联网应用中就会显得过于庞大而不适用。
- 采用UDP而不是TCP。这省去了TCP建立连接的成本及协议栈的开销。
- 将数据包头部都采用二进制压缩,减小数据量以适应低网络速率场景。
- 发送和接收数据可以异步进行,这样提升了设备响应速度。
- CoAP采用与HTTP协议相同的请求响应工作模式。
- CoAP主要是一个点对点协议,用于在客户端与服务器之间传输状态信息。虽然支持观察资源,但CoAP最好适合状态传输模型,不是完全基于事件。
- 4种消息类型
- CON——需要被确认的请求,如果CON请求被发送,那么对方必须做出响应。
- NON——不需要被确认的请求,如果NON请求被发送,那么对方不必做出回应。
- ACK——应答消息,接受到CON消息的响应。
- RST——复位消息,当接收者接受到的消息包含一个错误,接受者解析消息或者不再关心发送者发送的内容,那么复位消息将会被发送。
- 内容协商
- CoAP支持内容协商,客户端使用Accept选项表达倾向的资源表示,服务器回复Content-Type选项告知客户端它们接收的东西。
- CoAP请求也许会使用查询字符串形式。如:?a=b&c=d
- 观察:实现简单的订阅发布功能
- 主题Subject: 代表CoAP服务器上的某个资源resource,该资源状态随时可能发生变化
- 观察者Observer:代表对某个coap资源最新状态感兴趣的客户端CoAP Client
- 登记Registration: 观察者需要向服务器CoAP server登记感兴趣的主题Subject
- 通知Notification:当CoAP观察到某个主题发生状态变化时,CoAP服务器会主动向该主题下的已登记的观察者列表里面的每个观察者发送其订阅的主题最新状态数据
- 安全
- 因为CoAP建立在UDP而不是TCP之上,SSL/TLS不可用于提供安全性。DTLS数据报传输层安全提供了与TLS同样的保证机制,但是针对UDP之上数据传输。通常来说,具备DTLS能力的CoAP设备支持RSA、AES或者ECC、AES。
10.3 MQTT
MQTT协议采用发布/订阅模式,所有的物联网终端都通过TCP连接到云端,云端通过主题的方式管理各个设备关注的通讯内容,负责将设备与设备之间消息的转发。
- MQTT客户端建立长连接TCP。
- 使用发布/订阅消息模式,提供一对多的消息发布,解除应用程序耦合。
- 对负载内容屏蔽的消息传输。
- 使用 TCP/IP 提供网络连接。
- 三种QoS消息发布服务质量:
- “至多一次”,消息发布完全依赖底层 TCP/IP 网络。会发生消息丢失或重复。这一级别可用于如下情况,环境传感器数据,丢失一次读记录无所谓,因为不久后还会有第二次发送。
- “至少一次”,确保消息到达,但消息重复可能会发生。
- “只有一次”,确保消息到达一次。这一级别可用于如下情况,在计费系统中,消息重复或丢失会导致不正确的结果。
主题
- MQTT中,主题是层次结构的,像文件系统(例如:kitchen/oven/temperature)。当注册订阅时允许通配符,但不是发布时,允许整个层次结构被客户端观察。
持久化
- MQTT支持在代理上存储持久化消息,当发布消息时,客户端也许会要求代理能够持久化消息。只有最近的持久消息才会被存储。当客户端订阅一个主题时,持久化消息会被发送至客户端。
- 虽然MQTT支持一些持久化,但最好还是作为实时数据通讯总线使用。
安全
- MQTT代理也许会要求用户名、密码认证,为确保隐私,TCP连接也许会用SSL/TLS加密。
MQTT与RocketMQ对比
MQ消息队列知识小结