SwapStation_WebApp/backend/main.py

528 lines
19 KiB
Python

import os
import sys
import threading
import json
import csv
import io
import time
from datetime import datetime
from flask import Flask, jsonify, request, Response
from flask_socketio import SocketIO, join_room
from flask_cors import CORS
from dotenv import load_dotenv
# Import your custom core modules and the new models
from core.mqtt_client import MqttClient
from core.protobuf_decoder import ProtobufDecoder
from models import db, Station, User, MqttLog
from flask_login import login_required, current_user, LoginManager
from proto.vec_payload_chgSt_pb2 import (
rpcRequest,
jobType_e
)
# --- Load Environment Variables ---
load_dotenv()
# --- Pre-startup Check for Essential Configuration ---
DATABASE_URL = os.getenv("DATABASE_URL")
if not DATABASE_URL:
print("FATAL ERROR: DATABASE_URL is not set in .env file.")
sys.exit(1)
# --- Application Setup ---
app = Flask(__name__)
# CORS(app)
# CORS(app, resources={r"/api/*": {"origins": "http://127.0.0.1:5500"}}, supports_credentials=True)
# CORS(app, resources={r"/api/*": {"origins": "http://127.0.0.1:5500"}}, supports_credentials=True, expose_headers='Content-Disposition')
CORS(app, resources={r"/api/*": {"origins": ["http://192.168.1.12:5500","http://127.0.0.1:5500"]}}, supports_credentials=True, expose_headers='Content-Disposition')
# CORS(app, resources={r"/api/*": {"origins": "http://localhost:5173"}}) , "http://127.0.0.1:5500"
# This tells Flask: "For any route starting with /api/, allow requests
# from the frontend running on http://localhost:5173".
# ADD THESE LINES FOR FLASK-LOGIN
login_manager = LoginManager()
login_manager.init_app(app)
app.config['SQLALCHEMY_DATABASE_URI'] = DATABASE_URL
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False
app.config['SECRET_KEY'] = os.getenv("SECRET_KEY", "a_very_secret_key")
db.init_app(app)
socketio = SocketIO(app, cors_allowed_origins="*")
# --- User Loader for Flask-Login ---
@login_manager.user_loader
def load_user(user_id):
return User.query.get(int(user_id))
# --- Global instances ---
decoder = ProtobufDecoder()
mqtt_clients = {}
last_message_timestamps = {}
STATION_TIMEOUT_SECONDS = 10
# --- MQTT Message Handling ---
def on_message_handler(station_id, topic, payload):
last_message_timestamps[station_id] = time.time()
print(f"Main handler received message for station {station_id} on topic {topic}")
decoded_data = None
message_type = topic.split('/')[-1]
if message_type == 'PERIODIC':
decoded_data = decoder.decode_periodic(payload)
elif message_type == 'EVENTS':
decoded_data = decoder.decode_event(payload)
elif message_type == 'REQUEST':
decoded_data = decoder.decode_rpc_request(payload)
if decoded_data:
# print("DECODED DATA TO BE SENT:", decoded_data)
try:
with app.app_context():
log_entry = MqttLog(
station_id=station_id,
topic=topic,
topic_type=message_type,
payload=decoded_data
)
db.session.add(log_entry)
db.session.commit()
print(f"Successfully wrote data for {station_id} to PostgreSQL.")
except Exception as e:
print(f"Error writing to PostgreSQL: {e}")
socketio.emit('dashboard_update', {
'stationId': station_id,
'topic': topic,
'data': decoded_data
}, room=station_id)
# --- (WebSocket and API routes remain the same) ---
@socketio.on('connect')
def handle_connect():
print('Client connected to WebSocket')
# --- NEW: Function to handle joining a room and sending initial data ---
@socketio.on('join_station_room')
def handle_join_station_room(data):
station_id = data['station_id']
join_room(station_id)
print(f"Client joined room for station: {station_id}")
try:
# Find the most recent log entry for this station
latest_log = MqttLog.query.filter_by(
station_id=station_id,
topic_type='PERIODIC'
).order_by(MqttLog.timestamp.desc()).first()
if latest_log:
# If we have a past log, send it immediately to the new client
print(f"Sending initial state for {station_id} to new client.")
socketio.emit('dashboard_update', {
'stationId': station_id,
'topic': latest_log.topic,
'data': latest_log.payload
}, room=station_id)
except Exception as e:
print(f"Error querying or sending initial state for {station_id}: {e}")
# --- API Routes ---
@app.route('/api/login', methods=['POST'])
def login():
"""Handles user login."""
data = request.get_json()
if not data or not data.get('username') or not data.get('password'):
return jsonify({"message": "Username and password are required."}), 400
user = User.query.filter_by(username=data['username']).first()
if user and user.check_password(data['password']):
# In a real app, you would create a session token here (e.g., with Flask-Login)
return jsonify({"message": "Login successful"}), 200
return jsonify({"message": "Invalid username or password"}), 401
# --- Admin-only: Add User ---
@app.route('/api/users', methods=['POST'])
# @login_required # Ensures the user is logged in
def add_user():
# Check if the logged-in user is an admin
# if not current_user.is_admin:
# return jsonify({"message": "Admin access required."}), 403 # Forbidden
data = request.get_json()
username = data.get('username')
password = data.get('password')
is_admin = data.get('is_admin', False)
if not username or not password:
return jsonify({"message": "Username and password are required."}), 400
if User.query.filter_by(username=username).first():
return jsonify({"message": "Username already exists."}), 409 # Conflict
new_user = User(username=username, is_admin=is_admin)
new_user.set_password(password)
db.session.add(new_user)
db.session.commit()
return jsonify({"message": "User added successfully."}), 201
# --- Admin-only: Add Station ---
@app.route('/api/stations', methods=['POST'])
# @login_required # Ensures the user is logged in
def add_station():
# if not current_user.is_admin:
# return jsonify({"message": "Admin access required."}), 403
data = request.get_json()
# All fields are now expected from the frontend form
required_fields = ['station_id', 'name', 'location', 'mqtt_broker', 'mqtt_port']
if not all(field in data for field in required_fields):
return jsonify({"message": "Missing required station details."}), 400
if Station.query.filter_by(station_id=data['station_id']).first():
return jsonify({"message": "Station ID already exists."}), 409
new_station = Station(
station_id=data['station_id'],
name=data['name'],
location=data['location'],
mqtt_broker=data['mqtt_broker'],
mqtt_port=data['mqtt_port'],
mqtt_user=data.get('mqtt_user'),
mqtt_password=data.get('mqtt_password')
)
db.session.add(new_station)
db.session.commit()
# Immediately start the MQTT client for the station just created.
start_single_mqtt_client(new_station)
return jsonify({"message": "Station added successfully."}), 201
@app.route('/api/stations', methods=['GET'])
def get_stations():
try:
stations = Station.query.all()
station_list = []
for s in stations:
# --- NEW: More accurate heartbeat logic ---
last_msg_time = last_message_timestamps.get(s.station_id)
# A station is online only if we have received a message recently
is_online = last_msg_time is not None and (time.time() - last_msg_time) < STATION_TIMEOUT_SECONDS
station_list.append({
"id": s.station_id,
"name": s.name,
"location": s.location,
"status": "Online" if is_online else "Offline"
})
return jsonify(station_list)
except Exception as e:
return jsonify({"error": f"Database query failed: {e}"}), 500
# --- CSV Export route (UPDATED) ---
def _format_periodic_row(payload, num_slots=9):
"""
Flattens a periodic payload dictionary into a single list for a CSV row.
"""
row = [
datetime.fromtimestamp(payload.get("ts")).strftime("%Y-%m-%d %H:%M:%S"),
payload.get("deviceId", ""),
payload.get("stationDiagnosticCode", "")
]
slots_data = payload.get("slotLevelPayload", [])
slot_map = {s.get('slotId', i+1): s for i, s in enumerate(slots_data)}
slot_fields_keys = [
"batteryIdentification", "batteryPresent", "chargerPresent", "doorStatus", "doorLockStatus",
"voltage", "current", "soc", "batteryMaxTemp", "slotTemperature",
"batteryFaultCode", "chargerFaultCode", "batteryMode", "chargerMode"
]
for i in range(1, num_slots + 1):
slot = slot_map.get(i)
if slot:
row.extend([
slot.get('batteryIdentification', ''),
slot.get("batteryPresent", 0),
slot.get("chargerPresent", 0),
slot.get("doorStatus", 0),
slot.get("doorLockStatus", 0),
slot.get('voltage', 0) / 1000.0,
slot.get('current', 0) / 1000.0,
slot.get('soc', 0),
slot.get('batteryMaxTemp', 0) / 10.0,
slot.get('slotTemperature', 0) / 10.0,
slot.get('batteryFaultCode', 0),
slot.get('chargerFaultCode', 0),
slot.get('batteryMode', 0),
slot.get('chargerMode', 0)
])
else:
row.extend([''] * len(slot_fields_keys))
row.append('') # Placeholder for RawHexPayload
return row
@app.route('/api/logs/export', methods=['GET'])
def export_logs():
station_id = request.args.get('station_id')
start_datetime_str = request.args.get('start_datetime')
end_datetime_str = request.args.get('end_datetime')
log_type = request.args.get('log_type', 'PERIODIC')
if not all([station_id, start_datetime_str, end_datetime_str]):
return jsonify({"error": "Missing required parameters"}), 400
try:
start_datetime = datetime.strptime(start_datetime_str, '%Y-%m-%dT%H:%M')
end_datetime = datetime.strptime(end_datetime_str, '%Y-%m-%dT%H:%M')
except ValueError:
return jsonify({"error": "Invalid datetime format"}), 400
# --- FIX 1: Correctly query for Events & RPC ---
if log_type == 'EVENT':
# If frontend asks for EVENT, search for both EVENTS and REQUEST in the DB
query = MqttLog.query.filter(
MqttLog.station_id == station_id,
MqttLog.timestamp.between(start_datetime, end_datetime),
MqttLog.topic_type.in_(['EVENTS', 'REQUEST'])
)
else: # Otherwise, query for PERIODIC
query = MqttLog.query.filter(
MqttLog.station_id == station_id,
MqttLog.timestamp.between(start_datetime, end_datetime),
MqttLog.topic_type == log_type
)
logs = query.order_by(MqttLog.timestamp.asc()).all()
if not logs:
return jsonify({"message": "No logs found for the selected criteria."}), 404
output = io.StringIO()
writer = csv.writer(output)
# --- FIX 2: Create a cleaner filename ---
station = Station.query.filter_by(station_id=station_id).first()
station_name = station.name.replace(' ', '_') if station else station_id
date_str = start_datetime.strftime('%Y-%m-%d')
filename = f"{station_name}_{log_type}_{date_str}.csv"
if log_type == 'PERIODIC':
base_header = ["Timestamp", "DeviceID", "StationDiagnosticCode"]
slot_fields = [
"BatteryID", "BatteryPresent", "ChargerPresent", "DoorStatus", "DoorLockStatus",
"Voltage_V", "Current_A", "SOC_Percent", "BatteryTemp_C", "SlotTemp_C",
"BatteryFaultCode", "ChargerFaultCode", "BatteryMode", "ChargerMode"
]
slot_header = [f"Slot{i}_{field}" for i in range(1, 10) for field in slot_fields]
header = base_header + slot_header + ["RawHexPayload"]
writer.writerow(header)
for log in logs:
writer.writerow(_format_periodic_row(log.payload))
else: # For EVENTS_RPC
header = ["Timestamp", "Topic", "Payload_JSON"]
writer.writerow(header)
for log in logs:
writer.writerow([log.timestamp, log.topic, json.dumps(log.payload)])
output.seek(0)
return Response(
output,
mimetype="text/csv",
headers={"Content-Disposition": f"attachment;filename={filename}"}
)
@socketio.on('rpc_request')
def handle_rpc_request(payload):
"""
Receives a command from the web dashboard, creates a Protobuf RPC request,
and publishes it to the station via MQTT.
"""
station_id = payload.get('station_id')
command = payload.get('command')
data = payload.get('data') # This will be the slot_id or swap_pairs array
print(f"Received RPC request for station {station_id}: {command} with data {data}")
# Find the correct MQTT client for this station
mqtt_client = mqtt_clients.get(station_id)
if not mqtt_client or not mqtt_client.is_connected:
print(f"Cannot send RPC for {station_id}: MQTT client not connected.")
return # Or emit an error back to the user
# --- Create the Protobuf message based on the command ---
# This is where the logic from your snippet is implemented.
request_payload = rpcRequest(
ts=int(time.time()),
jobId=f"job_{int(time.time())}"
)
# Determine the jobType and set data based on the command string
if command == 'OPEN':
request_payload.jobType = jobType_e.JOBTYPE_GATE_OPEN_CLOSE
request_payload.slotInfo.slotId = data
request_payload.slotInfo.state = 1
elif command == 'CHG_ON':
# Replace this with the correct name from your .proto file
request_payload.jobType = jobType_e.JOBTYPE_CHARGER_ENABLE_DISABLE
request_payload.slotInfo.slotId = data
request_payload.slotInfo.state = 1 # State 1 for ON
elif command == 'CHG_OFF':
# Replace this with the correct name from your .proto file
request_payload.jobType = jobType_e.JOBTYPE_CHARGER_ENABLE_DISABLE
request_payload.slotInfo.slotId = data
request_payload.slotInfo.state = 0 # State 0 for OFF
elif command == 'START_SWAP':
# --- THIS IS THE CORRECTED LINE ---
request_payload.jobType = jobType_e.JOBTYPE_SWAP_START
if data and isinstance(data, list):
# Your logic for adding the swap pairs to the payload
for pair in data:
swap_info = request_payload.swapInfo.add()
swap_info.fromSlot = pair[0]
swap_info.toSlot = pair[1]
# --- NEW: Added handlers for Abort and Reset ---
elif command == 'ABORT_SWAP':
request_payload.jobType = jobType_e.JOBTYPE_TRANSACTION_ABORT
elif command == 'STATION_RESET':
request_payload.jobType = jobType_e.JOBTYPE_REBOOT
elif command == 'LANGUAGE_UPDATE':
request_payload.jobType = jobType_e.JOBTYPE_LANGUAGE_UPDATE
# Logic to map language string to enum would go here
else:
print(f"Unknown command: {command}")
return
# --- Serialize and Publish the Message ---
serialized_payload = request_payload.SerializeToString()
# Construct the MQTT topic
# NOTE: You may need to fetch client_id and version from your database
topic = f"VEC/batterySmartStation/v100/{station_id}/RPC/REQUEST"
print(f"Publishing to {topic}")
mqtt_client.client.publish(topic, serialized_payload)
# ADD THIS NEW FUNCTION
def start_single_mqtt_client(station):
"""
Creates and starts a new MQTT client thread for a SINGLE station.
This is our new reusable function.
"""
if station.station_id in mqtt_clients and mqtt_clients[station.station_id].is_connected:
print(f"MQTT client for {station.station_id} is already running.")
return
print(f"Creating and starting MQTT client for station: {station.name} ({station.station_id})")
client = MqttClient(
broker=station.mqtt_broker,
port=station.mqtt_port,
user=station.mqtt_user,
password=station.mqtt_password,
station_id=station.station_id,
on_message_callback=on_message_handler
)
client.start() # The start method should handle threading
mqtt_clients[station.station_id] = client
# --- Main Application Logic ---
# def start_mqtt_clients():
# """
# Initializes and starts an MQTT client for each station found in the database,
# using the specific MQTT credentials stored for each station.
# """
# try:
# with app.app_context():
# stations = Station.query.all()
# except Exception as e:
# print(f"CRITICAL: Could not query stations from the database in MQTT thread: {e}")
# return
# for station in stations:
# if station.station_id not in mqtt_clients:
# print(f"Creating and starting MQTT client for station: {station.name} ({station.station_id})")
# client = MqttClient(
# broker=station.mqtt_broker,
# port=station.mqtt_port,
# user=station.mqtt_user,
# password=station.mqtt_password,
# station_id=station.station_id,
# on_message_callback=on_message_handler
# )
# client.start()
# mqtt_clients[station.station_id] = client
def start_mqtt_clients():
"""
Initializes and starts an MQTT client for each station found in the database
by calling our new reusable function.
"""
try:
with app.app_context():
stations = Station.query.all()
print(f"Found {len(stations)} existing stations to monitor.")
except Exception as e:
print(f"CRITICAL: Could not query stations from the database: {e}")
return
for station in stations:
start_single_mqtt_client(station)
if __name__ == '__main__':
try:
with app.app_context():
db.create_all()
# Add a default user if none exist
if not User.query.first():
print("No users found. Creating a default admin user.")
default_user = User(username='admin')
default_user.set_password('password')
db.session.add(default_user)
db.session.commit()
# Add a default station if none exist
if not Station.query.first():
print("No stations found. Adding a default station.")
default_station = Station(
station_id="V16000862287077265957",
name="Test Station 1",
mqtt_broker="mqtt.vecmocon.com",
mqtt_port=1883,
mqtt_user="your_username",
mqtt_password="your_password"
)
db.session.add(default_station)
db.session.commit()
except Exception as e:
print(f"FATAL ERROR: Could not connect to PostgreSQL: {e}")
sys.exit(1)
mqtt_thread = threading.Thread(target=start_mqtt_clients, daemon=True)
mqtt_thread.start()
print(f"Starting Flask-SocketIO server on http://192.168.1.12:5000")
socketio.run(app, host='192.168.1.12', port=5000)