From 45d2bbdab8f1222bfdf0be31c6c8f5df4169f168 Mon Sep 17 00:00:00 2001 From: Kirubakaran Date: Sat, 20 Sep 2025 22:03:00 +0530 Subject: [PATCH] fix: added on_message_handler in the main.py --- backend/main.py | 42 +++++++++++++++++++++++++++++++++++++++++- 1 file changed, 41 insertions(+), 1 deletion(-) diff --git a/backend/main.py b/backend/main.py index 1765553..1e3d7f7 100644 --- a/backend/main.py +++ b/backend/main.py @@ -66,7 +66,47 @@ mqtt_clients = {} last_message_timestamps = {} STATION_TIMEOUT_SECONDS = 10 - + +# --- MQTT Message Handling --- +def on_message_handler(station_id, topic, payload): + message_type = topic.split('/')[-1] + + if message_type in ['PERIODIC']: + last_message_timestamps[station_id] = time.time() + + # Decode the message payload based on its type + decoded_data = None + if message_type == 'PERIODIC': + decoded_data = decoder.decode_periodic(payload) + elif message_type == 'EVENTS': + decoded_data = decoder.decode_event(payload) + elif message_type == 'REQUEST': + decoded_data = decoder.decode_rpc_request(payload) + + if decoded_data: + try: + with app.app_context(): + log_entry = MqttLog( + station_id=station_id, + topic=topic, + topic_type=message_type, + payload=decoded_data + ) + db.session.add(log_entry) + db.session.commit() + except Exception as e: + print(f"Error writing to PostgreSQL: {e}") + + # Emit update to the main dashboard + socketio.emit('dashboard_update', { + 'stationId': station_id, + 'topic': topic, + 'data': decoded_data + }, room=station_id) + + # Emit update notification to the analytics page + if message_type in ['EVENTS', 'PERIODIC']: + socketio.emit('analytics_updated', room=station_id) # --- (WebSocket and API routes remain the same) ---