基于python与mqtt的传输代码及其文件传输应用(下)

本篇实现了文件的传输任务,mqtt协议为轻量级协议,文件大小需要控制在200mb以下。

Posted by R on 2023-03-01

有几个要点需要提前说明,以 python 为例,一段程序最有新意、值得学习的地方在于它的逻辑安排和它的语法结构,而不是它实现了什么功能。功能的实现无非是函数指令的调用,关键在于各个指令如何通过语法进行搭配,使其能够实现更流畅的综合功能。此处用类定义的结构写,主要目的是为以后其他功能的加入服务,文件传输只是项目功能实现的一块,因此用类将各功能块进行区分是最为有效的。

在结构上使用了 python 常用的文件读写结构 with open() as f:‘rb’ 这部分是先关参数,用于表明读写的格式以及模式,有很多种参数形式,这里是以二进制格式只读文件。f则有很多方法属性,用于返回文件的内容或定义编码等,详细内容可参照链接。另外还有回调函数的调用,这是两个需要学习的点,功能整体并不复杂,因此不需要通过高级语法实现。

文件传输(订阅端)

类的使用能够在很大程度上让代码变的清晰易懂,好的类语法代码能够把各个功能分割成块,不会因为参数问题造成整个逻辑的混乱。之前做上位机程序第一次接触类语法,理解用了好几天,就算是现在再次使用也是勉强掌握,之后需要找时间专门写博客进行回顾,这一部分的熟练掌握是必要的。过程中还用到了一些 python 文件处理指令,这同样需要进行总结回顾,文件操作不论是在何种功能下都是必须的。以下内容为订阅端各个功能块的简析:

库导入及初始化

先划定类名,确定好类名之后在其他程序可以直接继承,具体内容参照类语法定义。初始化的部分相当于一个基本设置,后面功能中需要频繁使用的、固定的、属性类的东西可以在初始化的部分进行设置,以便后续使用。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
import paho.mqtt.client as mqtt
import random
import json
import os
class Local_Sub:
# 初始化端口号、订阅主题等
def __init__(self):
self.broker = 'xxxxx' # ip
self.port = xxxxx # 端口
self.client_id = f'python-mqtt-{random.randint(0, 100)}' # 用户id(随机数)
self.topic = 'file_transfer' # 订阅主题(后面出于功能需要还会有其他主题,但这里是整个类功能需要实现的主题)
self.qos = 2 # qos 等级(qos 是 mqtt 里面的概念,相当于设置信息准确到达的等级)
self.retain = True # 消息保留标志位,后续会使用
print(f"主题:{self.topic},质量:{self.qos}")
self.path = os.getcwd() # 确定当前路径
self.dir_path = os.path.join(self.path, 'savepng') #确定接收文件的存放路径(绝对路径)

mqtt连接函数

连接功能和之前实现信息通讯时的连接函数没有太大区别,在敲代码的过程中发现回调函数真的是一个很好用的函数,定义好以后放在那边,特定事件一出现它就开始响应,这种功能的应用范围实在太广了。但是追根朔源应该是涉及到 C 语言的指针问题,因此还没有深入了解学习。

1
2
3
4
5
6
7
8
9
10
11
def connect_mqtt(self) -> mqtt:
def on_connect(client, userdata, flags, rc):
if rc == 0:
print("Connected to MQTT Broker!")
else:
print("Failed to connect, return code %d\n", rc)
self.client = mqtt.Client(self.client_id) # 实例化
self.client.on_connect = on_connect
self.client.username_pw_set('test', '123')
self.client.connect(self.broker, self.port, 60)
return self.client

接收回调函数(核心)

接收回调函数,需要实现文件的接收、存取,同时还要给发布端发送确认信息,是整个代码功能的核心体现。这部分功能划成了三个部分:文件信息、图像文件以及 txt 文件的接收,同时它们各自有着自己的反馈信息。这里写的其实还是有些笨重的,只针对项目要实现的两种形式文件传输做了对应的块,要改进的话可以再加文件类型判断指令,将得到的文件直接对接解码和命名,这样不论是什么格式的文件都可以完成保存。

还有一个问题是反馈信息的发送,还处于最开始单个文件的确认阶段,也就是说每接收一个文件就要发送确认的信息,此处代码已经实现了多个文件的传输,那么就需要在确认所有文件收到后再发送确认信息。这种功能的实现需要发布端发送所有文件后传出完结信息,再根据完结信息作出反馈,这一部分实现我觉得可以模仿三次握手和四次挥手,毕竟 IP/TCP 协议也是 mqtt 的基础。

回到代码本身,这段代码更多的还是围绕文件处理指令打转,包括文本转换处理的任务,真正用到传输的地方其实很少。换句话说,mqtt 本身的指令并不多,无非是连接、订阅、发布以及回调函数的循环调用,关键还是在于与你要实现的功能进行逻辑上的搭配。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
def on_message(self, client, userdata, msg):
self.payload = msg.payload
print(f"Received message on topic '{msg.topic}'")
if msg.topic == self.topic + "/info": # 处理文件信息
file_info = json.loads(self.payload) # 文件信息转换为字符串形式,以便显示文件名
self.filename = file_info["filename"] # 字符串形式的文件信息读取
filesize = file_info["filesize"]
print(f"Received file info: {self.filename} of size {filesize} bytes")
token1 = client.publish(self.topic + "/ack1", "ACK", qos=1) # 确认收到文件信息(可有可无,但多获取点信息也不是坏事)
print('ack1 send')
if msg.topic == self.topic + "/data/png": # 处理图像文件数据
with open(f"{self.dir_path}/new{self.filename}", "wb") as f:
# 文件写操作,存储位置为初始化设置好的路径( mqtt 的文件传输只能以二进制格式处理)
f.write(msg.payload)
print("file saved")
token2 = client.publish(self.topic + "/ack2", "ACK", qos=1)
print('ack2 send')
if msg.topic == self.topic + "/data/txt": # 处理文本文件数据
with open(f"{self.dir_path}/new{self.filename}", "wb") as f:
f.write(msg.payload)
print("file saved")
token2 = client.publish(self.topic + "/ack2", "ACK", qos=1)
print('ack2 send')

主程序段

主程序段需要保证以上几块功能先后顺序不要错,同时我将确认主题放到了主程序段进行处理,作为订阅端要一直保持连接。

1
2
3
4
5
6
7
8
9
10
11
12
   def Main(self):
client = self.connect_mqtt()
self.client.on_message = self.on_message
info_topic = self.topic + "/info"
data_topic1 = self.topic + "/data/png"
data_topic2 = self.topic + "/data/txt"
client.subscribe([(info_topic, 2), (data_topic1, 2), (data_topic2, 2)])
self.client.loop_forever() # 保持连接

if __name__ == '__main__':
sub = Local_Sub()
sub.Main()

文件传输(发布端)

发布端的函数和订阅端在初始化及连接函数方面基本相同,只需要添加用于判断是否收到确认信息的全局变量:

1
self.received_ack = False

文件发送函数

先获取存放数据文件的目录信息,遍历目录下每个文件,将其解码发送,这便是这部分函数的主要功能。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
def send_large_file(self):
path = os.getcwd() # 获取当前目录
dir_path = os.path.join(path, 'png') # 拼接完整的子目录路径
for file in os.listdir(dir_path): # 遍历循环
self.filename = file
file_path = os.path.join(dir_path, file)
self.filesize = os.path.getsize(file_path)
# 先发送文件信息
file_info = {"filename": self.filename, "filesize": self.filesize}
info_json = json.dumps(file_info)
token1 = self.client.publish(self.topic + "/info", info_json, qos=2)
token1.wait_for_publish()
with open(file_path, "rb") as f:
filedata = f.read()
# 再发送文件数据
token2 = self.client.publish(self.topic + "/data/png", filedata, qos=2)

等待函数

我通过定义三个函数进行了实现,这部分在后续可能会增删,因此尽可能避免混乱。三个函数分别用于设置主题、消息回调以及等待结束。主题设置不用多说,和订阅端相同;消息回调用于处理从订阅端返回来的消息,根据对应情况进行处理;等待结束有 100s 的延时,用于等待最终到来的结束传输过程的命令,后续可能因为文件的增多增加等待时间。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
def check_sub(self):
ack_topic1 = self.topic + "/ack1"
ack_topic2 = self.topic + "/ack2"
self.client.subscribe([(ack_topic1, 1), (ack_topic2, 1)])
def on_message(self, client, userdata, msg):
self.payload = msg.payload.decode()
print(f"Received message on topic '{msg.topic}'")
if self.payload == "ACK": # 如果收到确认消息,就将标志设为True
print("Received ACK from subscriber.")
if self.payload == "ACK" and msg.topic == self.topic + "/ack1":
print("Received ACK1 from subscriber.")
if self.payload == "ACK" and msg.topic == self.topic + "/ack2": # 如果收到确认消息,就将标志设为True
self.received_ack = True
def wait(self):
# 等待收到确认消息或超时(这里设为10秒)
timeout = 100
while not self.received_ack and timeout > 0:
print(f"Waiting for ACK...{timeout}s left")
time.sleep(1) # 每隔一秒检查一次
timeout -= 1
if self.received_ack: # 如果收到确认消息,就打印成功并退出程序
print("Received ACK from subscriber. Exiting...")
self.client.loop_stop()
self.client.disconnect()
else: # 如果超时,就打印失败并退出程序
print("Timeout. No ACK from subscriber. Exiting...")
self.client.loop_stop()
self.client.disconnect()

整体程序

这仅是项目整体流程实现的一部分代码,很多优化工作还没有去搞,很多功能拓展也没来得及做。但是用于了解 mqtt 基本传输流程已经足够了,mqtt 是轻量级的传输协议,通过它这种一对多的特性(订阅端可以有很多,并不是只有一个)可以实现很多有意思的功能,比如远程开关灯、远程控制机械等等。在文件传输方面虽然只能承载 200MB 以下的文件,但我想用于日常应用以及足够,毕竟它的特性在于一对多实现,而不是大负载传输。总之,希望之后还能在其它地方应用到这个协议。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
#发布
import time
import paho.mqtt.client as mqtt
import os
import random
import json

class Local_Pub:
def __init__(self):
self.broker = 'xxxxx'
self.port = xxxxx
self.client_id = f'python-mqtt-{random.randint(0, 100)}'
self.topic='file_transfer'
self.qos=1
self.retain=True
self.received_ack = False
print(f"主题:{self.topic},质量:{self.qos}")

def connect_mqtt(self) -> mqtt:
def on_connect(client, userdata, flags, rc):
if rc == 0:
print("Connected to MQTT Broker!")
else:
print("Failed to connect, return code %d\n", rc)
self.client = mqtt.Client(self.client_id) # 实例化
self.client.on_connect = on_connect
self.client.username_pw_set('test', '123')
self.client.connect(self.broker, self.port, 60)
return self.client

def on_publish(self, client, userdata, mid):
print(f"data published with mid {mid}\n")

def on_message(self, client, userdata, msg):
self.payload = msg.payload.decode()
print(f"Received message on topic '{msg.topic}'")
if self.payload == "ACK": # 如果收到确认消息,就将标志设为True
print("Received ACK from subscriber.")
if self.payload == "ACK" and msg.topic == self.topic + "/ack1":
print("Received ACK1 from subscriber.")
if self.payload == "ACK" and msg.topic == self.topic + "/ack2": # 如果收到确认消息,就将标志设为True
self.received_ack = True

def check_sub(self):
ack_topic1 = self.topic + "/ack1"
ack_topic2 = self.topic + "/ack2"
self.client.subscribe([(ack_topic1, 1), (ack_topic2, 1)])

def send_large_file(self):
path = os.getcwd() # 获取当前目录
dir_path = os.path.join(path, 'png') # 拼接完整的子目录路径
for file in os.listdir(dir_path):
self.filename = file
file_path = os.path.join(dir_path, file)
self.filesize = os.path.getsize(file_path)
# send file info first
file_info = {"filename": self.filename, "filesize": self.filesize}
info_json = json.dumps(file_info)
token1 = self.client.publish(self.topic + "/info", info_json, qos=2)
token1.wait_for_publish()
with open(file_path, "rb") as f:
filedata = f.read()
# send file data next
token2 = self.client.publish(self.topic + "/data/png", filedata, qos=2)

def wait(self):
# 等待收到确认消息或超时(这里设为10秒)
timeout = 100
while not self.received_ack and timeout > 0:
print(f"Waiting for ACK...{timeout}s left")
time.sleep(1) # 每隔一秒检查一次
timeout -= 1
if self.received_ack: # 如果收到确认消息,就打印成功并退出程序
print("Received ACK from subscriber. Exiting...")
self.client.loop_stop()
self.client.disconnect()
else: # 如果超时,就打印失败并退出程序
print("Timeout. No ACK from subscriber. Exiting...")
self.client.loop_stop()
self.client.disconnect()

def Main(self):
self.client = self.connect_mqtt()
self.client.on_message = self.on_message
self.check_sub()
self.client.loop_start()
self.send_large_file()
self.client.on_publish = self.on_publish
print(f"Sent {self.filename} of size {self.filesize} bytes")
self.wait()

if __name__ == '__main__':
pub = Local_Pub()
pub.Main()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
# 订阅
import paho.mqtt.client as mqtt
import random
import json
import os

class Local_Sub:
# 初始化端口号、订阅主题等
def __init__(self):
self.broker = '8.130.8.222'
self.port = 31883
self.client_id = f'python-mqtt-{random.randint(0, 100)}'
self.topic = 'file_transfer'
self.qos = 2
self.retain = True
print(f"主题:{self.topic},质量:{self.qos}")
self.path = os.getcwd()
self.dir_path = os.path.join(self.path, 'savepng')
#连接函数,连接客户端并显示连接信息
def connect_mqtt(self) -> mqtt:
def on_connect(client, userdata, flags, rc):
if rc == 0:
print("Connected to MQTT Broker!")
else:
print("Failed to connect, return code %d\n", rc)
self.client = mqtt.Client(self.client_id) # 实例化
self.client.on_connect = on_connect
self.client.username_pw_set('test', '123')
self.client.connect(self.broker, self.port, 60)
return self.client
# 回调函数,用于接收、存取文件并发送确认信息
def on_message(self, client, userdata, msg):
self.payload = msg.payload
print(f"Received message on topic '{msg.topic}'")
if msg.topic == self.topic + "/info":
file_info = json.loads(self.payload)
self.filename = file_info["filename"]
filesize = file_info["filesize"]
print(f"Received file info: {self.filename} of size {filesize} bytes")
token1 = client.publish(self.topic + "/ack1", "ACK", qos=1)
print('ack1 send')
if msg.topic == self.topic + "/data/png":
# print(f"Received file data: {filename} of size {filesize} bytes")
with open(f"{self.dir_path}/new{self.filename}", "wb") as f:
f.write(msg.payload)
print("file saved")
token2 = client.publish(self.topic + "/ack2", "ACK", qos=1)
print('ack2 send')
if msg.topic == self.topic + "/data/txt":
# print(f"Received file data: {filename} of size {filesize} bytes")
with open(f"{self.dir_path}/new{self.filename}", "wb") as f:
f.write(msg.payload)
print("file saved")
token2 = client.publish(self.topic + "/ack2", "ACK", qos=1)
print('ack2 send')
# 主程序段
def Main(self):
client = self.connect_mqtt()
self.client.on_message = self.on_message
info_topic = self.topic + "/info"
data_topic1 = self.topic + "/data/png"
data_topic2 = self.topic + "/data/txt"
client.subscribe([(info_topic, 2), (data_topic1, 2), (data_topic2, 2)])
self.client.loop_forever() # 保持连接

if __name__ == '__main__':
sub = Local_Sub()
sub.Main()