SwapStation_WebApp/backend/main.py

825 lines
32 KiB
Python

import os
import sys
import threading
import json
import csv
import io
import time
from datetime import datetime, timedelta, timezone
from flask import Flask, jsonify, request, Response
from flask_socketio import SocketIO, join_room
from flask_cors import CORS
from dotenv import load_dotenv
from sqlalchemy import desc, func, case, String, Integer
# Import your custom core modules and the new models
from core.mqtt_client import MqttClient
from core.protobuf_decoder import ProtobufDecoder
from models import db, Station, User, MqttLog
from flask_login import login_required, current_user, LoginManager
from proto.vec_payload_chgSt_pb2 import (
rpcRequest,
jobType_e
)
# --- Load Environment Variables ---
load_dotenv()
# Load the allowed origin for CORS from an environment variable.
# Default to a local development URL if the variable is not set.
ALLOWED_ORIGIN = os.getenv("CORS_ALLOWED_ORIGIN", "http://127.0.0.1:5500")
print(f"--- INFO: Configuring CORS to allow requests from: {ALLOWED_ORIGIN} ---")
# --- Pre-startup Check for Essential Configuration ---
DATABASE_URL = os.getenv("DATABASE_URL")
if not DATABASE_URL:
print("FATAL ERROR: DATABASE_URL is not set in .env file.")
sys.exit(1)
# --- Application Setup ---
app = Flask(__name__)
# CORS(app)
# CORS(app, resources={r"/api/*": {"origins": ["http://10.10.1.169:5500","http://127.0.0.1:5500"]}}, supports_credentials=True, expose_headers='Content-Disposition')
CORS(app, resources={r"/api/*": {"origins": [ALLOWED_ORIGIN]}}, supports_credentials=True, expose_headers='Content-Disposition')
# ADD THESE LINES FOR FLASK-LOGIN
login_manager = LoginManager()
login_manager.init_app(app)
app.config['SQLALCHEMY_DATABASE_URI'] = DATABASE_URL
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False
app.config['SECRET_KEY'] = os.getenv("SECRET_KEY", "a_very_secret_key")
db.init_app(app)
socketio = SocketIO(app, cors_allowed_origins="*")
# --- User Loader for Flask-Login ---
@login_manager.user_loader
def load_user(user_id):
return User.query.get(int(user_id))
# --- Global instances ---
decoder = ProtobufDecoder()
mqtt_clients = {}
last_message_timestamps = {}
STATION_TIMEOUT_SECONDS = 10
# --- MQTT Message Handling ---
def on_message_handler(station_id, topic, payload):
message_type = topic.split('/')[-1]
if message_type in ['PERIODIC']:
last_message_timestamps[station_id] = time.time()
# Decode the message payload based on its type
decoded_data = None
if message_type == 'PERIODIC':
decoded_data = decoder.decode_periodic(payload)
elif message_type == 'EVENTS':
decoded_data = decoder.decode_event(payload)
elif message_type == 'REQUEST':
decoded_data = decoder.decode_rpc_request(payload)
if decoded_data:
try:
with app.app_context():
log_entry = MqttLog(
station_id=station_id,
topic=topic,
topic_type=message_type,
payload=decoded_data
)
db.session.add(log_entry)
db.session.commit()
except Exception as e:
print(f"Error writing to PostgreSQL: {e}")
# Emit update to the main dashboard
socketio.emit('dashboard_update', {
'stationId': station_id,
'topic': topic,
'data': decoded_data
}, room=station_id)
if message_type == 'PERIODIC':
# For periodic messages, only calculate and send the live status
# This logic is from your /api/stations route
last_msg_time = last_message_timestamps.get(station_id)
is_online = last_msg_time is not None and (time.time() - last_msg_time) < STATION_TIMEOUT_SECONDS
status_text = "Online" if is_online else "Offline"
print(f"Sending live status update: {status_text}")
socketio.emit('status_update', {'status': status_text}, room=station_id)
# Emit update notification to the analytics page
if message_type in ['EVENTS', 'REQUEST']:
socketio.emit('analytics_updated', room=station_id)
# --- (WebSocket and API routes remain the same) ---
@socketio.on('connect')
def handle_connect():
print('Client connected to WebSocket')
# --- NEW: Function to handle joining a room and sending initial data ---
@socketio.on('join_station_room')
def handle_join_station_room(data):
station_id = data['station_id']
join_room(station_id)
print(f"Client joined room for station: {station_id}")
try:
# Find the most recent log entry for this station
latest_log = MqttLog.query.filter_by(
station_id=station_id,
topic_type='PERIODIC'
).order_by(MqttLog.timestamp.desc()).first()
if latest_log:
# If we have a past log, send it immediately to the new client
print(f"Sending initial state for {station_id} to new client.")
socketio.emit('dashboard_update', {
'stationId': station_id,
'topic': latest_log.topic,
'data': latest_log.payload
}, room=station_id)
except Exception as e:
print(f"Error querying or sending initial state for {station_id}: {e}")
# --- API Routes ---
@app.route('/api/login', methods=['POST'])
def login():
"""Handles user login."""
data = request.get_json()
if not data or not data.get('username') or not data.get('password'):
return jsonify({"message": "Username and password are required."}), 400
user = User.query.filter_by(username=data['username']).first()
if user and user.check_password(data['password']):
# In a real app, you would create a session token here (e.g., with Flask-Login)
return jsonify({"message": "Login successful"}), 200
return jsonify({"message": "Invalid username or password"}), 401
# --- Admin-only: Add User ---
@app.route('/api/users', methods=['POST'])
# @login_required # Ensures the user is logged in
def add_user():
# Check if the logged-in user is an admin
# if not current_user.is_admin:
# return jsonify({"message": "Admin access required."}), 403 # Forbidden
data = request.get_json()
username = data.get('username')
password = data.get('password')
is_admin = data.get('is_admin', False)
if not username or not password:
return jsonify({"message": "Username and password are required."}), 400
if User.query.filter_by(username=username).first():
return jsonify({"message": "Username already exists."}), 409 # Conflict
new_user = User(username=username, is_admin=is_admin)
new_user.set_password(password)
db.session.add(new_user)
db.session.commit()
return jsonify({"message": "User added successfully."}), 201
# --- Admin-only: Add Station ---
@app.route('/api/stations', methods=['POST'])
# @login_required # Ensures the user is logged in
def add_station():
# if not current_user.is_admin:
# return jsonify({"message": "Admin access required."}), 403
data = request.get_json()
# All fields are now expected from the frontend form
required_fields = ['station_id', 'name', 'location', 'mqtt_broker', 'mqtt_port']
if not all(field in data for field in required_fields):
return jsonify({"message": "Missing required station details."}), 400
if Station.query.filter_by(station_id=data['station_id']).first():
return jsonify({"message": "Station ID already exists."}), 409
new_station = Station(
station_id=data['station_id'],
product_id=data['product_id'],
name=data['name'],
location=data['location'],
mqtt_broker=data['mqtt_broker'],
mqtt_port=data['mqtt_port'],
mqtt_user=data.get('mqtt_user'),
mqtt_password=data.get('mqtt_password')
)
db.session.add(new_station)
db.session.commit()
# Immediately start the MQTT client for the station just created.
start_single_mqtt_client(new_station)
return jsonify({"message": "Station added successfully."}), 201
# The new function with logging
@app.route('/api/stations/<string:station_id>', methods=['DELETE'])
def remove_station(station_id):
"""
Removes a station from the database and stops its MQTT client.
"""
print(f"\n--- REMOVE REQUEST RECEIVED for station: {station_id} ---")
# 1. Find the station in the database
station = Station.query.filter_by(station_id=station_id).first_or_404()
print(f"[LOG] Found station '{station.name}' in the database.")
# 2. Stop the running MQTT client for this station
client_to_stop = mqtt_clients.get(station_id)
if client_to_stop:
print(f"[LOG] Found active MQTT client. Attempting to stop it now...")
client_to_stop.stop()
mqtt_clients.pop(station_id, None)
print(f"[LOG] Successfully stopped and removed client object for {station_id}.")
else:
print(f"[LOG] No active MQTT client was found for {station_id}. No action needed.")
# 3. Delete the station from the database
print(f"[LOG] Attempting to delete {station_id} from the database...")
db.session.delete(station)
db.session.commit()
print(f"[LOG] Successfully deleted station from the database.")
print(f"--- REMOVE REQUEST COMPLETED for station: {station_id} ---\n")
return jsonify({"message": f"Station {station_id} removed successfully."}), 200
@app.route('/api/stations', methods=['GET'])
def get_stations():
try:
stations = Station.query.all()
station_list = []
for s in stations:
# --- NEW: More accurate heartbeat logic ---
last_msg_time = last_message_timestamps.get(s.station_id)
# A station is online only if we have received a message recently
is_online = last_msg_time is not None and (time.time() - last_msg_time) < STATION_TIMEOUT_SECONDS
station_list.append({
"id": s.station_id,
"name": s.name,
"location": s.location,
"product_id": s.product_id,
"status": "Online" if is_online else "Offline"
})
return jsonify(station_list)
except Exception as e:
return jsonify({"error": f"Database query failed: {e}"}), 500
#--- Daily Stats Route ---
@app.route('/api/stations/daily-stats', methods=['GET'])
def get_all_station_stats():
"""
Calculates the swap statistics for the last 24 hours for all stations.
"""
try:
# --- THIS IS THE FIX ---
# Calculate a rolling 24-hour window instead of a fixed "today"
now_utc = datetime.now(timezone.utc)
start_of_period = now_utc - timedelta(hours=24)
# The query now uses the correct .astext syntax for JSONB fields
stats = db.session.query(
MqttLog.station_id,
func.count(case((MqttLog.payload['eventType'].astext == 'EVENT_SWAP_START', 1))).label('total_starts'),
func.count(case((MqttLog.payload['eventType'].astext == 'EVENT_SWAP_ENDED', 1))).label('completed'),
func.count(case((MqttLog.payload['eventType'].astext == 'EVENT_SWAP_ABORTED', 1))).label('aborted')
).filter(
MqttLog.topic_type == 'EVENTS',
MqttLog.timestamp.between(start_of_period, now_utc) # Use the new 24-hour window
).group_by(MqttLog.station_id).all()
stats_dict = {
station_id: {
"total_starts": total_starts,
"completed": completed,
"aborted": aborted
} for station_id, total_starts, completed, aborted in stats
}
return jsonify(stats_dict)
except Exception as e:
print(f"Error fetching daily stats: {e}")
return jsonify({"message": "Could not fetch daily station stats."}), 500
@app.route('/api/logs/recent/<string:station_id>', methods=['GET'])
def get_recent_logs(station_id):
# Get parameters from the request, with defaults
start_date_str = request.args.get('start_date', datetime.now().strftime('%Y-%m-%d'))
end_date_str = request.args.get('end_date', datetime.now().strftime('%Y-%m-%d'))
limit_count = request.args.get('count', 50, type=int)
try:
start_date = datetime.strptime(start_date_str, '%Y-%m-%d').date()
end_date = datetime.strptime(end_date_str, '%Y-%m-%d').date()
start_datetime = datetime.combine(start_date, datetime.min.time()) # <-- FIX
end_datetime = datetime.combine(end_date, datetime.max.time())
except ValueError:
return jsonify({"message": "Invalid date format."}), 400
try:
# The query now uses all three filters
logs = MqttLog.query.filter(
MqttLog.station_id == station_id,
MqttLog.topic_type.in_(['EVENTS', 'REQUEST']),
MqttLog.timestamp.between(start_datetime, end_datetime)
).order_by(desc(MqttLog.timestamp)).limit(limit_count).all()
logs.reverse()
log_list = [{
"topic": log.topic, "payload": log.payload, "timestamp": log.timestamp.isoformat()
} for log in logs]
return jsonify(log_list)
except Exception as e:
return jsonify({"message": "Could not fetch recent logs."}), 500
# A helper dictionary to make abort reason labels more readable
ABORT_REASON_MAP = {
"ABORT_UNKNOWN": "Unknown",
"ABORT_BAT_EXIT_TIMEOUT": "Battery Exit Timeout",
"ABORT_BAT_ENTRY_TIMEOUT": "Battery Entry Timeout",
"ABORT_DOOR_CLOSE_TIMEOUT": "Door Close Timeout",
"ABORT_DOOR_OPEN_TIMEOUT": "Door Open Timeout",
"ABORT_INVALID_PARAM": "Invalid Parameter",
"ABORT_REMOTE_REQUESTED": "Remote Abort",
"ABORT_INVALID_BATTERY": "Invalid Battery"
}
#--- Analytics Route ---
@app.route('/api/analytics', methods=['GET'])
def get_analytics_data():
# 1. Get and validate request parameters
station_id = request.args.get('station_id')
start_date_str = request.args.get('start_date')
end_date_str = request.args.get('end_date')
if not all([station_id, start_date_str, end_date_str]):
return jsonify({"message": "Missing required parameters."}), 400
try:
start_date = datetime.strptime(start_date_str, '%Y-%m-%d').date()
end_date = datetime.strptime(end_date_str, '%Y-%m-%d').date()
start_datetime = datetime.combine(start_date, datetime.min.time())
end_datetime = datetime.combine(end_date, datetime.max.time())
except ValueError:
return jsonify({"message": "Invalid date format. Please use YYYY-MM-DD."}), 400
# 2. Query for ALL relevant logs (EVENTS and REQUESTS) in one go
try:
logs = MqttLog.query.filter(
MqttLog.station_id == station_id,
MqttLog.topic_type.in_(['EVENTS', 'REQUEST']),
MqttLog.timestamp.between(start_datetime, end_datetime)
).order_by(MqttLog.timestamp.asc()).all()
periodic_logs = MqttLog.query.filter(
MqttLog.station_id == station_id,
MqttLog.topic_type == 'PERIODIC',
MqttLog.timestamp.between(start_datetime, end_datetime)
).order_by(MqttLog.timestamp.asc()).all()
except Exception as e:
return jsonify({"message": f"Could not query logs: {e}"}), 500
# 3. Initialize data structures for processing
swap_starts_map = {}
completed_swap_times = []
total_initiations = 0
total_starts = 0
completed_swaps = 0
aborted_swaps = 0
daily_completed, daily_aborted, hourly_swaps, abort_reason_counts = {}, {}, [0] * 24, {}
slot_utilization_counts = {i: 0 for i in range(1, 10)}
# 4. Process the logs to calculate all KPIs and chart data
for log in logs:
event_type = log.payload.get('eventType')
job_type = log.payload.get('jobType')
session_id = log.payload.get('sessionId')
log_date = log.timestamp.date()
log_hour = log.timestamp.hour
if job_type == 'JOBTYPE_SWAP_AUTH_SUCCESS':
total_initiations += 1
elif event_type == 'EVENT_SWAP_START':
total_starts += 1
hourly_swaps[log_hour] += 1
if session_id:
swap_starts_map[session_id] = log.timestamp
elif event_type == 'EVENT_BATTERY_EXIT':
completed_swaps += 1
daily_completed[log_date] = daily_completed.get(log_date, 0) + 1
if session_id and session_id in swap_starts_map:
duration = (log.timestamp - swap_starts_map[session_id]).total_seconds()
completed_swap_times.append(duration)
del swap_starts_map[session_id]
elif event_type == 'EVENT_SWAP_ABORTED':
aborted_swaps += 1
daily_aborted[log_date] = daily_aborted.get(log_date, 0) + 1
reason = log.payload.get('eventData', {}).get('swapAbortReason', 'ABORT_UNKNOWN')
abort_reason_counts[reason] = abort_reason_counts.get(reason, 0) + 1
elif event_type == 'EVENT_SLOT_LOCK_DISENEGAGED':
slot_id = log.payload.get('eventData', {}).get('slotId')
if slot_id and slot_id in slot_utilization_counts:
slot_utilization_counts[slot_id] += 1
# --- NEW: 4. Calculate Station Uptime ---
total_period_seconds = (end_datetime - start_datetime).total_seconds()
total_downtime_seconds = 0
MAX_ONLINE_GAP_SECONDS = 30 # Assume offline if no message for over 30 seconds
if not periodic_logs:
total_downtime_seconds = total_period_seconds
else:
# Check gap from start time to first message
first_gap = (periodic_logs[0].timestamp - start_datetime).total_seconds()
if first_gap > MAX_ONLINE_GAP_SECONDS:
total_downtime_seconds += first_gap
# Check gaps between consecutive messages
for i in range(1, len(periodic_logs)):
gap = (periodic_logs[i].timestamp - periodic_logs[i-1].timestamp).total_seconds()
if gap > MAX_ONLINE_GAP_SECONDS:
total_downtime_seconds += gap
# Check gap from last message to end time
last_gap = (end_datetime - periodic_logs[-1].timestamp).total_seconds()
if last_gap > MAX_ONLINE_GAP_SECONDS:
total_downtime_seconds += last_gap
station_uptime = 100 * (1 - (total_downtime_seconds / total_period_seconds))
station_uptime = max(0, min(100, station_uptime)) # Ensure value is between 0 and 100
# 6. Prepare final data structures
avg_swap_time_seconds = sum(completed_swap_times) / len(completed_swap_times) if completed_swap_times else None
kpi_data = {
"total_swaps_initiated": total_initiations,
"total_swaps_started": total_starts,
"completed_swaps": completed_swaps,
"aborted_swaps": aborted_swaps,
"avg_swap_time_seconds": avg_swap_time_seconds,
"station_uptime": round(station_uptime, 2)
}
date_labels, completed_data, aborted_data = [], [], []
current_date = start_date
while current_date <= end_date:
date_labels.append(current_date.strftime('%b %d'))
completed_data.append(daily_completed.get(current_date, 0))
aborted_data.append(daily_aborted.get(current_date, 0))
current_date += timedelta(days=1)
swap_activity_data = {"labels": date_labels, "completed_data": completed_data, "aborted_data": aborted_data}
hourly_distribution_data = {"labels": [f"{h % 12 if h % 12 != 0 else 12} {'AM' if h < 12 else 'PM'}" for h in range(24)], "swap_data": hourly_swaps}
abort_reasons_data = {"labels": [ABORT_REASON_MAP.get(r, r) for r in abort_reason_counts.keys()], "reason_data": list(abort_reason_counts.values())}
slot_utilization_data = {"counts": [slot_utilization_counts[i] for i in range(1, 10)]} # Return counts as a simple list [_ , _, ...]
# 7. Combine all data and return
return jsonify({
"kpis": kpi_data,
"swap_activity": swap_activity_data,
"hourly_distribution": hourly_distribution_data,
"abort_reasons": abort_reasons_data,
"slot_utilization": slot_utilization_data
})
@app.route('/api/uptime/<string:station_id>', methods=['GET'])
def get_station_uptime(station_id):
"""
A lightweight endpoint to calculate only the station uptime for the last 24 hours.
"""
try:
end_datetime = datetime.now(timezone.utc)
start_datetime = end_datetime - timedelta(hours=24)
periodic_logs = MqttLog.query.filter(
MqttLog.station_id == station_id,
MqttLog.topic_type == 'PERIODIC',
MqttLog.timestamp.between(start_datetime, end_datetime)
).order_by(MqttLog.timestamp.asc()).all()
total_period_seconds = (end_datetime - start_datetime).total_seconds()
total_downtime_seconds = 0
MAX_ONLINE_GAP_SECONDS = 30
if not periodic_logs:
total_downtime_seconds = total_period_seconds
else:
first_gap = (periodic_logs[0].timestamp.replace(tzinfo=timezone.utc) - start_datetime).total_seconds()
if first_gap > MAX_ONLINE_GAP_SECONDS:
total_downtime_seconds += first_gap
for i in range(1, len(periodic_logs)):
gap = (periodic_logs[i].timestamp - periodic_logs[i-1].timestamp).total_seconds()
if gap > MAX_ONLINE_GAP_SECONDS:
total_downtime_seconds += gap
last_gap = (end_datetime - periodic_logs[-1].timestamp.replace(tzinfo=timezone.utc)).total_seconds()
if last_gap > MAX_ONLINE_GAP_SECONDS:
total_downtime_seconds += last_gap
uptime_percentage = 100 * (1 - (total_downtime_seconds / total_period_seconds))
uptime_percentage = max(0, min(100, uptime_percentage))
return jsonify({"uptime": round(uptime_percentage, 2)})
except Exception as e:
print(f"Error in uptime calculation for {station_id}: {e}")
return jsonify({"uptime": "Error"}), 500
# --- CSV Export route (UPDATED) ---
def _format_periodic_row(payload, num_slots=9):
"""
Flattens a periodic payload dictionary into a single list for a CSV row.
"""
row = [
datetime.fromtimestamp(payload.get("ts")).strftime("%Y-%m-%d %H:%M:%S"),
payload.get("deviceId", ""),
payload.get("stationDiagnosticCode", "")
]
slots_data = payload.get("slotLevelPayload", [])
slot_map = {s.get('slotId', i+1): s for i, s in enumerate(slots_data)}
slot_fields_keys = [
"batteryIdentification", "batteryPresent", "chargerPresent", "doorStatus", "doorLockStatus",
"voltage", "current", "soc", "batteryMaxTemp", "slotTemperature",
"batteryFaultCode", "chargerFaultCode", "batteryMode", "chargerMode"
]
for i in range(1, num_slots + 1):
slot = slot_map.get(i)
if slot:
row.extend([
slot.get('batteryIdentification', ''),
slot.get("batteryPresent", 0),
slot.get("chargerPresent", 0),
slot.get("doorStatus", 0),
slot.get("doorLockStatus", 0),
slot.get('voltage', 0) / 1000.0,
slot.get('current', 0) / 1000.0,
slot.get('soc', 0),
slot.get('batteryMaxTemp', 0) / 10.0,
slot.get('slotTemperature', 0) / 10.0,
slot.get('batteryFaultCode', 0),
slot.get('chargerFaultCode', 0),
slot.get('batteryMode', 0),
slot.get('chargerMode', 0)
])
else:
row.extend([''] * len(slot_fields_keys))
row.append('') # Placeholder for RawHexPayload
return row
@app.route('/api/logs/export', methods=['GET'])
def export_logs():
station_id = request.args.get('station_id')
start_datetime_str = request.args.get('start_datetime')
end_datetime_str = request.args.get('end_datetime')
log_type = request.args.get('log_type', 'PERIODIC')
if not all([station_id, start_datetime_str, end_datetime_str]):
return jsonify({"error": "Missing required parameters"}), 400
try:
start_datetime = datetime.strptime(start_datetime_str, '%Y-%m-%dT%H:%M')
end_datetime = datetime.strptime(end_datetime_str, '%Y-%m-%dT%H:%M')
except ValueError:
return jsonify({"error": "Invalid datetime format"}), 400
# --- FIX 1: Correctly query for Events & RPC ---
if log_type == 'EVENT':
# If frontend asks for EVENT, search for both EVENTS and REQUEST in the DB
query = MqttLog.query.filter(
MqttLog.station_id == station_id,
MqttLog.timestamp.between(start_datetime, end_datetime),
MqttLog.topic_type.in_(['EVENTS', 'REQUEST'])
)
else: # Otherwise, query for PERIODIC
query = MqttLog.query.filter(
MqttLog.station_id == station_id,
MqttLog.timestamp.between(start_datetime, end_datetime),
MqttLog.topic_type == log_type
)
logs = query.order_by(MqttLog.timestamp.asc()).all()
if not logs:
return jsonify({"message": "No logs found for the selected criteria."}), 404
output = io.StringIO()
writer = csv.writer(output)
# --- FIX 2: Create a cleaner filename ---
station = Station.query.filter_by(station_id=station_id).first()
station_name = station.name.replace(' ', '_') if station else station_id
date_str = start_datetime.strftime('%Y-%m-%d')
filename = f"{station_name}_{log_type}_{date_str}.csv"
if log_type == 'PERIODIC':
base_header = ["Timestamp", "DeviceID", "StationDiagnosticCode"]
slot_fields = [
"BatteryID", "BatteryPresent", "ChargerPresent", "DoorStatus", "DoorLockStatus",
"Voltage_V", "Current_A", "SOC_Percent", "BatteryTemp_C", "SlotTemp_C",
"BatteryFaultCode", "ChargerFaultCode", "BatteryMode", "ChargerMode"
]
slot_header = [f"Slot{i}_{field}" for i in range(1, 10) for field in slot_fields]
header = base_header + slot_header + ["RawHexPayload"]
writer.writerow(header)
for log in logs:
writer.writerow(_format_periodic_row(log.payload))
else: # For EVENTS_RPC
header = ["Timestamp", "Topic", "Payload_JSON"]
writer.writerow(header)
for log in logs:
writer.writerow([log.timestamp, log.topic, json.dumps(log.payload)])
output.seek(0)
return Response(
output,
mimetype="text/csv",
headers={"Content-Disposition": f"attachment;filename={filename}"}
)
@socketio.on('rpc_request')
def handle_rpc_request(payload):
"""
Receives a command from the web dashboard, creates a Protobuf RPC request,
and publishes it to the station via MQTT.
"""
station_id = payload.get('station_id')
command = payload.get('command')
data = payload.get('data') # This will be the slot_id or swap_pairs array
print(f"Received RPC request for station {station_id}: {command} with data {data}")
# Find the correct MQTT client for this station
mqtt_client = mqtt_clients.get(station_id)
if not mqtt_client or not mqtt_client.is_connected:
print(f"Cannot send RPC for {station_id}: MQTT client not connected.")
return # Or emit an error back to the user
# --- Create the Protobuf message based on the command ---
# This is where the logic from your snippet is implemented.
request_payload = rpcRequest(
ts=int(time.time()),
jobId=f"job_{int(time.time())}"
)
# Determine the jobType and set data based on the command string
if command == 'OPEN':
request_payload.jobType = jobType_e.JOBTYPE_GATE_OPEN_CLOSE
request_payload.slotInfo.slotId = data
request_payload.slotInfo.state = 1
elif command == 'CHG_ON':
# Replace this with the correct name from your .proto file
request_payload.jobType = jobType_e.JOBTYPE_CHARGER_ENABLE_DISABLE
request_payload.slotInfo.slotId = data
request_payload.slotInfo.state = 1 # State 1 for ON
elif command == 'CHG_OFF':
# Replace this with the correct name from your .proto file
request_payload.jobType = jobType_e.JOBTYPE_CHARGER_ENABLE_DISABLE
request_payload.slotInfo.slotId = data
request_payload.slotInfo.state = 0 # State 0 for OFF
elif command == 'START_SWAP':
# --- THIS IS THE CORRECTED LINE ---
request_payload.jobType = jobType_e.JOBTYPE_SWAP_START
if data and isinstance(data, list):
# Your logic for adding the swap pairs to the payload
for pair in data:
swap_info = request_payload.swapInfo.add()
swap_info.fromSlot = pair[0]
swap_info.toSlot = pair[1]
# --- NEW: Added handlers for Abort and Reset ---
elif command == 'ABORT_SWAP':
request_payload.jobType = jobType_e.JOBTYPE_TRANSACTION_ABORT
elif command == 'STATION_RESET':
request_payload.jobType = jobType_e.JOBTYPE_REBOOT
elif command == 'LANGUAGE_UPDATE':
request_payload.jobType = jobType_e.JOBTYPE_LANGUAGE_UPDATE
# Logic to map language string to enum would go here
else:
print(f"Unknown command: {command}")
return
# --- Serialize and Publish the Message ---
serialized_payload = request_payload.SerializeToString()
# Construct the MQTT topic
# NOTE: You may need to fetch client_id and version from your database
topic = f"VEC/batterySmartStation/v100/{station_id}/RPC/REQUEST"
print(f"Publishing to {topic}")
mqtt_client.client.publish(topic, serialized_payload)
# ADD THIS NEW FUNCTION
def start_single_mqtt_client(station):
"""
Creates and starts a new MQTT client thread for a SINGLE station.
This is our new reusable function.
"""
if station.station_id in mqtt_clients and mqtt_clients[station.station_id].is_connected:
print(f"MQTT client for {station.station_id} is already running.")
return
print(f"Creating and starting MQTT client for station: {station.name} ({station.station_id})")
client = MqttClient(
broker=station.mqtt_broker,
port=station.mqtt_port,
user=station.mqtt_user,
password=station.mqtt_password,
station_id=station.station_id,
on_message_callback=on_message_handler
)
client.start() # The start method should handle threading
mqtt_clients[station.station_id] = client
# --- Main Application Logic ---
def start_mqtt_clients():
"""
Initializes and starts an MQTT client for each station found in the database
by calling our new reusable function.
"""
try:
with app.app_context():
stations = Station.query.all()
print(f"Found {len(stations)} existing stations to monitor.")
except Exception as e:
print(f"CRITICAL: Could not query stations from the database: {e}")
return
for station in stations:
start_single_mqtt_client(station)
if __name__ == '__main__':
try:
with app.app_context():
db.create_all()
# Add a default user if none exist
if not User.query.first():
print("No users found. Creating a default admin user.")
default_user = User(username='admin')
default_user.set_password('password')
db.session.add(default_user)
db.session.commit()
# Add a default station if none exist
if not Station.query.first():
print("No stations found. Adding a default station.")
default_station = Station(
station_id="V16000862287077265957",
product_id="VEC_PROD_001",
name="Test Station 1",
mqtt_broker="mqtt.vecmocon.com",
mqtt_port=1883,
mqtt_user="your_username",
mqtt_password="your_password"
)
db.session.add(default_station)
db.session.commit()
except Exception as e:
print(f"FATAL ERROR: Could not connect to PostgreSQL: {e}")
sys.exit(1)
mqtt_thread = threading.Thread(target=start_mqtt_clients, daemon=True)
mqtt_thread.start()
print(f"Starting Flask-SocketIO server on http://0.0.0.0:5000")
socketio.run(app, host='0.0.0.0', port=5000)