有几个要点需要提前说明,以 python 为例,一段程序最有新意、值得学习的地方在于它的逻辑安排和它的语法结构,而不是它实现了什么功能。功能的实现无非是函数指令的调用,关键在于各个指令如何通过语法进行搭配,使其能够实现更流畅的综合功能。此处用类定义的结构写,主要目的是为以后其他功能的加入服务,文件传输只是项目功能实现的一块,因此用类将各功能块进行区分是最为有效的。
在结构上使用了 python 常用的文件读写结构 with open() as f: 。 ‘rb’ 这部分是先关参数,用于表明读写的格式以及模式,有很多种参数形式,这里是以二进制格式只读文件。f则有很多方法属性,用于返回文件的内容或定义编码等,详细内容可参照链接。另外还有回调函数 的调用,这是两个需要学习的点,功能整体并不复杂,因此不需要通过高级语法实现。
文件传输(订阅端) 类的使用能够在很大程度上让代码变的清晰易懂,好的类语法代码能够把各个功能分割成块,不会因为参数问题造成整个逻辑的混乱。之前做上位机程序第一次接触类语法,理解用了好几天,就算是现在再次使用也是勉强掌握,之后需要找时间专门写博客进行回顾,这一部分的熟练掌握是必要的。过程中还用到了一些 python 文件处理指令,这同样需要进行总结回顾,文件操作不论是在何种功能下都是必须的。以下内容为订阅端各个功能块的简析:
库导入及初始化 先划定类名,确定好类名之后在其他程序可以直接继承,具体内容参照类语法定义。初始化的部分相当于一个基本设置,后面功能中需要频繁使用的、固定的、属性类的东西可以在初始化的部分进行设置,以便后续使用。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 import paho.mqtt.client as mqttimport randomimport jsonimport osclass Local_Sub : def __init__ (self ): self.broker = 'xxxxx' self.port = xxxxx self.client_id = f'python-mqtt-{random.randint(0 , 100 )} ' self.topic = 'file_transfer' self.qos = 2 self.retain = True print (f"主题:{self.topic} ,质量:{self.qos} " ) self.path = os.getcwd() self.dir_path = os.path.join(self.path, 'savepng' )
mqtt连接函数 连接功能和之前实现信息通讯时的连接函数没有太大区别,在敲代码的过程中发现回调函数真的是一个很好用的函数,定义好以后放在那边,特定事件一出现它就开始响应,这种功能的应用范围实在太广了。但是追根朔源应该是涉及到 C 语言的指针问题,因此还没有深入了解学习。
1 2 3 4 5 6 7 8 9 10 11 def connect_mqtt (self ) -> mqtt: def on_connect (client, userdata, flags, rc ): if rc == 0 : print ("Connected to MQTT Broker!" ) else : print ("Failed to connect, return code %d\n" , rc) self.client = mqtt.Client(self.client_id) self.client.on_connect = on_connect self.client.username_pw_set('test' , '123' ) self.client.connect(self.broker, self.port, 60 ) return self.client
接收回调函数(核心) 接收回调函数,需要实现文件的接收、存取,同时还要给发布端发送确认信息,是整个代码功能的核心体现。这部分功能划成了三个部分:文件信息、图像文件以及 txt 文件的接收,同时它们各自有着自己的反馈信息。 这里写的其实还是有些笨重的,只针对项目要实现的两种形式文件传输做了对应的块,要改进的话可以再加文件类型判断指令,将得到的文件直接对接解码和命名,这样不论是什么格式的文件都可以完成保存。
还有一个问题是反馈信息的发送,还处于最开始单个文件的确认阶段,也就是说每接收一个文件就要发送确认的信息,此处代码已经实现了多个文件的传输,那么就需要在确认所有文件收到后再发送确认信息。这种功能的实现需要发布端发送所有文件后传出完结信息,再根据完结信息作出反馈,这一部分实现我觉得可以模仿三次握手和四次挥手 ,毕竟 IP/TCP 协议也是 mqtt 的基础。
回到代码本身,这段代码更多的还是围绕文件处理指令打转,包括文本转换处理的任务,真正用到传输的地方其实很少。换句话说,mqtt 本身的指令并不多,无非是连接、订阅、发布以及回调函数的循环调用,关键还是在于与你要实现的功能进行逻辑上的搭配。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 def on_message (self, client, userdata, msg ): self.payload = msg.payload print (f"Received message on topic '{msg.topic} '" ) if msg.topic == self.topic + "/info" : file_info = json.loads(self.payload) self.filename = file_info["filename" ] filesize = file_info["filesize" ] print (f"Received file info: {self.filename} of size {filesize} bytes" ) token1 = client.publish(self.topic + "/ack1" , "ACK" , qos=1 ) print ('ack1 send' ) if msg.topic == self.topic + "/data/png" : with open (f"{self.dir_path} /new{self.filename} " , "wb" ) as f: f.write(msg.payload) print ("file saved" ) token2 = client.publish(self.topic + "/ack2" , "ACK" , qos=1 ) print ('ack2 send' ) if msg.topic == self.topic + "/data/txt" : with open (f"{self.dir_path} /new{self.filename} " , "wb" ) as f: f.write(msg.payload) print ("file saved" ) token2 = client.publish(self.topic + "/ack2" , "ACK" , qos=1 ) print ('ack2 send' )
主程序段 主程序段需要保证以上几块功能先后顺序不要错,同时我将确认主题放到了主程序段进行处理,作为订阅端要一直保持连接。
1 2 3 4 5 6 7 8 9 10 11 12 def Main (self ): client = self.connect_mqtt() self.client.on_message = self.on_message info_topic = self.topic + "/info" data_topic1 = self.topic + "/data/png" data_topic2 = self.topic + "/data/txt" client.subscribe([(info_topic, 2 ), (data_topic1, 2 ), (data_topic2, 2 )]) self.client.loop_forever() if __name__ == '__main__' : sub = Local_Sub() sub.Main()
文件传输(发布端) 发布端的函数和订阅端在初始化及连接函数方面基本相同,只需要添加用于判断是否收到确认信息的全局变量:
1 self.received_ack = False
文件发送函数 先获取存放数据文件的目录信息,遍历目录下每个文件,将其解码发送,这便是这部分函数的主要功能。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 def send_large_file (self ): path = os.getcwd() dir_path = os.path.join(path, 'png' ) for file in os.listdir(dir_path): self.filename = file file_path = os.path.join(dir_path, file) self.filesize = os.path.getsize(file_path) file_info = {"filename" : self.filename, "filesize" : self.filesize} info_json = json.dumps(file_info) token1 = self.client.publish(self.topic + "/info" , info_json, qos=2 ) token1.wait_for_publish() with open (file_path, "rb" ) as f: filedata = f.read() token2 = self.client.publish(self.topic + "/data/png" , filedata, qos=2 )
等待函数 我通过定义三个函数进行了实现,这部分在后续可能会增删,因此尽可能避免混乱。三个函数分别用于设置主题、消息回调以及等待结束。主题设置不用多说,和订阅端相同;消息回调用于处理从订阅端返回来的消息,根据对应情况进行处理;等待结束有 100s 的延时,用于等待最终到来的结束传输过程的命令,后续可能因为文件的增多增加等待时间。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 def check_sub (self ): ack_topic1 = self.topic + "/ack1" ack_topic2 = self.topic + "/ack2" self.client.subscribe([(ack_topic1, 1 ), (ack_topic2, 1 )]) def on_message (self, client, userdata, msg ): self.payload = msg.payload.decode() print (f"Received message on topic '{msg.topic} '" ) if self.payload == "ACK" : print ("Received ACK from subscriber." ) if self.payload == "ACK" and msg.topic == self.topic + "/ack1" : print ("Received ACK1 from subscriber." ) if self.payload == "ACK" and msg.topic == self.topic + "/ack2" : self.received_ack = True def wait (self ): timeout = 100 while not self.received_ack and timeout > 0 : print (f"Waiting for ACK...{timeout} s left" ) time.sleep(1 ) timeout -= 1 if self.received_ack: print ("Received ACK from subscriber. Exiting..." ) self.client.loop_stop() self.client.disconnect() else : print ("Timeout. No ACK from subscriber. Exiting..." ) self.client.loop_stop() self.client.disconnect()
整体程序 这仅是项目整体流程实现的一部分代码,很多优化工作还没有去搞,很多功能拓展也没来得及做。但是用于了解 mqtt 基本传输流程已经足够了,mqtt 是轻量级的传输协议,通过它这种一对多的特性(订阅端可以有很多,并不是只有一个)可以实现很多有意思的功能,比如远程开关灯、远程控制机械等等。在文件传输方面虽然只能承载 200MB 以下的文件,但我想用于日常应用以及足够,毕竟它的特性在于一对多实现,而不是大负载传输。总之,希望之后还能在其它地方应用到这个协议。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 import timeimport paho.mqtt.client as mqttimport osimport randomimport jsonclass Local_Pub : def __init__ (self ): self.broker = 'xxxxx' self.port = xxxxx self.client_id = f'python-mqtt-{random.randint(0 , 100 )} ' self.topic='file_transfer' self.qos=1 self.retain=True self.received_ack = False print (f"主题:{self.topic} ,质量:{self.qos} " ) def connect_mqtt (self ) -> mqtt: def on_connect (client, userdata, flags, rc ): if rc == 0 : print ("Connected to MQTT Broker!" ) else : print ("Failed to connect, return code %d\n" , rc) self.client = mqtt.Client(self.client_id) self.client.on_connect = on_connect self.client.username_pw_set('test' , '123' ) self.client.connect(self.broker, self.port, 60 ) return self.client def on_publish (self, client, userdata, mid ): print (f"data published with mid {mid} \n" ) def on_message (self, client, userdata, msg ): self.payload = msg.payload.decode() print (f"Received message on topic '{msg.topic} '" ) if self.payload == "ACK" : print ("Received ACK from subscriber." ) if self.payload == "ACK" and msg.topic == self.topic + "/ack1" : print ("Received ACK1 from subscriber." ) if self.payload == "ACK" and msg.topic == self.topic + "/ack2" : self.received_ack = True def check_sub (self ): ack_topic1 = self.topic + "/ack1" ack_topic2 = self.topic + "/ack2" self.client.subscribe([(ack_topic1, 1 ), (ack_topic2, 1 )]) def send_large_file (self ): path = os.getcwd() dir_path = os.path.join(path, 'png' ) for file in os.listdir(dir_path): self.filename = file file_path = os.path.join(dir_path, file) self.filesize = os.path.getsize(file_path) file_info = {"filename" : self.filename, "filesize" : self.filesize} info_json = json.dumps(file_info) token1 = self.client.publish(self.topic + "/info" , info_json, qos=2 ) token1.wait_for_publish() with open (file_path, "rb" ) as f: filedata = f.read() token2 = self.client.publish(self.topic + "/data/png" , filedata, qos=2 ) def wait (self ): timeout = 100 while not self.received_ack and timeout > 0 : print (f"Waiting for ACK...{timeout} s left" ) time.sleep(1 ) timeout -= 1 if self.received_ack: print ("Received ACK from subscriber. Exiting..." ) self.client.loop_stop() self.client.disconnect() else : print ("Timeout. No ACK from subscriber. Exiting..." ) self.client.loop_stop() self.client.disconnect() def Main (self ): self.client = self.connect_mqtt() self.client.on_message = self.on_message self.check_sub() self.client.loop_start() self.send_large_file() self.client.on_publish = self.on_publish print (f"Sent {self.filename} of size {self.filesize} bytes" ) self.wait() if __name__ == '__main__' : pub = Local_Pub() pub.Main()
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 import paho.mqtt.client as mqttimport randomimport jsonimport osclass Local_Sub : def __init__ (self ): self.broker = '8.130.8.222' self.port = 31883 self.client_id = f'python-mqtt-{random.randint(0 , 100 )} ' self.topic = 'file_transfer' self.qos = 2 self.retain = True print (f"主题:{self.topic} ,质量:{self.qos} " ) self.path = os.getcwd() self.dir_path = os.path.join(self.path, 'savepng' ) def connect_mqtt (self ) -> mqtt: def on_connect (client, userdata, flags, rc ): if rc == 0 : print ("Connected to MQTT Broker!" ) else : print ("Failed to connect, return code %d\n" , rc) self.client = mqtt.Client(self.client_id) self.client.on_connect = on_connect self.client.username_pw_set('test' , '123' ) self.client.connect(self.broker, self.port, 60 ) return self.client def on_message (self, client, userdata, msg ): self.payload = msg.payload print (f"Received message on topic '{msg.topic} '" ) if msg.topic == self.topic + "/info" : file_info = json.loads(self.payload) self.filename = file_info["filename" ] filesize = file_info["filesize" ] print (f"Received file info: {self.filename} of size {filesize} bytes" ) token1 = client.publish(self.topic + "/ack1" , "ACK" , qos=1 ) print ('ack1 send' ) if msg.topic == self.topic + "/data/png" : with open (f"{self.dir_path} /new{self.filename} " , "wb" ) as f: f.write(msg.payload) print ("file saved" ) token2 = client.publish(self.topic + "/ack2" , "ACK" , qos=1 ) print ('ack2 send' ) if msg.topic == self.topic + "/data/txt" : with open (f"{self.dir_path} /new{self.filename} " , "wb" ) as f: f.write(msg.payload) print ("file saved" ) token2 = client.publish(self.topic + "/ack2" , "ACK" , qos=1 ) print ('ack2 send' ) def Main (self ): client = self.connect_mqtt() self.client.on_message = self.on_message info_topic = self.topic + "/info" data_topic1 = self.topic + "/data/png" data_topic2 = self.topic + "/data/txt" client.subscribe([(info_topic, 2 ), (data_topic1, 2 ), (data_topic2, 2 )]) self.client.loop_forever() if __name__ == '__main__' : sub = Local_Sub() sub.Main()