# import os # import sys # import threading # import json # import csv # import io # from datetime import datetime # from flask import Flask, jsonify, request, Response # from flask_socketio import SocketIO # from dotenv import load_dotenv # # Import your custom core modules and the new models # from core.mqtt_client import MqttClient # from core.protobuf_decoder import ProtobufDecoder # from models import db, Station, User, MqttLog # # --- Load Environment Variables --- # load_dotenv() # # --- Pre-startup Check for Essential Configuration --- # DATABASE_URL = os.getenv("DATABASE_URL") # if not DATABASE_URL: # print("FATAL ERROR: DATABASE_URL is not set in .env file.") # sys.exit(1) # # --- Application Setup --- # app = Flask(__name__) # app.config['SQLALCHEMY_DATABASE_URI'] = DATABASE_URL # app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False # app.config['SECRET_KEY'] = os.getenv("SECRET_KEY", "a_very_secret_key") # db.init_app(app) # socketio = SocketIO(app, cors_allowed_origins="*") # # --- Global instances --- # decoder = ProtobufDecoder() # mqtt_clients = {} # # --- MQTT Message Handling --- # def on_message_handler(station_id, topic, payload): # """ # Handles incoming MQTT messages, decodes them, writes to PostgreSQL, # and emits to WebSockets. # """ # print(f"Main handler received message for station {station_id} on topic {topic}") # decoded_data = None # message_type = topic.split('/')[-1] # if message_type == 'PERIODIC': # decoded_data = decoder.decode_periodic(payload) # elif message_type == 'EVENTS': # decoded_data = decoder.decode_event(payload) # elif message_type == 'REQUEST': # decoded_data = decoder.decode_rpc_request(payload) # if decoded_data: # # 1. Write the data to PostgreSQL for historical storage # try: # with app.app_context(): # log_entry = MqttLog( # station_id=station_id, # topic=topic, # payload=decoded_data # ) # db.session.add(log_entry) # db.session.commit() # print(f"Successfully wrote data for {station_id} to PostgreSQL.") # except Exception as e: # print(f"Error writing to PostgreSQL: {e}") # # 2. Emit the data to the frontend for real-time view # socketio.emit('dashboard_update', { # 'stationId': station_id, # 'topic': topic, # 'data': decoded_data # }, room=station_id) # # --- (WebSocket and API routes remain the same) --- # @socketio.on('connect') # def handle_connect(): # print('Client connected to WebSocket') # @socketio.on('disconnect') # def handle_disconnect(): # print('Client disconnected') # @socketio.on('join_station_room') # def handle_join_station_room(data): # station_id = data.get('station_id') # if station_id: # from flask import request # socketio.join_room(station_id, request.sid) # @socketio.on('leave_station_room') # def handle_leave_station_room(data): # station_id = data.get('station_id') # if station_id: # from flask import request # socketio.leave_room(station_id, request.sid) # @app.route('/api/stations', methods=['GET']) # def get_stations(): # try: # stations = Station.query.all() # return jsonify([{"id": s.station_id, "name": s.name} for s in stations]) # except Exception as e: # return jsonify({"error": f"Database query failed: {e}"}), 500 # # --- (CSV Export route remains the same) --- # @app.route('/api/logs/export', methods=['GET']) # def export_logs(): # # ... (existing implementation) # pass # # --- Main Application Logic (UPDATED) --- # def start_mqtt_clients(): # """ # Initializes and starts an MQTT client for each station found in the database, # using the specific MQTT credentials stored for each station. # """ # try: # with app.app_context(): # # Get the full station objects, not just the IDs # stations = Station.query.all() # except Exception as e: # print(f"CRITICAL: Could not query stations from the database in MQTT thread: {e}") # return # for station in stations: # if station.station_id not in mqtt_clients: # print(f"Creating and starting MQTT client for station: {station.name} ({station.station_id})") # # Use the specific details from each station object in the database # client = MqttClient( # broker=station.mqtt_broker, # port=station.mqtt_port, # user=station.mqtt_user, # password=station.mqtt_password, # station_id=station.station_id, # on_message_callback=on_message_handler # ) # client.start() # mqtt_clients[station.station_id] = client # if __name__ == '__main__': # try: # with app.app_context(): # db.create_all() # if not Station.query.first(): # print("No stations found. Adding a default station with default MQTT config.") # # Add a default station with MQTT details for first-time setup # default_station = Station( # station_id="V16000868210069259709", # name="Test Station 2", # mqtt_broker="mqtt-dev.upgrid.in", # mqtt_port=1883, # mqtt_user="guest", # mqtt_password="password" # ) # db.session.add(default_station) # db.session.commit() # except Exception as e: # print(f"FATAL ERROR: Could not connect to PostgreSQL: {e}") # sys.exit(1) # mqtt_thread = threading.Thread(target=start_mqtt_clients, daemon=True) # mqtt_thread.start() # print(f"Starting Flask-SocketIO server on http://localhost:5000") # socketio.run(app, host='0.0.0.0', port=5000) import os import sys import threading import json import csv import io from datetime import datetime from flask import Flask, jsonify, request, Response from flask_socketio import SocketIO from dotenv import load_dotenv # Import your custom core modules and the new models from core.mqtt_client import MqttClient from core.protobuf_decoder import ProtobufDecoder from models import db, Station, User, MqttLog # --- Load Environment Variables --- load_dotenv() # --- Pre-startup Check for Essential Configuration --- DATABASE_URL = os.getenv("DATABASE_URL") if not DATABASE_URL: print("FATAL ERROR: DATABASE_URL is not set in .env file.") sys.exit(1) # --- Application Setup --- app = Flask(__name__) app.config['SQLALCHEMY_DATABASE_URI'] = DATABASE_URL app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False app.config['SECRET_KEY'] = os.getenv("SECRET_KEY", "a_very_secret_key") db.init_app(app) socketio = SocketIO(app, cors_allowed_origins="*") # --- Global instances --- decoder = ProtobufDecoder() mqtt_clients = {} # --- MQTT Message Handling (UPDATED) --- def on_message_handler(station_id, topic, payload): """ Handles incoming MQTT messages, decodes them, writes to PostgreSQL, and emits to WebSockets. """ print(f"Main handler received message for station {station_id} on topic {topic}") decoded_data = None message_type = topic.split('/')[-1] if message_type == 'PERIODIC': decoded_data = decoder.decode_periodic(payload) elif message_type == 'EVENTS': decoded_data = decoder.decode_event(payload) elif message_type == 'REQUEST': decoded_data = decoder.decode_rpc_request(payload) if decoded_data: # 1. Write the data to PostgreSQL for historical storage try: with app.app_context(): log_entry = MqttLog( station_id=station_id, topic=topic, topic_type=message_type, # <-- Save the new topic_type payload=decoded_data ) db.session.add(log_entry) db.session.commit() print(f"Successfully wrote data for {station_id} to PostgreSQL.") except Exception as e: print(f"Error writing to PostgreSQL: {e}") # 2. Emit the data to the frontend for real-time view socketio.emit('dashboard_update', { 'stationId': station_id, 'topic': topic, 'data': decoded_data }, room=station_id) # --- (WebSocket and API routes remain the same) --- @socketio.on('connect') def handle_connect(): print('Client connected to WebSocket') # ... (other socketio handlers) @app.route('/api/stations', methods=['GET']) def get_stations(): try: stations = Station.query.all() return jsonify([{"id": s.station_id, "name": s.name} for s in stations]) except Exception as e: return jsonify({"error": f"Database query failed: {e}"}), 500 # --- CSV Export route (UPDATED) --- def _format_periodic_row(payload, num_slots=9): """ Flattens a periodic payload dictionary into a single list for a CSV row. """ row = [ datetime.fromtimestamp(payload.get("ts")).strftime("%Y-%m-%d %H:%M:%S"), payload.get("deviceId", ""), payload.get("stationDiagnosticCode", "") ] slots_data = payload.get("slotLevelPayload", []) slot_map = {s.get('slotId', i+1): s for i, s in enumerate(slots_data)} slot_fields_keys = [ "batteryIdentification", "batteryPresent", "chargerPresent", "doorStatus", "doorLockStatus", "voltage", "current", "soc", "batteryMaxTemp", "slotTemperature", "batteryFaultCode", "chargerFaultCode", "batteryMode", "chargerMode" ] for i in range(1, num_slots + 1): slot = slot_map.get(i) if slot: row.extend([ slot.get('batteryIdentification', ''), slot.get("batteryPresent", 0), slot.get("chargerPresent", 0), slot.get("doorStatus", 0), slot.get("doorLockStatus", 0), slot.get('voltage', 0) / 1000.0, slot.get('current', 0) / 1000.0, slot.get('soc', 0), slot.get('batteryMaxTemp', 0) / 10.0, slot.get('slotTemperature', 0) / 10.0, slot.get('batteryFaultCode', 0), slot.get('chargerFaultCode', 0), slot.get('batteryMode', 0), slot.get('chargerMode', 0) ]) else: row.extend([''] * len(slot_fields_keys)) row.append('') # Placeholder for RawHexPayload return row @app.route('/api/logs/export', methods=['GET']) def export_logs(): station_id = request.args.get('station_id') start_date_str = request.args.get('start_date') end_date_str = request.args.get('end_date') log_type = request.args.get('log_type', 'PERIODIC') if not all([station_id, start_date_str, end_date_str]): return jsonify({"error": "Missing required parameters: station_id, start_date, end_date"}), 400 try: start_date = datetime.strptime(start_date_str, '%Y-%m-%d') end_date = datetime.strptime(end_date_str, '%Y-%m-%d').replace(hour=23, minute=59, second=59) except ValueError: return jsonify({"error": "Invalid date format. Use YYYY-MM-DD."}), 400 # UPDATED QUERY: Filter by the new 'topic_type' column for better performance query = MqttLog.query.filter( MqttLog.station_id == station_id, MqttLog.timestamp.between(start_date, end_date), MqttLog.topic_type == log_type ) logs = query.order_by(MqttLog.timestamp.asc()).all() output = io.StringIO() writer = csv.writer(output) if log_type == 'PERIODIC': base_header = ["Timestamp", "DeviceID", "StationDiagnosticCode"] slot_fields = [ "BatteryID", "BatteryPresent", "ChargerPresent", "DoorStatus", "DoorLockStatus", "Voltage_V", "Current_A", "SOC_Percent", "BatteryTemp_C", "SlotTemp_C", "BatteryFaultCode", "ChargerFaultCode", "BatteryMode", "ChargerMode" ] slot_header = [f"Slot{i}_{field}" for i in range(1, 10) for field in slot_fields] header = base_header + slot_header + ["RawHexPayload"] writer.writerow(header) for log in logs: writer.writerow(_format_periodic_row(log.payload)) else: # For EVENTS_RPC header = ["Timestamp", "Topic", "Payload_JSON"] writer.writerow(header) for log in logs: writer.writerow([log.timestamp, log.topic, json.dumps(log.payload)]) output.seek(0) return Response( output, mimetype="text/csv", headers={"Content-Disposition": f"attachment;filename=logs_{station_id}_{log_type}_{start_date_str}_to_{end_date_str}.csv"} ) # --- Main Application Logic --- def start_mqtt_clients(): """ Initializes and starts an MQTT client for each station found in the database, using the specific MQTT credentials stored for each station. """ try: with app.app_context(): stations = Station.query.all() except Exception as e: print(f"CRITICAL: Could not query stations from the database in MQTT thread: {e}") return for station in stations: if station.station_id not in mqtt_clients: print(f"Creating and starting MQTT client for station: {station.name} ({station.station_id})") client = MqttClient( broker=station.mqtt_broker, port=station.mqtt_port, user=station.mqtt_user, password=station.mqtt_password, station_id=station.station_id, on_message_callback=on_message_handler ) client.start() mqtt_clients[station.station_id] = client if __name__ == '__main__': try: with app.app_context(): db.create_all() if not Station.query.first(): print("No stations found. Adding a default station with default MQTT config.") default_station = Station( station_id="V16000868210069259709", name="Test Station 2", mqtt_broker="mqtt-dev.upgrid.in", mqtt_port=1883, mqtt_user="guest", mqtt_password="password" ) db.session.add(default_station) db.session.commit() except Exception as e: print(f"FATAL ERROR: Could not connect to PostgreSQL: {e}") sys.exit(1) mqtt_thread = threading.Thread(target=start_mqtt_clients, daemon=True) mqtt_thread.start() print(f"Starting Flask-SocketIO server on http://localhost:5000") socketio.run(app, host='0.0.0.0', port=5000)