如何利用 EMQX 构建高可用的 MQTT 消息服务器?详细步骤与最佳实践!

在部分业务场景中,需要业务上游消息流下发到终端设备,从而需要引入MQTT消息服务器。由于前期使用过emqtt,从而选择EMQX(老版本就是emqtt)。EMQX 的使用场景非常广泛,比如车联网、智能家居、物流跟踪、智能制造等。用户可以将 EMQX 作为消息中间件,在不同的终端设备和服务端之间进行消息的传递和处理。

针对业务消息流通知客服接待客户的场景,EMQX 能够胜任以下事务:

  1. 客户端通过 MQTT 协议与 EMQX 建立连接,发送消息。
  2. EMQX 接收来自客户端的消息,并根据规则引擎或订阅关系转发消息到接待客服端。
  3. 客服端订阅了相关主题(Topic),当新的消息发布到这些主题时,EMQX 会推送消息到相应的订阅者(即客服端)。
  4. 客服通过客户端接收到消息,并能够实时响应客户的需求。

这样,业务消息流就可以利用 MQTT 协议的发布-订阅机制,实现消息的实时传递,保证客服人员能及时接收到客户的消息,并作出回应

EMQX架构介绍

Mria 是 Mnesia 的一个开源扩展,它为集群增加了最终的一致性。启用异步方式同步事务日志后,EMQX 节点之间的连接模式从 Mnesia 的全网状拓扑结构转向 Mria 的网状+星型状拓扑结构,集群中节点可以按角色分为核心节点(Core)或复制者节点(Replicant)。

如何利用 EMQX 构建高可用的 MQTT 消息服务器?详细步骤与最佳实践!

节点角色介绍

核心节点

核心节点作为数据库的数据层,节点间以全网状连接,每个节点都包含一个最新的数据副本,这保证了容错性:只要有一个节点存活,数据就不会丢失。核心节点一般是静态和持久的,不建议进行自动伸缩(即经常添加、删除或替换节点)

复制节点

复制节点会连接到核心节点,并被动地复制来自核心节点的数据更新。复制节点不允许执行任何的写操作,而是将其转交给核心节点代为执行。同时,由于复制节点有一个完整的本地数据副本,因此数据读取速度非常快,这样有助于降低 EMQX 路由的时延。

架构优势

我们可以将这种数据复制模型当做无主复制和主从复制的一种混合,这种结构的优势在于:

  • 更高的水平可扩展性:EMQX 5.0 已能支持包含 23 个节点的大规模集群。
  • 更轻松的集群自动扩展:通过复制节点的自动伸缩简化集群的自动扩展。

相比与 4.x 版本所有节点采用全连接的方式,节点数量越多节点之间完成数据同步的成本就越高,EMQX 5.0 中由于复制节点不参与数据写入,当更多的复制节点加入集群时,表的更新效率不会受到影响,进而允许创建更大的 EMQX 集群。

另外,复制节点被设计成可以按需增删,添加或删除它们不会改变数据冗余,所以它们可以被放在一个自动伸缩组中,从而实现更好的 DevOps 实践。

但随着总数据量的增大,从核心节点初始化复制数据是一个相对繁重的操作,所以复制节点的自动伸缩策略不也能太过于激进。

高可用搭建

搭建3节点高可用集群

  • 如果有多台服务器,可以直接rpm或者二进制直接安装运行,如果资源受限,可以用docker部署,本次实验通过多台服务器验证
  • 同一个集群, cookie = “emqxsecretcookie”必须一致,否则后期加入不了集群
wget https://www.emqx.com/zh/downloads/broker/5.7.2/emqx-5.7.2-el7-amd64.tar.gz
mkdir -pv /usr/local/emqx01..03
tar xf emqx-5.7.2-el7-amd64.tar.gz -C /usr/local/emqx01..03

#启动(修改node.name根据实际情况配置即可)
cd /usr/local/emqx01..03
./bin/emqx start

#因为默认集群组成方式为manual,所以实例启动后,需要执行cluster join
例如固定第一台emqx01,emqx02/03实例可以执行join emqx01即可,命令如下:
emqx02: emqx ctl cluster join emqx01
emqx03: emqx ctl cluster join emqx01

扩容

  • node.role可选参数:core/replicant,默认值为core

扩容core节点

wget https://www.emqx.com/zh/downloads/broker/5.7.2/emqx-5.7.2-el7-amd64.tar.gz
mkdir -pv /usr/local/emqx04
tar xf emqx-5.7.2-el7-amd64.tar.gz -C /usr/local/emqx04

#启动(修改node.name)
cd /usr/local/emqx04
./bin/emqx start

#因为默认集群组成方式为manual,所以实例启动后,需要执行cluster join
例如固定第一台emqx01,其他实例可以执行join emqx01即可,命令如下:
emqx04: emqx ctl cluster join emqx01

扩容replicant节点

wget https://www.emqx.com/zh/downloads/broker/5.7.2/emqx-5.7.2-el7-amd64.tar.gz
mkdir -pv /usr/local/emqx05
tar xf emqx-5.7.2-el7-amd64.tar.gz -C /usr/local/emqx05

#启动(修改node.name和node.role为replicant)
cd /usr/local/emqx05
./bin/emqx start

#因为默认集群组成方式为manual,所以实例启动后,需要执行cluster join
例如固定第一台emqx01,其他实例可以执行join emqx01即可,命令如下:
emqx05: emqx ctl cluster join emqx01

集群总览

如何利用 EMQX 构建高可用的 MQTT 消息服务器?详细步骤与最佳实践!

缩容

  • 节点疏散和集群负载重平衡是 EMQX 企业版功能,开源版本缩容就没有节点疏散和集群负载重平衡的操作

缩容core节点

#缩容emqx04,在emqx04上执行如下命令
emqx04: emqx ctl cluster leave
#当节点出现故障或无响应时,可以通过如下命令执行强制剔除
emqx04: emqx ctl cluster force-leave

缩容replicant节点

  • 从前端负载均衡剔除后,关闭replicant节点后,集群元数据就已经清除,勿需再做其他操作

集群总览

如何利用 EMQX 构建高可用的 MQTT 消息服务器?详细步骤与最佳实践!

测试

#安装压测工具
curl -LO https://www.emqx.com/en/downloads/MQTTX/v1.10.1/mqttx-cli-linux-x64
mv mqttx-cli-linux-x64 /usr/local/bin/mqttx
chmod +x /usr/local/bin/mqttx
如何利用 EMQX 构建高可用的 MQTT 消息服务器?详细步骤与最佳实践!

nginx实现高可用代理

代理tcp


stream {
  upstream mqtt_servers {
    least_conn;
    server ip1:1883 max_fails=2 fail_timeout=10s;
    server ip2:1883 max_fails=2 fail_timeout=10s;
    server ip3:1883 max_fails=2 fail_timeout=10s;    
  }

  server {
    listen 1883;
    proxy_pass mqtt_servers;
    proxy_protocol off;
    proxy_connect_timeout 10s;
    proxy_timeout 1800s;
    proxy_buffer_size 3M;
    tcp_nodelay on;
  }
}

连接地址:nginxip:1883

代理websocket

upstream mqtt_websocket_servers {
  server ip1:8083;
  server ip2:8083;
  server ip3:8083;  
}

server {
  listen 80;
  server_name emqx.test.com;

  location /mqtt {
    proxy_pass http://mqtt_websocket_servers;

    proxy_http_version 1.1;
    proxy_set_header Upgrade $http_upgrade;
    proxy_set_header Connection "Upgrade";

    # 禁用缓存
    proxy_buffering off;

    proxy_connect_timeout 10s;
    # WebSocket 连接有效时间
    # 在该时间内没有数据交互的话 WebSocket 连接会自动断开,默认为 60s
    proxy_send_timeout 3600s;
    proxy_read_timeout 3600s;

    # 反向代理真实 IP
    proxy_set_header Host $host;
    proxy_set_header X-Real-IP $remote_addr;
    proxy_set_header REMOTE-HOST $remote_addr;
    proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
  }
}

websocket连接地址:ws://emqx.test.com/mqtt

拓展

数据集成

  • 可以通过webhook开发,将消息流转存来用于后期排查bug和分析

webhook开发

#安装flask
pip install flask

#webhook.py脚本
from flask import Flask, json, request

api = Flask(__name__)

@api.route('/', methods=['POST'])
def print_messages():
  reply= {"result": "ok", "message": "success"}
  print("got post request: ", request.get_data())
  return json.dumps(reply), 200

if __name__ == '__main__':
  api.run(host='0.0.0.0', port=5000)

配置

如何利用 EMQX 构建高可用的 MQTT 消息服务器?详细步骤与最佳实践!

测试

如何利用 EMQX 构建高可用的 MQTT 消息服务器?详细步骤与最佳实践!

延展阅读:

如何利用KtConnect简化Kubernetes开发调试?

CAP理论与Raft协议如何在分布式系统中确保一致性和可用性?

如何实现高稳定性编码?深入探索防御性编码与关键技术策略!

如何在Nginx上优化TLS配置以平衡并提升安全性与连接性能?

MongoDB Change Streams可以应用在哪些场景?它有哪些局限性?

咨询方案 获取更多方案详情                        
(0)
研发专家-曾曾研发专家-曾曾
上一篇 2024年8月6日 下午2:38
下一篇 2024年8月23日 下午3:13

相关推荐