EMQX打通终端和后端
EMQX打通终端和后端
部署EMQX
华为云开服务器,在安全组中放18083、1883端口,装ubuntu2004系统。运行以下命令安装EMQX:
curl -s https://assets.emqx.com/scripts/install-emqx-deb.sh | sudo bash
sudo apt-get install emqx启动:
sudo systemctl start emqx浏览器访问18083端口就能看到控制面板,初始账号admin,密码public。
点击最下面的图标 -> API密钥,新建一个密钥,记住KEY和SECRET,日后通过api连接EMQX都需要带上这两个参数。
大致流程
MQTT本质上是逻辑中转,后端发布一个主题,终端订阅它。一般的mqtt直接用协议连接即可,但是我使用的Hono后端不支持mqtt库,必须使用EMQX提供的api,所以后端和EMQX之间本质上依然是通过HTTP交互的。所以只需做两件事:后端通过api发布主题;终端通过mqtt订阅。
代码实现
后端
后端着重实现两个接口:display和devices,分别是发布信息、获取在线设备。
display
app.post('/api/display', async (c) => {
try {
const body = await c.req.json();
const message = body.message || '';
if (!message) {
return c.json({ ok: false, error: 'Message is required' }, 400);
}
const result = await publishMqtt('device/display', message, { qos: 1 });
if (result.success) {
return c.json({ ok: true, message: result.message });
} else {
return c.json({ ok: false, error: result.error }, 503);
}
} catch (err) {
const errorMessage = err instanceof Error ? err.message : 'Unknown error';
console.error('Error:', errorMessage);
return c.json({ ok: false, error: 'Invalid request' }, 400);
}
});核心在于发布消息这一步:
const result = await publishMqtt('device/display', message, { qos: 1 });publishMqtt 的签名如下:
export async function publishMqtt(
topic: string,
payload: string,
options: PublishOptions = { qos: 1, retain: false }
): Promise<{ success: boolean; message?: string; error?: string }>前两个参数好理解,第三个参数是发布选项,它是这样的:
interface PublishOptions {
qos?: 0 | 1 | 2;
retain?: boolean;
}其中QoS代表服务质量等级:
| QoS | 含义 | 可靠性 |
|---|---|---|
| 0 | 最多一次 | 可能丢失 |
| 1 | 至少一次 | 可能重复 |
| 2 | 仅一次 | 最可靠 |
retain 是是否保留消息。
返回一个Promise方便后续处理。实现上比较容易,按EMQX的api要求构造请求即可,核心格式如下:
const body = {
topic,
payload,
qos: options.qos || 1,
retain: options.retain || false,
};
const response = await fetch(url, {
method: 'POST',
headers: {
'Content-Type': 'application/json',
'Authorization': 'Basic ' + btoa(`${EMQX_API_KEY}:${EMQX_API_SECRET}`),
},
body: JSON.stringify(body),
});EMQX_API_KEY 和 EMQX_API_SECRET 是刚才定义的api-key,EMQX_API_URL 取 http://121.37.194.185:18083。
devices
这个接口用来配合前端显示在线状态。一样是调api:url = ${EMQX_API_URL}/api/v5/clients
终端
需要安装这两个库:
#include <PubSubClient.h>
#include <WiFiClient.h>PubSubClient.h 提供mqtt客户端的实现,比如连接broker、订阅、发布等。WiFiClient.h 用来创建TCP客户端(因为mqtt是基于TCP的)。在代码上体现也很直观:要创建 PubSubClient 实例,是需要传入 WiFiClient 对象的。
头文件所有的字段如下:
class MqttService {
private:
PubSubClient client;
WiFiClient wifiClient;
const char* mqttServer = "121.37.194.185";
const int mqttPort = 1883;
const char* deviceId = "esp32_device";
static void messageCallback(char* topic, byte* payload, unsigned int length);
public:
MqttService();
bool connect();
void maintain();
bool isConnected();
void subscribe(const char* topic);
typedef void (*MessageHandler)(const String& message);
static MessageHandler onMessage;
};messageCallback是PublicSubClient的标准消息接收回调,三个参数分别是:消息来自哪个topic、消息内容、长度。由于后续需要client.setCallback(callback);,setCallback()只接受普通函数指针,所以要用static来抹掉this参数;connect用来连接broker;maintain用来放在主循环里维持连接;isConnected是预留的检查接口;subscribe中传入topic的路径就能订阅这个topic;MessageHandler是一个函数指针类型,它指向一个消息处理函数。
消息接收回调
void MqttService::messageCallback(char* topic, byte* payload, unsigned int length) {
String message = "";
for (unsigned int i = 0; i < length; i++) {
message += (char)payload[i];
}
if (onMessage) {
onMessage(message);
}
}mqtt返回的payload是编码格式,需要先转化成字符串消息,之后调用处理函数。这个函数会写在main里(松耦合,让用户自定义)。
连接broker
bool MqttService::connect() {
if (!client.connected()) {
client.setServer(mqttServer, mqttPort);
client.setCallback(messageCallback);
String clientId = String(deviceId) + "_" + String(random(0xffff), HEX);
if (client.connect(clientId.c_str())) {
Serial.println("MQTT connected!");
subscribe("device/display");
return true;
} else {
Serial.print("MQTT connection failed, rc=");
Serial.println(client.state());
return false;
}
}
return true;
}这一步其实就是调现成的方法。
维护函数
void MqttService::maintain() {
if (!client.connected()) {
connect();
}
client.loop();
}一旦断开mqtt连接就立刻重连。
测试
写一个简单的屏幕显示服务,构造如下请求:
curl http://localhost:8787/api/display `
-Method POST `
-Body '{"message":"Hello MQTT"}' `
-ContentType "application/json"发给后端display接口后可以看到屏幕显示。
