563 lines
21 KiB
Python
563 lines
21 KiB
Python
import os
|
|
import sys
|
|
import threading
|
|
import json
|
|
import csv
|
|
import io
|
|
import time
|
|
from datetime import datetime
|
|
from flask import Flask, jsonify, request, Response
|
|
from flask_socketio import SocketIO, join_room
|
|
from flask_cors import CORS
|
|
from dotenv import load_dotenv
|
|
|
|
# Import your custom core modules and the new models
|
|
from core.mqtt_client import MqttClient
|
|
from core.protobuf_decoder import ProtobufDecoder
|
|
from models import db, Station, User, MqttLog
|
|
from flask_login import login_required, current_user, LoginManager
|
|
from proto.vec_payload_chgSt_pb2 import (
|
|
rpcRequest,
|
|
jobType_e
|
|
)
|
|
|
|
# --- Load Environment Variables ---
|
|
load_dotenv()
|
|
|
|
# --- Pre-startup Check for Essential Configuration ---
|
|
DATABASE_URL = os.getenv("DATABASE_URL")
|
|
if not DATABASE_URL:
|
|
print("FATAL ERROR: DATABASE_URL is not set in .env file.")
|
|
sys.exit(1)
|
|
|
|
# --- Application Setup ---
|
|
app = Flask(__name__)
|
|
# CORS(app)
|
|
|
|
# CORS(app, resources={r"/api/*": {"origins": "http://127.0.0.1:5500"}}, supports_credentials=True)
|
|
|
|
# CORS(app, resources={r"/api/*": {"origins": "http://127.0.0.1:5500"}}, supports_credentials=True, expose_headers='Content-Disposition')
|
|
|
|
CORS(app, resources={r"/api/*": {"origins": ["http://192.168.1.12:5500","http://127.0.0.1:5500"]}}, supports_credentials=True, expose_headers='Content-Disposition')
|
|
|
|
# CORS(app, resources={r"/api/*": {"origins": "http://localhost:5173"}}) , "http://127.0.0.1:5500"
|
|
# This tells Flask: "For any route starting with /api/, allow requests
|
|
# from the frontend running on http://localhost:5173".
|
|
|
|
# ADD THESE LINES FOR FLASK-LOGIN
|
|
login_manager = LoginManager()
|
|
login_manager.init_app(app)
|
|
|
|
app.config['SQLALCHEMY_DATABASE_URI'] = DATABASE_URL
|
|
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False
|
|
app.config['SECRET_KEY'] = os.getenv("SECRET_KEY", "a_very_secret_key")
|
|
db.init_app(app)
|
|
socketio = SocketIO(app, cors_allowed_origins="*")
|
|
|
|
# --- User Loader for Flask-Login ---
|
|
@login_manager.user_loader
|
|
def load_user(user_id):
|
|
return User.query.get(int(user_id))
|
|
|
|
# --- Global instances ---
|
|
decoder = ProtobufDecoder()
|
|
mqtt_clients = {}
|
|
last_message_timestamps = {}
|
|
STATION_TIMEOUT_SECONDS = 10
|
|
|
|
# --- MQTT Message Handling ---
|
|
def on_message_handler(station_id, topic, payload):
|
|
last_message_timestamps[station_id] = time.time()
|
|
|
|
print(f"Main handler received message for station {station_id} on topic {topic}")
|
|
|
|
decoded_data = None
|
|
message_type = topic.split('/')[-1]
|
|
|
|
if message_type == 'PERIODIC':
|
|
decoded_data = decoder.decode_periodic(payload)
|
|
elif message_type == 'EVENTS':
|
|
decoded_data = decoder.decode_event(payload)
|
|
elif message_type == 'REQUEST':
|
|
decoded_data = decoder.decode_rpc_request(payload)
|
|
|
|
if decoded_data:
|
|
# print("DECODED DATA TO BE SENT:", decoded_data)
|
|
try:
|
|
with app.app_context():
|
|
log_entry = MqttLog(
|
|
station_id=station_id,
|
|
topic=topic,
|
|
topic_type=message_type,
|
|
payload=decoded_data
|
|
)
|
|
db.session.add(log_entry)
|
|
db.session.commit()
|
|
print(f"Successfully wrote data for {station_id} to PostgreSQL.")
|
|
except Exception as e:
|
|
print(f"Error writing to PostgreSQL: {e}")
|
|
|
|
socketio.emit('dashboard_update', {
|
|
'stationId': station_id,
|
|
'topic': topic,
|
|
'data': decoded_data
|
|
}, room=station_id)
|
|
|
|
# --- (WebSocket and API routes remain the same) ---
|
|
@socketio.on('connect')
|
|
def handle_connect():
|
|
print('Client connected to WebSocket')
|
|
|
|
# --- NEW: Function to handle joining a room and sending initial data ---
|
|
@socketio.on('join_station_room')
|
|
def handle_join_station_room(data):
|
|
station_id = data['station_id']
|
|
join_room(station_id)
|
|
print(f"Client joined room for station: {station_id}")
|
|
|
|
try:
|
|
# Find the most recent log entry for this station
|
|
latest_log = MqttLog.query.filter_by(
|
|
station_id=station_id,
|
|
topic_type='PERIODIC'
|
|
).order_by(MqttLog.timestamp.desc()).first()
|
|
|
|
if latest_log:
|
|
# If we have a past log, send it immediately to the new client
|
|
print(f"Sending initial state for {station_id} to new client.")
|
|
socketio.emit('dashboard_update', {
|
|
'stationId': station_id,
|
|
'topic': latest_log.topic,
|
|
'data': latest_log.payload
|
|
}, room=station_id)
|
|
except Exception as e:
|
|
print(f"Error querying or sending initial state for {station_id}: {e}")
|
|
|
|
# --- API Routes ---
|
|
@app.route('/api/login', methods=['POST'])
|
|
def login():
|
|
"""Handles user login."""
|
|
data = request.get_json()
|
|
if not data or not data.get('username') or not data.get('password'):
|
|
return jsonify({"message": "Username and password are required."}), 400
|
|
|
|
user = User.query.filter_by(username=data['username']).first()
|
|
|
|
if user and user.check_password(data['password']):
|
|
# In a real app, you would create a session token here (e.g., with Flask-Login)
|
|
return jsonify({"message": "Login successful"}), 200
|
|
|
|
return jsonify({"message": "Invalid username or password"}), 401
|
|
|
|
# --- Admin-only: Add User ---
|
|
@app.route('/api/users', methods=['POST'])
|
|
# @login_required # Ensures the user is logged in
|
|
def add_user():
|
|
# Check if the logged-in user is an admin
|
|
# if not current_user.is_admin:
|
|
# return jsonify({"message": "Admin access required."}), 403 # Forbidden
|
|
|
|
data = request.get_json()
|
|
username = data.get('username')
|
|
password = data.get('password')
|
|
is_admin = data.get('is_admin', False)
|
|
|
|
if not username or not password:
|
|
return jsonify({"message": "Username and password are required."}), 400
|
|
if User.query.filter_by(username=username).first():
|
|
return jsonify({"message": "Username already exists."}), 409 # Conflict
|
|
|
|
new_user = User(username=username, is_admin=is_admin)
|
|
new_user.set_password(password)
|
|
db.session.add(new_user)
|
|
db.session.commit()
|
|
return jsonify({"message": "User added successfully."}), 201
|
|
|
|
# --- Admin-only: Add Station ---
|
|
@app.route('/api/stations', methods=['POST'])
|
|
# @login_required # Ensures the user is logged in
|
|
def add_station():
|
|
# if not current_user.is_admin:
|
|
# return jsonify({"message": "Admin access required."}), 403
|
|
|
|
data = request.get_json()
|
|
# All fields are now expected from the frontend form
|
|
required_fields = ['station_id', 'name', 'location', 'mqtt_broker', 'mqtt_port']
|
|
if not all(field in data for field in required_fields):
|
|
return jsonify({"message": "Missing required station details."}), 400
|
|
|
|
if Station.query.filter_by(station_id=data['station_id']).first():
|
|
return jsonify({"message": "Station ID already exists."}), 409
|
|
|
|
new_station = Station(
|
|
station_id=data['station_id'],
|
|
product_id=data['product_id'],
|
|
name=data['name'],
|
|
location=data['location'],
|
|
mqtt_broker=data['mqtt_broker'],
|
|
mqtt_port=data['mqtt_port'],
|
|
mqtt_user=data.get('mqtt_user'),
|
|
mqtt_password=data.get('mqtt_password')
|
|
)
|
|
db.session.add(new_station)
|
|
db.session.commit()
|
|
|
|
# Immediately start the MQTT client for the station just created.
|
|
start_single_mqtt_client(new_station)
|
|
|
|
return jsonify({"message": "Station added successfully."}), 201
|
|
|
|
|
|
# The new function with logging
|
|
@app.route('/api/stations/<string:station_id>', methods=['DELETE'])
|
|
def remove_station(station_id):
|
|
"""
|
|
Removes a station from the database and stops its MQTT client.
|
|
"""
|
|
print(f"\n--- REMOVE REQUEST RECEIVED for station: {station_id} ---")
|
|
|
|
# 1. Find the station in the database
|
|
station = Station.query.filter_by(station_id=station_id).first_or_404()
|
|
print(f"[LOG] Found station '{station.name}' in the database.")
|
|
|
|
# 2. Stop the running MQTT client for this station
|
|
client_to_stop = mqtt_clients.get(station_id)
|
|
if client_to_stop:
|
|
print(f"[LOG] Found active MQTT client. Attempting to stop it now...")
|
|
client_to_stop.stop()
|
|
mqtt_clients.pop(station_id, None)
|
|
print(f"[LOG] Successfully stopped and removed client object for {station_id}.")
|
|
else:
|
|
print(f"[LOG] No active MQTT client was found for {station_id}. No action needed.")
|
|
|
|
# 3. Delete the station from the database
|
|
print(f"[LOG] Attempting to delete {station_id} from the database...")
|
|
db.session.delete(station)
|
|
db.session.commit()
|
|
print(f"[LOG] Successfully deleted station from the database.")
|
|
|
|
print(f"--- REMOVE REQUEST COMPLETED for station: {station_id} ---\n")
|
|
return jsonify({"message": f"Station {station_id} removed successfully."}), 200
|
|
|
|
|
|
@app.route('/api/stations', methods=['GET'])
|
|
def get_stations():
|
|
try:
|
|
stations = Station.query.all()
|
|
station_list = []
|
|
for s in stations:
|
|
# --- NEW: More accurate heartbeat logic ---
|
|
last_msg_time = last_message_timestamps.get(s.station_id)
|
|
# A station is online only if we have received a message recently
|
|
is_online = last_msg_time is not None and (time.time() - last_msg_time) < STATION_TIMEOUT_SECONDS
|
|
|
|
station_list.append({
|
|
"id": s.station_id,
|
|
"name": s.name,
|
|
"location": s.location,
|
|
"product_id": s.product_id,
|
|
"status": "Online" if is_online else "Offline"
|
|
})
|
|
return jsonify(station_list)
|
|
except Exception as e:
|
|
return jsonify({"error": f"Database query failed: {e}"}), 500
|
|
|
|
|
|
# --- CSV Export route (UPDATED) ---
|
|
def _format_periodic_row(payload, num_slots=9):
|
|
"""
|
|
Flattens a periodic payload dictionary into a single list for a CSV row.
|
|
"""
|
|
row = [
|
|
datetime.fromtimestamp(payload.get("ts")).strftime("%Y-%m-%d %H:%M:%S"),
|
|
payload.get("deviceId", ""),
|
|
payload.get("stationDiagnosticCode", "")
|
|
]
|
|
|
|
slots_data = payload.get("slotLevelPayload", [])
|
|
slot_map = {s.get('slotId', i+1): s for i, s in enumerate(slots_data)}
|
|
|
|
slot_fields_keys = [
|
|
"batteryIdentification", "batteryPresent", "chargerPresent", "doorStatus", "doorLockStatus",
|
|
"voltage", "current", "soc", "batteryMaxTemp", "slotTemperature",
|
|
"batteryFaultCode", "chargerFaultCode", "batteryMode", "chargerMode"
|
|
]
|
|
|
|
for i in range(1, num_slots + 1):
|
|
slot = slot_map.get(i)
|
|
if slot:
|
|
row.extend([
|
|
slot.get('batteryIdentification', ''),
|
|
slot.get("batteryPresent", 0),
|
|
slot.get("chargerPresent", 0),
|
|
slot.get("doorStatus", 0),
|
|
slot.get("doorLockStatus", 0),
|
|
slot.get('voltage', 0) / 1000.0,
|
|
slot.get('current', 0) / 1000.0,
|
|
slot.get('soc', 0),
|
|
slot.get('batteryMaxTemp', 0) / 10.0,
|
|
slot.get('slotTemperature', 0) / 10.0,
|
|
slot.get('batteryFaultCode', 0),
|
|
slot.get('chargerFaultCode', 0),
|
|
slot.get('batteryMode', 0),
|
|
slot.get('chargerMode', 0)
|
|
])
|
|
else:
|
|
row.extend([''] * len(slot_fields_keys))
|
|
|
|
row.append('') # Placeholder for RawHexPayload
|
|
return row
|
|
|
|
@app.route('/api/logs/export', methods=['GET'])
|
|
def export_logs():
|
|
station_id = request.args.get('station_id')
|
|
start_datetime_str = request.args.get('start_datetime')
|
|
end_datetime_str = request.args.get('end_datetime')
|
|
log_type = request.args.get('log_type', 'PERIODIC')
|
|
|
|
if not all([station_id, start_datetime_str, end_datetime_str]):
|
|
return jsonify({"error": "Missing required parameters"}), 400
|
|
|
|
try:
|
|
start_datetime = datetime.strptime(start_datetime_str, '%Y-%m-%dT%H:%M')
|
|
end_datetime = datetime.strptime(end_datetime_str, '%Y-%m-%dT%H:%M')
|
|
except ValueError:
|
|
return jsonify({"error": "Invalid datetime format"}), 400
|
|
|
|
# --- FIX 1: Correctly query for Events & RPC ---
|
|
if log_type == 'EVENT':
|
|
# If frontend asks for EVENT, search for both EVENTS and REQUEST in the DB
|
|
query = MqttLog.query.filter(
|
|
MqttLog.station_id == station_id,
|
|
MqttLog.timestamp.between(start_datetime, end_datetime),
|
|
MqttLog.topic_type.in_(['EVENTS', 'REQUEST'])
|
|
)
|
|
else: # Otherwise, query for PERIODIC
|
|
query = MqttLog.query.filter(
|
|
MqttLog.station_id == station_id,
|
|
MqttLog.timestamp.between(start_datetime, end_datetime),
|
|
MqttLog.topic_type == log_type
|
|
)
|
|
|
|
logs = query.order_by(MqttLog.timestamp.asc()).all()
|
|
|
|
if not logs:
|
|
return jsonify({"message": "No logs found for the selected criteria."}), 404
|
|
|
|
output = io.StringIO()
|
|
writer = csv.writer(output)
|
|
|
|
# --- FIX 2: Create a cleaner filename ---
|
|
station = Station.query.filter_by(station_id=station_id).first()
|
|
station_name = station.name.replace(' ', '_') if station else station_id
|
|
date_str = start_datetime.strftime('%Y-%m-%d')
|
|
filename = f"{station_name}_{log_type}_{date_str}.csv"
|
|
|
|
if log_type == 'PERIODIC':
|
|
base_header = ["Timestamp", "DeviceID", "StationDiagnosticCode"]
|
|
slot_fields = [
|
|
"BatteryID", "BatteryPresent", "ChargerPresent", "DoorStatus", "DoorLockStatus",
|
|
"Voltage_V", "Current_A", "SOC_Percent", "BatteryTemp_C", "SlotTemp_C",
|
|
"BatteryFaultCode", "ChargerFaultCode", "BatteryMode", "ChargerMode"
|
|
]
|
|
slot_header = [f"Slot{i}_{field}" for i in range(1, 10) for field in slot_fields]
|
|
header = base_header + slot_header + ["RawHexPayload"]
|
|
writer.writerow(header)
|
|
|
|
for log in logs:
|
|
writer.writerow(_format_periodic_row(log.payload))
|
|
else: # For EVENTS_RPC
|
|
header = ["Timestamp", "Topic", "Payload_JSON"]
|
|
writer.writerow(header)
|
|
for log in logs:
|
|
writer.writerow([log.timestamp, log.topic, json.dumps(log.payload)])
|
|
|
|
output.seek(0)
|
|
return Response(
|
|
output,
|
|
mimetype="text/csv",
|
|
headers={"Content-Disposition": f"attachment;filename={filename}"}
|
|
)
|
|
|
|
@socketio.on('rpc_request')
|
|
def handle_rpc_request(payload):
|
|
"""
|
|
Receives a command from the web dashboard, creates a Protobuf RPC request,
|
|
and publishes it to the station via MQTT.
|
|
"""
|
|
station_id = payload.get('station_id')
|
|
command = payload.get('command')
|
|
data = payload.get('data') # This will be the slot_id or swap_pairs array
|
|
|
|
print(f"Received RPC request for station {station_id}: {command} with data {data}")
|
|
|
|
# Find the correct MQTT client for this station
|
|
mqtt_client = mqtt_clients.get(station_id)
|
|
if not mqtt_client or not mqtt_client.is_connected:
|
|
print(f"Cannot send RPC for {station_id}: MQTT client not connected.")
|
|
return # Or emit an error back to the user
|
|
|
|
# --- Create the Protobuf message based on the command ---
|
|
# This is where the logic from your snippet is implemented.
|
|
request_payload = rpcRequest(
|
|
ts=int(time.time()),
|
|
jobId=f"job_{int(time.time())}"
|
|
)
|
|
|
|
# Determine the jobType and set data based on the command string
|
|
if command == 'OPEN':
|
|
request_payload.jobType = jobType_e.JOBTYPE_GATE_OPEN_CLOSE
|
|
request_payload.slotInfo.slotId = data
|
|
request_payload.slotInfo.state = 1
|
|
|
|
elif command == 'CHG_ON':
|
|
# Replace this with the correct name from your .proto file
|
|
request_payload.jobType = jobType_e.JOBTYPE_CHARGER_ENABLE_DISABLE
|
|
request_payload.slotInfo.slotId = data
|
|
request_payload.slotInfo.state = 1 # State 1 for ON
|
|
|
|
elif command == 'CHG_OFF':
|
|
# Replace this with the correct name from your .proto file
|
|
request_payload.jobType = jobType_e.JOBTYPE_CHARGER_ENABLE_DISABLE
|
|
request_payload.slotInfo.slotId = data
|
|
request_payload.slotInfo.state = 0 # State 0 for OFF
|
|
|
|
elif command == 'START_SWAP':
|
|
# --- THIS IS THE CORRECTED LINE ---
|
|
request_payload.jobType = jobType_e.JOBTYPE_SWAP_START
|
|
|
|
if data and isinstance(data, list):
|
|
# Your logic for adding the swap pairs to the payload
|
|
for pair in data:
|
|
swap_info = request_payload.swapInfo.add()
|
|
swap_info.fromSlot = pair[0]
|
|
swap_info.toSlot = pair[1]
|
|
|
|
# --- NEW: Added handlers for Abort and Reset ---
|
|
elif command == 'ABORT_SWAP':
|
|
request_payload.jobType = jobType_e.JOBTYPE_TRANSACTION_ABORT
|
|
|
|
elif command == 'STATION_RESET':
|
|
request_payload.jobType = jobType_e.JOBTYPE_REBOOT
|
|
|
|
elif command == 'LANGUAGE_UPDATE':
|
|
request_payload.jobType = jobType_e.JOBTYPE_LANGUAGE_UPDATE
|
|
# Logic to map language string to enum would go here
|
|
|
|
else:
|
|
print(f"Unknown command: {command}")
|
|
return
|
|
|
|
# --- Serialize and Publish the Message ---
|
|
serialized_payload = request_payload.SerializeToString()
|
|
|
|
# Construct the MQTT topic
|
|
# NOTE: You may need to fetch client_id and version from your database
|
|
topic = f"VEC/batterySmartStation/v100/{station_id}/RPC/REQUEST"
|
|
|
|
print(f"Publishing to {topic}")
|
|
mqtt_client.client.publish(topic, serialized_payload)
|
|
|
|
# ADD THIS NEW FUNCTION
|
|
def start_single_mqtt_client(station):
|
|
"""
|
|
Creates and starts a new MQTT client thread for a SINGLE station.
|
|
This is our new reusable function.
|
|
"""
|
|
if station.station_id in mqtt_clients and mqtt_clients[station.station_id].is_connected:
|
|
print(f"MQTT client for {station.station_id} is already running.")
|
|
return
|
|
|
|
print(f"Creating and starting MQTT client for station: {station.name} ({station.station_id})")
|
|
|
|
client = MqttClient(
|
|
broker=station.mqtt_broker,
|
|
port=station.mqtt_port,
|
|
user=station.mqtt_user,
|
|
password=station.mqtt_password,
|
|
station_id=station.station_id,
|
|
on_message_callback=on_message_handler
|
|
)
|
|
client.start() # The start method should handle threading
|
|
mqtt_clients[station.station_id] = client
|
|
|
|
# --- Main Application Logic ---
|
|
# def start_mqtt_clients():
|
|
# """
|
|
# Initializes and starts an MQTT client for each station found in the database,
|
|
# using the specific MQTT credentials stored for each station.
|
|
# """
|
|
# try:
|
|
# with app.app_context():
|
|
# stations = Station.query.all()
|
|
# except Exception as e:
|
|
# print(f"CRITICAL: Could not query stations from the database in MQTT thread: {e}")
|
|
# return
|
|
|
|
# for station in stations:
|
|
# if station.station_id not in mqtt_clients:
|
|
# print(f"Creating and starting MQTT client for station: {station.name} ({station.station_id})")
|
|
|
|
# client = MqttClient(
|
|
# broker=station.mqtt_broker,
|
|
# port=station.mqtt_port,
|
|
# user=station.mqtt_user,
|
|
# password=station.mqtt_password,
|
|
# station_id=station.station_id,
|
|
# on_message_callback=on_message_handler
|
|
# )
|
|
# client.start()
|
|
# mqtt_clients[station.station_id] = client
|
|
|
|
def start_mqtt_clients():
|
|
"""
|
|
Initializes and starts an MQTT client for each station found in the database
|
|
by calling our new reusable function.
|
|
"""
|
|
try:
|
|
with app.app_context():
|
|
stations = Station.query.all()
|
|
print(f"Found {len(stations)} existing stations to monitor.")
|
|
except Exception as e:
|
|
print(f"CRITICAL: Could not query stations from the database: {e}")
|
|
return
|
|
|
|
for station in stations:
|
|
start_single_mqtt_client(station)
|
|
|
|
if __name__ == '__main__':
|
|
try:
|
|
with app.app_context():
|
|
db.create_all()
|
|
# Add a default user if none exist
|
|
if not User.query.first():
|
|
print("No users found. Creating a default admin user.")
|
|
default_user = User(username='admin')
|
|
default_user.set_password('password')
|
|
db.session.add(default_user)
|
|
db.session.commit()
|
|
|
|
# Add a default station if none exist
|
|
if not Station.query.first():
|
|
print("No stations found. Adding a default station.")
|
|
default_station = Station(
|
|
station_id="V16000862287077265957",
|
|
product_id="VEC_PROD_001",
|
|
name="Test Station 1",
|
|
mqtt_broker="mqtt.vecmocon.com",
|
|
mqtt_port=1883,
|
|
mqtt_user="your_username",
|
|
mqtt_password="your_password"
|
|
)
|
|
db.session.add(default_station)
|
|
db.session.commit()
|
|
except Exception as e:
|
|
print(f"FATAL ERROR: Could not connect to PostgreSQL: {e}")
|
|
sys.exit(1)
|
|
|
|
mqtt_thread = threading.Thread(target=start_mqtt_clients, daemon=True)
|
|
mqtt_thread.start()
|
|
|
|
print(f"Starting Flask-SocketIO server on http://192.168.1.12:5000")
|
|
socketio.run(app, host='192.168.1.12', port=5000)
|