825 lines
32 KiB
Python
825 lines
32 KiB
Python
import os
|
|
import sys
|
|
import threading
|
|
import json
|
|
import csv
|
|
import io
|
|
import time
|
|
from datetime import datetime, timedelta, timezone
|
|
from flask import Flask, jsonify, request, Response
|
|
from flask_socketio import SocketIO, join_room
|
|
from flask_cors import CORS
|
|
from dotenv import load_dotenv
|
|
from sqlalchemy import desc, func, case, String, Integer
|
|
|
|
# Import your custom core modules and the new models
|
|
from core.mqtt_client import MqttClient
|
|
from core.protobuf_decoder import ProtobufDecoder
|
|
from models import db, Station, User, MqttLog
|
|
from flask_login import login_required, current_user, LoginManager
|
|
from proto.vec_payload_chgSt_pb2 import (
|
|
rpcRequest,
|
|
jobType_e
|
|
)
|
|
|
|
# --- Load Environment Variables ---
|
|
load_dotenv()
|
|
|
|
# Load the allowed origin for CORS from an environment variable.
|
|
# Default to a local development URL if the variable is not set.
|
|
ALLOWED_ORIGIN = os.getenv("CORS_ALLOWED_ORIGIN", "http://127.0.0.1:5500")
|
|
|
|
print(f"--- INFO: Configuring CORS to allow requests from: {ALLOWED_ORIGIN} ---")
|
|
|
|
# --- Pre-startup Check for Essential Configuration ---
|
|
DATABASE_URL = os.getenv("DATABASE_URL")
|
|
if not DATABASE_URL:
|
|
print("FATAL ERROR: DATABASE_URL is not set in .env file.")
|
|
sys.exit(1)
|
|
|
|
# --- Application Setup ---
|
|
app = Flask(__name__)
|
|
# CORS(app)
|
|
|
|
# CORS(app, resources={r"/api/*": {"origins": ["http://10.10.1.169:5500","http://127.0.0.1:5500"]}}, supports_credentials=True, expose_headers='Content-Disposition')
|
|
|
|
CORS(app, resources={r"/api/*": {"origins": [ALLOWED_ORIGIN]}}, supports_credentials=True, expose_headers='Content-Disposition')
|
|
|
|
|
|
# ADD THESE LINES FOR FLASK-LOGIN
|
|
login_manager = LoginManager()
|
|
login_manager.init_app(app)
|
|
|
|
app.config['SQLALCHEMY_DATABASE_URI'] = DATABASE_URL
|
|
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False
|
|
app.config['SECRET_KEY'] = os.getenv("SECRET_KEY", "a_very_secret_key")
|
|
db.init_app(app)
|
|
socketio = SocketIO(app, cors_allowed_origins="*")
|
|
|
|
# --- User Loader for Flask-Login ---
|
|
@login_manager.user_loader
|
|
def load_user(user_id):
|
|
return User.query.get(int(user_id))
|
|
|
|
# --- Global instances ---
|
|
decoder = ProtobufDecoder()
|
|
mqtt_clients = {}
|
|
last_message_timestamps = {}
|
|
STATION_TIMEOUT_SECONDS = 10
|
|
|
|
|
|
# --- MQTT Message Handling ---
|
|
def on_message_handler(station_id, topic, payload):
|
|
message_type = topic.split('/')[-1]
|
|
|
|
if message_type in ['PERIODIC']:
|
|
last_message_timestamps[station_id] = time.time()
|
|
|
|
# Decode the message payload based on its type
|
|
decoded_data = None
|
|
if message_type == 'PERIODIC':
|
|
decoded_data = decoder.decode_periodic(payload)
|
|
elif message_type == 'EVENTS':
|
|
decoded_data = decoder.decode_event(payload)
|
|
elif message_type == 'REQUEST':
|
|
decoded_data = decoder.decode_rpc_request(payload)
|
|
|
|
if decoded_data:
|
|
try:
|
|
with app.app_context():
|
|
log_entry = MqttLog(
|
|
station_id=station_id,
|
|
topic=topic,
|
|
topic_type=message_type,
|
|
payload=decoded_data
|
|
)
|
|
db.session.add(log_entry)
|
|
db.session.commit()
|
|
except Exception as e:
|
|
print(f"Error writing to PostgreSQL: {e}")
|
|
|
|
# Emit update to the main dashboard
|
|
socketio.emit('dashboard_update', {
|
|
'stationId': station_id,
|
|
'topic': topic,
|
|
'data': decoded_data
|
|
}, room=station_id)
|
|
|
|
if message_type == 'PERIODIC':
|
|
# For periodic messages, only calculate and send the live status
|
|
# This logic is from your /api/stations route
|
|
last_msg_time = last_message_timestamps.get(station_id)
|
|
is_online = last_msg_time is not None and (time.time() - last_msg_time) < STATION_TIMEOUT_SECONDS
|
|
status_text = "Online" if is_online else "Offline"
|
|
|
|
print(f"Sending live status update: {status_text}")
|
|
socketio.emit('status_update', {'status': status_text}, room=station_id)
|
|
|
|
# Emit update notification to the analytics page
|
|
if message_type in ['EVENTS', 'REQUEST']:
|
|
socketio.emit('analytics_updated', room=station_id)
|
|
|
|
|
|
# --- (WebSocket and API routes remain the same) ---
|
|
@socketio.on('connect')
|
|
def handle_connect():
|
|
print('Client connected to WebSocket')
|
|
|
|
# --- NEW: Function to handle joining a room and sending initial data ---
|
|
@socketio.on('join_station_room')
|
|
def handle_join_station_room(data):
|
|
station_id = data['station_id']
|
|
join_room(station_id)
|
|
print(f"Client joined room for station: {station_id}")
|
|
|
|
try:
|
|
# Find the most recent log entry for this station
|
|
latest_log = MqttLog.query.filter_by(
|
|
station_id=station_id,
|
|
topic_type='PERIODIC'
|
|
).order_by(MqttLog.timestamp.desc()).first()
|
|
|
|
if latest_log:
|
|
# If we have a past log, send it immediately to the new client
|
|
print(f"Sending initial state for {station_id} to new client.")
|
|
socketio.emit('dashboard_update', {
|
|
'stationId': station_id,
|
|
'topic': latest_log.topic,
|
|
'data': latest_log.payload
|
|
}, room=station_id)
|
|
except Exception as e:
|
|
print(f"Error querying or sending initial state for {station_id}: {e}")
|
|
|
|
# --- API Routes ---
|
|
@app.route('/api/login', methods=['POST'])
|
|
def login():
|
|
"""Handles user login."""
|
|
data = request.get_json()
|
|
if not data or not data.get('username') or not data.get('password'):
|
|
return jsonify({"message": "Username and password are required."}), 400
|
|
|
|
user = User.query.filter_by(username=data['username']).first()
|
|
|
|
if user and user.check_password(data['password']):
|
|
# In a real app, you would create a session token here (e.g., with Flask-Login)
|
|
return jsonify({"message": "Login successful"}), 200
|
|
|
|
return jsonify({"message": "Invalid username or password"}), 401
|
|
|
|
# --- Admin-only: Add User ---
|
|
@app.route('/api/users', methods=['POST'])
|
|
# @login_required # Ensures the user is logged in
|
|
def add_user():
|
|
# Check if the logged-in user is an admin
|
|
# if not current_user.is_admin:
|
|
# return jsonify({"message": "Admin access required."}), 403 # Forbidden
|
|
|
|
data = request.get_json()
|
|
username = data.get('username')
|
|
password = data.get('password')
|
|
is_admin = data.get('is_admin', False)
|
|
|
|
if not username or not password:
|
|
return jsonify({"message": "Username and password are required."}), 400
|
|
if User.query.filter_by(username=username).first():
|
|
return jsonify({"message": "Username already exists."}), 409 # Conflict
|
|
|
|
new_user = User(username=username, is_admin=is_admin)
|
|
new_user.set_password(password)
|
|
db.session.add(new_user)
|
|
db.session.commit()
|
|
return jsonify({"message": "User added successfully."}), 201
|
|
|
|
# --- Admin-only: Add Station ---
|
|
@app.route('/api/stations', methods=['POST'])
|
|
# @login_required # Ensures the user is logged in
|
|
def add_station():
|
|
# if not current_user.is_admin:
|
|
# return jsonify({"message": "Admin access required."}), 403
|
|
|
|
data = request.get_json()
|
|
# All fields are now expected from the frontend form
|
|
required_fields = ['station_id', 'name', 'location', 'mqtt_broker', 'mqtt_port']
|
|
if not all(field in data for field in required_fields):
|
|
return jsonify({"message": "Missing required station details."}), 400
|
|
|
|
if Station.query.filter_by(station_id=data['station_id']).first():
|
|
return jsonify({"message": "Station ID already exists."}), 409
|
|
|
|
new_station = Station(
|
|
station_id=data['station_id'],
|
|
product_id=data['product_id'],
|
|
name=data['name'],
|
|
location=data['location'],
|
|
mqtt_broker=data['mqtt_broker'],
|
|
mqtt_port=data['mqtt_port'],
|
|
mqtt_user=data.get('mqtt_user'),
|
|
mqtt_password=data.get('mqtt_password')
|
|
)
|
|
db.session.add(new_station)
|
|
db.session.commit()
|
|
|
|
# Immediately start the MQTT client for the station just created.
|
|
start_single_mqtt_client(new_station)
|
|
|
|
return jsonify({"message": "Station added successfully."}), 201
|
|
|
|
|
|
# The new function with logging
|
|
@app.route('/api/stations/<string:station_id>', methods=['DELETE'])
|
|
def remove_station(station_id):
|
|
"""
|
|
Removes a station from the database and stops its MQTT client.
|
|
"""
|
|
print(f"\n--- REMOVE REQUEST RECEIVED for station: {station_id} ---")
|
|
|
|
# 1. Find the station in the database
|
|
station = Station.query.filter_by(station_id=station_id).first_or_404()
|
|
print(f"[LOG] Found station '{station.name}' in the database.")
|
|
|
|
# 2. Stop the running MQTT client for this station
|
|
client_to_stop = mqtt_clients.get(station_id)
|
|
if client_to_stop:
|
|
print(f"[LOG] Found active MQTT client. Attempting to stop it now...")
|
|
client_to_stop.stop()
|
|
mqtt_clients.pop(station_id, None)
|
|
print(f"[LOG] Successfully stopped and removed client object for {station_id}.")
|
|
else:
|
|
print(f"[LOG] No active MQTT client was found for {station_id}. No action needed.")
|
|
|
|
# 3. Delete the station from the database
|
|
print(f"[LOG] Attempting to delete {station_id} from the database...")
|
|
db.session.delete(station)
|
|
db.session.commit()
|
|
print(f"[LOG] Successfully deleted station from the database.")
|
|
|
|
print(f"--- REMOVE REQUEST COMPLETED for station: {station_id} ---\n")
|
|
return jsonify({"message": f"Station {station_id} removed successfully."}), 200
|
|
|
|
|
|
@app.route('/api/stations', methods=['GET'])
|
|
def get_stations():
|
|
try:
|
|
stations = Station.query.all()
|
|
station_list = []
|
|
for s in stations:
|
|
# --- NEW: More accurate heartbeat logic ---
|
|
last_msg_time = last_message_timestamps.get(s.station_id)
|
|
# A station is online only if we have received a message recently
|
|
is_online = last_msg_time is not None and (time.time() - last_msg_time) < STATION_TIMEOUT_SECONDS
|
|
|
|
station_list.append({
|
|
"id": s.station_id,
|
|
"name": s.name,
|
|
"location": s.location,
|
|
"product_id": s.product_id,
|
|
"status": "Online" if is_online else "Offline"
|
|
})
|
|
return jsonify(station_list)
|
|
except Exception as e:
|
|
return jsonify({"error": f"Database query failed: {e}"}), 500
|
|
|
|
|
|
#--- Daily Stats Route ---
|
|
@app.route('/api/stations/daily-stats', methods=['GET'])
|
|
def get_all_station_stats():
|
|
"""
|
|
Calculates the swap statistics for the last 24 hours for all stations.
|
|
"""
|
|
try:
|
|
# --- THIS IS THE FIX ---
|
|
# Calculate a rolling 24-hour window instead of a fixed "today"
|
|
now_utc = datetime.now(timezone.utc)
|
|
start_of_period = now_utc - timedelta(hours=24)
|
|
|
|
# The query now uses the correct .astext syntax for JSONB fields
|
|
stats = db.session.query(
|
|
MqttLog.station_id,
|
|
func.count(case((MqttLog.payload['eventType'].astext == 'EVENT_SWAP_START', 1))).label('total_starts'),
|
|
func.count(case((MqttLog.payload['eventType'].astext == 'EVENT_SWAP_ENDED', 1))).label('completed'),
|
|
func.count(case((MqttLog.payload['eventType'].astext == 'EVENT_SWAP_ABORTED', 1))).label('aborted')
|
|
).filter(
|
|
MqttLog.topic_type == 'EVENTS',
|
|
MqttLog.timestamp.between(start_of_period, now_utc) # Use the new 24-hour window
|
|
).group_by(MqttLog.station_id).all()
|
|
|
|
stats_dict = {
|
|
station_id: {
|
|
"total_starts": total_starts,
|
|
"completed": completed,
|
|
"aborted": aborted
|
|
} for station_id, total_starts, completed, aborted in stats
|
|
}
|
|
|
|
return jsonify(stats_dict)
|
|
|
|
except Exception as e:
|
|
print(f"Error fetching daily stats: {e}")
|
|
return jsonify({"message": "Could not fetch daily station stats."}), 500
|
|
|
|
|
|
@app.route('/api/logs/recent/<string:station_id>', methods=['GET'])
|
|
def get_recent_logs(station_id):
|
|
# Get parameters from the request, with defaults
|
|
start_date_str = request.args.get('start_date', datetime.now().strftime('%Y-%m-%d'))
|
|
end_date_str = request.args.get('end_date', datetime.now().strftime('%Y-%m-%d'))
|
|
limit_count = request.args.get('count', 50, type=int)
|
|
|
|
try:
|
|
start_date = datetime.strptime(start_date_str, '%Y-%m-%d').date()
|
|
end_date = datetime.strptime(end_date_str, '%Y-%m-%d').date()
|
|
start_datetime = datetime.combine(start_date, datetime.min.time()) # <-- FIX
|
|
end_datetime = datetime.combine(end_date, datetime.max.time())
|
|
except ValueError:
|
|
return jsonify({"message": "Invalid date format."}), 400
|
|
|
|
try:
|
|
# The query now uses all three filters
|
|
logs = MqttLog.query.filter(
|
|
MqttLog.station_id == station_id,
|
|
MqttLog.topic_type.in_(['EVENTS', 'REQUEST']),
|
|
MqttLog.timestamp.between(start_datetime, end_datetime)
|
|
).order_by(desc(MqttLog.timestamp)).limit(limit_count).all()
|
|
|
|
logs.reverse()
|
|
|
|
log_list = [{
|
|
"topic": log.topic, "payload": log.payload, "timestamp": log.timestamp.isoformat()
|
|
} for log in logs]
|
|
|
|
return jsonify(log_list)
|
|
except Exception as e:
|
|
return jsonify({"message": "Could not fetch recent logs."}), 500
|
|
|
|
|
|
# A helper dictionary to make abort reason labels more readable
|
|
ABORT_REASON_MAP = {
|
|
"ABORT_UNKNOWN": "Unknown",
|
|
"ABORT_BAT_EXIT_TIMEOUT": "Battery Exit Timeout",
|
|
"ABORT_BAT_ENTRY_TIMEOUT": "Battery Entry Timeout",
|
|
"ABORT_DOOR_CLOSE_TIMEOUT": "Door Close Timeout",
|
|
"ABORT_DOOR_OPEN_TIMEOUT": "Door Open Timeout",
|
|
"ABORT_INVALID_PARAM": "Invalid Parameter",
|
|
"ABORT_REMOTE_REQUESTED": "Remote Abort",
|
|
"ABORT_INVALID_BATTERY": "Invalid Battery"
|
|
}
|
|
|
|
|
|
#--- Analytics Route ---
|
|
@app.route('/api/analytics', methods=['GET'])
|
|
def get_analytics_data():
|
|
# 1. Get and validate request parameters
|
|
station_id = request.args.get('station_id')
|
|
start_date_str = request.args.get('start_date')
|
|
end_date_str = request.args.get('end_date')
|
|
|
|
if not all([station_id, start_date_str, end_date_str]):
|
|
return jsonify({"message": "Missing required parameters."}), 400
|
|
|
|
try:
|
|
start_date = datetime.strptime(start_date_str, '%Y-%m-%d').date()
|
|
end_date = datetime.strptime(end_date_str, '%Y-%m-%d').date()
|
|
start_datetime = datetime.combine(start_date, datetime.min.time())
|
|
end_datetime = datetime.combine(end_date, datetime.max.time())
|
|
except ValueError:
|
|
return jsonify({"message": "Invalid date format. Please use YYYY-MM-DD."}), 400
|
|
|
|
# 2. Query for ALL relevant logs (EVENTS and REQUESTS) in one go
|
|
try:
|
|
logs = MqttLog.query.filter(
|
|
MqttLog.station_id == station_id,
|
|
MqttLog.topic_type.in_(['EVENTS', 'REQUEST']),
|
|
MqttLog.timestamp.between(start_datetime, end_datetime)
|
|
).order_by(MqttLog.timestamp.asc()).all()
|
|
|
|
periodic_logs = MqttLog.query.filter(
|
|
MqttLog.station_id == station_id,
|
|
MqttLog.topic_type == 'PERIODIC',
|
|
MqttLog.timestamp.between(start_datetime, end_datetime)
|
|
).order_by(MqttLog.timestamp.asc()).all()
|
|
except Exception as e:
|
|
return jsonify({"message": f"Could not query logs: {e}"}), 500
|
|
|
|
# 3. Initialize data structures for processing
|
|
swap_starts_map = {}
|
|
completed_swap_times = []
|
|
|
|
total_initiations = 0
|
|
total_starts = 0
|
|
completed_swaps = 0
|
|
aborted_swaps = 0
|
|
|
|
daily_completed, daily_aborted, hourly_swaps, abort_reason_counts = {}, {}, [0] * 24, {}
|
|
slot_utilization_counts = {i: 0 for i in range(1, 10)}
|
|
|
|
# 4. Process the logs to calculate all KPIs and chart data
|
|
for log in logs:
|
|
event_type = log.payload.get('eventType')
|
|
job_type = log.payload.get('jobType')
|
|
session_id = log.payload.get('sessionId')
|
|
log_date = log.timestamp.date()
|
|
log_hour = log.timestamp.hour
|
|
|
|
if job_type == 'JOBTYPE_SWAP_AUTH_SUCCESS':
|
|
total_initiations += 1
|
|
elif event_type == 'EVENT_SWAP_START':
|
|
total_starts += 1
|
|
hourly_swaps[log_hour] += 1
|
|
if session_id:
|
|
swap_starts_map[session_id] = log.timestamp
|
|
elif event_type == 'EVENT_BATTERY_EXIT':
|
|
completed_swaps += 1
|
|
daily_completed[log_date] = daily_completed.get(log_date, 0) + 1
|
|
if session_id and session_id in swap_starts_map:
|
|
duration = (log.timestamp - swap_starts_map[session_id]).total_seconds()
|
|
completed_swap_times.append(duration)
|
|
del swap_starts_map[session_id]
|
|
elif event_type == 'EVENT_SWAP_ABORTED':
|
|
aborted_swaps += 1
|
|
daily_aborted[log_date] = daily_aborted.get(log_date, 0) + 1
|
|
reason = log.payload.get('eventData', {}).get('swapAbortReason', 'ABORT_UNKNOWN')
|
|
abort_reason_counts[reason] = abort_reason_counts.get(reason, 0) + 1
|
|
elif event_type == 'EVENT_SLOT_LOCK_DISENEGAGED':
|
|
slot_id = log.payload.get('eventData', {}).get('slotId')
|
|
if slot_id and slot_id in slot_utilization_counts:
|
|
slot_utilization_counts[slot_id] += 1
|
|
|
|
# --- NEW: 4. Calculate Station Uptime ---
|
|
total_period_seconds = (end_datetime - start_datetime).total_seconds()
|
|
total_downtime_seconds = 0
|
|
MAX_ONLINE_GAP_SECONDS = 30 # Assume offline if no message for over 30 seconds
|
|
|
|
if not periodic_logs:
|
|
total_downtime_seconds = total_period_seconds
|
|
else:
|
|
# Check gap from start time to first message
|
|
first_gap = (periodic_logs[0].timestamp - start_datetime).total_seconds()
|
|
if first_gap > MAX_ONLINE_GAP_SECONDS:
|
|
total_downtime_seconds += first_gap
|
|
|
|
# Check gaps between consecutive messages
|
|
for i in range(1, len(periodic_logs)):
|
|
gap = (periodic_logs[i].timestamp - periodic_logs[i-1].timestamp).total_seconds()
|
|
if gap > MAX_ONLINE_GAP_SECONDS:
|
|
total_downtime_seconds += gap
|
|
|
|
# Check gap from last message to end time
|
|
last_gap = (end_datetime - periodic_logs[-1].timestamp).total_seconds()
|
|
if last_gap > MAX_ONLINE_GAP_SECONDS:
|
|
total_downtime_seconds += last_gap
|
|
|
|
station_uptime = 100 * (1 - (total_downtime_seconds / total_period_seconds))
|
|
station_uptime = max(0, min(100, station_uptime)) # Ensure value is between 0 and 100
|
|
|
|
# 6. Prepare final data structures
|
|
avg_swap_time_seconds = sum(completed_swap_times) / len(completed_swap_times) if completed_swap_times else None
|
|
|
|
kpi_data = {
|
|
"total_swaps_initiated": total_initiations,
|
|
"total_swaps_started": total_starts,
|
|
"completed_swaps": completed_swaps,
|
|
"aborted_swaps": aborted_swaps,
|
|
"avg_swap_time_seconds": avg_swap_time_seconds,
|
|
"station_uptime": round(station_uptime, 2)
|
|
}
|
|
|
|
date_labels, completed_data, aborted_data = [], [], []
|
|
current_date = start_date
|
|
while current_date <= end_date:
|
|
date_labels.append(current_date.strftime('%b %d'))
|
|
completed_data.append(daily_completed.get(current_date, 0))
|
|
aborted_data.append(daily_aborted.get(current_date, 0))
|
|
current_date += timedelta(days=1)
|
|
|
|
swap_activity_data = {"labels": date_labels, "completed_data": completed_data, "aborted_data": aborted_data}
|
|
hourly_distribution_data = {"labels": [f"{h % 12 if h % 12 != 0 else 12} {'AM' if h < 12 else 'PM'}" for h in range(24)], "swap_data": hourly_swaps}
|
|
abort_reasons_data = {"labels": [ABORT_REASON_MAP.get(r, r) for r in abort_reason_counts.keys()], "reason_data": list(abort_reason_counts.values())}
|
|
slot_utilization_data = {"counts": [slot_utilization_counts[i] for i in range(1, 10)]} # Return counts as a simple list [_ , _, ...]
|
|
|
|
|
|
# 7. Combine all data and return
|
|
return jsonify({
|
|
"kpis": kpi_data,
|
|
"swap_activity": swap_activity_data,
|
|
"hourly_distribution": hourly_distribution_data,
|
|
"abort_reasons": abort_reasons_data,
|
|
"slot_utilization": slot_utilization_data
|
|
})
|
|
|
|
|
|
@app.route('/api/uptime/<string:station_id>', methods=['GET'])
|
|
def get_station_uptime(station_id):
|
|
"""
|
|
A lightweight endpoint to calculate only the station uptime for the last 24 hours.
|
|
"""
|
|
try:
|
|
end_datetime = datetime.now(timezone.utc)
|
|
start_datetime = end_datetime - timedelta(hours=24)
|
|
|
|
periodic_logs = MqttLog.query.filter(
|
|
MqttLog.station_id == station_id,
|
|
MqttLog.topic_type == 'PERIODIC',
|
|
MqttLog.timestamp.between(start_datetime, end_datetime)
|
|
).order_by(MqttLog.timestamp.asc()).all()
|
|
|
|
total_period_seconds = (end_datetime - start_datetime).total_seconds()
|
|
total_downtime_seconds = 0
|
|
MAX_ONLINE_GAP_SECONDS = 30
|
|
|
|
if not periodic_logs:
|
|
total_downtime_seconds = total_period_seconds
|
|
else:
|
|
first_gap = (periodic_logs[0].timestamp.replace(tzinfo=timezone.utc) - start_datetime).total_seconds()
|
|
if first_gap > MAX_ONLINE_GAP_SECONDS:
|
|
total_downtime_seconds += first_gap
|
|
|
|
for i in range(1, len(periodic_logs)):
|
|
gap = (periodic_logs[i].timestamp - periodic_logs[i-1].timestamp).total_seconds()
|
|
if gap > MAX_ONLINE_GAP_SECONDS:
|
|
total_downtime_seconds += gap
|
|
|
|
last_gap = (end_datetime - periodic_logs[-1].timestamp.replace(tzinfo=timezone.utc)).total_seconds()
|
|
if last_gap > MAX_ONLINE_GAP_SECONDS:
|
|
total_downtime_seconds += last_gap
|
|
|
|
uptime_percentage = 100 * (1 - (total_downtime_seconds / total_period_seconds))
|
|
uptime_percentage = max(0, min(100, uptime_percentage))
|
|
|
|
return jsonify({"uptime": round(uptime_percentage, 2)})
|
|
|
|
except Exception as e:
|
|
print(f"Error in uptime calculation for {station_id}: {e}")
|
|
return jsonify({"uptime": "Error"}), 500
|
|
|
|
|
|
# --- CSV Export route (UPDATED) ---
|
|
def _format_periodic_row(payload, num_slots=9):
|
|
"""
|
|
Flattens a periodic payload dictionary into a single list for a CSV row.
|
|
"""
|
|
row = [
|
|
datetime.fromtimestamp(payload.get("ts")).strftime("%Y-%m-%d %H:%M:%S"),
|
|
payload.get("deviceId", ""),
|
|
payload.get("stationDiagnosticCode", "")
|
|
]
|
|
|
|
slots_data = payload.get("slotLevelPayload", [])
|
|
slot_map = {s.get('slotId', i+1): s for i, s in enumerate(slots_data)}
|
|
|
|
slot_fields_keys = [
|
|
"batteryIdentification", "batteryPresent", "chargerPresent", "doorStatus", "doorLockStatus",
|
|
"voltage", "current", "soc", "batteryMaxTemp", "slotTemperature",
|
|
"batteryFaultCode", "chargerFaultCode", "batteryMode", "chargerMode"
|
|
]
|
|
|
|
for i in range(1, num_slots + 1):
|
|
slot = slot_map.get(i)
|
|
if slot:
|
|
row.extend([
|
|
slot.get('batteryIdentification', ''),
|
|
slot.get("batteryPresent", 0),
|
|
slot.get("chargerPresent", 0),
|
|
slot.get("doorStatus", 0),
|
|
slot.get("doorLockStatus", 0),
|
|
slot.get('voltage', 0) / 1000.0,
|
|
slot.get('current', 0) / 1000.0,
|
|
slot.get('soc', 0),
|
|
slot.get('batteryMaxTemp', 0) / 10.0,
|
|
slot.get('slotTemperature', 0) / 10.0,
|
|
slot.get('batteryFaultCode', 0),
|
|
slot.get('chargerFaultCode', 0),
|
|
slot.get('batteryMode', 0),
|
|
slot.get('chargerMode', 0)
|
|
])
|
|
else:
|
|
row.extend([''] * len(slot_fields_keys))
|
|
|
|
row.append('') # Placeholder for RawHexPayload
|
|
return row
|
|
|
|
@app.route('/api/logs/export', methods=['GET'])
|
|
def export_logs():
|
|
station_id = request.args.get('station_id')
|
|
start_datetime_str = request.args.get('start_datetime')
|
|
end_datetime_str = request.args.get('end_datetime')
|
|
log_type = request.args.get('log_type', 'PERIODIC')
|
|
|
|
if not all([station_id, start_datetime_str, end_datetime_str]):
|
|
return jsonify({"error": "Missing required parameters"}), 400
|
|
|
|
try:
|
|
start_datetime = datetime.strptime(start_datetime_str, '%Y-%m-%dT%H:%M')
|
|
end_datetime = datetime.strptime(end_datetime_str, '%Y-%m-%dT%H:%M')
|
|
except ValueError:
|
|
return jsonify({"error": "Invalid datetime format"}), 400
|
|
|
|
# --- FIX 1: Correctly query for Events & RPC ---
|
|
if log_type == 'EVENT':
|
|
# If frontend asks for EVENT, search for both EVENTS and REQUEST in the DB
|
|
query = MqttLog.query.filter(
|
|
MqttLog.station_id == station_id,
|
|
MqttLog.timestamp.between(start_datetime, end_datetime),
|
|
MqttLog.topic_type.in_(['EVENTS', 'REQUEST'])
|
|
)
|
|
else: # Otherwise, query for PERIODIC
|
|
query = MqttLog.query.filter(
|
|
MqttLog.station_id == station_id,
|
|
MqttLog.timestamp.between(start_datetime, end_datetime),
|
|
MqttLog.topic_type == log_type
|
|
)
|
|
|
|
logs = query.order_by(MqttLog.timestamp.asc()).all()
|
|
|
|
if not logs:
|
|
return jsonify({"message": "No logs found for the selected criteria."}), 404
|
|
|
|
output = io.StringIO()
|
|
writer = csv.writer(output)
|
|
|
|
# --- FIX 2: Create a cleaner filename ---
|
|
station = Station.query.filter_by(station_id=station_id).first()
|
|
station_name = station.name.replace(' ', '_') if station else station_id
|
|
date_str = start_datetime.strftime('%Y-%m-%d')
|
|
filename = f"{station_name}_{log_type}_{date_str}.csv"
|
|
|
|
if log_type == 'PERIODIC':
|
|
base_header = ["Timestamp", "DeviceID", "StationDiagnosticCode"]
|
|
slot_fields = [
|
|
"BatteryID", "BatteryPresent", "ChargerPresent", "DoorStatus", "DoorLockStatus",
|
|
"Voltage_V", "Current_A", "SOC_Percent", "BatteryTemp_C", "SlotTemp_C",
|
|
"BatteryFaultCode", "ChargerFaultCode", "BatteryMode", "ChargerMode"
|
|
]
|
|
slot_header = [f"Slot{i}_{field}" for i in range(1, 10) for field in slot_fields]
|
|
header = base_header + slot_header + ["RawHexPayload"]
|
|
writer.writerow(header)
|
|
|
|
for log in logs:
|
|
writer.writerow(_format_periodic_row(log.payload))
|
|
else: # For EVENTS_RPC
|
|
header = ["Timestamp", "Topic", "Payload_JSON"]
|
|
writer.writerow(header)
|
|
for log in logs:
|
|
writer.writerow([log.timestamp, log.topic, json.dumps(log.payload)])
|
|
|
|
output.seek(0)
|
|
return Response(
|
|
output,
|
|
mimetype="text/csv",
|
|
headers={"Content-Disposition": f"attachment;filename={filename}"}
|
|
)
|
|
|
|
@socketio.on('rpc_request')
|
|
def handle_rpc_request(payload):
|
|
"""
|
|
Receives a command from the web dashboard, creates a Protobuf RPC request,
|
|
and publishes it to the station via MQTT.
|
|
"""
|
|
station_id = payload.get('station_id')
|
|
command = payload.get('command')
|
|
data = payload.get('data') # This will be the slot_id or swap_pairs array
|
|
|
|
print(f"Received RPC request for station {station_id}: {command} with data {data}")
|
|
|
|
# Find the correct MQTT client for this station
|
|
mqtt_client = mqtt_clients.get(station_id)
|
|
if not mqtt_client or not mqtt_client.is_connected:
|
|
print(f"Cannot send RPC for {station_id}: MQTT client not connected.")
|
|
return # Or emit an error back to the user
|
|
|
|
# --- Create the Protobuf message based on the command ---
|
|
# This is where the logic from your snippet is implemented.
|
|
request_payload = rpcRequest(
|
|
ts=int(time.time()),
|
|
jobId=f"job_{int(time.time())}"
|
|
)
|
|
|
|
# Determine the jobType and set data based on the command string
|
|
if command == 'OPEN':
|
|
request_payload.jobType = jobType_e.JOBTYPE_GATE_OPEN_CLOSE
|
|
request_payload.slotInfo.slotId = data
|
|
request_payload.slotInfo.state = 1
|
|
|
|
elif command == 'CHG_ON':
|
|
# Replace this with the correct name from your .proto file
|
|
request_payload.jobType = jobType_e.JOBTYPE_CHARGER_ENABLE_DISABLE
|
|
request_payload.slotInfo.slotId = data
|
|
request_payload.slotInfo.state = 1 # State 1 for ON
|
|
|
|
elif command == 'CHG_OFF':
|
|
# Replace this with the correct name from your .proto file
|
|
request_payload.jobType = jobType_e.JOBTYPE_CHARGER_ENABLE_DISABLE
|
|
request_payload.slotInfo.slotId = data
|
|
request_payload.slotInfo.state = 0 # State 0 for OFF
|
|
|
|
elif command == 'START_SWAP':
|
|
# --- THIS IS THE CORRECTED LINE ---
|
|
request_payload.jobType = jobType_e.JOBTYPE_SWAP_START
|
|
|
|
if data and isinstance(data, list):
|
|
# Your logic for adding the swap pairs to the payload
|
|
for pair in data:
|
|
swap_info = request_payload.swapInfo.add()
|
|
swap_info.fromSlot = pair[0]
|
|
swap_info.toSlot = pair[1]
|
|
|
|
# --- NEW: Added handlers for Abort and Reset ---
|
|
elif command == 'ABORT_SWAP':
|
|
request_payload.jobType = jobType_e.JOBTYPE_TRANSACTION_ABORT
|
|
|
|
elif command == 'STATION_RESET':
|
|
request_payload.jobType = jobType_e.JOBTYPE_REBOOT
|
|
|
|
elif command == 'LANGUAGE_UPDATE':
|
|
request_payload.jobType = jobType_e.JOBTYPE_LANGUAGE_UPDATE
|
|
# Logic to map language string to enum would go here
|
|
|
|
else:
|
|
print(f"Unknown command: {command}")
|
|
return
|
|
|
|
# --- Serialize and Publish the Message ---
|
|
serialized_payload = request_payload.SerializeToString()
|
|
|
|
# Construct the MQTT topic
|
|
# NOTE: You may need to fetch client_id and version from your database
|
|
topic = f"VEC/batterySmartStation/v100/{station_id}/RPC/REQUEST"
|
|
|
|
print(f"Publishing to {topic}")
|
|
mqtt_client.client.publish(topic, serialized_payload)
|
|
|
|
# ADD THIS NEW FUNCTION
|
|
def start_single_mqtt_client(station):
|
|
"""
|
|
Creates and starts a new MQTT client thread for a SINGLE station.
|
|
This is our new reusable function.
|
|
"""
|
|
if station.station_id in mqtt_clients and mqtt_clients[station.station_id].is_connected:
|
|
print(f"MQTT client for {station.station_id} is already running.")
|
|
return
|
|
|
|
print(f"Creating and starting MQTT client for station: {station.name} ({station.station_id})")
|
|
|
|
client = MqttClient(
|
|
broker=station.mqtt_broker,
|
|
port=station.mqtt_port,
|
|
user=station.mqtt_user,
|
|
password=station.mqtt_password,
|
|
station_id=station.station_id,
|
|
on_message_callback=on_message_handler
|
|
)
|
|
client.start() # The start method should handle threading
|
|
mqtt_clients[station.station_id] = client
|
|
|
|
# --- Main Application Logic ---
|
|
def start_mqtt_clients():
|
|
"""
|
|
Initializes and starts an MQTT client for each station found in the database
|
|
by calling our new reusable function.
|
|
"""
|
|
try:
|
|
with app.app_context():
|
|
stations = Station.query.all()
|
|
print(f"Found {len(stations)} existing stations to monitor.")
|
|
except Exception as e:
|
|
print(f"CRITICAL: Could not query stations from the database: {e}")
|
|
return
|
|
|
|
for station in stations:
|
|
start_single_mqtt_client(station)
|
|
|
|
if __name__ == '__main__':
|
|
try:
|
|
with app.app_context():
|
|
db.create_all()
|
|
# Add a default user if none exist
|
|
if not User.query.first():
|
|
print("No users found. Creating a default admin user.")
|
|
default_user = User(username='admin')
|
|
default_user.set_password('password')
|
|
db.session.add(default_user)
|
|
db.session.commit()
|
|
|
|
# Add a default station if none exist
|
|
if not Station.query.first():
|
|
print("No stations found. Adding a default station.")
|
|
default_station = Station(
|
|
station_id="V16000862287077265957",
|
|
product_id="VEC_PROD_001",
|
|
name="Test Station 1",
|
|
mqtt_broker="mqtt.vecmocon.com",
|
|
mqtt_port=1883,
|
|
mqtt_user="your_username",
|
|
mqtt_password="your_password"
|
|
)
|
|
db.session.add(default_station)
|
|
db.session.commit()
|
|
except Exception as e:
|
|
print(f"FATAL ERROR: Could not connect to PostgreSQL: {e}")
|
|
sys.exit(1)
|
|
|
|
mqtt_thread = threading.Thread(target=start_mqtt_clients, daemon=True)
|
|
mqtt_thread.start()
|
|
|
|
print(f"Starting Flask-SocketIO server on http://0.0.0.0:5000")
|
|
socketio.run(app, host='0.0.0.0', port=5000)
|