RabbitMQ 基础介绍

一、什么是RabbitMQ

RabbitMQ是一个消息代理。

RabbitMQ基于AMQP协议用Erlang编写。

什么是AMQP?

  • AMQP(高级消息队列协议)是一个网络协议。它支持符合要求的客户端应用(application)和消息中间件代理(messaging middleware broker)之间进行通信。

简单来说: MQ是邮递过程,寄信人去邮局把信件放入邮箱,邮递员就会把信件投递到收件人

二、 RabbitMQ简介

2.1 术语

术语 类比
生产者Producing 寄信人,信件的来源
消息Message 信件
交换机Exchange 邮局,信件分发的场所
队列Queue 邮箱,存储邮件
绑定Binding 邮局信件和邮箱之间的关系,邮局信件按收件省市划分后归纳到不同邮箱
消费者Consuming 收件人,信件的归宿

1

2.2 基本特性

  • 消息只存储在Quene中

    • 只有邮箱,也就是Queue具有存储功能,Exchange不能存储消息
  • RoutingKey

    • RoutingKey就是消息的收件地址
    • RoutingKey可以直接写成XXX格式,也可以写成XXX.XXX.XXX格式,也可以为空
  • 交换机有4种类型

    • Direct 直连交换机

    • Fanout 扇形交换机

    • Topic 主题交换机

    • Headers 头交换机

三、 官方Demo与思考

3.1 Hello World

2

3.1.1 操作

生产者:

1
2
3
4
5
6
7
8
- 连接MQ
- 申明Queue
MQ.Queue.Name = "hello"
- 发送信息
MQ.Publish
Exchange = ""
RoutingKey = "hello"
Body = "hello wolrd"

消费者:

1
2
3
4
5
6
7
- 连接MQ
- 申明Queue
MQ.Queue.Name = "hello"
- 消费信息
MQ.Consume
Queue.Name= "hello"
Auto ack = true //注意自动ack

3.1.2 思考

  • 为何没有给Exchange命名?Exchange是什么类型?
  • 为何在生产者和消费者都申明了Queue?
  • RoutingKey和Queue.Name 有什么关系?

3.2 工作队列

3

3.2.1 操作

生产者:

1
2
3
4
5
6
7
8
9
10
- 连接MQ
- 申明Queue
MQ.Queue.Name = "task_queue"
MQ.Queue.Durable = true //注意Queue持久化
- 发送信息
MQ.Publish
Exchange = ""
RoutingKey = "task_queue"
Body = "hello wolrd"
delivery_mode = persistent or 2 //注意消息持久化

多个消费者:

1
2
3
4
5
6
7
8
9
10
11
12
13
- 连接MQ
- 申明Queue
MQ.Queue.Name = "task_queue"
MQ.Queue.Durable = true
- QoS
MQ.Qos
prefetch_count = 1 //注意QoS
- 消费信息
MQ.Consume
Queue.Name= "task_queue"
Auto ack = false //注意手动ack
- Sleep模拟处理事务
- 手动ack

3.2.2 思考

  • 多个消费者在Direct交换机,相同Queue下,如何接收消息?
  • 如何让Queue和消息持久化?
  • 可以直接给3.1中的hello队列赋予持久优设置吗?
  • 持久化有什么用?
  • 默认交换机是持久化的吗?
  • 手动ack和自动ack的区别?
  • 忘记ack怎么办?
  • QoS(Quality of Service)如何实现?

3.3 发布/订阅

4

3.3.1 操作

生产者:

1
2
3
4
5
6
7
8
9
10
- 连接MQ
- 申明Exchange
MQ.Exchange.Name = "logs"
MQ.Exchange.Type = "fanout"
MQ.Exchange.Durable = true
- 发送信息
MQ.Publish
Exchange = "logs"
RoutingKey = "" //注意RoutingKey
Body = "hello wolrd"

多个消费者:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
- 连接MQ
- 申明Exchange
MQ.Exchange.Name = "logs"
MQ.Exchange.Type = "fanout"
MQ.Exchange.Durable = true
- 申明临时Queue
MQ.Queue.exclusive= true //注意临时Queue
- 绑定
Binding //注意绑定
Exchange = "logs"
Queue.Name = MQ.Queue.Name
RoutingKey = ""
- 消费信息
MQ.Consume
Queue.Name = MQ.Queue.Name
Auto ack = false
- Sleep模拟处理事务
- 手动ack

3.3.2 思考

  • 扇形交换机的RoutingKey是如何配置的?
  • 扇形交换机的主要用途?
  • 什么是临时队列?
  • 临时队列与持久化可以同时设置吗?
  • 扇形交换机可以在生产者设置持久化的队列吗?

3.4 路由

5

6

3.4.1 操作

生产者:

1
2
3
4
5
6
7
8
9
10
- 连接MQ
- 申明Exchange
MQ.Exchange.Name = "direct_logs"
MQ.Exchange.Type = "direct" //注意Exchange类型
MQ.Exchange.Durable = true
- 发送信息
MQ.Publish
Exchange = "direct_logs"
RoutingKey = "info" or "error" or "warn" //注意RoutingKey
Body = "hello wolrd"

多个消费者:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
- 连接MQ
- 申明Exchange
MQ.Exchange.Name = "direct_logs"
MQ.Exchange.Type = "direct"
MQ.Exchange.Durable = true
- 申明临时Queue
MQ.Queue.exclusive= true
- 绑定
Binding
Exchange = "direct_logs"
Queue.Name = MQ.Queue.Name
RoutingKey = "info" or "error" or "warn"
- 消费信息
MQ.Consume
Queue.Name = MQ.Queue.Name
Auto ack = false
- Sleep模拟处理事务
- 手动ack

3.4.2 思考

  • 直连交换机多重绑定和扇形交换机有什么区别?
  • 直连交换机RoutingKey绑定有什么限制?

3.5 主题交换机

7

3.5.1 操作

生产者:

1
2
3
4
5
6
7
8
9
10
- 连接MQ
- 申明Exchange
MQ.Exchange.Name = "topic_logs"
MQ.Exchange.Type = "topic"
MQ.Exchange.Durable = true
- 发送信息
MQ.Publish
Exchange = "topic_logs"
RoutingKey = "this.abc" or "that.abc.xyz" or "abc" //注意RoutingKey写法
Body = "hello wolrd"

多个消费者:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
- 连接MQ
- 申明Exchange
MQ.Exchange.Name = "topic_logs"
MQ.Exchange.Type = "topic"
MQ.Exchange.Durable = true
- 申明临时Queue
MQ.Queue.exclusive= true
- 绑定
Binding
Exchange = "topic_logs"
Queue.Name = MQ.Queue.Name
RoutingKey = "*.abc" or "#.abc" or "*.abc.*" //注意binding写法
- 消费信息
MQ.Consume
Queue.Name = MQ.Queue.Name
Auto ack = false
- Sleep模拟处理事务
- 手动ack

3.5.2 思考

  • *# 的区别?
  • 绑定键为 * 的队列会取到一个路由键为空的消息吗?
  • a.*.#a.#的区别在哪儿?

3.6 远程过程调用RPC

8

3.6.1 操作

调用端(生产者):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
- 调用本机某方法
调用另一个服务的接口
- 连接MQ
- 申明临时Queue
MQ.Queue.exclusive= true //注意临时Queue,用于接收回传信息
- 消费信息
MQ.Consume
Queue.Name= MQ.Queue.Name
Auto ack = true
- 发送信息
MQ.Publish
Exchange = ""
RoutingKey = "rpc_queue" //注意发送信息的队列,是服务端的队列,不是上面的临时队列
CorrelationId: corrId, //唯一id,用以与回调结果匹配
ReplyTo: q.Name, //回传地址,用以服务端将结果返回
Body = "123456" //传输id
- 接收结果
匹配CorrelationId 和 Server.CorrelationId
获得信息

服务端(消费者):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
- 连接MQ
- 申明Queue
MQ.Queue.Name= "rpc_queue" //注意此队列和调用者的临时队列不同
- QoS
MQ.Qos
prefetch_count = 1 //注意QoS
- 消费信息
MQ.Consume
Queue.Name= MQ.Queue.Name
Auto ack = false
- 调用接口
查询id相关信息
- 发送信息
MQ.Publish
Exchange = ""
RoutingKey = Client.ReplyTo
CorrelationId = Client.CorrelationId
Body = id相关信息

3.6.2 思考

  • 上面例子中使用了临时队列和非持久化队列,会出现什么问题?
  • 如果服务器发生故障,并且抛出异常,应该被转发到客户端吗?
  • 当没有服务器运行时,客户端如何作出反映?

四、 思考总结

  • Q:为何没有给Exchange命名?Exchange是什么类型?

    • 没有命名的交换机是默认(匿名)交换机
    • 默认交换机是Direct类型
  • Q:为何在生产者和消费者都申明了Queue?

    • 声明交换机和队列只能生效一次
    • 由于生产者和消费者无法确定启动顺序,所以两处申明可以防止招不到交换机或队列造成消息丢失
  • Q:RoutingKey和Queue.Name 有什么关系?

    • 默认或者匿名交换机的消息将会根据指定的routing_key分发到指定的队列
  • Q:多个消费者在Direct交换机,相同Queue下,如何接收消息?

    • 轮询分发
  • Q:如何让Queue和消息持久化?

    • 对于交换机和队列,设置其Durable属性
    • 对于消息,设置其发送模式为Persistent(部分语言用编码2表示)
  • Q:可以直接给3.1中的hello队列赋予持久优设置吗?

    • 不可以,队列只可申明一次,改动属性会报异常
  • Q:持久化有什么用?

    • 持久化会使MQ服务退出后,以申明的交换机、队列不仍存在,队列内的数据仍存在
    • 但持久化不能保证所有的数据都不会丢失
  • Q:默认交换机是持久化的吗?

  • Q:手动ack和自动ack的区别?

    • 自动ack会在消费者收到消息时就自动发送确认
    • 手动ack需要消费者自己手动发送确认
    • 消息没有超时的概念
    • 当消息被RabbitMQ发送给消费者之后,马上就会在内存中移除
    • 自动ack场景下,当消费者执行一个费时任务时,MQ崩溃,会导致消息丢失
    • 手动ack场景下,当消费者执行一个费时任务时,MQ崩溃,消息会被重新推送
  • Q:忘记ack怎么办?

    • 忘记ack会让MQ不断重复发送信息,导致MQ内存增加
    • 可以在MQ中查询messages_unacknowledged字段,手动处理
  • Q:QoS(Quality of Service)如何实现?

    • 设置MQ的QoS相关配置
    • 设置prefetch_count = 1
  • Q:扇形交换机的RoutingKey是如何配置的?

    • 填写为空
  • Q:扇形交换机的主要用途?

    • 发送广播
  • Q:什么是临时队列?

    • 临时队列随机生成队列名称
    • 当与消费者断开连接的时候,这个队列应当被立即删除
  • Q:临时队列与持久化可以同时设置吗?

    • 不可以,消费者断开后随机队列就删除,所以一旦MQ退出,消费者就与MQ断开连接,随机队列就会删除,不能设置持久化
  • Q:扇形交换机可以在生产者设置持久化的队列吗?

    • 可以,以保证所有消息不丢失。

    • 但是要注意,一般扇形交换机每个消费者一般独占一条队列,如果多个消费者共用一条队列,会跟直连交换机一样进行轮询分发

    • 如果在生产者设置10个持久化队列,但有20个消费者,每个消费者独占一条队列,那么只有其中的10个可以获取全部信息,其他10个消费者无法使用

    • 如果在生产者设置10个持久化队列,但有20个消费者,每2个消费者共用一条队列,那么20个消费者将轮询消费信息,不能收到完整的全部的信息

    • 所以扇形交换机一般不按照上面的方式使用

  • Q:直连交换机多重绑定和扇形交换机有什么区别?

    • 扇形交换机不能通过RoutingKey过滤消息
  • Q:直连交换机RoutingKey多重绑定有什么限制?

    • 不能用通配符方式模糊过滤消息
  • Q: *# 的区别?

    • (星号) 用来表示一个单词
    • (井号) 用来表示任意数量(零个或多个)单词
  • Q:绑定键为 * 的队列会取到一个路由键为空的消息吗?

    • 不能,星号表示至少一个单词
  • Q:a.*.#a.#的区别在哪儿?

    • 前者不可以匹配 a
    • 后者可以匹配 a
  • Q:上面例子中使用了临时队列和非持久化队列,会出现什么问题?

    • 没启动服务端时,客户机消息丢失
    • 客户端发送给服务端后,客户端断开连接,会导致回传信息丢失
  • Q:如果服务器发生故障,并且抛出异常,应该被转发到客户端吗?

    • 一般来说不需要
  • Q:当没有服务器运行时,客户端如何作出反映?

    • 对于不重要的信息,采用临时队列,丢失消息即可
    • 对于重要信息,采用持久化队列,等待服务器处理,并根据实际情况定时处理(清除或消息补偿)

五、 场景模拟

  • 账号注册后一系列短信邮件通知
    • 使用扇形交换机,短信和邮件服务作为消费者申明临时队列
    • 使用Auto ack
  • 秒杀下单
    • 使用直连交换机,秒杀服务作为生产者,订单中心作为消费者,都需要申明持久化队列
    • 使用手动ack
  • 下单后支付并返回结果
    • 使用直连交换机,下单服务作为生产者,支付服务作为消费者,都需要申明持久化队列
    • 使用手动ack
    • 消息安全性要求很高,需要消息补偿机制
  • 用户下单(订单服务接收下单请求,异步调用仓库服务查询库存是否足够)
    • 使用RPC
    • 仓库服务申明异步调用队列 A
    • 订单服务申明回传队列 B
    • 订单服务将待查询的产品id和B的地址发送到A
    • 仓库服务从A中消费信息,处理订单服务请求,并把结果发送到B
    • 订单服务从B中消费信息,接收仓库服务的查询结果

RabbitMQ 基础介绍

https://wurang.net/rabbitmq/

作者

Wu Rang

发布于

2018-02-08

更新于

2024-06-13

许可协议

评论