背景
在物联网中,MQTT 是一个非常通用的协议,为了能更好地认识和理解 mqtt,需要实际跑起来一个 mqtt 的项目。这里将会记录实践 MQTT 的过程。
基本结构
MQTT 协议的实现过程中有 3 个角色: ① publisher (发布者) ② broker (服务器) ③ subscriber (订阅者)。
消息包含两个部分: 消息主题(topic)、消息内容 (payload)
MQTT 协议会解决: 建立连接、session、顺序、消息质量、消息加密、消息传输 这些问题,当然有些特性是可配置的,比如消息质量、消息加密等。
broker 选择
服务器实现比较多,例如 aedes、hivemq、mosquitto、volantmq、EMQX、moquette、hmq、Rabbimq + mqtt plugin 等等,更多信息可以参考 MQTT broker 选型
基于一系列原因,我选择了 EMQX 这个实现,这里是 官方 docker 镜像,这里是 中文文档。
搭建过程
broker
k8s 中可以这么操作:
1 2 3 4 5
| k run emqx --image emqx:4.4.4 --label app=emqx
k create svc loadbalancer emqx --tcp=18083:18083 --tcp=1883:1883
|
如果在 docker 中的话这么操作:
1
| docker run -d --name emqx -p 18083:18083 -p 1883:1883 emqx:4.4.4
|
当然,你也可以加些环境变量作为 emqx 的配置,具体参考:emqx 配置文件
比较重要的可以设置一下消息保留: retainer
控制台
发现了 MQTTX 这个控制台。
安装:
1
| brew install --cask mqttx
|
或者直接从 github 上下载安装包
可以先尝试从控制面板上进行 mqtt 的消息发送和接收,建立感性认识。
client 选择
使用 golang 的 eclipse 的 paho.mqtt.golang,虽然这包名很不 golang……
其他客户端可以参考 mqtt-client-sdk 。
稍微写点代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68
| func subit() error { urlStr := "mqtt://xxx.xxx.xx.xx:1883" u, err := url.Parse(urlStr) if err != nil { return err } cfg := autopaho.ClientConfig{ BrokerUrls: []*url.URL{u}, Debug: log.Default(), PahoDebug: log.Default(), KeepAlive: 10, ClientConfig: paho.ClientConfig{ PingHandler: paho.DefaultPingerWithCustomFailHandler(func(err error) { fmt.Println("ping fail ", err) }), Router: paho.NewStandardRouter(), }, }
cfg.Router.RegisterHandler("longtesttopic", func(p *paho.Publish) { fmt.Printf("on longtesttoic : %s\n", p.Payload) })
m, err := autopaho.NewConnection(context.Background(), cfg) if err != nil { fmt.Println("new connect fail ", err) return err }
go func() { for { time.Sleep(time.Second * time.Duration(rand.Intn(10)))
res, err := m.Publish(context.Background(), &paho.Publish{ Topic: "longtesttopic", QoS: 1, Properties: &paho.PublishProperties{}, Payload: []byte(fmt.Sprintf("hello world %s", time.Now().String())), }) if err != nil { log.Printf("publish fail : %s\n", err) continue }
log.Printf("publish ok : %+v", res) } }()
err = m.AwaitConnection(context.Background()) if err != nil { return err }
_, err = m.Subscribe(context.Background(), &paho.Subscribe{ Subscriptions: map[string]paho.SubscribeOptions{ "longtesttopic": {QoS: 1}, }, }) if err != nil { return err }
time.Sleep(60 * time.Second * 3)
return nil }
|
以上是一个简单的测试。
一些想法
整体来说,mqtt 的协议是很简单的,和 websocket 一样简单,但是有 消息质量
的概念,一些场景下更好用,ws 没有 topic 这些概念,适用范围更广。
和 ws 相比,mqtt broker 算是一个特色,有比较成型的项目可以用,基于 ws 的 mqtt 在一些小场景里可能还挺好用。
MQTT broker 相比于 RocketMQ 这类消息队列,其实还比较类似,都是支持大量 topic 的,这点和 kafka 的数据结构有些差别。
发现一个比较有意思的项目: DGIOT工业物联网平台
后面可以看下 golang broker 的实现 volantmq
TODO
A leader or a man of action in a crisis almost always acts subconsciously and then thinks of the reasons for his action.
— Jawaharlal Nehru