import os import sys import threading import json import csv import io import time from datetime import datetime, timedelta from flask import Flask, jsonify, request, Response from flask_socketio import SocketIO, join_room from flask_cors import CORS from dotenv import load_dotenv from sqlalchemy import desc, func, case # Import your custom core modules and the new models from core.mqtt_client import MqttClient from core.protobuf_decoder import ProtobufDecoder from models import db, Station, User, MqttLog from flask_login import login_required, current_user, LoginManager from proto.vec_payload_chgSt_pb2 import ( rpcRequest, jobType_e ) # --- Load Environment Variables --- load_dotenv() # --- Pre-startup Check for Essential Configuration --- DATABASE_URL = os.getenv("DATABASE_URL") if not DATABASE_URL: print("FATAL ERROR: DATABASE_URL is not set in .env file.") sys.exit(1) # --- Application Setup --- app = Flask(__name__) # CORS(app) # CORS(app, resources={r"/api/*": {"origins": "http://127.0.0.1:5500"}}, supports_credentials=True) # CORS(app, resources={r"/api/*": {"origins": "http://127.0.0.1:5500"}}, supports_credentials=True, expose_headers='Content-Disposition') CORS(app, resources={r"/api/*": {"origins": ["http://172.20.10.4:5500","http://127.0.0.1:5500"]}}, supports_credentials=True, expose_headers='Content-Disposition') # CORS(app, resources={r"/api/*": {"origins": "http://localhost:5173"}}) , "http://127.0.0.1:5500" # This tells Flask: "For any route starting with /api/, allow requests # from the frontend running on http://localhost:5173". # ADD THESE LINES FOR FLASK-LOGIN login_manager = LoginManager() login_manager.init_app(app) app.config['SQLALCHEMY_DATABASE_URI'] = DATABASE_URL app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False app.config['SECRET_KEY'] = os.getenv("SECRET_KEY", "a_very_secret_key") db.init_app(app) socketio = SocketIO(app, cors_allowed_origins="*") # --- User Loader for Flask-Login --- @login_manager.user_loader def load_user(user_id): return User.query.get(int(user_id)) # --- Global instances --- decoder = ProtobufDecoder() mqtt_clients = {} last_message_timestamps = {} STATION_TIMEOUT_SECONDS = 10 # --- MQTT Message Handling --- def on_message_handler(station_id, topic, payload): message_type = topic.split('/')[-1] if message_type in ['PERIODIC']: last_message_timestamps[station_id] = time.time() # Decode the message payload based on its type decoded_data = None if message_type == 'PERIODIC': decoded_data = decoder.decode_periodic(payload) elif message_type == 'EVENTS': decoded_data = decoder.decode_event(payload) elif message_type == 'REQUEST': decoded_data = decoder.decode_rpc_request(payload) if decoded_data: try: with app.app_context(): log_entry = MqttLog( station_id=station_id, topic=topic, topic_type=message_type, payload=decoded_data ) db.session.add(log_entry) db.session.commit() except Exception as e: print(f"Error writing to PostgreSQL: {e}") # Emit update to the main dashboard socketio.emit('dashboard_update', { 'stationId': station_id, 'topic': topic, 'data': decoded_data }, room=station_id) if message_type == 'PERIODIC': # For periodic messages, only calculate and send the live status # This logic is from your /api/stations route last_msg_time = last_message_timestamps.get(station_id) is_online = last_msg_time is not None and (time.time() - last_msg_time) < STATION_TIMEOUT_SECONDS status_text = "Online" if is_online else "Offline" print(f"Sending live status update: {status_text}") socketio.emit('status_update', {'status': status_text}, room=station_id) # Emit update notification to the analytics page if message_type in ['EVENTS', 'REQUEST']: socketio.emit('analytics_updated', room=station_id) # --- (WebSocket and API routes remain the same) --- @socketio.on('connect') def handle_connect(): print('Client connected to WebSocket') # --- NEW: Function to handle joining a room and sending initial data --- @socketio.on('join_station_room') def handle_join_station_room(data): station_id = data['station_id'] join_room(station_id) print(f"Client joined room for station: {station_id}") try: # Find the most recent log entry for this station latest_log = MqttLog.query.filter_by( station_id=station_id, topic_type='PERIODIC' ).order_by(MqttLog.timestamp.desc()).first() if latest_log: # If we have a past log, send it immediately to the new client print(f"Sending initial state for {station_id} to new client.") socketio.emit('dashboard_update', { 'stationId': station_id, 'topic': latest_log.topic, 'data': latest_log.payload }, room=station_id) except Exception as e: print(f"Error querying or sending initial state for {station_id}: {e}") # --- API Routes --- @app.route('/api/login', methods=['POST']) def login(): """Handles user login.""" data = request.get_json() if not data or not data.get('username') or not data.get('password'): return jsonify({"message": "Username and password are required."}), 400 user = User.query.filter_by(username=data['username']).first() if user and user.check_password(data['password']): # In a real app, you would create a session token here (e.g., with Flask-Login) return jsonify({"message": "Login successful"}), 200 return jsonify({"message": "Invalid username or password"}), 401 # --- Admin-only: Add User --- @app.route('/api/users', methods=['POST']) # @login_required # Ensures the user is logged in def add_user(): # Check if the logged-in user is an admin # if not current_user.is_admin: # return jsonify({"message": "Admin access required."}), 403 # Forbidden data = request.get_json() username = data.get('username') password = data.get('password') is_admin = data.get('is_admin', False) if not username or not password: return jsonify({"message": "Username and password are required."}), 400 if User.query.filter_by(username=username).first(): return jsonify({"message": "Username already exists."}), 409 # Conflict new_user = User(username=username, is_admin=is_admin) new_user.set_password(password) db.session.add(new_user) db.session.commit() return jsonify({"message": "User added successfully."}), 201 # --- Admin-only: Add Station --- @app.route('/api/stations', methods=['POST']) # @login_required # Ensures the user is logged in def add_station(): # if not current_user.is_admin: # return jsonify({"message": "Admin access required."}), 403 data = request.get_json() # All fields are now expected from the frontend form required_fields = ['station_id', 'name', 'location', 'mqtt_broker', 'mqtt_port'] if not all(field in data for field in required_fields): return jsonify({"message": "Missing required station details."}), 400 if Station.query.filter_by(station_id=data['station_id']).first(): return jsonify({"message": "Station ID already exists."}), 409 new_station = Station( station_id=data['station_id'], product_id=data['product_id'], name=data['name'], location=data['location'], mqtt_broker=data['mqtt_broker'], mqtt_port=data['mqtt_port'], mqtt_user=data.get('mqtt_user'), mqtt_password=data.get('mqtt_password') ) db.session.add(new_station) db.session.commit() # Immediately start the MQTT client for the station just created. start_single_mqtt_client(new_station) return jsonify({"message": "Station added successfully."}), 201 # The new function with logging @app.route('/api/stations/', methods=['DELETE']) def remove_station(station_id): """ Removes a station from the database and stops its MQTT client. """ print(f"\n--- REMOVE REQUEST RECEIVED for station: {station_id} ---") # 1. Find the station in the database station = Station.query.filter_by(station_id=station_id).first_or_404() print(f"[LOG] Found station '{station.name}' in the database.") # 2. Stop the running MQTT client for this station client_to_stop = mqtt_clients.get(station_id) if client_to_stop: print(f"[LOG] Found active MQTT client. Attempting to stop it now...") client_to_stop.stop() mqtt_clients.pop(station_id, None) print(f"[LOG] Successfully stopped and removed client object for {station_id}.") else: print(f"[LOG] No active MQTT client was found for {station_id}. No action needed.") # 3. Delete the station from the database print(f"[LOG] Attempting to delete {station_id} from the database...") db.session.delete(station) db.session.commit() print(f"[LOG] Successfully deleted station from the database.") print(f"--- REMOVE REQUEST COMPLETED for station: {station_id} ---\n") return jsonify({"message": f"Station {station_id} removed successfully."}), 200 @app.route('/api/stations', methods=['GET']) def get_stations(): try: stations = Station.query.all() station_list = [] for s in stations: # --- NEW: More accurate heartbeat logic --- last_msg_time = last_message_timestamps.get(s.station_id) # A station is online only if we have received a message recently is_online = last_msg_time is not None and (time.time() - last_msg_time) < STATION_TIMEOUT_SECONDS station_list.append({ "id": s.station_id, "name": s.name, "location": s.location, "product_id": s.product_id, "status": "Online" if is_online else "Offline" }) return jsonify(station_list) except Exception as e: return jsonify({"error": f"Database query failed: {e}"}), 500 #--- Daily Stats Route --- @app.route('/api/stations/daily-stats', methods=['GET']) def get_all_station_stats(): """ Calculates the swap statistics for today for all stations. """ try: # --- CHANGE THESE TWO LINES --- today_start = datetime.combine(datetime.utcnow().date(), time.min) today_end = datetime.combine(datetime.utcnow().date(), time.max) # This is an efficient query that groups by station_id and counts events in one go stats = db.session.query( MqttLog.station_id, func.count(case((MqttLog.payload['eventType'] == 'EVENT_SWAP_START', 1))).label('total_starts'), func.count(case((MqttLog.payload['eventType'] == 'EVENT_SWAP_ENDED', 1))).label('completed'), func.count(case((MqttLog.payload['eventType'] == 'EVENT_SWAP_ABORTED', 1))).label('aborted') ).filter( MqttLog.topic_type == 'EVENTS', MqttLog.timestamp.between(today_start, today_end) ).group_by(MqttLog.station_id).all() # Convert the list of tuples into a dictionary for easy lookup stats_dict = { station_id: { "total_starts": total_starts, "completed": completed, "aborted": aborted } for station_id, total_starts, completed, aborted in stats } return jsonify(stats_dict) except Exception as e: print(f"Error fetching daily stats: {e}") return jsonify({"message": "Could not fetch daily station stats."}), 500 @app.route('/api/logs/recent/', methods=['GET']) def get_recent_logs(station_id): # Get parameters from the request, with defaults start_date_str = request.args.get('start_date', datetime.now().strftime('%Y-%m-%d')) end_date_str = request.args.get('end_date', datetime.now().strftime('%Y-%m-%d')) limit_count = request.args.get('count', 50, type=int) try: start_date = datetime.strptime(start_date_str, '%Y-%m-%d').date() end_date = datetime.strptime(end_date_str, '%Y-%m-%d').date() start_datetime = datetime.combine(start_date, datetime.min.time()) # <-- FIX end_datetime = datetime.combine(end_date, datetime.max.time()) except ValueError: return jsonify({"message": "Invalid date format."}), 400 try: # The query now uses all three filters logs = MqttLog.query.filter( MqttLog.station_id == station_id, MqttLog.topic_type.in_(['EVENTS', 'REQUEST']), MqttLog.timestamp.between(start_datetime, end_datetime) ).order_by(desc(MqttLog.timestamp)).limit(limit_count).all() logs.reverse() log_list = [{ "topic": log.topic, "payload": log.payload, "timestamp": log.timestamp.isoformat() } for log in logs] return jsonify(log_list) except Exception as e: return jsonify({"message": "Could not fetch recent logs."}), 500 # A helper dictionary to make abort reason labels more readable ABORT_REASON_MAP = { "ABORT_UNKNOWN": "Unknown", "ABORT_BAT_EXIT_TIMEOUT": "Battery Exit Timeout", "ABORT_BAT_ENTRY_TIMEOUT": "Battery Entry Timeout", "ABORT_DOOR_CLOSE_TIMEOUT": "Door Close Timeout", "ABORT_DOOR_OPEN_TIMEOUT": "Door Open Timeout", "ABORT_INVALID_PARAM": "Invalid Parameter", "ABORT_REMOTE_REQUESTED": "Remote Abort", "ABORT_INVALID_BATTERY": "Invalid Battery" } #--- Analytics Route --- # @app.route('/api/analytics', methods=['GET']) # def get_analytics_data(): # # 1. Get and validate request parameters (same as before) # station_id = request.args.get('station_id') # start_date_str = request.args.get('start_date') # end_date_str = request.args.get('end_date') # if not all([station_id, start_date_str, end_date_str]): # return jsonify({"message": "Missing required parameters."}), 400 # try: # start_date = datetime.strptime(start_date_str, '%Y-%m-%d').date() # end_date = datetime.strptime(end_date_str, '%Y-%m-%d').date() # start_datetime = datetime.combine(start_date, datetime.min.time()) # end_datetime = datetime.combine(end_date, datetime.max.time()) # except ValueError: # return jsonify({"message": "Invalid date format. Please use YYYY-MM-DD."}), 400 # # 2. Query for EVENT logs (for swap calculations) # try: # event_logs = MqttLog.query.filter( # MqttLog.station_id == station_id, # MqttLog.topic_type == 'EVENTS', # MqttLog.timestamp.between(start_datetime, end_datetime) # ).order_by(MqttLog.timestamp.asc()).all() # <-- ADD THIS SORTING # except Exception as e: # return jsonify({"message": f"Could not query event logs: {e}"}), 500 # # --- NEW: Query for PERIODIC logs (for uptime calculation) --- # try: # periodic_logs = MqttLog.query.filter( # MqttLog.station_id == station_id, # MqttLog.topic_type == 'PERIODIC', # MqttLog.timestamp.between(start_datetime, end_datetime) # ).order_by(MqttLog.timestamp.asc()).all() # except Exception as e: # return jsonify({"message": f"Could not query periodic logs: {e}"}), 500 # # --- 3. REVISED: Process logs to calculate KPIs and chart data --- # swap_starts = {} # Dictionary to store start times by sessionId # completed_swap_times = [] # total_swaps, completed_swaps, aborted_swaps = 0, 0, 0 # daily_completed, daily_aborted, hourly_swaps, abort_reason_counts = {}, {}, [0] * 24, {} # slot_utilization_counts = {i: 0 for i in range(1, 10)} # print("\n--- STARTING SWAP ANALYSIS ---") # Add this line # for log in event_logs: # event_type = log.payload.get('eventType') # session_id = log.payload.get('sessionId') # log_date = log.timestamp.date() # log_hour = log.timestamp.hour # if event_type == 'EVENT_SWAP_START': # total_swaps += 1 # hourly_swaps[log_hour] += 1 # if session_id: # swap_starts[session_id] = log.timestamp # Store start time # print(f"Found START for session '{session_id}' at {log.timestamp}") # Add this line # elif event_type == 'EVENT_SWAP_ENDED': # completed_swaps += 1 # daily_completed[log_date] = daily_completed.get(log_date, 0) + 1 # if session_id and session_id in swap_starts: # # Calculate duration if we have a matching start event # duration = (log.timestamp - swap_starts[session_id]).total_seconds() # completed_swap_times.append(duration) # print(f"Found MATCHING END for session '{session_id}'. Duration: {duration}s") # Add this line # del swap_starts[session_id] # Remove to prevent reuse # else: # print(f"Found END event but could not find matching START for session '{session_id}'") # Add this line # elif event_type == 'EVENT_SWAP_ABORTED': # aborted_swaps += 1 # daily_aborted[log_date] = daily_aborted.get(log_date, 0) + 1 # reason = log.payload.get('eventData', {}).get('swapAbortReason', 'ABORT_UNKNOWN') # abort_reason_counts[reason] = abort_reason_counts.get(reason, 0) + 1 # elif event_type == 'EVENT_BATTERY_EXIT': # slot_id = log.payload.get('eventData', {}).get('slotId') # if slot_id and slot_id in slot_utilization_counts: # slot_utilization_counts[slot_id] += 1 # print(f"--- ANALYSIS COMPLETE ---") # Add this line # print(f"Calculated Durations: {completed_swap_times}") # Add this line # # --- NEW: 4. Calculate Station Uptime --- # total_period_seconds = (end_datetime - start_datetime).total_seconds() # total_downtime_seconds = 0 # MAX_ONLINE_GAP_SECONDS = 30 # Assume offline if no message for over 30 seconds # if not periodic_logs: # total_downtime_seconds = total_period_seconds # else: # # Check gap from start time to first message # first_gap = (periodic_logs[0].timestamp - start_datetime).total_seconds() # if first_gap > MAX_ONLINE_GAP_SECONDS: # total_downtime_seconds += first_gap # # Check gaps between consecutive messages # for i in range(1, len(periodic_logs)): # gap = (periodic_logs[i].timestamp - periodic_logs[i-1].timestamp).total_seconds() # if gap > MAX_ONLINE_GAP_SECONDS: # total_downtime_seconds += gap # # Check gap from last message to end time # last_gap = (end_datetime - periodic_logs[-1].timestamp).total_seconds() # if last_gap > MAX_ONLINE_GAP_SECONDS: # total_downtime_seconds += last_gap # station_uptime = 100 * (1 - (total_downtime_seconds / total_period_seconds)) # station_uptime = max(0, min(100, station_uptime)) # Ensure value is between 0 and 100 # # 5. Prepare final data structures (KPI section is now updated) # avg_swap_time_seconds = sum(completed_swap_times) / len(completed_swap_times) if completed_swap_times else 0 # # avg_swap_time_seconds = sum(completed_swap_times) / len(completed_swap_times) if completed_swap_times else None # kpi_data = { # "total_swaps": total_swaps, "completed_swaps": completed_swaps, # "aborted_swaps": aborted_swaps, "avg_swap_time_seconds": avg_swap_time_seconds, # "station_uptime": round(station_uptime, 2) # Add uptime to the KPI object # } # # (The rest of the chart data preparation is unchanged) # date_labels, completed_data, aborted_data = [], [], [] # current_date = start_date # while current_date <= end_date: # date_labels.append(current_date.strftime('%b %d')) # completed_data.append(daily_completed.get(current_date, 0)) # aborted_data.append(daily_aborted.get(current_date, 0)) # current_date += timedelta(days=1) # swap_activity_data = {"labels": date_labels, "completed_data": completed_data, "aborted_data": aborted_data} # hourly_distribution_data = {"labels": [f"{h % 12 if h % 12 != 0 else 12} {'AM' if h < 12 else 'PM'}" for h in range(24)], "swap_data": hourly_swaps} # abort_reasons_data = {"labels": [ABORT_REASON_MAP.get(r, r) for r in abort_reason_counts.keys()], "reason_data": list(abort_reason_counts.values())} # slot_utilization_data = {"counts": [slot_utilization_counts[i] for i in range(1, 10)]} # Return counts as a simple list [_ , _, ...] # # 6. Combine all data and return # return jsonify({ # "kpis": kpi_data, # "swap_activity": swap_activity_data, # "hourly_distribution": hourly_distribution_data, # "abort_reasons": abort_reasons_data, # "slot_utilization": slot_utilization_data # <-- ADD THIS NEW KEY # }) @app.route('/api/analytics', methods=['GET']) def get_analytics_data(): # 1. Get and validate request parameters station_id = request.args.get('station_id') start_date_str = request.args.get('start_date') end_date_str = request.args.get('end_date') if not all([station_id, start_date_str, end_date_str]): return jsonify({"message": "Missing required parameters."}), 400 try: start_date = datetime.strptime(start_date_str, '%Y-%m-%d').date() end_date = datetime.strptime(end_date_str, '%Y-%m-%d').date() start_datetime = datetime.combine(start_date, datetime.min.time()) end_datetime = datetime.combine(end_date, datetime.max.time()) except ValueError: return jsonify({"message": "Invalid date format. Please use YYYY-MM-DD."}), 400 # 2. Query for ALL relevant logs (EVENTS and REQUESTS) in one go try: logs = MqttLog.query.filter( MqttLog.station_id == station_id, MqttLog.topic_type.in_(['EVENTS', 'REQUEST']), MqttLog.timestamp.between(start_datetime, end_datetime) ).order_by(MqttLog.timestamp.asc()).all() periodic_logs = MqttLog.query.filter( MqttLog.station_id == station_id, MqttLog.topic_type == 'PERIODIC', MqttLog.timestamp.between(start_datetime, end_datetime) ).order_by(MqttLog.timestamp.asc()).all() except Exception as e: return jsonify({"message": f"Could not query logs: {e}"}), 500 # 3. Initialize data structures for processing swap_starts_map = {} completed_swap_times = [] total_initiations = 0 total_starts = 0 completed_swaps = 0 aborted_swaps = 0 daily_completed, daily_aborted, hourly_swaps, abort_reason_counts = {}, {}, [0] * 24, {} slot_utilization_counts = {i: 0 for i in range(1, 10)} # 4. Process the logs to calculate all KPIs and chart data for log in logs: event_type = log.payload.get('eventType') job_type = log.payload.get('jobType') session_id = log.payload.get('sessionId') log_date = log.timestamp.date() log_hour = log.timestamp.hour if job_type == 'JOBTYPE_SWAP_AUTH_SUCCESS': total_initiations += 1 elif event_type == 'EVENT_SWAP_START': total_starts += 1 hourly_swaps[log_hour] += 1 if session_id: swap_starts_map[session_id] = log.timestamp elif event_type == 'EVENT_BATTERY_EXIT': completed_swaps += 1 daily_completed[log_date] = daily_completed.get(log_date, 0) + 1 if session_id and session_id in swap_starts_map: duration = (log.timestamp - swap_starts_map[session_id]).total_seconds() completed_swap_times.append(duration) del swap_starts_map[session_id] elif event_type == 'EVENT_SWAP_ABORTED': aborted_swaps += 1 daily_aborted[log_date] = daily_aborted.get(log_date, 0) + 1 reason = log.payload.get('eventData', {}).get('swapAbortReason', 'ABORT_UNKNOWN') abort_reason_counts[reason] = abort_reason_counts.get(reason, 0) + 1 elif event_type == 'EVENT_SLOT_LOCK_DISENEGAGED': slot_id = log.payload.get('eventData', {}).get('slotId') if slot_id and slot_id in slot_utilization_counts: slot_utilization_counts[slot_id] += 1 # --- NEW: 4. Calculate Station Uptime --- total_period_seconds = (end_datetime - start_datetime).total_seconds() total_downtime_seconds = 0 MAX_ONLINE_GAP_SECONDS = 30 # Assume offline if no message for over 30 seconds if not periodic_logs: total_downtime_seconds = total_period_seconds else: # Check gap from start time to first message first_gap = (periodic_logs[0].timestamp - start_datetime).total_seconds() if first_gap > MAX_ONLINE_GAP_SECONDS: total_downtime_seconds += first_gap # Check gaps between consecutive messages for i in range(1, len(periodic_logs)): gap = (periodic_logs[i].timestamp - periodic_logs[i-1].timestamp).total_seconds() if gap > MAX_ONLINE_GAP_SECONDS: total_downtime_seconds += gap # Check gap from last message to end time last_gap = (end_datetime - periodic_logs[-1].timestamp).total_seconds() if last_gap > MAX_ONLINE_GAP_SECONDS: total_downtime_seconds += last_gap station_uptime = 100 * (1 - (total_downtime_seconds / total_period_seconds)) station_uptime = max(0, min(100, station_uptime)) # Ensure value is between 0 and 100 # 6. Prepare final data structures avg_swap_time_seconds = sum(completed_swap_times) / len(completed_swap_times) if completed_swap_times else None kpi_data = { "total_swaps_initiated": total_initiations, "total_swaps_started": total_starts, "completed_swaps": completed_swaps, "aborted_swaps": aborted_swaps, "avg_swap_time_seconds": avg_swap_time_seconds, "station_uptime": round(station_uptime, 2) } date_labels, completed_data, aborted_data = [], [], [] current_date = start_date while current_date <= end_date: date_labels.append(current_date.strftime('%b %d')) completed_data.append(daily_completed.get(current_date, 0)) aborted_data.append(daily_aborted.get(current_date, 0)) current_date += timedelta(days=1) swap_activity_data = {"labels": date_labels, "completed_data": completed_data, "aborted_data": aborted_data} hourly_distribution_data = {"labels": [f"{h % 12 if h % 12 != 0 else 12} {'AM' if h < 12 else 'PM'}" for h in range(24)], "swap_data": hourly_swaps} abort_reasons_data = {"labels": [ABORT_REASON_MAP.get(r, r) for r in abort_reason_counts.keys()], "reason_data": list(abort_reason_counts.values())} slot_utilization_data = {"counts": [slot_utilization_counts[i] for i in range(1, 10)]} # Return counts as a simple list [_ , _, ...] # 7. Combine all data and return return jsonify({ "kpis": kpi_data, "swap_activity": swap_activity_data, "hourly_distribution": hourly_distribution_data, "abort_reasons": abort_reasons_data, "slot_utilization": slot_utilization_data }) # --- CSV Export route (UPDATED) --- def _format_periodic_row(payload, num_slots=9): """ Flattens a periodic payload dictionary into a single list for a CSV row. """ row = [ datetime.fromtimestamp(payload.get("ts")).strftime("%Y-%m-%d %H:%M:%S"), payload.get("deviceId", ""), payload.get("stationDiagnosticCode", "") ] slots_data = payload.get("slotLevelPayload", []) slot_map = {s.get('slotId', i+1): s for i, s in enumerate(slots_data)} slot_fields_keys = [ "batteryIdentification", "batteryPresent", "chargerPresent", "doorStatus", "doorLockStatus", "voltage", "current", "soc", "batteryMaxTemp", "slotTemperature", "batteryFaultCode", "chargerFaultCode", "batteryMode", "chargerMode" ] for i in range(1, num_slots + 1): slot = slot_map.get(i) if slot: # Convert boolean values to readable text # door_status_text = "OPEN" if slot.get("doorStatus", 0) == 1 else "CLOSED" # door_lock_status_text = "UNLOCKED" if slot.get("doorLockStatus", 0) == 1 else "LOCKED" # battery_present_text = "YES" if slot.get("batteryPresent", 0) == 1 else "NO" # charger_present_text = "YES" if slot.get("chargerPresent", 0) == 1 else "NO" row.extend([ slot.get('batteryIdentification', ''), slot.get("batteryPresent", 0), slot.get("chargerPresent", 0), slot.get("doorStatus", 0), slot.get("doorLockStatus", 0), slot.get('voltage', 0) / 1000.0, slot.get('current', 0) / 1000.0, slot.get('soc', 0), slot.get('batteryMaxTemp', 0) / 10.0, slot.get('slotTemperature', 0) / 10.0, slot.get('batteryFaultCode', 0), slot.get('chargerFaultCode', 0), slot.get('batteryMode', 0), slot.get('chargerMode', 0) ]) else: row.extend([''] * len(slot_fields_keys)) row.append('') # Placeholder for RawHexPayload return row @app.route('/api/logs/export', methods=['GET']) def export_logs(): station_id = request.args.get('station_id') start_datetime_str = request.args.get('start_datetime') end_datetime_str = request.args.get('end_datetime') log_type = request.args.get('log_type', 'PERIODIC') if not all([station_id, start_datetime_str, end_datetime_str]): return jsonify({"error": "Missing required parameters"}), 400 try: start_datetime = datetime.strptime(start_datetime_str, '%Y-%m-%dT%H:%M') end_datetime = datetime.strptime(end_datetime_str, '%Y-%m-%dT%H:%M') except ValueError: return jsonify({"error": "Invalid datetime format"}), 400 # --- FIX 1: Correctly query for Events & RPC --- if log_type == 'EVENT': # If frontend asks for EVENT, search for both EVENTS and REQUEST in the DB query = MqttLog.query.filter( MqttLog.station_id == station_id, MqttLog.timestamp.between(start_datetime, end_datetime), MqttLog.topic_type.in_(['EVENTS', 'REQUEST']) ) else: # Otherwise, query for PERIODIC query = MqttLog.query.filter( MqttLog.station_id == station_id, MqttLog.timestamp.between(start_datetime, end_datetime), MqttLog.topic_type == log_type ) logs = query.order_by(MqttLog.timestamp.asc()).all() if not logs: return jsonify({"message": "No logs found for the selected criteria."}), 404 output = io.StringIO() writer = csv.writer(output) # --- FIX 2: Create a cleaner filename --- station = Station.query.filter_by(station_id=station_id).first() station_name = station.name.replace(' ', '_') if station else station_id date_str = start_datetime.strftime('%Y-%m-%d') filename = f"{station_name}_{log_type}_{date_str}.csv" if log_type == 'PERIODIC': base_header = ["Timestamp", "DeviceID", "StationDiagnosticCode"] slot_fields = [ "BatteryID", "BatteryPresent", "ChargerPresent", "DoorStatus", "DoorLockStatus", "Voltage_V", "Current_A", "SOC_Percent", "BatteryTemp_C", "SlotTemp_C", "BatteryFaultCode", "ChargerFaultCode", "BatteryMode", "ChargerMode" ] slot_header = [f"Slot{i}_{field}" for i in range(1, 10) for field in slot_fields] header = base_header + slot_header + ["RawHexPayload"] writer.writerow(header) for log in logs: writer.writerow(_format_periodic_row(log.payload)) else: # For EVENTS_RPC header = ["Timestamp", "Topic", "Payload_JSON"] writer.writerow(header) for log in logs: writer.writerow([log.timestamp, log.topic, json.dumps(log.payload)]) output.seek(0) return Response( output, mimetype="text/csv", headers={"Content-Disposition": f"attachment;filename={filename}"} ) @socketio.on('rpc_request') def handle_rpc_request(payload): """ Receives a command from the web dashboard, creates a Protobuf RPC request, and publishes it to the station via MQTT. """ station_id = payload.get('station_id') command = payload.get('command') data = payload.get('data') # This will be the slot_id or swap_pairs array print(f"Received RPC request for station {station_id}: {command} with data {data}") # Find the correct MQTT client for this station mqtt_client = mqtt_clients.get(station_id) if not mqtt_client or not mqtt_client.is_connected: print(f"Cannot send RPC for {station_id}: MQTT client not connected.") return # Or emit an error back to the user # --- Create the Protobuf message based on the command --- # This is where the logic from your snippet is implemented. request_payload = rpcRequest( ts=int(time.time()), jobId=f"job_{int(time.time())}" ) # Determine the jobType and set data based on the command string if command == 'OPEN': request_payload.jobType = jobType_e.JOBTYPE_GATE_OPEN_CLOSE request_payload.slotInfo.slotId = data request_payload.slotInfo.state = 1 elif command == 'CHG_ON': # Replace this with the correct name from your .proto file request_payload.jobType = jobType_e.JOBTYPE_CHARGER_ENABLE_DISABLE request_payload.slotInfo.slotId = data request_payload.slotInfo.state = 1 # State 1 for ON elif command == 'CHG_OFF': # Replace this with the correct name from your .proto file request_payload.jobType = jobType_e.JOBTYPE_CHARGER_ENABLE_DISABLE request_payload.slotInfo.slotId = data request_payload.slotInfo.state = 0 # State 0 for OFF elif command == 'START_SWAP': # --- THIS IS THE CORRECTED LINE --- request_payload.jobType = jobType_e.JOBTYPE_SWAP_START if data and isinstance(data, list): # Your logic for adding the swap pairs to the payload for pair in data: swap_info = request_payload.swapInfo.add() swap_info.fromSlot = pair[0] swap_info.toSlot = pair[1] # --- NEW: Added handlers for Abort and Reset --- elif command == 'ABORT_SWAP': request_payload.jobType = jobType_e.JOBTYPE_TRANSACTION_ABORT elif command == 'STATION_RESET': request_payload.jobType = jobType_e.JOBTYPE_REBOOT elif command == 'LANGUAGE_UPDATE': request_payload.jobType = jobType_e.JOBTYPE_LANGUAGE_UPDATE # Logic to map language string to enum would go here else: print(f"Unknown command: {command}") return # --- Serialize and Publish the Message --- serialized_payload = request_payload.SerializeToString() # Construct the MQTT topic # NOTE: You may need to fetch client_id and version from your database topic = f"VEC/batterySmartStation/v100/{station_id}/RPC/REQUEST" print(f"Publishing to {topic}") mqtt_client.client.publish(topic, serialized_payload) # ADD THIS NEW FUNCTION def start_single_mqtt_client(station): """ Creates and starts a new MQTT client thread for a SINGLE station. This is our new reusable function. """ if station.station_id in mqtt_clients and mqtt_clients[station.station_id].is_connected: print(f"MQTT client for {station.station_id} is already running.") return print(f"Creating and starting MQTT client for station: {station.name} ({station.station_id})") client = MqttClient( broker=station.mqtt_broker, port=station.mqtt_port, user=station.mqtt_user, password=station.mqtt_password, station_id=station.station_id, on_message_callback=on_message_handler ) client.start() # The start method should handle threading mqtt_clients[station.station_id] = client # --- Main Application Logic --- # def start_mqtt_clients(): # """ # Initializes and starts an MQTT client for each station found in the database, # using the specific MQTT credentials stored for each station. # """ # try: # with app.app_context(): # stations = Station.query.all() # except Exception as e: # print(f"CRITICAL: Could not query stations from the database in MQTT thread: {e}") # return # for station in stations: # if station.station_id not in mqtt_clients: # print(f"Creating and starting MQTT client for station: {station.name} ({station.station_id})") # client = MqttClient( # broker=station.mqtt_broker, # port=station.mqtt_port, # user=station.mqtt_user, # password=station.mqtt_password, # station_id=station.station_id, # on_message_callback=on_message_handler # ) # client.start() # mqtt_clients[station.station_id] = client def start_mqtt_clients(): """ Initializes and starts an MQTT client for each station found in the database by calling our new reusable function. """ try: with app.app_context(): stations = Station.query.all() print(f"Found {len(stations)} existing stations to monitor.") except Exception as e: print(f"CRITICAL: Could not query stations from the database: {e}") return for station in stations: start_single_mqtt_client(station) if __name__ == '__main__': try: with app.app_context(): db.create_all() # Add a default user if none exist if not User.query.first(): print("No users found. Creating a default admin user.") default_user = User(username='admin') default_user.set_password('password') db.session.add(default_user) db.session.commit() # Add a default station if none exist if not Station.query.first(): print("No stations found. Adding a default station.") default_station = Station( station_id="V16000862287077265957", product_id="VEC_PROD_001", name="Test Station 1", mqtt_broker="mqtt.vecmocon.com", mqtt_port=1883, mqtt_user="your_username", mqtt_password="your_password" ) db.session.add(default_station) db.session.commit() except Exception as e: print(f"FATAL ERROR: Could not connect to PostgreSQL: {e}") sys.exit(1) mqtt_thread = threading.Thread(target=start_mqtt_clients, daemon=True) mqtt_thread.start() print(f"Starting Flask-SocketIO server on http://172.20.10.4:5000") socketio.run(app, host='172.20.10.4', port=5000)