SwapStation_WebApp/backend/main.py

965 lines
39 KiB
Python

import os
import sys
import threading
import json
import csv
import io
import time
from datetime import datetime, timedelta, timezone, time as dt_time
from zoneinfo import ZoneInfo
from flask import Flask, jsonify, request, Response
from flask_socketio import SocketIO, join_room
from flask_cors import CORS
from dotenv import load_dotenv
from sqlalchemy import desc, func, case
# Import your custom core modules and the new models
from core.mqtt_client import MqttClient
from core.protobuf_decoder import ProtobufDecoder
from models import db, Station, User, MqttLog
from flask_login import login_required, current_user, LoginManager
from proto.vec_payload_chgSt_pb2 import (
rpcRequest,
jobType_e
)
# --- Load Environment Variables ---
load_dotenv()
# --- Pre-startup Check for Essential Configuration ---
DATABASE_URL = os.getenv("DATABASE_URL")
if not DATABASE_URL:
print("FATAL ERROR: DATABASE_URL is not set in .env file.")
sys.exit(1)
# --- Application Setup ---
app = Flask(__name__)
# CORS(app)
# CORS(app, resources={r"/api/*": {"origins": "http://127.0.0.1:5500"}}, supports_credentials=True)
# CORS(app, resources={r"/api/*": {"origins": "http://127.0.0.1:5500"}}, supports_credentials=True, expose_headers='Content-Disposition')
CORS(app, resources={r"/api/*": {"origins": ["http://192.168.1.10:5500","http://127.0.0.1:5500"]}}, supports_credentials=True, expose_headers='Content-Disposition')
# CORS(app, resources={r"/api/*": {"origins": "http://localhost:5173"}}) , "http://127.0.0.1:5500"
# This tells Flask: "For any route starting with /api/, allow requests
# from the frontend running on http://localhost:5173".
# ADD THESE LINES FOR FLASK-LOGIN
login_manager = LoginManager()
login_manager.init_app(app)
app.config['SQLALCHEMY_DATABASE_URI'] = DATABASE_URL
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False
app.config['SECRET_KEY'] = os.getenv("SECRET_KEY", "a_very_secret_key")
db.init_app(app)
socketio = SocketIO(app, cors_allowed_origins="*")
IST = ZoneInfo("Asia/Kolkata")
# --- User Loader for Flask-Login ---
@login_manager.user_loader
def load_user(user_id):
return User.query.get(int(user_id))
# --- Global instances ---
decoder = ProtobufDecoder()
mqtt_clients = {}
last_message_timestamps = {}
STATION_TIMEOUT_SECONDS = 10
# --- MQTT Message Handling ---
def on_message_handler(station_id, topic, payload):
message_type = topic.split('/')[-1]
if message_type in ['PERIODIC']:
last_message_timestamps[station_id] = time.time()
# Decode the message payload based on its type
decoded_data = None
if message_type == 'PERIODIC':
decoded_data = decoder.decode_periodic(payload)
elif message_type == 'EVENTS':
decoded_data = decoder.decode_event(payload)
elif message_type == 'REQUEST':
decoded_data = decoder.decode_rpc_request(payload)
if decoded_data:
try:
with app.app_context():
log_entry = MqttLog(
station_id=station_id,
topic=topic,
topic_type=message_type,
payload=decoded_data
)
db.session.add(log_entry)
db.session.commit()
except Exception as e:
print(f"Error writing to PostgreSQL: {e}")
# Emit update to the main dashboard
socketio.emit('dashboard_update', {
'stationId': station_id,
'topic': topic,
'data': decoded_data
}, room=station_id)
if message_type == 'PERIODIC':
# For periodic messages, only calculate and send the live status
# This logic is from your /api/stations route
last_msg_time = last_message_timestamps.get(station_id)
is_online = last_msg_time is not None and (time.time() - last_msg_time) < STATION_TIMEOUT_SECONDS
status_text = "Online" if is_online else "Offline"
print(f"Sending live status update: {status_text}")
socketio.emit('status_update', {'status': status_text}, room=station_id)
# Emit update notification to the analytics page
if message_type in ['EVENTS', 'REQUEST']:
socketio.emit('analytics_updated', room=station_id)
# --- (WebSocket and API routes remain the same) ---
@socketio.on('connect')
def handle_connect():
print('Client connected to WebSocket')
# --- NEW: Function to handle joining a room and sending initial data ---
@socketio.on('join_station_room')
def handle_join_station_room(data):
station_id = data['station_id']
join_room(station_id)
print(f"Client joined room for station: {station_id}")
try:
# Find the most recent log entry for this station
latest_log = MqttLog.query.filter_by(
station_id=station_id,
topic_type='PERIODIC'
).order_by(MqttLog.timestamp.desc()).first()
if latest_log:
# If we have a past log, send it immediately to the new client
print(f"Sending initial state for {station_id} to new client.")
socketio.emit('dashboard_update', {
'stationId': station_id,
'topic': latest_log.topic,
'data': latest_log.payload
}, room=station_id)
except Exception as e:
print(f"Error querying or sending initial state for {station_id}: {e}")
# --- API Routes ---
@app.route('/api/login', methods=['POST'])
def login():
"""Handles user login."""
data = request.get_json()
if not data or not data.get('username') or not data.get('password'):
return jsonify({"message": "Username and password are required."}), 400
user = User.query.filter_by(username=data['username']).first()
if user and user.check_password(data['password']):
# In a real app, you would create a session token here (e.g., with Flask-Login)
return jsonify({"message": "Login successful"}), 200
return jsonify({"message": "Invalid username or password"}), 401
# --- Admin-only: Add User ---
@app.route('/api/users', methods=['POST'])
# @login_required # Ensures the user is logged in
def add_user():
# Check if the logged-in user is an admin
# if not current_user.is_admin:
# return jsonify({"message": "Admin access required."}), 403 # Forbidden
data = request.get_json()
username = data.get('username')
password = data.get('password')
is_admin = data.get('is_admin', False)
if not username or not password:
return jsonify({"message": "Username and password are required."}), 400
if User.query.filter_by(username=username).first():
return jsonify({"message": "Username already exists."}), 409 # Conflict
new_user = User(username=username, is_admin=is_admin)
new_user.set_password(password)
db.session.add(new_user)
db.session.commit()
return jsonify({"message": "User added successfully."}), 201
# --- Admin-only: Add Station ---
@app.route('/api/stations', methods=['POST'])
# @login_required # Ensures the user is logged in
def add_station():
# if not current_user.is_admin:
# return jsonify({"message": "Admin access required."}), 403
data = request.get_json()
# All fields are now expected from the frontend form
required_fields = ['station_id', 'name', 'location', 'mqtt_broker', 'mqtt_port']
if not all(field in data for field in required_fields):
return jsonify({"message": "Missing required station details."}), 400
if Station.query.filter_by(station_id=data['station_id']).first():
return jsonify({"message": "Station ID already exists."}), 409
new_station = Station(
station_id=data['station_id'],
product_id=data['product_id'],
name=data['name'],
location=data['location'],
mqtt_broker=data['mqtt_broker'],
mqtt_port=data['mqtt_port'],
mqtt_user=data.get('mqtt_user'),
mqtt_password=data.get('mqtt_password')
)
db.session.add(new_station)
db.session.commit()
# Immediately start the MQTT client for the station just created.
start_single_mqtt_client(new_station)
return jsonify({"message": "Station added successfully."}), 201
# The new function with logging
@app.route('/api/stations/<string:station_id>', methods=['DELETE'])
def remove_station(station_id):
"""
Removes a station from the database and stops its MQTT client.
"""
print(f"\n--- REMOVE REQUEST RECEIVED for station: {station_id} ---")
# 1. Find the station in the database
station = Station.query.filter_by(station_id=station_id).first_or_404()
print(f"[LOG] Found station '{station.name}' in the database.")
# 2. Stop the running MQTT client for this station
client_to_stop = mqtt_clients.get(station_id)
if client_to_stop:
print(f"[LOG] Found active MQTT client. Attempting to stop it now...")
client_to_stop.stop()
mqtt_clients.pop(station_id, None)
print(f"[LOG] Successfully stopped and removed client object for {station_id}.")
else:
print(f"[LOG] No active MQTT client was found for {station_id}. No action needed.")
# 3. Delete the station from the database
print(f"[LOG] Attempting to delete {station_id} from the database...")
db.session.delete(station)
db.session.commit()
print(f"[LOG] Successfully deleted station from the database.")
print(f"--- REMOVE REQUEST COMPLETED for station: {station_id} ---\n")
return jsonify({"message": f"Station {station_id} removed successfully."}), 200
@app.route('/api/stations', methods=['GET'])
def get_stations():
try:
stations = Station.query.all()
station_list = []
for s in stations:
# --- NEW: More accurate heartbeat logic ---
last_msg_time = last_message_timestamps.get(s.station_id)
# A station is online only if we have received a message recently
is_online = last_msg_time is not None and (time.time() - last_msg_time) < STATION_TIMEOUT_SECONDS
station_list.append({
"id": s.station_id,
"name": s.name,
"location": s.location,
"product_id": s.product_id,
"status": "Online" if is_online else "Offline"
})
return jsonify(station_list)
except Exception as e:
return jsonify({"error": f"Database query failed: {e}"}), 500
#--- Daily Stats Route ---
@app.route('/api/stations/daily-stats', methods=['GET'])
def get_all_station_stats():
"""
Calculates the swap statistics for the current calendar day in the IST timezone.
"""
try:
# Get the current time and date in your timezone (IST is defined globally)
now_ist = datetime.now(IST)
today_ist = now_ist.date()
# Calculate the precise start and end of that day
start_of_day_ist = datetime.combine(today_ist, dt_time.min, tzinfo=IST)
end_of_day_ist = datetime.combine(today_ist, dt_time.max, tzinfo=IST)
# --- The rest of the query uses this new date range ---
stats = db.session.query(
MqttLog.station_id,
func.count(case((MqttLog.payload['eventType'].astext == 'EVENT_SWAP_START', 1))).label('total_starts'),
func.count(case((MqttLog.payload['eventType'].astext == 'EVENT_SWAP_ENDED', 1))).label('completed'),
func.count(case((MqttLog.payload['eventType'].astext == 'EVENT_SWAP_ABORTED', 1))).label('aborted')
).filter(
MqttLog.topic_type == 'EVENTS',
# Use the new timezone-aware date range
MqttLog.timestamp.between(start_of_day_ist, end_of_day_ist)
).group_by(MqttLog.station_id).all()
stats_dict = {
station_id: {
"total_starts": total_starts,
"completed": completed,
"aborted": aborted
} for station_id, total_starts, completed, aborted in stats
}
return jsonify(stats_dict)
except Exception as e:
print(f"Error fetching daily stats: {e}")
return jsonify({"message": "Could not fetch daily station stats."}), 500
@app.route('/api/logs/recent/<string:station_id>', methods=['GET'])
def get_recent_logs(station_id):
# Get parameters from the request, with defaults
start_date_str = request.args.get('start_date', datetime.now().strftime('%Y-%m-%d'))
end_date_str = request.args.get('end_date', datetime.now().strftime('%Y-%m-%d'))
limit_count = request.args.get('count', 50, type=int)
try:
start_date = datetime.strptime(start_date_str, '%Y-%m-%d').date()
end_date = datetime.strptime(end_date_str, '%Y-%m-%d').date()
start_datetime = datetime.combine(start_date, datetime.min.time()) # <-- FIX
end_datetime = datetime.combine(end_date, datetime.max.time())
except ValueError:
return jsonify({"message": "Invalid date format."}), 400
try:
# The query now uses all three filters
logs = MqttLog.query.filter(
MqttLog.station_id == station_id,
MqttLog.topic_type.in_(['EVENTS', 'REQUEST']),
MqttLog.timestamp.between(start_datetime, end_datetime)
).order_by(desc(MqttLog.timestamp)).limit(limit_count).all()
logs.reverse()
log_list = [{
"topic": log.topic, "payload": log.payload, "timestamp": log.timestamp.isoformat()
} for log in logs]
return jsonify(log_list)
except Exception as e:
return jsonify({"message": "Could not fetch recent logs."}), 500
# A helper dictionary to make abort reason labels more readable
ABORT_REASON_MAP = {
"ABORT_UNKNOWN": "Unknown",
"ABORT_BAT_EXIT_TIMEOUT": "Battery Exit Timeout",
"ABORT_BAT_ENTRY_TIMEOUT": "Battery Entry Timeout",
"ABORT_DOOR_CLOSE_TIMEOUT": "Door Close Timeout",
"ABORT_DOOR_OPEN_TIMEOUT": "Door Open Timeout",
"ABORT_INVALID_PARAM": "Invalid Parameter",
"ABORT_REMOTE_REQUESTED": "Remote Abort",
"ABORT_INVALID_BATTERY": "Invalid Battery"
}
#--- Analytics Route ---
# @app.route('/api/analytics', methods=['GET'])
# def get_analytics_data():
# # 1. Get and validate request parameters (same as before)
# station_id = request.args.get('station_id')
# start_date_str = request.args.get('start_date')
# end_date_str = request.args.get('end_date')
# if not all([station_id, start_date_str, end_date_str]):
# return jsonify({"message": "Missing required parameters."}), 400
# try:
# start_date = datetime.strptime(start_date_str, '%Y-%m-%d').date()
# end_date = datetime.strptime(end_date_str, '%Y-%m-%d').date()
# start_datetime = datetime.combine(start_date, datetime.min.time())
# end_datetime = datetime.combine(end_date, datetime.max.time())
# except ValueError:
# return jsonify({"message": "Invalid date format. Please use YYYY-MM-DD."}), 400
# # 2. Query for EVENT logs (for swap calculations)
# try:
# event_logs = MqttLog.query.filter(
# MqttLog.station_id == station_id,
# MqttLog.topic_type == 'EVENTS',
# MqttLog.timestamp.between(start_datetime, end_datetime)
# ).order_by(MqttLog.timestamp.asc()).all() # <-- ADD THIS SORTING
# except Exception as e:
# return jsonify({"message": f"Could not query event logs: {e}"}), 500
# # --- NEW: Query for PERIODIC logs (for uptime calculation) ---
# try:
# periodic_logs = MqttLog.query.filter(
# MqttLog.station_id == station_id,
# MqttLog.topic_type == 'PERIODIC',
# MqttLog.timestamp.between(start_datetime, end_datetime)
# ).order_by(MqttLog.timestamp.asc()).all()
# except Exception as e:
# return jsonify({"message": f"Could not query periodic logs: {e}"}), 500
# # --- 3. REVISED: Process logs to calculate KPIs and chart data ---
# swap_starts = {} # Dictionary to store start times by sessionId
# completed_swap_times = []
# total_swaps, completed_swaps, aborted_swaps = 0, 0, 0
# daily_completed, daily_aborted, hourly_swaps, abort_reason_counts = {}, {}, [0] * 24, {}
# slot_utilization_counts = {i: 0 for i in range(1, 10)}
# print("\n--- STARTING SWAP ANALYSIS ---") # Add this line
# for log in event_logs:
# event_type = log.payload.get('eventType')
# session_id = log.payload.get('sessionId')
# log_date = log.timestamp.date()
# log_hour = log.timestamp.hour
# if event_type == 'EVENT_SWAP_START':
# total_swaps += 1
# hourly_swaps[log_hour] += 1
# if session_id:
# swap_starts[session_id] = log.timestamp # Store start time
# print(f"Found START for session '{session_id}' at {log.timestamp}") # Add this line
# elif event_type == 'EVENT_SWAP_ENDED':
# completed_swaps += 1
# daily_completed[log_date] = daily_completed.get(log_date, 0) + 1
# if session_id and session_id in swap_starts:
# # Calculate duration if we have a matching start event
# duration = (log.timestamp - swap_starts[session_id]).total_seconds()
# completed_swap_times.append(duration)
# print(f"Found MATCHING END for session '{session_id}'. Duration: {duration}s") # Add this line
# del swap_starts[session_id] # Remove to prevent reuse
# else:
# print(f"Found END event but could not find matching START for session '{session_id}'") # Add this line
# elif event_type == 'EVENT_SWAP_ABORTED':
# aborted_swaps += 1
# daily_aborted[log_date] = daily_aborted.get(log_date, 0) + 1
# reason = log.payload.get('eventData', {}).get('swapAbortReason', 'ABORT_UNKNOWN')
# abort_reason_counts[reason] = abort_reason_counts.get(reason, 0) + 1
# elif event_type == 'EVENT_BATTERY_EXIT':
# slot_id = log.payload.get('eventData', {}).get('slotId')
# if slot_id and slot_id in slot_utilization_counts:
# slot_utilization_counts[slot_id] += 1
# print(f"--- ANALYSIS COMPLETE ---") # Add this line
# print(f"Calculated Durations: {completed_swap_times}") # Add this line
# # --- NEW: 4. Calculate Station Uptime ---
# total_period_seconds = (end_datetime - start_datetime).total_seconds()
# total_downtime_seconds = 0
# MAX_ONLINE_GAP_SECONDS = 30 # Assume offline if no message for over 30 seconds
# if not periodic_logs:
# total_downtime_seconds = total_period_seconds
# else:
# # Check gap from start time to first message
# first_gap = (periodic_logs[0].timestamp - start_datetime).total_seconds()
# if first_gap > MAX_ONLINE_GAP_SECONDS:
# total_downtime_seconds += first_gap
# # Check gaps between consecutive messages
# for i in range(1, len(periodic_logs)):
# gap = (periodic_logs[i].timestamp - periodic_logs[i-1].timestamp).total_seconds()
# if gap > MAX_ONLINE_GAP_SECONDS:
# total_downtime_seconds += gap
# # Check gap from last message to end time
# last_gap = (end_datetime - periodic_logs[-1].timestamp).total_seconds()
# if last_gap > MAX_ONLINE_GAP_SECONDS:
# total_downtime_seconds += last_gap
# station_uptime = 100 * (1 - (total_downtime_seconds / total_period_seconds))
# station_uptime = max(0, min(100, station_uptime)) # Ensure value is between 0 and 100
# # 5. Prepare final data structures (KPI section is now updated)
# avg_swap_time_seconds = sum(completed_swap_times) / len(completed_swap_times) if completed_swap_times else 0
# # avg_swap_time_seconds = sum(completed_swap_times) / len(completed_swap_times) if completed_swap_times else None
# kpi_data = {
# "total_swaps": total_swaps, "completed_swaps": completed_swaps,
# "aborted_swaps": aborted_swaps, "avg_swap_time_seconds": avg_swap_time_seconds,
# "station_uptime": round(station_uptime, 2) # Add uptime to the KPI object
# }
# # (The rest of the chart data preparation is unchanged)
# date_labels, completed_data, aborted_data = [], [], []
# current_date = start_date
# while current_date <= end_date:
# date_labels.append(current_date.strftime('%b %d'))
# completed_data.append(daily_completed.get(current_date, 0))
# aborted_data.append(daily_aborted.get(current_date, 0))
# current_date += timedelta(days=1)
# swap_activity_data = {"labels": date_labels, "completed_data": completed_data, "aborted_data": aborted_data}
# hourly_distribution_data = {"labels": [f"{h % 12 if h % 12 != 0 else 12} {'AM' if h < 12 else 'PM'}" for h in range(24)], "swap_data": hourly_swaps}
# abort_reasons_data = {"labels": [ABORT_REASON_MAP.get(r, r) for r in abort_reason_counts.keys()], "reason_data": list(abort_reason_counts.values())}
# slot_utilization_data = {"counts": [slot_utilization_counts[i] for i in range(1, 10)]} # Return counts as a simple list [_ , _, ...]
# # 6. Combine all data and return
# return jsonify({
# "kpis": kpi_data,
# "swap_activity": swap_activity_data,
# "hourly_distribution": hourly_distribution_data,
# "abort_reasons": abort_reasons_data,
# "slot_utilization": slot_utilization_data # <-- ADD THIS NEW KEY
# })
@app.route('/api/analytics', methods=['GET'])
def get_analytics_data():
# 1. Get and validate request parameters
station_id = request.args.get('station_id')
start_date_str = request.args.get('start_date')
end_date_str = request.args.get('end_date')
if not all([station_id, start_date_str, end_date_str]):
return jsonify({"message": "Missing required parameters."}), 400
try:
start_date = datetime.strptime(start_date_str, '%Y-%m-%d').date()
end_date = datetime.strptime(end_date_str, '%Y-%m-%d').date()
start_datetime = datetime.combine(start_date, datetime.min.time())
end_datetime = datetime.combine(end_date, datetime.max.time())
except ValueError:
return jsonify({"message": "Invalid date format. Please use YYYY-MM-DD."}), 400
# 2. Query for ALL relevant logs (EVENTS and REQUESTS) in one go
try:
logs = MqttLog.query.filter(
MqttLog.station_id == station_id,
MqttLog.topic_type.in_(['EVENTS', 'REQUEST']),
MqttLog.timestamp.between(start_datetime, end_datetime)
).order_by(MqttLog.timestamp.asc()).all()
periodic_logs = MqttLog.query.filter(
MqttLog.station_id == station_id,
MqttLog.topic_type == 'PERIODIC',
MqttLog.timestamp.between(start_datetime, end_datetime)
).order_by(MqttLog.timestamp.asc()).all()
except Exception as e:
return jsonify({"message": f"Could not query logs: {e}"}), 500
# 3. Initialize data structures for processing
swap_starts_map = {}
completed_swap_times = []
total_initiations = 0
total_starts = 0
completed_swaps = 0
aborted_swaps = 0
daily_completed, daily_aborted, hourly_swaps, abort_reason_counts = {}, {}, [0] * 24, {}
slot_utilization_counts = {i: 0 for i in range(1, 10)}
# 4. Process the logs to calculate all KPIs and chart data
for log in logs:
event_type = log.payload.get('eventType')
job_type = log.payload.get('jobType')
session_id = log.payload.get('sessionId')
log_date = log.timestamp.date()
log_hour = log.timestamp.hour
if job_type == 'JOBTYPE_SWAP_AUTH_SUCCESS':
total_initiations += 1
elif event_type == 'EVENT_SWAP_START':
total_starts += 1
hourly_swaps[log_hour] += 1
if session_id:
swap_starts_map[session_id] = log.timestamp
elif event_type == 'EVENT_SWAP_ENDED':
completed_swaps += 1
daily_completed[log_date] = daily_completed.get(log_date, 0) + 1
if session_id and session_id in swap_starts_map:
duration = (log.timestamp - swap_starts_map[session_id]).total_seconds()
completed_swap_times.append(duration)
del swap_starts_map[session_id]
elif event_type == 'EVENT_SWAP_ABORTED':
aborted_swaps += 1
daily_aborted[log_date] = daily_aborted.get(log_date, 0) + 1
reason = log.payload.get('eventData', {}).get('swapAbortReason', 'ABORT_UNKNOWN')
abort_reason_counts[reason] = abort_reason_counts.get(reason, 0) + 1
elif event_type == 'EVENT_SLOT_LOCK_DISENEGAGED':
slot_id = log.payload.get('eventData', {}).get('slotId')
if slot_id and slot_id in slot_utilization_counts:
slot_utilization_counts[slot_id] += 1
# --- NEW: 4. Calculate Station Uptime ---
total_period_seconds = (end_datetime - start_datetime).total_seconds()
total_downtime_seconds = 0
MAX_ONLINE_GAP_SECONDS = 30 # Assume offline if no message for over 30 seconds
if not periodic_logs:
total_downtime_seconds = total_period_seconds
else:
# Check gap from start time to first message
first_gap = (periodic_logs[0].timestamp - start_datetime).total_seconds()
if first_gap > MAX_ONLINE_GAP_SECONDS:
total_downtime_seconds += first_gap
# Check gaps between consecutive messages
for i in range(1, len(periodic_logs)):
gap = (periodic_logs[i].timestamp - periodic_logs[i-1].timestamp).total_seconds()
if gap > MAX_ONLINE_GAP_SECONDS:
total_downtime_seconds += gap
# Check gap from last message to end time
last_gap = (end_datetime - periodic_logs[-1].timestamp).total_seconds()
if last_gap > MAX_ONLINE_GAP_SECONDS:
total_downtime_seconds += last_gap
station_uptime = 100 * (1 - (total_downtime_seconds / total_period_seconds))
station_uptime = max(0, min(100, station_uptime)) # Ensure value is between 0 and 100
# 6. Prepare final data structures
avg_swap_time_seconds = sum(completed_swap_times) / len(completed_swap_times) if completed_swap_times else None
kpi_data = {
"total_swaps_initiated": total_initiations,
"total_swaps_started": total_starts,
"completed_swaps": completed_swaps,
"aborted_swaps": aborted_swaps,
"avg_swap_time_seconds": avg_swap_time_seconds,
"station_uptime": round(station_uptime, 2)
}
date_labels, completed_data, aborted_data = [], [], []
current_date = start_date
while current_date <= end_date:
date_labels.append(current_date.strftime('%b %d'))
completed_data.append(daily_completed.get(current_date, 0))
aborted_data.append(daily_aborted.get(current_date, 0))
current_date += timedelta(days=1)
swap_activity_data = {"labels": date_labels, "completed_data": completed_data, "aborted_data": aborted_data}
hourly_distribution_data = {"labels": [f"{h % 12 if h % 12 != 0 else 12} {'AM' if h < 12 else 'PM'}" for h in range(24)], "swap_data": hourly_swaps}
abort_reasons_data = {"labels": [ABORT_REASON_MAP.get(r, r) for r in abort_reason_counts.keys()], "reason_data": list(abort_reason_counts.values())}
slot_utilization_data = {"counts": [slot_utilization_counts[i] for i in range(1, 10)]} # Return counts as a simple list [_ , _, ...]
# 7. Combine all data and return
return jsonify({
"kpis": kpi_data,
"swap_activity": swap_activity_data,
"hourly_distribution": hourly_distribution_data,
"abort_reasons": abort_reasons_data,
"slot_utilization": slot_utilization_data
})
# --- CSV Export route (UPDATED) ---
def _format_periodic_row(payload, num_slots=9):
"""
Flattens a periodic payload dictionary into a single list for a CSV row.
"""
row = [
datetime.fromtimestamp(payload.get("ts")).strftime("%Y-%m-%d %H:%M:%S"),
payload.get("deviceId", ""),
payload.get("stationDiagnosticCode", "")
]
slots_data = payload.get("slotLevelPayload", [])
slot_map = {s.get('slotId', i+1): s for i, s in enumerate(slots_data)}
slot_fields_keys = [
"batteryIdentification", "batteryPresent", "chargerPresent", "doorStatus", "doorLockStatus",
"voltage", "current", "soc", "batteryMaxTemp", "slotTemperature",
"batteryFaultCode", "chargerFaultCode", "batteryMode", "chargerMode"
]
for i in range(1, num_slots + 1):
slot = slot_map.get(i)
if slot:
# Convert boolean values to readable text
# door_status_text = "OPEN" if slot.get("doorStatus", 0) == 1 else "CLOSED"
# door_lock_status_text = "UNLOCKED" if slot.get("doorLockStatus", 0) == 1 else "LOCKED"
# battery_present_text = "YES" if slot.get("batteryPresent", 0) == 1 else "NO"
# charger_present_text = "YES" if slot.get("chargerPresent", 0) == 1 else "NO"
row.extend([
slot.get('batteryIdentification', ''),
slot.get("batteryPresent", 0),
slot.get("chargerPresent", 0),
slot.get("doorStatus", 0),
slot.get("doorLockStatus", 0),
slot.get('voltage', 0) / 1000.0,
slot.get('current', 0) / 1000.0,
slot.get('soc', 0),
slot.get('batteryMaxTemp', 0) / 10.0,
slot.get('slotTemperature', 0) / 10.0,
slot.get('batteryFaultCode', 0),
slot.get('chargerFaultCode', 0),
slot.get('batteryMode', 0),
slot.get('chargerMode', 0)
])
else:
row.extend([''] * len(slot_fields_keys))
row.append('') # Placeholder for RawHexPayload
return row
@app.route('/api/logs/export', methods=['GET'])
def export_logs():
station_id = request.args.get('station_id')
start_datetime_str = request.args.get('start_datetime')
end_datetime_str = request.args.get('end_datetime')
log_type = request.args.get('log_type', 'PERIODIC')
if not all([station_id, start_datetime_str, end_datetime_str]):
return jsonify({"error": "Missing required parameters"}), 400
try:
start_datetime = datetime.strptime(start_datetime_str, '%Y-%m-%dT%H:%M')
end_datetime = datetime.strptime(end_datetime_str, '%Y-%m-%dT%H:%M')
except ValueError:
return jsonify({"error": "Invalid datetime format"}), 400
# --- FIX 1: Correctly query for Events & RPC ---
if log_type == 'EVENT':
# If frontend asks for EVENT, search for both EVENTS and REQUEST in the DB
query = MqttLog.query.filter(
MqttLog.station_id == station_id,
MqttLog.timestamp.between(start_datetime, end_datetime),
MqttLog.topic_type.in_(['EVENTS', 'REQUEST'])
)
else: # Otherwise, query for PERIODIC
query = MqttLog.query.filter(
MqttLog.station_id == station_id,
MqttLog.timestamp.between(start_datetime, end_datetime),
MqttLog.topic_type == log_type
)
logs = query.order_by(MqttLog.timestamp.asc()).all()
if not logs:
return jsonify({"message": "No logs found for the selected criteria."}), 404
output = io.StringIO()
writer = csv.writer(output)
# --- FIX 2: Create a cleaner filename ---
station = Station.query.filter_by(station_id=station_id).first()
station_name = station.name.replace(' ', '_') if station else station_id
date_str = start_datetime.strftime('%Y-%m-%d')
filename = f"{station_name}_{log_type}_{date_str}.csv"
if log_type == 'PERIODIC':
base_header = ["Timestamp", "DeviceID", "StationDiagnosticCode"]
slot_fields = [
"BatteryID", "BatteryPresent", "ChargerPresent", "DoorStatus", "DoorLockStatus",
"Voltage_V", "Current_A", "SOC_Percent", "BatteryTemp_C", "SlotTemp_C",
"BatteryFaultCode", "ChargerFaultCode", "BatteryMode", "ChargerMode"
]
slot_header = [f"Slot{i}_{field}" for i in range(1, 10) for field in slot_fields]
header = base_header + slot_header + ["RawHexPayload"]
writer.writerow(header)
for log in logs:
writer.writerow(_format_periodic_row(log.payload))
else: # For EVENTS_RPC
header = ["Timestamp", "Topic", "Payload_JSON"]
writer.writerow(header)
for log in logs:
writer.writerow([log.timestamp, log.topic, json.dumps(log.payload)])
output.seek(0)
return Response(
output,
mimetype="text/csv",
headers={"Content-Disposition": f"attachment;filename={filename}"}
)
@socketio.on('rpc_request')
def handle_rpc_request(payload):
"""
Receives a command from the web dashboard, creates a Protobuf RPC request,
and publishes it to the station via MQTT.
"""
station_id = payload.get('station_id')
command = payload.get('command')
data = payload.get('data') # This will be the slot_id or swap_pairs array
print(f"Received RPC request for station {station_id}: {command} with data {data}")
# Find the correct MQTT client for this station
mqtt_client = mqtt_clients.get(station_id)
if not mqtt_client or not mqtt_client.is_connected:
print(f"Cannot send RPC for {station_id}: MQTT client not connected.")
return # Or emit an error back to the user
# --- Create the Protobuf message based on the command ---
# This is where the logic from your snippet is implemented.
request_payload = rpcRequest(
ts=int(time.time()),
jobId=f"job_{int(time.time())}"
)
# Determine the jobType and set data based on the command string
if command == 'OPEN':
request_payload.jobType = jobType_e.JOBTYPE_GATE_OPEN_CLOSE
request_payload.slotInfo.slotId = data
request_payload.slotInfo.state = 1
elif command == 'CHG_ON':
# Replace this with the correct name from your .proto file
request_payload.jobType = jobType_e.JOBTYPE_CHARGER_ENABLE_DISABLE
request_payload.slotInfo.slotId = data
request_payload.slotInfo.state = 1 # State 1 for ON
elif command == 'CHG_OFF':
# Replace this with the correct name from your .proto file
request_payload.jobType = jobType_e.JOBTYPE_CHARGER_ENABLE_DISABLE
request_payload.slotInfo.slotId = data
request_payload.slotInfo.state = 0 # State 0 for OFF
elif command == 'START_SWAP':
# --- THIS IS THE CORRECTED LINE ---
request_payload.jobType = jobType_e.JOBTYPE_SWAP_START
if data and isinstance(data, list):
# Your logic for adding the swap pairs to the payload
for pair in data:
swap_info = request_payload.swapInfo.add()
swap_info.fromSlot = pair[0]
swap_info.toSlot = pair[1]
# --- NEW: Added handlers for Abort and Reset ---
elif command == 'ABORT_SWAP':
request_payload.jobType = jobType_e.JOBTYPE_TRANSACTION_ABORT
elif command == 'STATION_RESET':
request_payload.jobType = jobType_e.JOBTYPE_REBOOT
elif command == 'LANGUAGE_UPDATE':
request_payload.jobType = jobType_e.JOBTYPE_LANGUAGE_UPDATE
# Logic to map language string to enum would go here
else:
print(f"Unknown command: {command}")
return
# --- Serialize and Publish the Message ---
serialized_payload = request_payload.SerializeToString()
# Construct the MQTT topic
# NOTE: You may need to fetch client_id and version from your database
topic = f"VEC/batterySmartStation/v100/{station_id}/RPC/REQUEST"
print(f"Publishing to {topic}")
mqtt_client.client.publish(topic, serialized_payload)
# ADD THIS NEW FUNCTION
def start_single_mqtt_client(station):
"""
Creates and starts a new MQTT client thread for a SINGLE station.
This is our new reusable function.
"""
if station.station_id in mqtt_clients and mqtt_clients[station.station_id].is_connected:
print(f"MQTT client for {station.station_id} is already running.")
return
print(f"Creating and starting MQTT client for station: {station.name} ({station.station_id})")
client = MqttClient(
broker=station.mqtt_broker,
port=station.mqtt_port,
user=station.mqtt_user,
password=station.mqtt_password,
station_id=station.station_id,
on_message_callback=on_message_handler
)
client.start() # The start method should handle threading
mqtt_clients[station.station_id] = client
# --- Main Application Logic ---
# def start_mqtt_clients():
# """
# Initializes and starts an MQTT client for each station found in the database,
# using the specific MQTT credentials stored for each station.
# """
# try:
# with app.app_context():
# stations = Station.query.all()
# except Exception as e:
# print(f"CRITICAL: Could not query stations from the database in MQTT thread: {e}")
# return
# for station in stations:
# if station.station_id not in mqtt_clients:
# print(f"Creating and starting MQTT client for station: {station.name} ({station.station_id})")
# client = MqttClient(
# broker=station.mqtt_broker,
# port=station.mqtt_port,
# user=station.mqtt_user,
# password=station.mqtt_password,
# station_id=station.station_id,
# on_message_callback=on_message_handler
# )
# client.start()
# mqtt_clients[station.station_id] = client
def start_mqtt_clients():
"""
Initializes and starts an MQTT client for each station found in the database
by calling our new reusable function.
"""
try:
with app.app_context():
stations = Station.query.all()
print(f"Found {len(stations)} existing stations to monitor.")
except Exception as e:
print(f"CRITICAL: Could not query stations from the database: {e}")
return
for station in stations:
start_single_mqtt_client(station)
if __name__ == '__main__':
try:
with app.app_context():
db.create_all()
# Add a default user if none exist
if not User.query.first():
print("No users found. Creating a default admin user.")
default_user = User(username='admin')
default_user.set_password('password')
db.session.add(default_user)
db.session.commit()
# Add a default station if none exist
if not Station.query.first():
print("No stations found. Adding a default station.")
default_station = Station(
station_id="V16000862287077265957",
product_id="VEC_PROD_001",
name="Test Station 1",
mqtt_broker="mqtt.vecmocon.com",
mqtt_port=1883,
mqtt_user="your_username",
mqtt_password="your_password"
)
db.session.add(default_station)
db.session.commit()
except Exception as e:
print(f"FATAL ERROR: Could not connect to PostgreSQL: {e}")
sys.exit(1)
mqtt_thread = threading.Thread(target=start_mqtt_clients, daemon=True)
mqtt_thread.start()
print(f"Starting Flask-SocketIO server on http://192.168.1.10:5000")
socketio.run(app, host='192.168.1.10', port=5000)