MQTT简单实践

背景

在物联网中,MQTT 是一个非常通用的协议,为了能更好地认识和理解 mqtt,需要实际跑起来一个 mqtt 的项目。这里将会记录实践 MQTT 的过程。

基本结构

MQTT 协议的实现过程中有 3 个角色: ① publisher (发布者) ② broker (服务器) ③ subscriber (订阅者)。

消息包含两个部分: 消息主题(topic)、消息内容 (payload)

MQTT 协议会解决: 建立连接、session、顺序、消息质量、消息加密、消息传输 这些问题,当然有些特性是可配置的,比如消息质量、消息加密等。

broker 选择

服务器实现比较多,例如 aedeshivemqmosquittovolantmqEMQXmoquettehmq、Rabbimq + mqtt plugin 等等,更多信息可以参考 MQTT broker 选型
基于一系列原因,我选择了 EMQX 这个实现,这里是 官方 docker 镜像,这里是 中文文档

搭建过程

broker

k8s 中可以这么操作:

1
2
3
4
5
# 运行pod
k run emqx --image emqx:4.4.4 --label app=emqx

# 创建外部访问
k create svc loadbalancer emqx --tcp=18083:18083 --tcp=1883:1883

如果在 docker 中的话这么操作:

1
docker run -d --name emqx -p 18083:18083 -p 1883:1883 emqx:4.4.4

当然,你也可以加些环境变量作为 emqx 的配置,具体参考:emqx 配置文件
比较重要的可以设置一下消息保留: retainer

控制台

发现了 MQTTX 这个控制台。
安装:

1
brew install --cask mqttx

或者直接从 github 上下载安装包

可以先尝试从控制面板上进行 mqtt 的消息发送和接收,建立感性认识。

client 选择

使用 golang 的 eclipse 的 paho.mqtt.golang,虽然这包名很不 golang……
其他客户端可以参考 mqtt-client-sdk

稍微写点代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
func subit() error {
urlStr := "mqtt://xxx.xxx.xx.xx:1883"
u, err := url.Parse(urlStr)
if err != nil {
return err
}

cfg := autopaho.ClientConfig{
BrokerUrls: []*url.URL{u},
Debug: log.Default(),
PahoDebug: log.Default(),
KeepAlive: 10,
ClientConfig: paho.ClientConfig{
PingHandler: paho.DefaultPingerWithCustomFailHandler(func(err error) {
fmt.Println("ping fail ", err)
}),
Router: paho.NewStandardRouter(),
},
}

cfg.Router.RegisterHandler("longtesttopic", func(p *paho.Publish) {
fmt.Printf("on longtesttoic : %s\n", p.Payload)
})

m, err := autopaho.NewConnection(context.Background(), cfg)
if err != nil {
fmt.Println("new connect fail ", err)
return err
}

go func() {
for {
time.Sleep(time.Second * time.Duration(rand.Intn(10)))

res, err := m.Publish(context.Background(), &paho.Publish{
Topic: "longtesttopic",
QoS: 1,
Properties: &paho.PublishProperties{},
Payload: []byte(fmt.Sprintf("hello world %s", time.Now().String())),
})
if err != nil {
log.Printf("publish fail : %s\n", err)
continue
}

log.Printf("publish ok : %+v", res)
}
}()

err = m.AwaitConnection(context.Background())
if err != nil {
return err
}

_, err = m.Subscribe(context.Background(), &paho.Subscribe{
Subscriptions: map[string]paho.SubscribeOptions{
"longtesttopic": {QoS: 1},
},
})
if err != nil {
return err
}

time.Sleep(60 * time.Second * 3)

return nil
}

以上是一个简单的测试。

一些想法

整体来说,mqtt 的协议是很简单的,和 websocket 一样简单,但是有 消息质量 的概念,一些场景下更好用,ws 没有 topic 这些概念,适用范围更广。

和 ws 相比,mqtt broker 算是一个特色,有比较成型的项目可以用,基于 ws 的 mqtt 在一些小场景里可能还挺好用。

MQTT broker 相比于 RocketMQ 这类消息队列,其实还比较类似,都是支持大量 topic 的,这点和 kafka 的数据结构有些差别。

发现一个比较有意思的项目: DGIOT工业物联网平台
后面可以看下 golang broker 的实现 volantmq

TODO

  • 参考 MQTT sdk 的设计,参考 socket.io 等的设计,设计一个 ws 下的通信框架。
  • 把 MQTT 用在物联网小项目中

A leader or a man of action in a crisis almost always acts subconsciously and then thinks of the reasons for his action.
Jawaharlal Nehru


本博客所有文章除特别声明外,均采用 CC BY-SA 4.0 协议 ,转载请注明出处!