跳到主要内容

rabbitmq介绍

端口作用
4369epmd,RabbitMQ 节点和 CLI 工具使用的对等发现服务
5672、5671由不带和带 TLS 的 AMQP 0-9-1 和 AMQP 1.0 客户端使用
25672用于节点间和 CLI 工具通信(Erlang 分发服务器端口)
15672、15671HTTP API客户端、管理 UI,不带和带 TLS(仅在启用管理插件的情况下)
1883如果启用了MQTT 插件,则不带和带 TLS 的MQTT 客户端
15675MQTT-over-WebSockets 客户端(仅当启用Web MQTT 插件时)
15692Prometheus 指标(仅在启用Prometheus 插件的情况下)
  • 虚拟机主机(Virtual host)

    1. 每一个 vhost 拥有自己独立的队列、交换器及绑定关系等,井且它拥有自己独立的权限。

    2. 可以限制最大连接数和最大队列数,并且可以设置vhost下的用户资源权限和Topic权限。

    3. rbbitmq会默认创建一个 "/" vhost。

  • 交换机(Exchanges)

    1. Direct Exchange(精确匹配):将消息中的Routing key与该Exchange关联的所有Binding中的Routing key进行比较,如果相等,则发送到该Binding对应的Queue中。

    2. Fanout Exchange(广播):直接将消息转发到所有binding的对应queue中,这种exchange在路由转发的时候,忽略Routing key

    3. Headers Exchange(headers属性匹配):将消息中的headers与该Exchange相关联的所有Binging中的参数进行匹配,如果匹配上了,则发送到该Binding对应的Queue中。

    4. Topic Exchange(模糊匹配):将消息中的Routing key与该Exchange关联的所有Binding中的Routing key进行对比,如果匹配上了,则发送到该Binding对应的Queue中。

  • 队列(Queues)

    1. Classic(经典队列):支持持久(发送的消息也必须设置properties持久才能消息也持久)与非持久、排他性(客户端断开会自动删除即使声明了持久,只对首次声明它的连接可见,从同一个连接创建的不同的通道可以同时访问某一个排他性的队列)、惰性队列(消费者请求时才加载到内存)、自动删除(最后一个消费者断开连接后自动删除队列)。

    2. Quorum(仲裁队列):始终持久、复制、数据安全和一致性导向的队列类型,主从同步基于Raft协议,强一致,Classic镜像队列的替代方案。对于大于 5 的仲裁队列节点大小,性能会下降很多。我们不建议在超过 7 个 RabbitMQ 节点上运行仲裁队列。默认仲裁队列大小为 3,并且可以使用 x-quorum-initial-group-size 队列参数进行控制。

    3. Stream(流队列):

      需要启用rabbitmq_stream插件

      流使用非破坏性消耗语义对仅追加日志进行建模。这意味着与 RabbitMQ 中的传统队列相反,从流中使用不会删除消息。持久化和复制。这转化为数据安全性和可用性(在节点丢失的情况下)以及扩展(从不同节点读取相同的流)。

      • 大型扇出:许多应用程序需要读取相同的消息(对于传统队列,需要为每个应用程序声明一个队列,并将同一消息的副本传递给每个应用程序)
      • 大量积压:流将消息存储在磁盘上,而不是内存中,因此唯一的限制是磁盘容量
      • 重播和时间旅行:消费者可以使用绝对偏移量或时间戳附加流中的任何位置,并且可以读取和重新读取相同的数据
      • 高吞吐量:与传统队列相比,流速度超快,快几个数量级
  • 信道(channels)

    1. 无论是生产者和消费者都需要通过TCP连接和RabbitMQ Broker建立连接,一个连接就是一个TCP连接。建立连接之后,用户就可以在连接的基础上建立一个AMQP信道,每个信道会被赋值一个唯一的ID(在各自的连接中不同,不同的连接重新算),信道是建立在TCP上的虚拟连接,RabbitMQ处理的每条AMQP指令都是通过信道完成的。
    2. 适用于多线程/进程进行处理的应用程序使用同一个TCP连接创建多个信道连接,避免创建多个TCP连接带来的资源开销。
    3. 当信道本身的流量很大的时候,多个信道复用同一个Connection就会产生性能瓶颈,进而整体的流量就会被限制了,此时就需要开辟多个Connection,将这些信道均摊道这些Connection中。
    4. 信道不是线程安全的,如果有多个线程,应该保证每个线程独享一个信道。

rabbitmq应用

死信队列(延时队列)

当发生以下任何事件时重新发布消息到交换机:

可通过配合TTL过期来实现延时队列

1.创建死信交换机:死信交换 (DLX) 是正常的交换。它们可以是任何常用类型,并跟正常的交换机一样声明绑定队列。

2.队列绑定:在需要绑定死信队列的队列(需要把过期拒绝的消息发送死信队列的其他队列)上添加参数 x-dead-letter-exchange: my-exchange-dlx (指定死信队列)、x-dead-letter-routing-key: route-dlx(指定Routing key)

集群安装

## 安装erlang
#安装依赖
yum install -y ncurses-devel unixODBC-devel wxWidgets-devel

wget https://github.com/erlang/otp/releases/download/OTP-24.3.4/otp_src_24.3.4.tar.gz
tar zxf otp_src_24.3.4.tar.gz
cd otp_src_24.3.4/
./configure --prefix=/usr/local/erlang
make -j
make install

cat <<EOF > /etc/profile.d/erlang.sh
PATH=\$PATH:/usr/local/erlang/bin
EOF
source /etc/profile

安装rabbitmq

wget https://github.com/rabbitmq/rabbitmq-server/releases/download/v3.10.1/rabbitmq-server-generic-unix-3.10.1.tar.xz
xz -d rabbitmq-server-generic-unix-3.10.1.tar.xz
tar xvf rabbitmq-server-generic-unix-3.10.1.tar
mv rabbitmq_server-3.10.1 /usr/local/rabbitmq


cat <<EOF > /etc/profile.d/rabbitmq.sh
PATH=\$PATH:/usr/local/rabbitmq/sbin
EOF
source /etc/profile


curl https://raw.githubusercontent.com/rabbitmq/rabbitmq-server/master/deps/rabbit/docs/rabbitmq.conf.example -o /usr/local/rabbitmq/etc/rabbitmq/rabbitmq.conf


#允许guest远程登录
sed -i '/^# loopback_users.guest = true/s/^# //' /usr/local/rabbitmq/etc/rabbitmq/rabbitmq.conf
[Unit]
Description=RabbitMQ broker
After=syslog.target network.target

[Service]
Type=notify
WorkingDirectory=/usr/local/rabbitmq
ExecStart=/usr/local/rabbitmq/sbin/rabbitmq-server
ExecStop=/usr/local/rabbitmq/sbin/rabbitmqctl stop

[Install]
WantedBy=multi-user.target

创建集群

rabbitmq必须使用主机名访问需要在各节点上写hosts

#启动rabbitmq
rabbitmq-server rabbitmq-server -detached
rabbitmq-plugins enable rabbitmq_management

#其他节点需要同步第一个节点cookie文件后在启动
scp /root/.erlang.cookie 172.16.7.15:/root
scp /root/.erlang.cookie 172.16.7.16:/root

#在除第一个节点外的节点上执行下面命令加入集群
rabbitmqctl stop_app
rabbitmqctl reset
rabbitmqctl join_cluster rabbit@node1
#内存节点
#RAM 节点仅将其元数据保存在内存中,用于提高具有高队列、交换或绑定流失的集群的性能。不提供更高的消息速率
rabbitmqctl join_cluster --ram rabbit@node1



#设置镜像队列所有节点镜像,可以使用-p指定虚拟主机
rabbitmqctl set_policy ha-all "^" '{"ha-mode":"all"}'

json参数说明:
ha-mode:镜像模式,分类:all/exactly/nodes,
all存储在全部节点;
exactly存储x个节点,节点的个数由ha-params指定;
nodes指定存储的节点上名称,经过ha-params指定;
ha-params:做为参数,为ha-mode的补充;
ha-sync-mode:镜像消息同步方式:automatic(自动),manually(手动);

常用操作:

#操作其他节点
rabbitmqctl -n rabbit@node3 ping

#修改密码
rabbitmqctl change_password guest abc123456

#导出配置
rabbitmqctl export_definitions /path/to/definitions.file.json

#导入配置
rabbitmqadmin import /path/to/definitions.file.json


#查看队列消息
rabbitmqctl list_queues

#清空队列
rabbitmqctl purge_queue tt1

#删除队列
rabbitmqctl delete_queue tt1

#查看连接
rabbitmqctl list_connections pid user peer_host peer_port state

#关闭某个连接,pid通过list_connections获取
rabbitmqctl close_connection "<rabbit@master.1652842502.20360.234>" "colse"


#修改集群名称
rabbitmqctl set_cluster_name mq-cluster-tt

#查看镜像队列
rabbitmqctl list_policies

#查看镜像状态(是否同步)
rabbitmqctl list_queues name slave_pids synchronised_slave_pids

#手动同步镜像队列
rabbitmqctl sync_queue tt1

#取消同步镜像队列
rabbitmqctl cancel_sync_queue tt1


#集群移除节点
rabbitmqctl -n rabbit@node3 stop_app
rabbitmqctl -n rabbit@node1 forget_cluster_node rabbit@node3

#修改节点类型,node2由disc改成ram
rabbitmqctl -n rabbit@node2 stop_app
rabbitmqctl -n rabbit@node2 change_cluster_node_type ram
rabbitmqctl -n rabbit@node2 start_app

#通过policy修改已创建队列配置
#设置v1虚拟主机下的t1队列的死信交换机及routing-key
rabbitmqctl set_policy DLX t1 '{"dead-letter-exchange":"d1"}' -p v1
rabbitmqctl set_policy DLX t1 '{"dead-letter-routing-key":"gg"}' -p v1
#设置v1虚拟主机下的t1队列的ttl(毫秒)
rabbitmqctl set_policy TTL t1 '{"message-ttl": 5000}' -p v1


#查看仲裁队列副本
rabbitmq-queues quorum_status "q-t1" -p v1

#仲裁队列添加副本
rabbitmq-queues add_member "q-t1" "rabbit@mq4"