基于python与mqtt的传输代码及其文件传输应用(上)

基于python和mqtt通讯协议的传输过程实现,目标在于实现文件传输,本篇主要解析基础代码。

Posted by R on 2023-02-28

mqtt网络传输协议基础代码解析

订阅端


外部库导入以及连接前设置

使用 python 语言进行开发,导入两个外部包,random 以及 paho.mqtt 。 random 用于随机数生成,paho.mqtt 则用于提供 MQTT 客户端类。

1
2
import random
from paho.mqtt import client as mqtt_client

连接前设置 MQTT Broker 的地址和端口号,订阅主题名称,以及客户端 id 和用户名密码

1
2
3
4
5
6
7
8
broker = 'broker.emqx.io'
port = 1883
topic = "python/mqtt"
# 用户端 id 由随机数生成
client_id = f'python-mqtt-{random.randint(0, 100)}'
# 在 broker 需要鉴权的情况下,要设置用户名密码
username = 'user'
password = 'xxxxx'

编写连接函数

此处 on_connect 为连接回调函数,客户端连接后,函数可被调用,依据rc来判断是否连接成功。(回调函数是个有意思的概念,之后专门写一篇博客来对比不同语言下的回调函数)下端函数功能为连接客户端并返回客户端配置信息,类型为 mqtt_client。

1
2
3
4
5
6
7
8
9
10
11
12
13
def connect_mqtt() -> mqtt_client:
#'->'用于类型指示,即connect_mqtt函数返回的值类型为 mqtt_client
def on_connect(client, userdata, flags, rc):
if rc == 0:
print("Connected to MQTT Broker!")
else:
print("Failed to connect, return code %d\n", rc)

client = mqtt_client.Client(client_id)
client.username_pw_set(username, password)#设置客户端密码等
client.on_connect = on_connect
client.connect(broker, port)
return client

订阅函数

此处 on_message 同样为消息回调函数,客户端从代理器收到消息后,该函数被调用,在该函数中将打印出订阅的 topic 名称以及接收到的消息内容。函数整体结构同上,回调函数的使用使得程序简洁高效。

1
2
3
4
5
6
def subscribe(client: mqtt_client):
def on_message(client, userdata, msg):
print(f"Received `{msg.payload.decode()}` from `{msg.topic}` topic")

client.subscribe(topic)
client.on_message = on_message

主程序段

只是粗略程序流程,并未实现循环的终端以及文件的传输,只适用于了解大体框架。

1
2
3
4
5
6
7
def run():
client = connect_mqtt()
subscribe(client)
client.loop_forever()

if __name__ == '__main__':
run()

消息发布端


发布函数

发布端仅发布函数和订阅端不同,连接设置以及函数部分完全一样,无需赘述。此处发布函数定义一个 while 循环语句,在循环中设置每秒调用 MQTT 客户端 publish 函数向 python/mqtt 主题发送消息。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
def publish(client):
msg_count = 0
while True:
time.sleep(1)
msg = f"messages: {msg_count}"
#上面这段程序中的f是一种新的字符串格式化方式,叫做f-string或者格式化字符串常量。在这里使用f-string来显示消息计数器的值。
result = client.publish(topic, msg)
# result: [0, 1]
status = result[0]
if status == 0:
print(f"Send `{msg}` to topic `{topic}`")
else:
print(f"Failed to send message to topic {topic}")
msg_count += 1

主程序段

1
2
3
4
5
6
7
8
def run():
client = connect_mqtt()
client.loop_start()
publish(client)


if __name__ == '__main__':
run()

以上流程可以初步实现基于mqtt协议的信息传输,实现结果如下:
消息发布
消息订阅

项目代码概览

这些程序是初步的应用,具体投入到项目中时还加入了很多其他规则,目前打算实现文件传输任务,但是始终卡在数据传输环节,暂且搁置。以下为项目代码,目的是实现基于mqtt的文件传输,还未跑通,仅供参考。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
#订阅端
import paho.mqtt.client as mqtt_client
import json
import random
import base64

broker = 'xxxxxxx' # 此处为服务器ip
port = xxxx # 端口信息
topic = "file_transfer"
client_id = f'python-mqtt-{random.randint(0, 100)}'
username = 'test'
password = '123'

def connect_mqtt() -> mqtt_client:
def on_connect(client, userdata, flags, rc):
if rc == 0:
print("Connected to MQTT Broker!")
else:
print("Failed to connect, return code %d\n", rc)
client = mqtt_client.Client(client_id)
client.username_pw_set(username, password)
client.on_connect = on_connect
client.connect(broker, port)
return client

def on_message(client, userdata, message):
payload = message.payload.decode()
print(f"Received message '{payload}' on topic '{message.topic}'")
if message.topic == topic + "/info":
file_info = json.loads(payload)
filename = file_info["filename"]
filesize = file_info["filesize"]
print(f"Received file info: {filename} of size {filesize} bytes")
token1 = client.publish(topic + "/ack", "ACK", qos=1)
token1.wait_for_publish()
if message.topic == topic + "/data":
print(f"Received file data: {filename} of size {filesize} bytes")
with open("received.txt", "wb") as f:
f.write(message.payload)
print("Image saved")
token2 = client.publish(topic + "/ack", "ACK", qos=1)
token2.wait_for_publish()

def run():
client = connect_mqtt()
info_topic = topic + "/info"
data_topic = topic + "/data"
client.subscribe([(info_topic, 2), (data_topic, 2)])
client.on_message = on_message
client.loop_forever()

if __name__ == '__main__':
run()

#发布端
from paho.mqtt import client as mqtt_client
import os
import random
import json
import time
import base64

broker = 'xxxxxxx' # 此处为服务器ip
port = xxxx # 端口信息
topic = "file_transfer"
client_id = f'python-mqtt-{random.randint(0, 100)}'
username = 'test'
password = '123'

# 定义一个全局变量,用来存储是否收到确认消息的标志
received_ack = False

def connect_mqtt() -> mqtt_client:
def on_connect(client, userdata, flags, rc):
if rc == 0:
print("Connected to MQTT Broker!")
else:
print("Failed to connect, return code %d\n", rc)
client = mqtt_client.Client(client_id)
client.username_pw_set(username, password)
client.on_connect = on_connect
client.connect(broker, port)
return client

def on_publish(client, userdata, mid):
print(f"data published with mid {mid}\n")

# 定义一个回调函数,用来处理接收到的确认消息,并修改全局变量的值
def on_message(client, userdata, message):
global received_ack # 声明全局变量
payload = message.payload.decode() # 解码消息内容
print(f"Received message '{payload}' on topic '{message.topic}'")
if payload == "ACK": # 如果收到确认消息,就将标志设为True
received_ack = True

def run():
client = connect_mqtt()
# 订阅一个专门用于传输确认消息的主题,并设置回调函数为on_message
ack_topic = topic + "/ack"
client.subscribe(ack_topic)
client.on_message = on_message
# start the network loop
client.loop_start()
filename = "文件.txt"
filesize = os.path.getsize(filename)
# send file info first
file_info = {"filename": filename, "filesize": filesize}
info_json = json.dumps(file_info)
token1 = client.publish(topic + "/info", info_json, qos=1)
token1.wait_for_publish()
with open(filename, "rb") as f:
filedata = f.read()
time.sleep(5)
# send file data next
token2 = client.publish(topic + "/data", filedata, qos=1)
token2.wait_for_publish()
# set the callback function for publish events
client.on_publish = on_publish
print(f"Sent {filename} of size {filesize} bytes")
# 等待收到确认消息或超时(这里设为10秒)
timeout = 100
while not received_ack and timeout > 0:
print(f"Waiting for ACK...{timeout}s left")
time.sleep(1) # 每隔一秒检查一次
timeout -= 1
if received_ack: # 如果收到确认消息,就打印成功并退出程序
print("Received ACK from subscriber. Exiting...")
client.loop_stop()
client.disconnect()
else: # 如果超时,就打印失败并退出程序
print("Timeout. No ACK from subscriber. Exiting...")
client.loop_stop()
client.disconnect()

if __name__ == '__main__':
run()