SwapStation_WebApp/backend/main.py

958 lines
39 KiB
Python

import os
import sys
import threading
import json
import csv
import io
import time
from datetime import datetime, timedelta
from flask import Flask, jsonify, request, Response
from flask_socketio import SocketIO, join_room
from flask_cors import CORS
from dotenv import load_dotenv
from sqlalchemy import desc, func, case
# Import your custom core modules and the new models
from core.mqtt_client import MqttClient
from core.protobuf_decoder import ProtobufDecoder
from models import db, Station, User, MqttLog
from flask_login import login_required, current_user, LoginManager
from proto.vec_payload_chgSt_pb2 import (
rpcRequest,
jobType_e
)
# --- Load Environment Variables ---
load_dotenv()
# --- Pre-startup Check for Essential Configuration ---
DATABASE_URL = os.getenv("DATABASE_URL")
if not DATABASE_URL:
print("FATAL ERROR: DATABASE_URL is not set in .env file.")
sys.exit(1)
# --- Application Setup ---
app = Flask(__name__)
# CORS(app)
# CORS(app, resources={r"/api/*": {"origins": "http://127.0.0.1:5500"}}, supports_credentials=True)
# CORS(app, resources={r"/api/*": {"origins": "http://127.0.0.1:5500"}}, supports_credentials=True, expose_headers='Content-Disposition')
CORS(app, resources={r"/api/*": {"origins": ["http://192.168.1.10:5500","http://127.0.0.1:5500"]}}, supports_credentials=True, expose_headers='Content-Disposition')
# CORS(app, resources={r"/api/*": {"origins": "http://localhost:5173"}}) , "http://127.0.0.1:5500"
# This tells Flask: "For any route starting with /api/, allow requests
# from the frontend running on http://localhost:5173".
# ADD THESE LINES FOR FLASK-LOGIN
login_manager = LoginManager()
login_manager.init_app(app)
app.config['SQLALCHEMY_DATABASE_URI'] = DATABASE_URL
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False
app.config['SECRET_KEY'] = os.getenv("SECRET_KEY", "a_very_secret_key")
db.init_app(app)
socketio = SocketIO(app, cors_allowed_origins="*")
# --- User Loader for Flask-Login ---
@login_manager.user_loader
def load_user(user_id):
return User.query.get(int(user_id))
# --- Global instances ---
decoder = ProtobufDecoder()
mqtt_clients = {}
last_message_timestamps = {}
STATION_TIMEOUT_SECONDS = 10
# --- MQTT Message Handling ---
def on_message_handler(station_id, topic, payload):
message_type = topic.split('/')[-1]
if message_type in ['PERIODIC']:
last_message_timestamps[station_id] = time.time()
# Decode the message payload based on its type
decoded_data = None
if message_type == 'PERIODIC':
decoded_data = decoder.decode_periodic(payload)
elif message_type == 'EVENTS':
decoded_data = decoder.decode_event(payload)
elif message_type == 'REQUEST':
decoded_data = decoder.decode_rpc_request(payload)
if decoded_data:
try:
with app.app_context():
log_entry = MqttLog(
station_id=station_id,
topic=topic,
topic_type=message_type,
payload=decoded_data
)
db.session.add(log_entry)
db.session.commit()
except Exception as e:
print(f"Error writing to PostgreSQL: {e}")
# Emit update to the main dashboard
socketio.emit('dashboard_update', {
'stationId': station_id,
'topic': topic,
'data': decoded_data
}, room=station_id)
if message_type == 'PERIODIC':
# For periodic messages, only calculate and send the live status
# This logic is from your /api/stations route
last_msg_time = last_message_timestamps.get(station_id)
is_online = last_msg_time is not None and (time.time() - last_msg_time) < STATION_TIMEOUT_SECONDS
status_text = "Online" if is_online else "Offline"
print(f"Sending live status update: {status_text}")
socketio.emit('status_update', {'status': status_text}, room=station_id)
# Emit update notification to the analytics page
if message_type in ['EVENTS', 'REQUEST']:
socketio.emit('analytics_updated', room=station_id)
# --- (WebSocket and API routes remain the same) ---
@socketio.on('connect')
def handle_connect():
print('Client connected to WebSocket')
# --- NEW: Function to handle joining a room and sending initial data ---
@socketio.on('join_station_room')
def handle_join_station_room(data):
station_id = data['station_id']
join_room(station_id)
print(f"Client joined room for station: {station_id}")
try:
# Find the most recent log entry for this station
latest_log = MqttLog.query.filter_by(
station_id=station_id,
topic_type='PERIODIC'
).order_by(MqttLog.timestamp.desc()).first()
if latest_log:
# If we have a past log, send it immediately to the new client
print(f"Sending initial state for {station_id} to new client.")
socketio.emit('dashboard_update', {
'stationId': station_id,
'topic': latest_log.topic,
'data': latest_log.payload
}, room=station_id)
except Exception as e:
print(f"Error querying or sending initial state for {station_id}: {e}")
# --- API Routes ---
@app.route('/api/login', methods=['POST'])
def login():
"""Handles user login."""
data = request.get_json()
if not data or not data.get('username') or not data.get('password'):
return jsonify({"message": "Username and password are required."}), 400
user = User.query.filter_by(username=data['username']).first()
if user and user.check_password(data['password']):
# In a real app, you would create a session token here (e.g., with Flask-Login)
return jsonify({"message": "Login successful"}), 200
return jsonify({"message": "Invalid username or password"}), 401
# --- Admin-only: Add User ---
@app.route('/api/users', methods=['POST'])
# @login_required # Ensures the user is logged in
def add_user():
# Check if the logged-in user is an admin
# if not current_user.is_admin:
# return jsonify({"message": "Admin access required."}), 403 # Forbidden
data = request.get_json()
username = data.get('username')
password = data.get('password')
is_admin = data.get('is_admin', False)
if not username or not password:
return jsonify({"message": "Username and password are required."}), 400
if User.query.filter_by(username=username).first():
return jsonify({"message": "Username already exists."}), 409 # Conflict
new_user = User(username=username, is_admin=is_admin)
new_user.set_password(password)
db.session.add(new_user)
db.session.commit()
return jsonify({"message": "User added successfully."}), 201
# --- Admin-only: Add Station ---
@app.route('/api/stations', methods=['POST'])
# @login_required # Ensures the user is logged in
def add_station():
# if not current_user.is_admin:
# return jsonify({"message": "Admin access required."}), 403
data = request.get_json()
# All fields are now expected from the frontend form
required_fields = ['station_id', 'name', 'location', 'mqtt_broker', 'mqtt_port']
if not all(field in data for field in required_fields):
return jsonify({"message": "Missing required station details."}), 400
if Station.query.filter_by(station_id=data['station_id']).first():
return jsonify({"message": "Station ID already exists."}), 409
new_station = Station(
station_id=data['station_id'],
product_id=data['product_id'],
name=data['name'],
location=data['location'],
mqtt_broker=data['mqtt_broker'],
mqtt_port=data['mqtt_port'],
mqtt_user=data.get('mqtt_user'),
mqtt_password=data.get('mqtt_password')
)
db.session.add(new_station)
db.session.commit()
# Immediately start the MQTT client for the station just created.
start_single_mqtt_client(new_station)
return jsonify({"message": "Station added successfully."}), 201
# The new function with logging
@app.route('/api/stations/<string:station_id>', methods=['DELETE'])
def remove_station(station_id):
"""
Removes a station from the database and stops its MQTT client.
"""
print(f"\n--- REMOVE REQUEST RECEIVED for station: {station_id} ---")
# 1. Find the station in the database
station = Station.query.filter_by(station_id=station_id).first_or_404()
print(f"[LOG] Found station '{station.name}' in the database.")
# 2. Stop the running MQTT client for this station
client_to_stop = mqtt_clients.get(station_id)
if client_to_stop:
print(f"[LOG] Found active MQTT client. Attempting to stop it now...")
client_to_stop.stop()
mqtt_clients.pop(station_id, None)
print(f"[LOG] Successfully stopped and removed client object for {station_id}.")
else:
print(f"[LOG] No active MQTT client was found for {station_id}. No action needed.")
# 3. Delete the station from the database
print(f"[LOG] Attempting to delete {station_id} from the database...")
db.session.delete(station)
db.session.commit()
print(f"[LOG] Successfully deleted station from the database.")
print(f"--- REMOVE REQUEST COMPLETED for station: {station_id} ---\n")
return jsonify({"message": f"Station {station_id} removed successfully."}), 200
@app.route('/api/stations', methods=['GET'])
def get_stations():
try:
stations = Station.query.all()
station_list = []
for s in stations:
# --- NEW: More accurate heartbeat logic ---
last_msg_time = last_message_timestamps.get(s.station_id)
# A station is online only if we have received a message recently
is_online = last_msg_time is not None and (time.time() - last_msg_time) < STATION_TIMEOUT_SECONDS
station_list.append({
"id": s.station_id,
"name": s.name,
"location": s.location,
"product_id": s.product_id,
"status": "Online" if is_online else "Offline"
})
return jsonify(station_list)
except Exception as e:
return jsonify({"error": f"Database query failed: {e}"}), 500
#--- Daily Stats Route ---
@app.route('/api/stations/daily-stats', methods=['GET'])
def get_all_station_stats():
"""
Calculates the swap statistics for today for all stations.
"""
try:
# --- CHANGE THESE TWO LINES ---
today_start = datetime.combine(datetime.utcnow().date(), time.min)
today_end = datetime.combine(datetime.utcnow().date(), time.max)
# This is an efficient query that groups by station_id and counts events in one go
stats = db.session.query(
MqttLog.station_id,
func.count(case((MqttLog.payload['eventType'] == 'EVENT_SWAP_START', 1))).label('total_starts'),
func.count(case((MqttLog.payload['eventType'] == 'EVENT_SWAP_ENDED', 1))).label('completed'),
func.count(case((MqttLog.payload['eventType'] == 'EVENT_SWAP_ABORTED', 1))).label('aborted')
).filter(
MqttLog.topic_type == 'EVENTS',
MqttLog.timestamp.between(today_start, today_end)
).group_by(MqttLog.station_id).all()
# Convert the list of tuples into a dictionary for easy lookup
stats_dict = {
station_id: {
"total_starts": total_starts,
"completed": completed,
"aborted": aborted
} for station_id, total_starts, completed, aborted in stats
}
return jsonify(stats_dict)
except Exception as e:
print(f"Error fetching daily stats: {e}")
return jsonify({"message": "Could not fetch daily station stats."}), 500
@app.route('/api/logs/recent/<string:station_id>', methods=['GET'])
def get_recent_logs(station_id):
# Get parameters from the request, with defaults
start_date_str = request.args.get('start_date', datetime.now().strftime('%Y-%m-%d'))
end_date_str = request.args.get('end_date', datetime.now().strftime('%Y-%m-%d'))
limit_count = request.args.get('count', 50, type=int)
try:
start_date = datetime.strptime(start_date_str, '%Y-%m-%d').date()
end_date = datetime.strptime(end_date_str, '%Y-%m-%d').date()
start_datetime = datetime.combine(start_date, datetime.min.time()) # <-- FIX
end_datetime = datetime.combine(end_date, datetime.max.time())
except ValueError:
return jsonify({"message": "Invalid date format."}), 400
try:
# The query now uses all three filters
logs = MqttLog.query.filter(
MqttLog.station_id == station_id,
MqttLog.topic_type.in_(['EVENTS', 'REQUEST']),
MqttLog.timestamp.between(start_datetime, end_datetime)
).order_by(desc(MqttLog.timestamp)).limit(limit_count).all()
logs.reverse()
log_list = [{
"topic": log.topic, "payload": log.payload, "timestamp": log.timestamp.isoformat()
} for log in logs]
return jsonify(log_list)
except Exception as e:
return jsonify({"message": "Could not fetch recent logs."}), 500
# A helper dictionary to make abort reason labels more readable
ABORT_REASON_MAP = {
"ABORT_UNKNOWN": "Unknown",
"ABORT_BAT_EXIT_TIMEOUT": "Battery Exit Timeout",
"ABORT_BAT_ENTRY_TIMEOUT": "Battery Entry Timeout",
"ABORT_DOOR_CLOSE_TIMEOUT": "Door Close Timeout",
"ABORT_DOOR_OPEN_TIMEOUT": "Door Open Timeout",
"ABORT_INVALID_PARAM": "Invalid Parameter",
"ABORT_REMOTE_REQUESTED": "Remote Abort",
"ABORT_INVALID_BATTERY": "Invalid Battery"
}
#--- Analytics Route ---
# @app.route('/api/analytics', methods=['GET'])
# def get_analytics_data():
# # 1. Get and validate request parameters (same as before)
# station_id = request.args.get('station_id')
# start_date_str = request.args.get('start_date')
# end_date_str = request.args.get('end_date')
# if not all([station_id, start_date_str, end_date_str]):
# return jsonify({"message": "Missing required parameters."}), 400
# try:
# start_date = datetime.strptime(start_date_str, '%Y-%m-%d').date()
# end_date = datetime.strptime(end_date_str, '%Y-%m-%d').date()
# start_datetime = datetime.combine(start_date, datetime.min.time())
# end_datetime = datetime.combine(end_date, datetime.max.time())
# except ValueError:
# return jsonify({"message": "Invalid date format. Please use YYYY-MM-DD."}), 400
# # 2. Query for EVENT logs (for swap calculations)
# try:
# event_logs = MqttLog.query.filter(
# MqttLog.station_id == station_id,
# MqttLog.topic_type == 'EVENTS',
# MqttLog.timestamp.between(start_datetime, end_datetime)
# ).order_by(MqttLog.timestamp.asc()).all() # <-- ADD THIS SORTING
# except Exception as e:
# return jsonify({"message": f"Could not query event logs: {e}"}), 500
# # --- NEW: Query for PERIODIC logs (for uptime calculation) ---
# try:
# periodic_logs = MqttLog.query.filter(
# MqttLog.station_id == station_id,
# MqttLog.topic_type == 'PERIODIC',
# MqttLog.timestamp.between(start_datetime, end_datetime)
# ).order_by(MqttLog.timestamp.asc()).all()
# except Exception as e:
# return jsonify({"message": f"Could not query periodic logs: {e}"}), 500
# # --- 3. REVISED: Process logs to calculate KPIs and chart data ---
# swap_starts = {} # Dictionary to store start times by sessionId
# completed_swap_times = []
# total_swaps, completed_swaps, aborted_swaps = 0, 0, 0
# daily_completed, daily_aborted, hourly_swaps, abort_reason_counts = {}, {}, [0] * 24, {}
# slot_utilization_counts = {i: 0 for i in range(1, 10)}
# print("\n--- STARTING SWAP ANALYSIS ---") # Add this line
# for log in event_logs:
# event_type = log.payload.get('eventType')
# session_id = log.payload.get('sessionId')
# log_date = log.timestamp.date()
# log_hour = log.timestamp.hour
# if event_type == 'EVENT_SWAP_START':
# total_swaps += 1
# hourly_swaps[log_hour] += 1
# if session_id:
# swap_starts[session_id] = log.timestamp # Store start time
# print(f"Found START for session '{session_id}' at {log.timestamp}") # Add this line
# elif event_type == 'EVENT_SWAP_ENDED':
# completed_swaps += 1
# daily_completed[log_date] = daily_completed.get(log_date, 0) + 1
# if session_id and session_id in swap_starts:
# # Calculate duration if we have a matching start event
# duration = (log.timestamp - swap_starts[session_id]).total_seconds()
# completed_swap_times.append(duration)
# print(f"Found MATCHING END for session '{session_id}'. Duration: {duration}s") # Add this line
# del swap_starts[session_id] # Remove to prevent reuse
# else:
# print(f"Found END event but could not find matching START for session '{session_id}'") # Add this line
# elif event_type == 'EVENT_SWAP_ABORTED':
# aborted_swaps += 1
# daily_aborted[log_date] = daily_aborted.get(log_date, 0) + 1
# reason = log.payload.get('eventData', {}).get('swapAbortReason', 'ABORT_UNKNOWN')
# abort_reason_counts[reason] = abort_reason_counts.get(reason, 0) + 1
# elif event_type == 'EVENT_BATTERY_EXIT':
# slot_id = log.payload.get('eventData', {}).get('slotId')
# if slot_id and slot_id in slot_utilization_counts:
# slot_utilization_counts[slot_id] += 1
# print(f"--- ANALYSIS COMPLETE ---") # Add this line
# print(f"Calculated Durations: {completed_swap_times}") # Add this line
# # --- NEW: 4. Calculate Station Uptime ---
# total_period_seconds = (end_datetime - start_datetime).total_seconds()
# total_downtime_seconds = 0
# MAX_ONLINE_GAP_SECONDS = 30 # Assume offline if no message for over 30 seconds
# if not periodic_logs:
# total_downtime_seconds = total_period_seconds
# else:
# # Check gap from start time to first message
# first_gap = (periodic_logs[0].timestamp - start_datetime).total_seconds()
# if first_gap > MAX_ONLINE_GAP_SECONDS:
# total_downtime_seconds += first_gap
# # Check gaps between consecutive messages
# for i in range(1, len(periodic_logs)):
# gap = (periodic_logs[i].timestamp - periodic_logs[i-1].timestamp).total_seconds()
# if gap > MAX_ONLINE_GAP_SECONDS:
# total_downtime_seconds += gap
# # Check gap from last message to end time
# last_gap = (end_datetime - periodic_logs[-1].timestamp).total_seconds()
# if last_gap > MAX_ONLINE_GAP_SECONDS:
# total_downtime_seconds += last_gap
# station_uptime = 100 * (1 - (total_downtime_seconds / total_period_seconds))
# station_uptime = max(0, min(100, station_uptime)) # Ensure value is between 0 and 100
# # 5. Prepare final data structures (KPI section is now updated)
# avg_swap_time_seconds = sum(completed_swap_times) / len(completed_swap_times) if completed_swap_times else 0
# # avg_swap_time_seconds = sum(completed_swap_times) / len(completed_swap_times) if completed_swap_times else None
# kpi_data = {
# "total_swaps": total_swaps, "completed_swaps": completed_swaps,
# "aborted_swaps": aborted_swaps, "avg_swap_time_seconds": avg_swap_time_seconds,
# "station_uptime": round(station_uptime, 2) # Add uptime to the KPI object
# }
# # (The rest of the chart data preparation is unchanged)
# date_labels, completed_data, aborted_data = [], [], []
# current_date = start_date
# while current_date <= end_date:
# date_labels.append(current_date.strftime('%b %d'))
# completed_data.append(daily_completed.get(current_date, 0))
# aborted_data.append(daily_aborted.get(current_date, 0))
# current_date += timedelta(days=1)
# swap_activity_data = {"labels": date_labels, "completed_data": completed_data, "aborted_data": aborted_data}
# hourly_distribution_data = {"labels": [f"{h % 12 if h % 12 != 0 else 12} {'AM' if h < 12 else 'PM'}" for h in range(24)], "swap_data": hourly_swaps}
# abort_reasons_data = {"labels": [ABORT_REASON_MAP.get(r, r) for r in abort_reason_counts.keys()], "reason_data": list(abort_reason_counts.values())}
# slot_utilization_data = {"counts": [slot_utilization_counts[i] for i in range(1, 10)]} # Return counts as a simple list [_ , _, ...]
# # 6. Combine all data and return
# return jsonify({
# "kpis": kpi_data,
# "swap_activity": swap_activity_data,
# "hourly_distribution": hourly_distribution_data,
# "abort_reasons": abort_reasons_data,
# "slot_utilization": slot_utilization_data # <-- ADD THIS NEW KEY
# })
@app.route('/api/analytics', methods=['GET'])
def get_analytics_data():
# 1. Get and validate request parameters
station_id = request.args.get('station_id')
start_date_str = request.args.get('start_date')
end_date_str = request.args.get('end_date')
if not all([station_id, start_date_str, end_date_str]):
return jsonify({"message": "Missing required parameters."}), 400
try:
start_date = datetime.strptime(start_date_str, '%Y-%m-%d').date()
end_date = datetime.strptime(end_date_str, '%Y-%m-%d').date()
start_datetime = datetime.combine(start_date, datetime.min.time())
end_datetime = datetime.combine(end_date, datetime.max.time())
except ValueError:
return jsonify({"message": "Invalid date format. Please use YYYY-MM-DD."}), 400
# 2. Query for ALL relevant logs (EVENTS and REQUESTS) in one go
try:
logs = MqttLog.query.filter(
MqttLog.station_id == station_id,
MqttLog.topic_type.in_(['EVENTS', 'REQUEST']),
MqttLog.timestamp.between(start_datetime, end_datetime)
).order_by(MqttLog.timestamp.asc()).all()
periodic_logs = MqttLog.query.filter(
MqttLog.station_id == station_id,
MqttLog.topic_type == 'PERIODIC',
MqttLog.timestamp.between(start_datetime, end_datetime)
).order_by(MqttLog.timestamp.asc()).all()
except Exception as e:
return jsonify({"message": f"Could not query logs: {e}"}), 500
# 3. Initialize data structures for processing
swap_starts_map = {}
completed_swap_times = []
total_initiations = 0
total_starts = 0
completed_swaps = 0
aborted_swaps = 0
daily_completed, daily_aborted, hourly_swaps, abort_reason_counts = {}, {}, [0] * 24, {}
slot_utilization_counts = {i: 0 for i in range(1, 10)}
# 4. Process the logs to calculate all KPIs and chart data
for log in logs:
event_type = log.payload.get('eventType')
job_type = log.payload.get('jobType')
session_id = log.payload.get('sessionId')
log_date = log.timestamp.date()
log_hour = log.timestamp.hour
if job_type == 'JOBTYPE_SWAP_AUTH_SUCCESS':
total_initiations += 1
elif event_type == 'EVENT_SWAP_START':
total_starts += 1
hourly_swaps[log_hour] += 1
if session_id:
swap_starts_map[session_id] = log.timestamp
elif event_type == 'EVENT_SWAP_ENDED':
completed_swaps += 1
daily_completed[log_date] = daily_completed.get(log_date, 0) + 1
if session_id and session_id in swap_starts_map:
duration = (log.timestamp - swap_starts_map[session_id]).total_seconds()
completed_swap_times.append(duration)
del swap_starts_map[session_id]
elif event_type == 'EVENT_SWAP_ABORTED':
aborted_swaps += 1
daily_aborted[log_date] = daily_aborted.get(log_date, 0) + 1
reason = log.payload.get('eventData', {}).get('swapAbortReason', 'ABORT_UNKNOWN')
abort_reason_counts[reason] = abort_reason_counts.get(reason, 0) + 1
elif event_type == 'EVENT_SLOT_LOCK_DISENEGAGED':
slot_id = log.payload.get('eventData', {}).get('slotId')
if slot_id and slot_id in slot_utilization_counts:
slot_utilization_counts[slot_id] += 1
# --- NEW: 4. Calculate Station Uptime ---
total_period_seconds = (end_datetime - start_datetime).total_seconds()
total_downtime_seconds = 0
MAX_ONLINE_GAP_SECONDS = 30 # Assume offline if no message for over 30 seconds
if not periodic_logs:
total_downtime_seconds = total_period_seconds
else:
# Check gap from start time to first message
first_gap = (periodic_logs[0].timestamp - start_datetime).total_seconds()
if first_gap > MAX_ONLINE_GAP_SECONDS:
total_downtime_seconds += first_gap
# Check gaps between consecutive messages
for i in range(1, len(periodic_logs)):
gap = (periodic_logs[i].timestamp - periodic_logs[i-1].timestamp).total_seconds()
if gap > MAX_ONLINE_GAP_SECONDS:
total_downtime_seconds += gap
# Check gap from last message to end time
last_gap = (end_datetime - periodic_logs[-1].timestamp).total_seconds()
if last_gap > MAX_ONLINE_GAP_SECONDS:
total_downtime_seconds += last_gap
station_uptime = 100 * (1 - (total_downtime_seconds / total_period_seconds))
station_uptime = max(0, min(100, station_uptime)) # Ensure value is between 0 and 100
# 6. Prepare final data structures
avg_swap_time_seconds = sum(completed_swap_times) / len(completed_swap_times) if completed_swap_times else None
kpi_data = {
"total_swaps_initiated": total_initiations,
"total_swaps_started": total_starts,
"completed_swaps": completed_swaps,
"aborted_swaps": aborted_swaps,
"avg_swap_time_seconds": avg_swap_time_seconds,
"station_uptime": round(station_uptime, 2)
}
date_labels, completed_data, aborted_data = [], [], []
current_date = start_date
while current_date <= end_date:
date_labels.append(current_date.strftime('%b %d'))
completed_data.append(daily_completed.get(current_date, 0))
aborted_data.append(daily_aborted.get(current_date, 0))
current_date += timedelta(days=1)
swap_activity_data = {"labels": date_labels, "completed_data": completed_data, "aborted_data": aborted_data}
hourly_distribution_data = {"labels": [f"{h % 12 if h % 12 != 0 else 12} {'AM' if h < 12 else 'PM'}" for h in range(24)], "swap_data": hourly_swaps}
abort_reasons_data = {"labels": [ABORT_REASON_MAP.get(r, r) for r in abort_reason_counts.keys()], "reason_data": list(abort_reason_counts.values())}
slot_utilization_data = {"counts": [slot_utilization_counts[i] for i in range(1, 10)]} # Return counts as a simple list [_ , _, ...]
# 7. Combine all data and return
return jsonify({
"kpis": kpi_data,
"swap_activity": swap_activity_data,
"hourly_distribution": hourly_distribution_data,
"abort_reasons": abort_reasons_data,
"slot_utilization": slot_utilization_data
})
# --- CSV Export route (UPDATED) ---
def _format_periodic_row(payload, num_slots=9):
"""
Flattens a periodic payload dictionary into a single list for a CSV row.
"""
row = [
datetime.fromtimestamp(payload.get("ts")).strftime("%Y-%m-%d %H:%M:%S"),
payload.get("deviceId", ""),
payload.get("stationDiagnosticCode", "")
]
slots_data = payload.get("slotLevelPayload", [])
slot_map = {s.get('slotId', i+1): s for i, s in enumerate(slots_data)}
slot_fields_keys = [
"batteryIdentification", "batteryPresent", "chargerPresent", "doorStatus", "doorLockStatus",
"voltage", "current", "soc", "batteryMaxTemp", "slotTemperature",
"batteryFaultCode", "chargerFaultCode", "batteryMode", "chargerMode"
]
for i in range(1, num_slots + 1):
slot = slot_map.get(i)
if slot:
# Convert boolean values to readable text
# door_status_text = "OPEN" if slot.get("doorStatus", 0) == 1 else "CLOSED"
# door_lock_status_text = "UNLOCKED" if slot.get("doorLockStatus", 0) == 1 else "LOCKED"
# battery_present_text = "YES" if slot.get("batteryPresent", 0) == 1 else "NO"
# charger_present_text = "YES" if slot.get("chargerPresent", 0) == 1 else "NO"
row.extend([
slot.get('batteryIdentification', ''),
slot.get("batteryPresent", 0),
slot.get("chargerPresent", 0),
slot.get("doorStatus", 0),
slot.get("doorLockStatus", 0),
slot.get('voltage', 0) / 1000.0,
slot.get('current', 0) / 1000.0,
slot.get('soc', 0),
slot.get('batteryMaxTemp', 0) / 10.0,
slot.get('slotTemperature', 0) / 10.0,
slot.get('batteryFaultCode', 0),
slot.get('chargerFaultCode', 0),
slot.get('batteryMode', 0),
slot.get('chargerMode', 0)
])
else:
row.extend([''] * len(slot_fields_keys))
row.append('') # Placeholder for RawHexPayload
return row
@app.route('/api/logs/export', methods=['GET'])
def export_logs():
station_id = request.args.get('station_id')
start_datetime_str = request.args.get('start_datetime')
end_datetime_str = request.args.get('end_datetime')
log_type = request.args.get('log_type', 'PERIODIC')
if not all([station_id, start_datetime_str, end_datetime_str]):
return jsonify({"error": "Missing required parameters"}), 400
try:
start_datetime = datetime.strptime(start_datetime_str, '%Y-%m-%dT%H:%M')
end_datetime = datetime.strptime(end_datetime_str, '%Y-%m-%dT%H:%M')
except ValueError:
return jsonify({"error": "Invalid datetime format"}), 400
# --- FIX 1: Correctly query for Events & RPC ---
if log_type == 'EVENT':
# If frontend asks for EVENT, search for both EVENTS and REQUEST in the DB
query = MqttLog.query.filter(
MqttLog.station_id == station_id,
MqttLog.timestamp.between(start_datetime, end_datetime),
MqttLog.topic_type.in_(['EVENTS', 'REQUEST'])
)
else: # Otherwise, query for PERIODIC
query = MqttLog.query.filter(
MqttLog.station_id == station_id,
MqttLog.timestamp.between(start_datetime, end_datetime),
MqttLog.topic_type == log_type
)
logs = query.order_by(MqttLog.timestamp.asc()).all()
if not logs:
return jsonify({"message": "No logs found for the selected criteria."}), 404
output = io.StringIO()
writer = csv.writer(output)
# --- FIX 2: Create a cleaner filename ---
station = Station.query.filter_by(station_id=station_id).first()
station_name = station.name.replace(' ', '_') if station else station_id
date_str = start_datetime.strftime('%Y-%m-%d')
filename = f"{station_name}_{log_type}_{date_str}.csv"
if log_type == 'PERIODIC':
base_header = ["Timestamp", "DeviceID", "StationDiagnosticCode"]
slot_fields = [
"BatteryID", "BatteryPresent", "ChargerPresent", "DoorStatus", "DoorLockStatus",
"Voltage_V", "Current_A", "SOC_Percent", "BatteryTemp_C", "SlotTemp_C",
"BatteryFaultCode", "ChargerFaultCode", "BatteryMode", "ChargerMode"
]
slot_header = [f"Slot{i}_{field}" for i in range(1, 10) for field in slot_fields]
header = base_header + slot_header + ["RawHexPayload"]
writer.writerow(header)
for log in logs:
writer.writerow(_format_periodic_row(log.payload))
else: # For EVENTS_RPC
header = ["Timestamp", "Topic", "Payload_JSON"]
writer.writerow(header)
for log in logs:
writer.writerow([log.timestamp, log.topic, json.dumps(log.payload)])
output.seek(0)
return Response(
output,
mimetype="text/csv",
headers={"Content-Disposition": f"attachment;filename={filename}"}
)
@socketio.on('rpc_request')
def handle_rpc_request(payload):
"""
Receives a command from the web dashboard, creates a Protobuf RPC request,
and publishes it to the station via MQTT.
"""
station_id = payload.get('station_id')
command = payload.get('command')
data = payload.get('data') # This will be the slot_id or swap_pairs array
print(f"Received RPC request for station {station_id}: {command} with data {data}")
# Find the correct MQTT client for this station
mqtt_client = mqtt_clients.get(station_id)
if not mqtt_client or not mqtt_client.is_connected:
print(f"Cannot send RPC for {station_id}: MQTT client not connected.")
return # Or emit an error back to the user
# --- Create the Protobuf message based on the command ---
# This is where the logic from your snippet is implemented.
request_payload = rpcRequest(
ts=int(time.time()),
jobId=f"job_{int(time.time())}"
)
# Determine the jobType and set data based on the command string
if command == 'OPEN':
request_payload.jobType = jobType_e.JOBTYPE_GATE_OPEN_CLOSE
request_payload.slotInfo.slotId = data
request_payload.slotInfo.state = 1
elif command == 'CHG_ON':
# Replace this with the correct name from your .proto file
request_payload.jobType = jobType_e.JOBTYPE_CHARGER_ENABLE_DISABLE
request_payload.slotInfo.slotId = data
request_payload.slotInfo.state = 1 # State 1 for ON
elif command == 'CHG_OFF':
# Replace this with the correct name from your .proto file
request_payload.jobType = jobType_e.JOBTYPE_CHARGER_ENABLE_DISABLE
request_payload.slotInfo.slotId = data
request_payload.slotInfo.state = 0 # State 0 for OFF
elif command == 'START_SWAP':
# --- THIS IS THE CORRECTED LINE ---
request_payload.jobType = jobType_e.JOBTYPE_SWAP_START
if data and isinstance(data, list):
# Your logic for adding the swap pairs to the payload
for pair in data:
swap_info = request_payload.swapInfo.add()
swap_info.fromSlot = pair[0]
swap_info.toSlot = pair[1]
# --- NEW: Added handlers for Abort and Reset ---
elif command == 'ABORT_SWAP':
request_payload.jobType = jobType_e.JOBTYPE_TRANSACTION_ABORT
elif command == 'STATION_RESET':
request_payload.jobType = jobType_e.JOBTYPE_REBOOT
elif command == 'LANGUAGE_UPDATE':
request_payload.jobType = jobType_e.JOBTYPE_LANGUAGE_UPDATE
# Logic to map language string to enum would go here
else:
print(f"Unknown command: {command}")
return
# --- Serialize and Publish the Message ---
serialized_payload = request_payload.SerializeToString()
# Construct the MQTT topic
# NOTE: You may need to fetch client_id and version from your database
topic = f"VEC/batterySmartStation/v100/{station_id}/RPC/REQUEST"
print(f"Publishing to {topic}")
mqtt_client.client.publish(topic, serialized_payload)
# ADD THIS NEW FUNCTION
def start_single_mqtt_client(station):
"""
Creates and starts a new MQTT client thread for a SINGLE station.
This is our new reusable function.
"""
if station.station_id in mqtt_clients and mqtt_clients[station.station_id].is_connected:
print(f"MQTT client for {station.station_id} is already running.")
return
print(f"Creating and starting MQTT client for station: {station.name} ({station.station_id})")
client = MqttClient(
broker=station.mqtt_broker,
port=station.mqtt_port,
user=station.mqtt_user,
password=station.mqtt_password,
station_id=station.station_id,
on_message_callback=on_message_handler
)
client.start() # The start method should handle threading
mqtt_clients[station.station_id] = client
# --- Main Application Logic ---
# def start_mqtt_clients():
# """
# Initializes and starts an MQTT client for each station found in the database,
# using the specific MQTT credentials stored for each station.
# """
# try:
# with app.app_context():
# stations = Station.query.all()
# except Exception as e:
# print(f"CRITICAL: Could not query stations from the database in MQTT thread: {e}")
# return
# for station in stations:
# if station.station_id not in mqtt_clients:
# print(f"Creating and starting MQTT client for station: {station.name} ({station.station_id})")
# client = MqttClient(
# broker=station.mqtt_broker,
# port=station.mqtt_port,
# user=station.mqtt_user,
# password=station.mqtt_password,
# station_id=station.station_id,
# on_message_callback=on_message_handler
# )
# client.start()
# mqtt_clients[station.station_id] = client
def start_mqtt_clients():
"""
Initializes and starts an MQTT client for each station found in the database
by calling our new reusable function.
"""
try:
with app.app_context():
stations = Station.query.all()
print(f"Found {len(stations)} existing stations to monitor.")
except Exception as e:
print(f"CRITICAL: Could not query stations from the database: {e}")
return
for station in stations:
start_single_mqtt_client(station)
if __name__ == '__main__':
try:
with app.app_context():
db.create_all()
# Add a default user if none exist
if not User.query.first():
print("No users found. Creating a default admin user.")
default_user = User(username='admin')
default_user.set_password('password')
db.session.add(default_user)
db.session.commit()
# Add a default station if none exist
if not Station.query.first():
print("No stations found. Adding a default station.")
default_station = Station(
station_id="V16000862287077265957",
product_id="VEC_PROD_001",
name="Test Station 1",
mqtt_broker="mqtt.vecmocon.com",
mqtt_port=1883,
mqtt_user="your_username",
mqtt_password="your_password"
)
db.session.add(default_station)
db.session.commit()
except Exception as e:
print(f"FATAL ERROR: Could not connect to PostgreSQL: {e}")
sys.exit(1)
mqtt_thread = threading.Thread(target=start_mqtt_clients, daemon=True)
mqtt_thread.start()
print(f"Starting Flask-SocketIO server on http://192.168.1.10:5000")
socketio.run(app, host='192.168.1.10', port=5000)