fix: added on_message_handler in the main.py
parent
eb88959660
commit
45d2bbdab8
|
|
@ -66,7 +66,47 @@ mqtt_clients = {}
|
|||
last_message_timestamps = {}
|
||||
STATION_TIMEOUT_SECONDS = 10
|
||||
|
||||
|
||||
|
||||
# --- MQTT Message Handling ---
|
||||
def on_message_handler(station_id, topic, payload):
|
||||
message_type = topic.split('/')[-1]
|
||||
|
||||
if message_type in ['PERIODIC']:
|
||||
last_message_timestamps[station_id] = time.time()
|
||||
|
||||
# Decode the message payload based on its type
|
||||
decoded_data = None
|
||||
if message_type == 'PERIODIC':
|
||||
decoded_data = decoder.decode_periodic(payload)
|
||||
elif message_type == 'EVENTS':
|
||||
decoded_data = decoder.decode_event(payload)
|
||||
elif message_type == 'REQUEST':
|
||||
decoded_data = decoder.decode_rpc_request(payload)
|
||||
|
||||
if decoded_data:
|
||||
try:
|
||||
with app.app_context():
|
||||
log_entry = MqttLog(
|
||||
station_id=station_id,
|
||||
topic=topic,
|
||||
topic_type=message_type,
|
||||
payload=decoded_data
|
||||
)
|
||||
db.session.add(log_entry)
|
||||
db.session.commit()
|
||||
except Exception as e:
|
||||
print(f"Error writing to PostgreSQL: {e}")
|
||||
|
||||
# Emit update to the main dashboard
|
||||
socketio.emit('dashboard_update', {
|
||||
'stationId': station_id,
|
||||
'topic': topic,
|
||||
'data': decoded_data
|
||||
}, room=station_id)
|
||||
|
||||
# Emit update notification to the analytics page
|
||||
if message_type in ['EVENTS', 'PERIODIC']:
|
||||
socketio.emit('analytics_updated', room=station_id)
|
||||
|
||||
|
||||
# --- (WebSocket and API routes remain the same) ---
|
||||
|
|
|
|||
Loading…
Reference in New Issue