在部分业务场景中,需要业务上游消息流下发到终端设备,从而需要引入MQTT消息服务器。由于前期使用过emqtt,从而选择EMQX(老版本就是emqtt)。EMQX 的使用场景非常广泛,比如车联网、智能家居、物流跟踪、智能制造等。用户可以将 EMQX 作为消息中间件,在不同的终端设备和服务端之间进行消息的传递和处理。
针对业务消息流通知客服接待客户的场景,EMQX 能够胜任以下事务:
- 客户端通过 MQTT 协议与 EMQX 建立连接,发送消息。
- EMQX 接收来自客户端的消息,并根据规则引擎或订阅关系转发消息到接待客服端。
- 客服端订阅了相关主题(Topic),当新的消息发布到这些主题时,EMQX 会推送消息到相应的订阅者(即客服端)。
- 客服通过客户端接收到消息,并能够实时响应客户的需求。
这样,业务消息流就可以利用 MQTT 协议的发布-订阅机制,实现消息的实时传递,保证客服人员能及时接收到客户的消息,并作出回应
文章导航
EMQX架构介绍
Mria 是 Mnesia 的一个开源扩展,它为集群增加了最终的一致性。启用异步方式同步事务日志后,EMQX 节点之间的连接模式从 Mnesia 的全网状拓扑结构转向 Mria 的网状+星型状拓扑结构,集群中节点可以按角色分为核心节点(Core)或复制者节点(Replicant)。
节点角色介绍
核心节点
核心节点作为数据库的数据层,节点间以全网状连接,每个节点都包含一个最新的数据副本,这保证了容错性:只要有一个节点存活,数据就不会丢失。核心节点一般是静态和持久的,不建议进行自动伸缩(即经常添加、删除或替换节点)。
复制节点
复制节点会连接到核心节点,并被动地复制来自核心节点的数据更新。复制节点不允许执行任何的写操作,而是将其转交给核心节点代为执行。同时,由于复制节点有一个完整的本地数据副本,因此数据读取速度非常快,这样有助于降低 EMQX 路由的时延。
架构优势
我们可以将这种数据复制模型当做无主复制和主从复制的一种混合,这种结构的优势在于:
- 更高的水平可扩展性:EMQX 5.0 已能支持包含 23 个节点的大规模集群。
- 更轻松的集群自动扩展:通过复制节点的自动伸缩简化集群的自动扩展。
相比与 4.x 版本所有节点采用全连接的方式,节点数量越多节点之间完成数据同步的成本就越高,EMQX 5.0 中由于复制节点不参与数据写入,当更多的复制节点加入集群时,表的更新效率不会受到影响,进而允许创建更大的 EMQX 集群。
另外,复制节点被设计成可以按需增删,添加或删除它们不会改变数据冗余,所以它们可以被放在一个自动伸缩组中,从而实现更好的 DevOps 实践。
但随着总数据量的增大,从核心节点初始化复制数据是一个相对繁重的操作,所以复制节点的自动伸缩策略不也能太过于激进。
高可用搭建
搭建3节点高可用集群
- 如果有多台服务器,可以直接rpm或者二进制直接安装运行,如果资源受限,可以用docker部署,本次实验通过多台服务器验证
- 同一个集群, cookie = “emqxsecretcookie”必须一致,否则后期加入不了集群
wget https://www.emqx.com/zh/downloads/broker/5.7.2/emqx-5.7.2-el7-amd64.tar.gz
mkdir -pv /usr/local/emqx01..03
tar xf emqx-5.7.2-el7-amd64.tar.gz -C /usr/local/emqx01..03
#启动(修改node.name根据实际情况配置即可)
cd /usr/local/emqx01..03
./bin/emqx start
#因为默认集群组成方式为manual,所以实例启动后,需要执行cluster join
例如固定第一台emqx01,emqx02/03实例可以执行join emqx01即可,命令如下:
emqx02: emqx ctl cluster join emqx01
emqx03: emqx ctl cluster join emqx01
扩容
- node.role可选参数:core/replicant,默认值为core
扩容core节点
wget https://www.emqx.com/zh/downloads/broker/5.7.2/emqx-5.7.2-el7-amd64.tar.gz
mkdir -pv /usr/local/emqx04
tar xf emqx-5.7.2-el7-amd64.tar.gz -C /usr/local/emqx04
#启动(修改node.name)
cd /usr/local/emqx04
./bin/emqx start
#因为默认集群组成方式为manual,所以实例启动后,需要执行cluster join
例如固定第一台emqx01,其他实例可以执行join emqx01即可,命令如下:
emqx04: emqx ctl cluster join emqx01
扩容replicant节点
wget https://www.emqx.com/zh/downloads/broker/5.7.2/emqx-5.7.2-el7-amd64.tar.gz
mkdir -pv /usr/local/emqx05
tar xf emqx-5.7.2-el7-amd64.tar.gz -C /usr/local/emqx05
#启动(修改node.name和node.role为replicant)
cd /usr/local/emqx05
./bin/emqx start
#因为默认集群组成方式为manual,所以实例启动后,需要执行cluster join
例如固定第一台emqx01,其他实例可以执行join emqx01即可,命令如下:
emqx05: emqx ctl cluster join emqx01
集群总览
缩容
- 节点疏散和集群负载重平衡是 EMQX 企业版功能,开源版本缩容就没有节点疏散和集群负载重平衡的操作
缩容core节点
#缩容emqx04,在emqx04上执行如下命令
emqx04: emqx ctl cluster leave
#当节点出现故障或无响应时,可以通过如下命令执行强制剔除
emqx04: emqx ctl cluster force-leave
缩容replicant节点
- 从前端负载均衡剔除后,关闭replicant节点后,集群元数据就已经清除,勿需再做其他操作
集群总览
测试
#安装压测工具
curl -LO https://www.emqx.com/en/downloads/MQTTX/v1.10.1/mqttx-cli-linux-x64
mv mqttx-cli-linux-x64 /usr/local/bin/mqttx
chmod +x /usr/local/bin/mqttx
nginx实现高可用代理
代理tcp
stream {
upstream mqtt_servers {
least_conn;
server ip1:1883 max_fails=2 fail_timeout=10s;
server ip2:1883 max_fails=2 fail_timeout=10s;
server ip3:1883 max_fails=2 fail_timeout=10s;
}
server {
listen 1883;
proxy_pass mqtt_servers;
proxy_protocol off;
proxy_connect_timeout 10s;
proxy_timeout 1800s;
proxy_buffer_size 3M;
tcp_nodelay on;
}
}
连接地址:nginxip:1883
代理websocket
upstream mqtt_websocket_servers {
server ip1:8083;
server ip2:8083;
server ip3:8083;
}
server {
listen 80;
server_name emqx.test.com;
location /mqtt {
proxy_pass http://mqtt_websocket_servers;
proxy_http_version 1.1;
proxy_set_header Upgrade $http_upgrade;
proxy_set_header Connection "Upgrade";
# 禁用缓存
proxy_buffering off;
proxy_connect_timeout 10s;
# WebSocket 连接有效时间
# 在该时间内没有数据交互的话 WebSocket 连接会自动断开,默认为 60s
proxy_send_timeout 3600s;
proxy_read_timeout 3600s;
# 反向代理真实 IP
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header REMOTE-HOST $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
}
}
websocket连接地址:ws://emqx.test.com/mqtt
拓展
数据集成
- 可以通过webhook开发,将消息流转存来用于后期排查bug和分析
webhook开发
#安装flask
pip install flask
#webhook.py脚本
from flask import Flask, json, request
api = Flask(__name__)
@api.route('/', methods=['POST'])
def print_messages():
reply= {"result": "ok", "message": "success"}
print("got post request: ", request.get_data())
return json.dumps(reply), 200
if __name__ == '__main__':
api.run(host='0.0.0.0', port=5000)
配置
测试
延展阅读:
如何利用KtConnect简化Kubernetes开发调试?
CAP理论与Raft协议如何在分布式系统中确保一致性和可用性?
如何在Nginx上优化TLS配置以平衡并提升安全性与连接性能?
MongoDB Change Streams可以应用在哪些场景?它有哪些局限性?
咨询方案 获取更多方案详情