import os import sys import threading import json import csv import io import time from datetime import datetime from flask import Flask, jsonify, request, Response from flask_socketio import SocketIO, join_room from flask_cors import CORS from dotenv import load_dotenv from sqlalchemy import desc # Import your custom core modules and the new models from core.mqtt_client import MqttClient from core.protobuf_decoder import ProtobufDecoder from models import db, Station, User, MqttLog from flask_login import login_required, current_user, LoginManager from proto.vec_payload_chgSt_pb2 import ( rpcRequest, jobType_e ) # --- Load Environment Variables --- load_dotenv() # --- Pre-startup Check for Essential Configuration --- DATABASE_URL = os.getenv("DATABASE_URL") if not DATABASE_URL: print("FATAL ERROR: DATABASE_URL is not set in .env file.") sys.exit(1) # --- Application Setup --- app = Flask(__name__) # CORS(app) # CORS(app, resources={r"/api/*": {"origins": "http://127.0.0.1:5500"}}, supports_credentials=True) # CORS(app, resources={r"/api/*": {"origins": "http://127.0.0.1:5500"}}, supports_credentials=True, expose_headers='Content-Disposition') CORS(app, resources={r"/api/*": {"origins": ["http://192.168.1.12:5500","http://127.0.0.1:5500"]}}, supports_credentials=True, expose_headers='Content-Disposition') # CORS(app, resources={r"/api/*": {"origins": "http://localhost:5173"}}) , "http://127.0.0.1:5500" # This tells Flask: "For any route starting with /api/, allow requests # from the frontend running on http://localhost:5173". # ADD THESE LINES FOR FLASK-LOGIN login_manager = LoginManager() login_manager.init_app(app) app.config['SQLALCHEMY_DATABASE_URI'] = DATABASE_URL app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False app.config['SECRET_KEY'] = os.getenv("SECRET_KEY", "a_very_secret_key") db.init_app(app) socketio = SocketIO(app, cors_allowed_origins="*") # --- User Loader for Flask-Login --- @login_manager.user_loader def load_user(user_id): return User.query.get(int(user_id)) # --- Global instances --- decoder = ProtobufDecoder() mqtt_clients = {} last_message_timestamps = {} STATION_TIMEOUT_SECONDS = 10 # --- MQTT Message Handling --- def on_message_handler(station_id, topic, payload): message_type = topic.split('/')[-1] if message_type in ['PERIODIC']: last_message_timestamps[station_id] = time.time() print(f"Main handler received message for station {station_id} on topic {topic}") decoded_data = None message_type = topic.split('/')[-1] if message_type == 'PERIODIC': decoded_data = decoder.decode_periodic(payload) elif message_type == 'EVENTS': decoded_data = decoder.decode_event(payload) elif message_type == 'REQUEST': decoded_data = decoder.decode_rpc_request(payload) if decoded_data: # print("DECODED DATA TO BE SENT:", decoded_data) try: with app.app_context(): log_entry = MqttLog( station_id=station_id, topic=topic, topic_type=message_type, payload=decoded_data ) db.session.add(log_entry) db.session.commit() print(f"Successfully wrote data for {station_id} to PostgreSQL.") except Exception as e: print(f"Error writing to PostgreSQL: {e}") socketio.emit('dashboard_update', { 'stationId': station_id, 'topic': topic, 'data': decoded_data }, room=station_id) # --- (WebSocket and API routes remain the same) --- @socketio.on('connect') def handle_connect(): print('Client connected to WebSocket') # --- NEW: Function to handle joining a room and sending initial data --- @socketio.on('join_station_room') def handle_join_station_room(data): station_id = data['station_id'] join_room(station_id) print(f"Client joined room for station: {station_id}") try: # Find the most recent log entry for this station latest_log = MqttLog.query.filter_by( station_id=station_id, topic_type='PERIODIC' ).order_by(MqttLog.timestamp.desc()).first() if latest_log: # If we have a past log, send it immediately to the new client print(f"Sending initial state for {station_id} to new client.") socketio.emit('dashboard_update', { 'stationId': station_id, 'topic': latest_log.topic, 'data': latest_log.payload }, room=station_id) except Exception as e: print(f"Error querying or sending initial state for {station_id}: {e}") # --- API Routes --- @app.route('/api/login', methods=['POST']) def login(): """Handles user login.""" data = request.get_json() if not data or not data.get('username') or not data.get('password'): return jsonify({"message": "Username and password are required."}), 400 user = User.query.filter_by(username=data['username']).first() if user and user.check_password(data['password']): # In a real app, you would create a session token here (e.g., with Flask-Login) return jsonify({"message": "Login successful"}), 200 return jsonify({"message": "Invalid username or password"}), 401 # --- Admin-only: Add User --- @app.route('/api/users', methods=['POST']) # @login_required # Ensures the user is logged in def add_user(): # Check if the logged-in user is an admin # if not current_user.is_admin: # return jsonify({"message": "Admin access required."}), 403 # Forbidden data = request.get_json() username = data.get('username') password = data.get('password') is_admin = data.get('is_admin', False) if not username or not password: return jsonify({"message": "Username and password are required."}), 400 if User.query.filter_by(username=username).first(): return jsonify({"message": "Username already exists."}), 409 # Conflict new_user = User(username=username, is_admin=is_admin) new_user.set_password(password) db.session.add(new_user) db.session.commit() return jsonify({"message": "User added successfully."}), 201 # --- Admin-only: Add Station --- @app.route('/api/stations', methods=['POST']) # @login_required # Ensures the user is logged in def add_station(): # if not current_user.is_admin: # return jsonify({"message": "Admin access required."}), 403 data = request.get_json() # All fields are now expected from the frontend form required_fields = ['station_id', 'name', 'location', 'mqtt_broker', 'mqtt_port'] if not all(field in data for field in required_fields): return jsonify({"message": "Missing required station details."}), 400 if Station.query.filter_by(station_id=data['station_id']).first(): return jsonify({"message": "Station ID already exists."}), 409 new_station = Station( station_id=data['station_id'], product_id=data['product_id'], name=data['name'], location=data['location'], mqtt_broker=data['mqtt_broker'], mqtt_port=data['mqtt_port'], mqtt_user=data.get('mqtt_user'), mqtt_password=data.get('mqtt_password') ) db.session.add(new_station) db.session.commit() # Immediately start the MQTT client for the station just created. start_single_mqtt_client(new_station) return jsonify({"message": "Station added successfully."}), 201 # The new function with logging @app.route('/api/stations/', methods=['DELETE']) def remove_station(station_id): """ Removes a station from the database and stops its MQTT client. """ print(f"\n--- REMOVE REQUEST RECEIVED for station: {station_id} ---") # 1. Find the station in the database station = Station.query.filter_by(station_id=station_id).first_or_404() print(f"[LOG] Found station '{station.name}' in the database.") # 2. Stop the running MQTT client for this station client_to_stop = mqtt_clients.get(station_id) if client_to_stop: print(f"[LOG] Found active MQTT client. Attempting to stop it now...") client_to_stop.stop() mqtt_clients.pop(station_id, None) print(f"[LOG] Successfully stopped and removed client object for {station_id}.") else: print(f"[LOG] No active MQTT client was found for {station_id}. No action needed.") # 3. Delete the station from the database print(f"[LOG] Attempting to delete {station_id} from the database...") db.session.delete(station) db.session.commit() print(f"[LOG] Successfully deleted station from the database.") print(f"--- REMOVE REQUEST COMPLETED for station: {station_id} ---\n") return jsonify({"message": f"Station {station_id} removed successfully."}), 200 @app.route('/api/stations', methods=['GET']) def get_stations(): try: stations = Station.query.all() station_list = [] for s in stations: # --- NEW: More accurate heartbeat logic --- last_msg_time = last_message_timestamps.get(s.station_id) # A station is online only if we have received a message recently is_online = last_msg_time is not None and (time.time() - last_msg_time) < STATION_TIMEOUT_SECONDS station_list.append({ "id": s.station_id, "name": s.name, "location": s.location, "product_id": s.product_id, "status": "Online" if is_online else "Offline" }) return jsonify(station_list) except Exception as e: return jsonify({"error": f"Database query failed: {e}"}), 500 @app.route('/api/logs/recent/', methods=['GET']) def get_recent_logs(station_id): """ Fetches the 50 most recent logs for a given station from the database. """ try: # Query the MqttLog table, filter by station_id, order by timestamp descending, and take the first 50 logs = MqttLog.query.filter_by(station_id=station_id).order_by(desc(MqttLog.timestamp)).limit(50).all() # We reverse the list so the oldest are first, for correct display order logs.reverse() log_list = [{ "topic": log.topic, "payload": log.payload, "timestamp": log.timestamp.isoformat() } for log in logs] return jsonify(log_list) except Exception as e: print(f"Error fetching recent logs: {e}") return jsonify({"message": "Could not fetch recent logs."}), 500 # --- CSV Export route (UPDATED) --- def _format_periodic_row(payload, num_slots=9): """ Flattens a periodic payload dictionary into a single list for a CSV row. """ row = [ datetime.fromtimestamp(payload.get("ts")).strftime("%Y-%m-%d %H:%M:%S"), payload.get("deviceId", ""), payload.get("stationDiagnosticCode", "") ] slots_data = payload.get("slotLevelPayload", []) slot_map = {s.get('slotId', i+1): s for i, s in enumerate(slots_data)} slot_fields_keys = [ "batteryIdentification", "batteryPresent", "chargerPresent", "doorStatus", "doorLockStatus", "voltage", "current", "soc", "batteryMaxTemp", "slotTemperature", "batteryFaultCode", "chargerFaultCode", "batteryMode", "chargerMode" ] for i in range(1, num_slots + 1): slot = slot_map.get(i) if slot: row.extend([ slot.get('batteryIdentification', ''), slot.get("batteryPresent", 0), slot.get("chargerPresent", 0), slot.get("doorStatus", 0), slot.get("doorLockStatus", 0), slot.get('voltage', 0) / 1000.0, slot.get('current', 0) / 1000.0, slot.get('soc', 0), slot.get('batteryMaxTemp', 0) / 10.0, slot.get('slotTemperature', 0) / 10.0, slot.get('batteryFaultCode', 0), slot.get('chargerFaultCode', 0), slot.get('batteryMode', 0), slot.get('chargerMode', 0) ]) else: row.extend([''] * len(slot_fields_keys)) row.append('') # Placeholder for RawHexPayload return row @app.route('/api/logs/export', methods=['GET']) def export_logs(): station_id = request.args.get('station_id') start_datetime_str = request.args.get('start_datetime') end_datetime_str = request.args.get('end_datetime') log_type = request.args.get('log_type', 'PERIODIC') if not all([station_id, start_datetime_str, end_datetime_str]): return jsonify({"error": "Missing required parameters"}), 400 try: start_datetime = datetime.strptime(start_datetime_str, '%Y-%m-%dT%H:%M') end_datetime = datetime.strptime(end_datetime_str, '%Y-%m-%dT%H:%M') except ValueError: return jsonify({"error": "Invalid datetime format"}), 400 # --- FIX 1: Correctly query for Events & RPC --- if log_type == 'EVENT': # If frontend asks for EVENT, search for both EVENTS and REQUEST in the DB query = MqttLog.query.filter( MqttLog.station_id == station_id, MqttLog.timestamp.between(start_datetime, end_datetime), MqttLog.topic_type.in_(['EVENTS', 'REQUEST']) ) else: # Otherwise, query for PERIODIC query = MqttLog.query.filter( MqttLog.station_id == station_id, MqttLog.timestamp.between(start_datetime, end_datetime), MqttLog.topic_type == log_type ) logs = query.order_by(MqttLog.timestamp.asc()).all() if not logs: return jsonify({"message": "No logs found for the selected criteria."}), 404 output = io.StringIO() writer = csv.writer(output) # --- FIX 2: Create a cleaner filename --- station = Station.query.filter_by(station_id=station_id).first() station_name = station.name.replace(' ', '_') if station else station_id date_str = start_datetime.strftime('%Y-%m-%d') filename = f"{station_name}_{log_type}_{date_str}.csv" if log_type == 'PERIODIC': base_header = ["Timestamp", "DeviceID", "StationDiagnosticCode"] slot_fields = [ "BatteryID", "BatteryPresent", "ChargerPresent", "DoorStatus", "DoorLockStatus", "Voltage_V", "Current_A", "SOC_Percent", "BatteryTemp_C", "SlotTemp_C", "BatteryFaultCode", "ChargerFaultCode", "BatteryMode", "ChargerMode" ] slot_header = [f"Slot{i}_{field}" for i in range(1, 10) for field in slot_fields] header = base_header + slot_header + ["RawHexPayload"] writer.writerow(header) for log in logs: writer.writerow(_format_periodic_row(log.payload)) else: # For EVENTS_RPC header = ["Timestamp", "Topic", "Payload_JSON"] writer.writerow(header) for log in logs: writer.writerow([log.timestamp, log.topic, json.dumps(log.payload)]) output.seek(0) return Response( output, mimetype="text/csv", headers={"Content-Disposition": f"attachment;filename={filename}"} ) @socketio.on('rpc_request') def handle_rpc_request(payload): """ Receives a command from the web dashboard, creates a Protobuf RPC request, and publishes it to the station via MQTT. """ station_id = payload.get('station_id') command = payload.get('command') data = payload.get('data') # This will be the slot_id or swap_pairs array print(f"Received RPC request for station {station_id}: {command} with data {data}") # Find the correct MQTT client for this station mqtt_client = mqtt_clients.get(station_id) if not mqtt_client or not mqtt_client.is_connected: print(f"Cannot send RPC for {station_id}: MQTT client not connected.") return # Or emit an error back to the user # --- Create the Protobuf message based on the command --- # This is where the logic from your snippet is implemented. request_payload = rpcRequest( ts=int(time.time()), jobId=f"job_{int(time.time())}" ) # Determine the jobType and set data based on the command string if command == 'OPEN': request_payload.jobType = jobType_e.JOBTYPE_GATE_OPEN_CLOSE request_payload.slotInfo.slotId = data request_payload.slotInfo.state = 1 elif command == 'CHG_ON': # Replace this with the correct name from your .proto file request_payload.jobType = jobType_e.JOBTYPE_CHARGER_ENABLE_DISABLE request_payload.slotInfo.slotId = data request_payload.slotInfo.state = 1 # State 1 for ON elif command == 'CHG_OFF': # Replace this with the correct name from your .proto file request_payload.jobType = jobType_e.JOBTYPE_CHARGER_ENABLE_DISABLE request_payload.slotInfo.slotId = data request_payload.slotInfo.state = 0 # State 0 for OFF elif command == 'START_SWAP': # --- THIS IS THE CORRECTED LINE --- request_payload.jobType = jobType_e.JOBTYPE_SWAP_START if data and isinstance(data, list): # Your logic for adding the swap pairs to the payload for pair in data: swap_info = request_payload.swapInfo.add() swap_info.fromSlot = pair[0] swap_info.toSlot = pair[1] # --- NEW: Added handlers for Abort and Reset --- elif command == 'ABORT_SWAP': request_payload.jobType = jobType_e.JOBTYPE_TRANSACTION_ABORT elif command == 'STATION_RESET': request_payload.jobType = jobType_e.JOBTYPE_REBOOT elif command == 'LANGUAGE_UPDATE': request_payload.jobType = jobType_e.JOBTYPE_LANGUAGE_UPDATE # Logic to map language string to enum would go here else: print(f"Unknown command: {command}") return # --- Serialize and Publish the Message --- serialized_payload = request_payload.SerializeToString() # Construct the MQTT topic # NOTE: You may need to fetch client_id and version from your database topic = f"VEC/batterySmartStation/v100/{station_id}/RPC/REQUEST" print(f"Publishing to {topic}") mqtt_client.client.publish(topic, serialized_payload) # ADD THIS NEW FUNCTION def start_single_mqtt_client(station): """ Creates and starts a new MQTT client thread for a SINGLE station. This is our new reusable function. """ if station.station_id in mqtt_clients and mqtt_clients[station.station_id].is_connected: print(f"MQTT client for {station.station_id} is already running.") return print(f"Creating and starting MQTT client for station: {station.name} ({station.station_id})") client = MqttClient( broker=station.mqtt_broker, port=station.mqtt_port, user=station.mqtt_user, password=station.mqtt_password, station_id=station.station_id, on_message_callback=on_message_handler ) client.start() # The start method should handle threading mqtt_clients[station.station_id] = client # --- Main Application Logic --- # def start_mqtt_clients(): # """ # Initializes and starts an MQTT client for each station found in the database, # using the specific MQTT credentials stored for each station. # """ # try: # with app.app_context(): # stations = Station.query.all() # except Exception as e: # print(f"CRITICAL: Could not query stations from the database in MQTT thread: {e}") # return # for station in stations: # if station.station_id not in mqtt_clients: # print(f"Creating and starting MQTT client for station: {station.name} ({station.station_id})") # client = MqttClient( # broker=station.mqtt_broker, # port=station.mqtt_port, # user=station.mqtt_user, # password=station.mqtt_password, # station_id=station.station_id, # on_message_callback=on_message_handler # ) # client.start() # mqtt_clients[station.station_id] = client def start_mqtt_clients(): """ Initializes and starts an MQTT client for each station found in the database by calling our new reusable function. """ try: with app.app_context(): stations = Station.query.all() print(f"Found {len(stations)} existing stations to monitor.") except Exception as e: print(f"CRITICAL: Could not query stations from the database: {e}") return for station in stations: start_single_mqtt_client(station) if __name__ == '__main__': try: with app.app_context(): db.create_all() # Add a default user if none exist if not User.query.first(): print("No users found. Creating a default admin user.") default_user = User(username='admin') default_user.set_password('password') db.session.add(default_user) db.session.commit() # Add a default station if none exist if not Station.query.first(): print("No stations found. Adding a default station.") default_station = Station( station_id="V16000862287077265957", product_id="VEC_PROD_001", name="Test Station 1", mqtt_broker="mqtt.vecmocon.com", mqtt_port=1883, mqtt_user="your_username", mqtt_password="your_password" ) db.session.add(default_station) db.session.commit() except Exception as e: print(f"FATAL ERROR: Could not connect to PostgreSQL: {e}") sys.exit(1) mqtt_thread = threading.Thread(target=start_mqtt_clients, daemon=True) mqtt_thread.start() print(f"Starting Flask-SocketIO server on http://192.168.1.12:5000") socketio.run(app, host='192.168.1.12', port=5000)