单文件 MongoDB 服务器(1)

引子

是的我找回了自己的博客

起因是我的小伙伴在假期提出了一个需求:

在单个文件中存储数据,并使用 MongoDB 的方式来存取
还附上了已有开发者做的项目 TinyMongo
但上面这个项目,并不能使用现有的工具来进行连接(比如说 MongoDB Compass)
原因也很简单,项目只实现了一个 Client,并没有一个 Server 来与其他软件进行通信。

我寻思,这个问题看起来不难~~(真的吗?)~~,而且我本身也对数据库服务器这块感兴趣,便开始着手研究和编写代码。
由于读写已经由别的项目完成了,所以我只需要专注于完成 “服务器” 应该干的事情。

那么,数据库服务器应该干什么呢?
不知道,没一点头绪,该死的本科水专业课,只会让人背 SQL 语句。
教程,貌似也没有完全面向小白的。
那就撸袖子干,写到哪里算哪里,等对问题有基本认知了再来回头重构代码,学习更优秀的设计,参考开源代码。
GO,GO,先搓起来先~

注意!!!
本人对数据库是完全的小白,该系列纯粹是为本人在学习过程中的记录,包括但不限于“我寻思应该这样写”。代码可能不美观,但我相信它会随着我对这个问题的掌握逐渐越来越好了。

希望哪天做 AI 失业了还能靠学习的这个吃口饭

小目标是搭一个能跑起来的服务!不问性能

MongoDB 基于 TCP 协议来实现通信,在 Python 里实现一个 TCP 服务器也很简单,基于 socket 库即可:

Python 中的 socket 模块是用于实现网络通信的模块,提供了底层网络操作的接口,使得用户可以通过网络实现客户端和服务器之间的数据传输、连接和通信。

1
2
3
4
5
6
server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server_socket.bind((host, port))
# set maximum number of connections
server_socket.listen(5)
# log server start
print(f"Server started on {host}:{port}")

host, port 分别是地址和端口,例如host="127.0.0.1", port="27019"

MongoDB 官方文档给出了定义的通信格式 mongodb-wire-protocol 我只能说可读性很差的。
这里对它的内容进行简单的整理:

请求应包括消息头请求有效负载,而响应则包括消息头响应有效负载

需要注意:

  • 请求和响应的有效负载并不相同
  • 消息头和有效负载都是 BSON 格式的二进制数据
  • 消息的排序遵循小端序(little-endian)

而 BSON(Binary JSON)是一种二进制形式的 JSON 文档。它是 MongoDB 数据库的默认数据存储格式。

消息头

官方文档给出消息头的定义为

1
2
3
4
5
6
7
struct MsgHeader {
int32 messageLength; // total message size, including this
int32 requestID; // identifier for this message
int32 responseTo; // requestID from the original request
// (used in responses from the database)
int32 opCode; // message type
}
  • messageLength:整个消息的大小,包括消息头
  • requestID:每个请求的唯一标识符,由客户端或数据库生成
  • responseTo:如果是响应,则为请求的 requestID;如果是请求,则为 0
  • opCode:消息类型,包括请求、响应、错误、通知等

无论是请求还是响应,都要遵循该消息头来进行传输

有效负载

这个就五花八门了,现在网上大部分的资料都是基于 5.1 版本的前的,对于 opCode 中的例如 OP_QUERY, OP_INSERT, OP_QUERY 等等都有很详尽的定义,详细的可以在旧版本的操作码中找到,至于新版,换成了统一的了。

从 5.1 版本开始,OP_MSGOP_COMPRESSED 是向 Server 发送请求时唯一支持的操作码

OP_COMPRESSED 是 5.1 版本的压缩请求,用于支持压缩数据。OP_MSG 是 5.1 版本的消息请求,用于支持多文档事务。

1
2
3
4
5
6
OP_MSG {
MsgHeader header; // standard message header
uint32 flagBits; // message flags
Sections[] sections; // data sections
optional<uint32> checksum; // optional CRC-32C checksum
}

比较关键的部分在于:flagBits 标志位和 sections 消息数组:

  • flagBits: 前 16 位必须有,我个人的理解是只能设置有实际含义的二进制位,否则,解析器则必须报错,对应原文是 "The first 16 bits (0-15) are required and parsers MUST error if an unknown bit is set."
    最后 16 位是可选的,允许设置一些额外的标志位,但代理和其他消息转发器必须在转发消息之前清除任何未知的可选位。

sections 中可以包含单个的 BSON 对象,也可以包含一个文档。
文档是 MongoDB 的基本数据单元,使用 BSON 格式存储。一个文档包含了一组键值对,类似于 JSON 对象。
如: {"name": "John", "age": 30, "city": "New York"}

看完了,SO?首先作为客户端连接服务器要发什么请求?作为服务端返回的内容又包含了什么?

握手请求

我们都知道和服务器建立连接前都要先尝试“握手”,即发送一些配置信息来确定双方后续沟通的一些细节。我姑且把这样的部分成为“握手阶段”。

客户端要和 MongoDB 的服务器建立通信,同样也会先发送一些类似测试的指令,然而,官方文档关于这方面写的极其少,并且隐藏的很深(不好找),那么很简单,抓包看官方客户端和服务端如何通信的即可。
我用的抓包工具是 WireShark,具体如何抓就不赘述了,直接贴有哪些请求的结论:

isMaster

选择 MongoDB Compass 作为客户端(版本 > 5.1),连接到 MongoDB Server。连接刚开始的时候会不断地尝试发送 OP_QUERY 请求。请求内容为:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
{
"flags": 0,
"fullCollectionName": "admin.$cmd",
"numberToSkip": 0,
"numberToReturn": -1,
"query": {
"ismaster": 1,
"helloOk": True,
"client": {
"application": {
"name": "MongoDB Compass"
},
"driver": {
"name": "nodejs",
"version": "6.12.0"
},
"platform": "Node.js v20.18.1, LE",
"os": {
"name": "win32",
"architecture": "x64",
"version": "10.0.22631",
"type": "Windows_NT"
}
},
"compression": ["none"]
},
"returnFieldsSelector": None
}

这里我把请求的全部 payload 展示出来,方便查看。isMaster 请求是一个 OP_QUERY 类型的消息。
此事在官方文档中已有记载,虽然我们使用了新版本的 OP_MSG,但唯一支持的旧操作码就是这个 OP_QUERY
我本人 MongoDB 的使用经验很少,很多命令一知半解,只能大致的猜测一些字段的含义。

当收到 OP_QUERY 的 “isMaster” 请求的回复后,会接着发送一个 OP_MSG 的版本

1
2
3
4
5
6
7
8
9
10
11
12
{
'flagBits': 65536,
'sections': [{
'hello': 1,
'maxAwaitTimeMS': 10000,
'topologyVersion': {
'processId': ObjectId('679f5fe50ab84b41447995a7'),
'counter': 0
},
'$db': 'admin'
}]
}

必须要指出,topologyVersion 的 counter 的字段类型为 Long,不仔细就会当做 int 来处理了 -_-||

maxAwaitTimeMS 貌似是客户端最大会等待服务器的响应时间,(后续实验出来)一旦超过这个时间,该请求就会重发

topologyVersion 是一个我目前完全无法理解的东西,跳过。

当这个请求也被回复后,客户端发送另外一个 OP_MSG 请求:

1
2
3
4
5
6
7
8
9
10
{
'flagBits': 0,
'sections': [{
'ping': 1,
'lsid': {
'id': Binary(b '\xad\x98\xeb\xe2\xa1\xf5G\xef\xbd\xcc\xde\x01H\xaeA\xa9', 4)
},
'$db': 'admin'
}]
}

跟据官方文档的说明:

ping 命令是一个空操作,用于测试服务器是否在响应命令。即使服务器处于写锁定状态,此命令也会立即返回:
lsid 指定与该命令关联的会话的唯一 ID

至此我个人认为 isMaster 请求全部完成,至于如何响应这些请求,我会在下一章介绍。
明面上,完成这些步骤后 MongoDB Compass 上已经弹出服务器连接成功的提示框了,但实际上在你主动操作前,客户端还有请求发出。

aggregate

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
{
'flagBits': 0,
'sections': [{
'aggregate': 1,
'pipeline': [{
'$currentOp': {
'allUsers': True,
'idleConnections': False,
'truncateOps': False
}
}],
'cursor': {},
'lsid': {
'id': Binary(b '\xcb\xf3\xe4\x99\xca|H\xe7\x9eqZ\xea<k\xec\x91', 4)
},
'$db': 'admin'
}]
}

一个相当复杂的命令,官网有解释,顾名思义,是查看当然执行的命令的相关信息,更多的我暂时也不知道了。

top & buildInfo & hostInfo

1
2
3
4
5
6
7
8
9
10
{
'flagBits': 0,
'sections': [{
'top': 1,
'lsid': {
'id': Binary(b 'l\xf3\xceRD\x08I\xf9\x8bV\x14\x91\x88aDL', 4)
},
'$db': 'admin'
}]
}

其余的命令和,比如 buildInfo 和 top 类似,只不过 sections 中的 'top': 1 变成了 'buildInfo': 1

像这样的命令有还有 dbStats 和 atlasVersion,推测是获取一些服务器配置信息的命令。

getParameter

1
2
3
4
5
6
7
8
9
10
11
{
'flagBits': 0,
'sections': [{
'getParameter': 1,
'featureCompatibilityVersion': 1
'lsid': {
'id': Binary(b 'l\xf3\xceRD\x08I\xf9\x8bV\xa1\x99\x88aDL', 4)
},
'$db': 'admin'
}]
}

connectionStatus

1
2
3
4
5
6
7
8
9
10
11
{
'flagBits': 0,
'sections': [{
'connectionStatus': 1,
'showPrivileges': True
'lsid': {
'id': Binary(b 'B\xfd2\xc0\x8c\xf8I\xc2\x84\x08):|\x9e\x95y', 4)
},
'$db': 'admin'
}]
}

listDatabases

1
2
3
4
5
6
7
8
9
10
11
{
'flagBits': 0,
'sections': [{
'listDatabases': 1,
'nameOnly': True
'lsid': {
'id': Binary(b"V'\xe1\xcf\xe2mL&\xbf`\xf0\x87\x13\xfd\xc3\xe6", 4)
},
'$db': 'admin'
}]
}

最符合名字的一集,从名字就能推断出是列出现在所有的表,并且由于有 nameOnly 所以只需要列出名字。

响应

只从文档无法推测出究竟需要返回什么样子的字段,还好我们有抓包,可以以此结果来反推信息。
不过在那之前,我们要先在 Python 里完成一些基础协议数据结构的东西。

BSON

BSON(Binary JSON)是一种二进制形式的 JSON 文档。它是 MongoDB 数据库的默认数据存储格式。
如何序列化这样的数据,不需要我们来完成,Python 有 支持库 bson
我们可以调用 bson.encode() 来将 Python 的字典变为 BSON 文档,同时使用 bson.decode() 将 BSON 文档转换为 Python 字典。
需要注意的是,Python 的 整数默认为 Int32,当我们需要用到 Int64 类型时,可以考虑使用 bson.int64.Int64()

消息头

众所周知,对于 socket 获取的流数据,一般都是二进制,并且使用 socket 发送内容的时候也要用二进制。
Python 内置一个二进制数据处理的相关库 struct,通过使用 struct.pack() | struct.unpack() 可以快速将Python数据打包或者解包一个二进制数据,使用不同的“格式化字符串”可以指定打包成不同的二进制格式,比如说大小端,uint类型等等。详细的可以去查看使用文档。

要解包消息头,使用 struct 就非常的简单。

1
2
3
4
5
6
7
8
9
10
11
12
class HeadParser:

def do_decode(self, data):
# MongoDB message header
# 16 bytes in total, 4 bytes each: (message length), (request id), (response to), (op code)
header = struct.unpack("<iiii", data[:16])
return {
"message_length": header[0],
"request_id": header[1],
"response_to": header[2],
"op_code": header[3]
}

而有效负载,不同的有不同的格式,
我们先搓几个工具函数,它从二进制数据流中读取特定的 Python 数据。更具体的实现可以移步到该项目我的 Github 仓库。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
def byte2string(data, offset):
"""
Convert byte data to string.
:param data: byte data
:param offset: pointer to the start of the string
:return: pointer to the end of the string and the string
"""
full_str = ""
# cstring end with b'\0'
while data[offset] != 0:
full_str += chr(data[offset])
offset += 1
return offset+1, full_str

def byte2int32(data, offset):
"""
Convert byte data to integer.
:param data: byte data
:param offset: pointer to the start of the integer
:return: pointer to the end of the integer and the integer
"""
integer = struct.unpack("<i", data[offset:offset+4])[0]
return offset+4, integer

def byte2int64(data, offset):
"""
Convert byte data to integer.
:param data: byte data
:param offset: pointer to the start of the integer
:return: pointer to the end of the integer and the integer
"""
integer = struct.unpack("<q", data[offset:offset+8])[0]
return offset+8, integer

def byte2document(data, offset):
"""
Covert byte data to dict object in Python
:param data: byte data
:param offset: pointer to the start of the document
:return: offset and decoded document result
"""
doc_length = struct.unpack("<i", data[offset: offset+4])[0]
document = bson.decode(data[offset: offset+doc_length])
offset += doc_length
return offset, document

def byte2uint32(data, offset):
"""
Convert byte data to unsigned integer.
:param data: byte data
:param offset: pointer to the start of the integer
:return: pointer to the end of the integer and the integer
"""
integer = struct.unpack("<I", data[offset:offset+4])[0]
return offset+4, integer

def byte2sections(data, offset):
current_offset = offset
current_offset, section_size = byte2int32(data, current_offset)
section_name = byte2string(data, current_offset)
documents = []
while current_offset < offset + section_size:
current_offset, document = byte2document(data, current_offset)
documents.append(document)
res = {
section_name: documents
}
return current_offset, res

CRC 校验

不算太难,内置库 zlib 有现成的轮子调用,直接给出实现:

1
2
3
4
5
6
7
8
9
def crc32_checksum(raw_data, checksum):
"""
Calculate the CRC32 checksum of the given data and compare it with the given checksum.
:param raw_data: Total data except the checksum
:param checksum: The expected checksum
:return:
"""
computed_checksum = zlib.crc32(raw_data) & 0xFFFFFFFF
return computed_checksum == checksum

有了这些基本的认知还有工具,下一节我们来实现一个应答这些请求的服务器。里面也有很多坑

单文件 MongoDB 服务器(1)

http://cyx0706.github.io/2025/03/20/mongodb/

Author

Ctwo

Posted on

2025-03-20

Updated on

2025-05-08

Licensed under

Comments