1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136
| import paho.mqtt.client as mqtt_client import json import random import base64
broker = 'xxxxxxx' port = xxxx topic = "file_transfer" client_id = f'python-mqtt-{random.randint(0, 100)}' username = 'test' password = '123'
def connect_mqtt() -> mqtt_client: def on_connect(client, userdata, flags, rc): if rc == 0: print("Connected to MQTT Broker!") else: print("Failed to connect, return code %d\n", rc) client = mqtt_client.Client(client_id) client.username_pw_set(username, password) client.on_connect = on_connect client.connect(broker, port) return client
def on_message(client, userdata, message): payload = message.payload.decode() print(f"Received message '{payload}' on topic '{message.topic}'") if message.topic == topic + "/info": file_info = json.loads(payload) filename = file_info["filename"] filesize = file_info["filesize"] print(f"Received file info: {filename} of size {filesize} bytes") token1 = client.publish(topic + "/ack", "ACK", qos=1) token1.wait_for_publish() if message.topic == topic + "/data": print(f"Received file data: {filename} of size {filesize} bytes") with open("received.txt", "wb") as f: f.write(message.payload) print("Image saved") token2 = client.publish(topic + "/ack", "ACK", qos=1) token2.wait_for_publish()
def run(): client = connect_mqtt() info_topic = topic + "/info" data_topic = topic + "/data" client.subscribe([(info_topic, 2), (data_topic, 2)]) client.on_message = on_message client.loop_forever()
if __name__ == '__main__': run()
from paho.mqtt import client as mqtt_client import os import random import json import time import base64
broker = 'xxxxxxx' port = xxxx topic = "file_transfer" client_id = f'python-mqtt-{random.randint(0, 100)}' username = 'test' password = '123'
received_ack = False
def connect_mqtt() -> mqtt_client: def on_connect(client, userdata, flags, rc): if rc == 0: print("Connected to MQTT Broker!") else: print("Failed to connect, return code %d\n", rc) client = mqtt_client.Client(client_id) client.username_pw_set(username, password) client.on_connect = on_connect client.connect(broker, port) return client
def on_publish(client, userdata, mid): print(f"data published with mid {mid}\n")
def on_message(client, userdata, message): global received_ack payload = message.payload.decode() print(f"Received message '{payload}' on topic '{message.topic}'") if payload == "ACK": received_ack = True
def run(): client = connect_mqtt() ack_topic = topic + "/ack" client.subscribe(ack_topic) client.on_message = on_message client.loop_start() filename = "文件.txt" filesize = os.path.getsize(filename) file_info = {"filename": filename, "filesize": filesize} info_json = json.dumps(file_info) token1 = client.publish(topic + "/info", info_json, qos=1) token1.wait_for_publish() with open(filename, "rb") as f: filedata = f.read() time.sleep(5) token2 = client.publish(topic + "/data", filedata, qos=1) token2.wait_for_publish() client.on_publish = on_publish print(f"Sent {filename} of size {filesize} bytes") timeout = 100 while not received_ack and timeout > 0: print(f"Waiting for ACK...{timeout}s left") time.sleep(1) timeout -= 1 if received_ack: print("Received ACK from subscriber. Exiting...") client.loop_stop() client.disconnect() else: print("Timeout. No ACK from subscriber. Exiting...") client.loop_stop() client.disconnect()
if __name__ == '__main__': run()
|