零号机之后的反思

零号机的回顾

在零号机中,我们实现了消息系统的基本能力 – 消息发送

这个过程也引发了很多思考,例如,为什么我们的接口设计使用的是 xxx/xxx 这种格式,它有什么可解释的含义?为什么不是 xxx.xxx 这种?是否有 通道 这个隐藏概念在里面? 一系列的问题和思考我在 零号机 中都有罗列。

在这些问题中,我觉得最值得思考的问题是 “消息系统的交互模型是怎样的?”, 这个问题是之后一切代码设计的基础。

如果我们认为,消息系统 和 通常写的 以 http 为典型的业务系统没什么差别,不过都是 发送请求、检查参数、执行逻辑 ,只不过消息系统为了能及时反馈,拥有 ws 这类长连接而已。 那么,代码的设计就很容易变成 零号机 中的这种模式: 在 ws 之上,封装了 请求路由,甚至可以认为,ws 只是一种实现实时通知的方式而已,稍微封装一下就可以变成基于 任何传输层协议 之上的消息系统了,比如基于 http 轮询、http2、tcp、quic 等等……

从 rpc 框架中找思路

如果我们走到了这一步,很可能这套框架就逐步走上了 一套 rpc 框架 的道路。如果看过鸟窝大佬的 rpcx 框架,就会发现基本就是这样的思路。

那么,直接用一套 rpc 框架,能不能比较好地满足消息系统的需求呢?这需要继续分析。

现代的 rpc 框架,基本都会支持两种通信模型: ping/pong 和 stream。

前者,就是我们最常使用的 发送一个请求,等待一个响应 的模式,不论是 http 请求 还是 常用的 grpc 的请求,都是此类。 这种模型最大的特点就是: 简单可控、易理解。

理解起来就是:我(客户端)向你(服务端)发了一个指令(请求),你就一定会给我一个答案,不给我就等着,还是不给我可能会重发一次,还是不给我就认为你出问题了,然后就走了(也算是得到一个结果: 服务端挂了)。

这种模式在代码的逻辑上,都是 同步 的(和语言实现的同步异步无关,仅指逻辑层面),一定是 发了请求就等着,等到结果就处理之后的逻辑

后者,stream,中文名 ,从通信模型的角度可以分成 3 种类型,发送端流、返回端流、双向流,分别对应着 ① 我可以发很多次,但只会收到一次回复 ② 我只会发一次,但会收到很多次回复 ③ 发送端和返回端都可以持续发。 其中,双向流最为典型,也是和我们场景最相符的,因此,要理解清楚这种模式的特征。

在双向流中,同样可以分为 多个消息,发送的消息可以是 独立的,也可能是 有关联关系的发送的消息接收到的消息,有可能是独立的,也有可能是有关联关系的。那么这里会存在这几个问题:

  • 如果是独立的,应该怎样处理这条消息
  • 如果是关联的,应该怎样标识关联?又怎样处理?

独立的消息,在消息系统中就像是收到一个外部的偶然消息一样。
关联消息,一个场景是:在消息系统中 我发了一个消息,并且我需要这条消息的处理状态。
关联消息,另一个场景是: 在消息系统中,我发了一条消息 1,我又发了一条消息 2,其中,如果消息 1 没有成功,不能发送消息 2。

因此,消息系统的框架需要处理这些场景。这个留待最后梳理。

从事件机制中找思路

在程序世界中, 消息事件 就像是一个人,在不同的场合换上了不同衣服。 如果我们站在 事件 的角度看消息系统,是否有不同的启发呢?

和上面的 用 rpc 的视角看消息系统 不同,事件的视角下,我们处理的不再是 请求 了,而是一个我们关心的 事件 出现了,我们需要对这个事件做出反应。

对象实体上,通常包含这样几个角色:

  • listener,事件的监听者,也就是我们的业务逻辑处理器,需要在收到事件时进行处理。
  • publisher,事件发布者,根据业务逻辑决定触发什么事件。
  • eventhub,登记着各类 listener,并且提供 发布一个事件 的能力,在收到一个事件时,把事件分发给各类 listener。

这几个只是逻辑上的角色,实现上不一定有直接的结构体来承接,但一定有承接这些职能的主体存在。

如果对前端了解一些,可能对这种模式就很熟悉了,毕竟整个前端体系都是以 事件分发 为基础的,在 nodejs 中,有现成的 events 官方库可以使用。 要创建一个事件管理器(eventhub),可以直接这么用:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
import EventEmitter from 'events';

class XXEventHub extends EventEmitter {
// …… 一些自己的业务逻辑
}

// 创建一个 hub
let hub = new XXEventHub()

// 创建一个处理器
let listener = function(e) {
// 处理逻辑
}

// 注册一个监听
hub.on("eventname1", listener)

// 触发一个事件
hub.emit("eventname1", {
// 自定义事件结构
})

// 取消一个监听
hub.off("eventname1", listener)

大概就是这么简单。

一般来说,这类事件机制在前端都是只在当前页面生效的,要跨页面,就需要使用浏览器提供的 postMessage 的机制,或者自己用 后台worker 进行一次转发。

如果希望跨机器,就没有通用的、广泛实践的、开箱即用的技术方案了。 这类需求,在协同类产品中很常见,因此,很多协同类的产品,大都使用 websocket 作为连接通道,在服务端接收消息后再进行一次分发。

这类需求往往从前端产生,很多时候也是直接由前端同学直接上手做,因此在 nodejs 体系下这类工具还是比较成熟的,用的最最最多的,就是 socket.io ,这几乎是每一个前端同学上手搞 ws 的必经之路。 这个库做的还是很不错的,sdk 从最初的 web端和 nodejs 端,逐渐扩展到很多语言生态,例如 javaC++DartPython.NETGo 等。

socket.io 是基于 engine.io 的,后者才是真正实现了 websocket 协议的库,而 socket.io 更多做的是 定义交互 api实现session实现降级 之类的。

在接口上,socket.io 和上面说的 nodejs 的 EventEmitter 几乎保持了一致,对前端同学来说非常友好,官网的例子是这样的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
// 服务器监听连接建立
io.on("connection", (socket) => {
// 服务器监听事件
socket.on("hello", (arg) => {
console.log("client send hello : ", arg); // client send hello : I am client

socket.emit("hi", "I am server")
});
});

// 客户端 监听事件
socket.on("hi", (arg) => {
console.log("server send hi : ", arg); // server send hi : I am server
});


// 客户端 发送事件
socket.emit("hello", "I am client");

回答一下关心的几个话题:

  • 是否支持重连和 session: 是
  • 是否有 通道 的概念: 是
  • 是否有 回复 的概念: 是 (有 ack,但机制不明朗)
  • 集群支持如何: 简单使用还ok,扩展性待考察

补充一点,socket.io 有admin 实现,这对于数据观测而言非常有价值

不得不说,对于一些协同类小游戏而言,socket.io 是真的神器。比如像这种游戏,一个 redis + socket.io 就搞定所有事儿了。

大数据量下,socket.io 的集群机制不大好用,这时候消息系统使用 nats、go-emitter 这类扩展性更好的消息系统。

从流式中间件找灵感

说到 消息 ,一定跑不了 消息中间件,最出名的例如 kafkarocketMQredis pub/subnatsgo-emitter 等等。
我们可以分析一下这类中间件的实现,看看他们怎么处理的。

通信协议的选择

  • websocket
  • mqtt
  • http
  • webrtc

消息中间件和消息分发框架的异同

  • 消息中间件大多会持久化数据,消息分发框架大多不会
  • 消息中间件无法在server端添加业务逻辑,消息分发框架主要就是在server端写业务逻辑
  • 其他方面似乎很相似

TODO

  • 回顾一下 sarama 的 stream 的实现。

文档直通车


Discipline is the bridge between goals and accomplishment.
Jim Rohn