| import paho.mqtt.client as mqtt_client import json import random import base64
broker = 'xxxxxxx' port = xxxx topic = "file_transfer" client_id = f'python-mqtt-{random.randint(0, 100)}' username = 'test' password = '123'
def connect_mqtt() -> mqtt_client: def on_connect(client, userdata, flags, rc): if rc == 0: print("Connected to MQTT Broker!") else: print("Failed to connect, return code %d\n", rc) client = mqtt_client.Client(client_id) client.username_pw_set(username, password) client.on_connect = on_connect client.connect(broker, port) return client
def on_message(client, userdata, message): payload = message.payload.decode() print(f"Received message '{payload}' on topic '{message.topic}'") if message.topic == topic + "/info": file_info = json.loads(payload) filename = file_info["filename"] filesize = file_info["filesize"] print(f"Received file info: {filename} of size {filesize} bytes") token1 = client.publish(topic + "/ack", "ACK", qos=1) token1.wait_for_publish() if message.topic == topic + "/data": print(f"Received file data: {filename} of size {filesize} bytes") with open("received.txt", "wb") as f: f.write(message.payload) print("Image saved") token2 = client.publish(topic + "/ack", "ACK", qos=1) token2.wait_for_publish()
def run(): client = connect_mqtt() info_topic = topic + "/info" data_topic = topic + "/data" client.subscribe([(info_topic, 2), (data_topic, 2)]) client.on_message = on_message client.loop_forever()
if __name__ == '__main__': run()
from paho.mqtt import client as mqtt_client import os import random import json import time import base64
broker = 'xxxxxxx' port = xxxx topic = "file_transfer" client_id = f'python-mqtt-{random.randint(0, 100)}' username = 'test' password = '123'
received_ack = False
def connect_mqtt() -> mqtt_client: def on_connect(client, userdata, flags, rc): if rc == 0: print("Connected to MQTT Broker!") else: print("Failed to connect, return code %d\n", rc) client = mqtt_client.Client(client_id) client.username_pw_set(username, password) client.on_connect = on_connect client.connect(broker, port) return client
def on_publish(client, userdata, mid): print(f"data published with mid {mid}\n")
def on_message(client, userdata, message): global received_ack payload = message.payload.decode() print(f"Received message '{payload}' on topic '{message.topic}'") if payload == "ACK": received_ack = True
def run(): client = connect_mqtt() ack_topic = topic + "/ack" client.subscribe(ack_topic) client.on_message = on_message client.loop_start() filename = "文件.txt" filesize = os.path.getsize(filename) file_info = {"filename": filename, "filesize": filesize} info_json = json.dumps(file_info) token1 = client.publish(topic + "/info", info_json, qos=1) token1.wait_for_publish() with open(filename, "rb") as f: filedata = f.read() time.sleep(5) token2 = client.publish(topic + "/data", filedata, qos=1) token2.wait_for_publish() client.on_publish = on_publish print(f"Sent {filename} of size {filesize} bytes") timeout = 100 while not received_ack and timeout > 0: print(f"Waiting for ACK...{timeout}s left") time.sleep(1) timeout -= 1 if received_ack: print("Received ACK from subscriber. Exiting...") client.loop_stop() client.disconnect() else: print("Timeout. No ACK from subscriber. Exiting...") client.loop_stop() client.disconnect()
if __name__ == '__main__': run()