From 494e6b4c7c9538e855c001f3b08b83b5bfea31a4 Mon Sep 17 00:00:00 2001 From: Kirubakaran Date: Sat, 13 Sep 2025 02:42:20 +0530 Subject: [PATCH] chore: added the instant thread for new station --- backend/main.py | 77 +++++++++++++++++++++++++++++++++++++------------ 1 file changed, 58 insertions(+), 19 deletions(-) diff --git a/backend/main.py b/backend/main.py index f933d25..c49f813 100644 --- a/backend/main.py +++ b/backend/main.py @@ -200,9 +200,9 @@ def add_station(): ) db.session.add(new_station) db.session.commit() - - # You might want to start the new MQTT client here as well - # start_single_mqtt_client(new_station) + + # Immediately start the MQTT client for the station just created. + start_single_mqtt_client(new_station) return jsonify({"message": "Station added successfully."}), 201 @@ -424,33 +424,72 @@ def handle_rpc_request(payload): print(f"Publishing to {topic}") mqtt_client.client.publish(topic, serialized_payload) +# ADD THIS NEW FUNCTION +def start_single_mqtt_client(station): + """ + Creates and starts a new MQTT client thread for a SINGLE station. + This is our new reusable function. + """ + if station.station_id in mqtt_clients and mqtt_clients[station.station_id].is_connected: + print(f"MQTT client for {station.station_id} is already running.") + return + + print(f"Creating and starting MQTT client for station: {station.name} ({station.station_id})") + + client = MqttClient( + broker=station.mqtt_broker, + port=station.mqtt_port, + user=station.mqtt_user, + password=station.mqtt_password, + station_id=station.station_id, + on_message_callback=on_message_handler + ) + client.start() # The start method should handle threading + mqtt_clients[station.station_id] = client + # --- Main Application Logic --- +# def start_mqtt_clients(): +# """ +# Initializes and starts an MQTT client for each station found in the database, +# using the specific MQTT credentials stored for each station. +# """ +# try: +# with app.app_context(): +# stations = Station.query.all() +# except Exception as e: +# print(f"CRITICAL: Could not query stations from the database in MQTT thread: {e}") +# return + +# for station in stations: +# if station.station_id not in mqtt_clients: +# print(f"Creating and starting MQTT client for station: {station.name} ({station.station_id})") + +# client = MqttClient( +# broker=station.mqtt_broker, +# port=station.mqtt_port, +# user=station.mqtt_user, +# password=station.mqtt_password, +# station_id=station.station_id, +# on_message_callback=on_message_handler +# ) +# client.start() +# mqtt_clients[station.station_id] = client + def start_mqtt_clients(): """ - Initializes and starts an MQTT client for each station found in the database, - using the specific MQTT credentials stored for each station. + Initializes and starts an MQTT client for each station found in the database + by calling our new reusable function. """ try: with app.app_context(): stations = Station.query.all() + print(f"Found {len(stations)} existing stations to monitor.") except Exception as e: - print(f"CRITICAL: Could not query stations from the database in MQTT thread: {e}") + print(f"CRITICAL: Could not query stations from the database: {e}") return for station in stations: - if station.station_id not in mqtt_clients: - print(f"Creating and starting MQTT client for station: {station.name} ({station.station_id})") - - client = MqttClient( - broker=station.mqtt_broker, - port=station.mqtt_port, - user=station.mqtt_user, - password=station.mqtt_password, - station_id=station.station_id, - on_message_callback=on_message_handler - ) - client.start() - mqtt_clients[station.station_id] = client + start_single_mqtt_client(station) if __name__ == '__main__': try: