SwapStation_WebApp/backend/main.py

769 lines
30 KiB
Python

import os
import sys
import threading
import json
import csv
import io
import time
from datetime import datetime, timedelta
from flask import Flask, jsonify, request, Response
from flask_socketio import SocketIO, join_room
from flask_cors import CORS
from dotenv import load_dotenv
from sqlalchemy import desc, func, case
# Import your custom core modules and the new models
from core.mqtt_client import MqttClient
from core.protobuf_decoder import ProtobufDecoder
from models import db, Station, User, MqttLog
from flask_login import login_required, current_user, LoginManager
from proto.vec_payload_chgSt_pb2 import (
rpcRequest,
jobType_e
)
# --- Load Environment Variables ---
load_dotenv()
# --- Pre-startup Check for Essential Configuration ---
DATABASE_URL = os.getenv("DATABASE_URL")
if not DATABASE_URL:
print("FATAL ERROR: DATABASE_URL is not set in .env file.")
sys.exit(1)
# --- Application Setup ---
app = Flask(__name__)
# CORS(app)
# CORS(app, resources={r"/api/*": {"origins": "http://127.0.0.1:5500"}}, supports_credentials=True)
# CORS(app, resources={r"/api/*": {"origins": "http://127.0.0.1:5500"}}, supports_credentials=True, expose_headers='Content-Disposition')
CORS(app, resources={r"/api/*": {"origins": ["http://192.168.1.12:5500","http://127.0.0.1:5500"]}}, supports_credentials=True, expose_headers='Content-Disposition')
# CORS(app, resources={r"/api/*": {"origins": "http://localhost:5173"}}) , "http://127.0.0.1:5500"
# This tells Flask: "For any route starting with /api/, allow requests
# from the frontend running on http://localhost:5173".
# ADD THESE LINES FOR FLASK-LOGIN
login_manager = LoginManager()
login_manager.init_app(app)
app.config['SQLALCHEMY_DATABASE_URI'] = DATABASE_URL
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False
app.config['SECRET_KEY'] = os.getenv("SECRET_KEY", "a_very_secret_key")
db.init_app(app)
socketio = SocketIO(app, cors_allowed_origins="*")
# --- User Loader for Flask-Login ---
@login_manager.user_loader
def load_user(user_id):
return User.query.get(int(user_id))
# --- Global instances ---
decoder = ProtobufDecoder()
mqtt_clients = {}
last_message_timestamps = {}
STATION_TIMEOUT_SECONDS = 10
# --- MQTT Message Handling ---
def on_message_handler(station_id, topic, payload):
message_type = topic.split('/')[-1]
if message_type in ['PERIODIC']:
last_message_timestamps[station_id] = time.time()
print(f"Main handler received message for station {station_id} on topic {topic}")
decoded_data = None
message_type = topic.split('/')[-1]
if message_type == 'PERIODIC':
decoded_data = decoder.decode_periodic(payload)
elif message_type == 'EVENTS':
decoded_data = decoder.decode_event(payload)
elif message_type == 'REQUEST':
decoded_data = decoder.decode_rpc_request(payload)
if decoded_data:
# print("DECODED DATA TO BE SENT:", decoded_data)
try:
with app.app_context():
log_entry = MqttLog(
station_id=station_id,
topic=topic,
topic_type=message_type,
payload=decoded_data
)
db.session.add(log_entry)
db.session.commit()
print(f"Successfully wrote data for {station_id} to PostgreSQL.")
except Exception as e:
print(f"Error writing to PostgreSQL: {e}")
socketio.emit('dashboard_update', {
'stationId': station_id,
'topic': topic,
'data': decoded_data
}, room=station_id)
# --- (WebSocket and API routes remain the same) ---
@socketio.on('connect')
def handle_connect():
print('Client connected to WebSocket')
# --- NEW: Function to handle joining a room and sending initial data ---
@socketio.on('join_station_room')
def handle_join_station_room(data):
station_id = data['station_id']
join_room(station_id)
print(f"Client joined room for station: {station_id}")
try:
# Find the most recent log entry for this station
latest_log = MqttLog.query.filter_by(
station_id=station_id,
topic_type='PERIODIC'
).order_by(MqttLog.timestamp.desc()).first()
if latest_log:
# If we have a past log, send it immediately to the new client
print(f"Sending initial state for {station_id} to new client.")
socketio.emit('dashboard_update', {
'stationId': station_id,
'topic': latest_log.topic,
'data': latest_log.payload
}, room=station_id)
except Exception as e:
print(f"Error querying or sending initial state for {station_id}: {e}")
# --- API Routes ---
@app.route('/api/login', methods=['POST'])
def login():
"""Handles user login."""
data = request.get_json()
if not data or not data.get('username') or not data.get('password'):
return jsonify({"message": "Username and password are required."}), 400
user = User.query.filter_by(username=data['username']).first()
if user and user.check_password(data['password']):
# In a real app, you would create a session token here (e.g., with Flask-Login)
return jsonify({"message": "Login successful"}), 200
return jsonify({"message": "Invalid username or password"}), 401
# --- Admin-only: Add User ---
@app.route('/api/users', methods=['POST'])
# @login_required # Ensures the user is logged in
def add_user():
# Check if the logged-in user is an admin
# if not current_user.is_admin:
# return jsonify({"message": "Admin access required."}), 403 # Forbidden
data = request.get_json()
username = data.get('username')
password = data.get('password')
is_admin = data.get('is_admin', False)
if not username or not password:
return jsonify({"message": "Username and password are required."}), 400
if User.query.filter_by(username=username).first():
return jsonify({"message": "Username already exists."}), 409 # Conflict
new_user = User(username=username, is_admin=is_admin)
new_user.set_password(password)
db.session.add(new_user)
db.session.commit()
return jsonify({"message": "User added successfully."}), 201
# --- Admin-only: Add Station ---
@app.route('/api/stations', methods=['POST'])
# @login_required # Ensures the user is logged in
def add_station():
# if not current_user.is_admin:
# return jsonify({"message": "Admin access required."}), 403
data = request.get_json()
# All fields are now expected from the frontend form
required_fields = ['station_id', 'name', 'location', 'mqtt_broker', 'mqtt_port']
if not all(field in data for field in required_fields):
return jsonify({"message": "Missing required station details."}), 400
if Station.query.filter_by(station_id=data['station_id']).first():
return jsonify({"message": "Station ID already exists."}), 409
new_station = Station(
station_id=data['station_id'],
product_id=data['product_id'],
name=data['name'],
location=data['location'],
mqtt_broker=data['mqtt_broker'],
mqtt_port=data['mqtt_port'],
mqtt_user=data.get('mqtt_user'),
mqtt_password=data.get('mqtt_password')
)
db.session.add(new_station)
db.session.commit()
# Immediately start the MQTT client for the station just created.
start_single_mqtt_client(new_station)
return jsonify({"message": "Station added successfully."}), 201
# The new function with logging
@app.route('/api/stations/<string:station_id>', methods=['DELETE'])
def remove_station(station_id):
"""
Removes a station from the database and stops its MQTT client.
"""
print(f"\n--- REMOVE REQUEST RECEIVED for station: {station_id} ---")
# 1. Find the station in the database
station = Station.query.filter_by(station_id=station_id).first_or_404()
print(f"[LOG] Found station '{station.name}' in the database.")
# 2. Stop the running MQTT client for this station
client_to_stop = mqtt_clients.get(station_id)
if client_to_stop:
print(f"[LOG] Found active MQTT client. Attempting to stop it now...")
client_to_stop.stop()
mqtt_clients.pop(station_id, None)
print(f"[LOG] Successfully stopped and removed client object for {station_id}.")
else:
print(f"[LOG] No active MQTT client was found for {station_id}. No action needed.")
# 3. Delete the station from the database
print(f"[LOG] Attempting to delete {station_id} from the database...")
db.session.delete(station)
db.session.commit()
print(f"[LOG] Successfully deleted station from the database.")
print(f"--- REMOVE REQUEST COMPLETED for station: {station_id} ---\n")
return jsonify({"message": f"Station {station_id} removed successfully."}), 200
@app.route('/api/stations', methods=['GET'])
def get_stations():
try:
stations = Station.query.all()
station_list = []
for s in stations:
# --- NEW: More accurate heartbeat logic ---
last_msg_time = last_message_timestamps.get(s.station_id)
# A station is online only if we have received a message recently
is_online = last_msg_time is not None and (time.time() - last_msg_time) < STATION_TIMEOUT_SECONDS
station_list.append({
"id": s.station_id,
"name": s.name,
"location": s.location,
"product_id": s.product_id,
"status": "Online" if is_online else "Offline"
})
return jsonify(station_list)
except Exception as e:
return jsonify({"error": f"Database query failed: {e}"}), 500
#--- Daily Stats Route ---
@app.route('/api/stations/daily-stats', methods=['GET'])
def get_all_station_stats():
"""
Calculates the swap statistics for today for all stations.
"""
try:
# --- CHANGE THESE TWO LINES ---
today_start = datetime.combine(datetime.now().date(), time.min) # Use local time
today_end = datetime.combine(datetime.now().date(), time.max) # Use local time
# This is an efficient query that groups by station_id and counts events in one go
stats = db.session.query(
MqttLog.station_id,
func.count(case((MqttLog.payload['eventType'] == 'EVENT_SWAP_START', 1))).label('total_starts'),
func.count(case((MqttLog.payload['eventType'] == 'EVENT_SWAP_ENDED', 1))).label('completed'),
func.count(case((MqttLog.payload['eventType'] == 'EVENT_SWAP_ABORTED', 1))).label('aborted')
).filter(
MqttLog.topic_type == 'EVENTS',
MqttLog.timestamp.between(today_start, today_end)
).group_by(MqttLog.station_id).all()
# Convert the list of tuples into a dictionary for easy lookup
stats_dict = {
station_id: {
"total_starts": total_starts,
"completed": completed,
"aborted": aborted
} for station_id, total_starts, completed, aborted in stats
}
return jsonify(stats_dict)
except Exception as e:
print(f"Error fetching daily stats: {e}")
return jsonify({"message": "Could not fetch daily station stats."}), 500
@app.route('/api/logs/recent/<string:station_id>', methods=['GET'])
def get_recent_logs(station_id):
"""
Fetches the 50 most recent logs for a given station from the database.
"""
try:
# Query the MqttLog table, filter by station_id, order by timestamp descending, and take the first 50
logs = MqttLog.query.filter_by(station_id=station_id).order_by(desc(MqttLog.timestamp)).limit(50).all()
# We reverse the list so the oldest are first, for correct display order
logs.reverse()
log_list = [{
"topic": log.topic,
"payload": log.payload,
"timestamp": log.timestamp.isoformat()
} for log in logs]
return jsonify(log_list)
except Exception as e:
print(f"Error fetching recent logs: {e}")
return jsonify({"message": "Could not fetch recent logs."}), 500
# A helper dictionary to make abort reason labels more readable
ABORT_REASON_MAP = {
"ABORT_UNKNOWN": "Unknown",
"ABORT_BAT_EXIT_TIMEOUT": "Battery Exit Timeout",
"ABORT_BAT_ENTRY_TIMEOUT": "Battery Entry Timeout",
"ABORT_DOOR_CLOSE_TIMEOUT": "Door Close Timeout",
"ABORT_DOOR_OPEN_TIMEOUT": "Door Open Timeout",
"ABORT_INVALID_PARAM": "Invalid Parameter",
"ABORT_REMOTE_REQUESTED": "Remote Abort",
"ABORT_INVALID_BATTERY": "Invalid Battery"
}
#--- Analytics Route ---
@app.route('/api/analytics', methods=['GET'])
def get_analytics_data():
# 1. Get and validate request parameters (same as before)
station_id = request.args.get('station_id')
start_date_str = request.args.get('start_date')
end_date_str = request.args.get('end_date')
if not all([station_id, start_date_str, end_date_str]):
return jsonify({"message": "Missing required parameters."}), 400
try:
start_date = datetime.strptime(start_date_str, '%Y-%m-%d').date()
end_date = datetime.strptime(end_date_str, '%Y-%m-%d').date()
start_datetime = datetime.combine(start_date, datetime.min.time())
end_datetime = datetime.combine(end_date, datetime.max.time())
except ValueError:
return jsonify({"message": "Invalid date format. Please use YYYY-MM-DD."}), 400
# 2. Query for EVENT logs (for swap calculations)
try:
event_logs = MqttLog.query.filter(
MqttLog.station_id == station_id,
MqttLog.topic_type == 'EVENTS',
MqttLog.timestamp.between(start_datetime, end_datetime)
).all()
except Exception as e:
return jsonify({"message": f"Could not query event logs: {e}"}), 500
# --- NEW: Query for PERIODIC logs (for uptime calculation) ---
try:
periodic_logs = MqttLog.query.filter(
MqttLog.station_id == station_id,
MqttLog.topic_type == 'PERIODIC',
MqttLog.timestamp.between(start_datetime, end_datetime)
).order_by(MqttLog.timestamp.asc()).all()
except Exception as e:
return jsonify({"message": f"Could not query periodic logs: {e}"}), 500
# 3. Process EVENT logs for swap KPIs and charts
total_swaps, completed_swaps, aborted_swaps = 0, 0, 0
completed_swap_times, daily_completed, daily_aborted, hourly_swaps, abort_reason_counts = [], {}, {}, [0] * 24, {}
slot_utilization_counts = {i: 0 for i in range(1, 10)} # For the heatmap
for log in event_logs:
# (This processing logic is unchanged)
event_type = log.payload.get('eventType')
log_date = log.timestamp.date()
log_hour = log.timestamp.hour
if event_type == 'EVENT_SWAP_START':
total_swaps += 1
hourly_swaps[log_hour] += 1
elif event_type == 'EVENT_SWAP_ENDED':
completed_swaps += 1
daily_completed[log_date] = daily_completed.get(log_date, 0) + 1
swap_time = log.payload.get('eventData', {}).get('swapTime')
if swap_time is not None:
completed_swap_times.append(swap_time)
elif event_type == 'EVENT_SWAP_ABORTED':
aborted_swaps += 1
daily_aborted[log_date] = daily_aborted.get(log_date, 0) + 1
reason = log.payload.get('eventData', {}).get('swapAbortReason', 'ABORT_UNKNOWN')
abort_reason_counts[reason] = abort_reason_counts.get(reason, 0) + 1
elif event_type == 'EVENT_BATTERY_EXIT':
slot_id = log.payload.get('eventData', {}).get('slotId')
if slot_id and slot_id in slot_utilization_counts:
slot_utilization_counts[slot_id] += 1
# --- NEW: 4. Calculate Station Uptime ---
total_period_seconds = (end_datetime - start_datetime).total_seconds()
total_downtime_seconds = 0
MAX_ONLINE_GAP_SECONDS = 30 # Assume offline if no message for over 30 seconds
if not periodic_logs:
total_downtime_seconds = total_period_seconds
else:
# Check gap from start time to first message
first_gap = (periodic_logs[0].timestamp - start_datetime).total_seconds()
if first_gap > MAX_ONLINE_GAP_SECONDS:
total_downtime_seconds += first_gap
# Check gaps between consecutive messages
for i in range(1, len(periodic_logs)):
gap = (periodic_logs[i].timestamp - periodic_logs[i-1].timestamp).total_seconds()
if gap > MAX_ONLINE_GAP_SECONDS:
total_downtime_seconds += gap
# Check gap from last message to end time
last_gap = (end_datetime - periodic_logs[-1].timestamp).total_seconds()
if last_gap > MAX_ONLINE_GAP_SECONDS:
total_downtime_seconds += last_gap
station_uptime = 100 * (1 - (total_downtime_seconds / total_period_seconds))
station_uptime = max(0, min(100, station_uptime)) # Ensure value is between 0 and 100
# 5. Prepare final data structures (KPI section is now updated)
avg_swap_time_seconds = sum(completed_swap_times) / len(completed_swap_times) if completed_swap_times else 0
kpi_data = {
"total_swaps": total_swaps, "completed_swaps": completed_swaps,
"aborted_swaps": aborted_swaps, "avg_swap_time_seconds": avg_swap_time_seconds,
"station_uptime": round(station_uptime, 2) # Add uptime to the KPI object
}
# (The rest of the chart data preparation is unchanged)
date_labels, completed_data, aborted_data = [], [], []
current_date = start_date
while current_date <= end_date:
date_labels.append(current_date.strftime('%b %d'))
completed_data.append(daily_completed.get(current_date, 0))
aborted_data.append(daily_aborted.get(current_date, 0))
current_date += timedelta(days=1)
swap_activity_data = {"labels": date_labels, "completed_data": completed_data, "aborted_data": aborted_data}
hourly_distribution_data = {"labels": [f"{h % 12 if h % 12 != 0 else 12} {'AM' if h < 12 else 'PM'}" for h in range(24)], "swap_data": hourly_swaps}
abort_reasons_data = {"labels": [ABORT_REASON_MAP.get(r, r) for r in abort_reason_counts.keys()], "reason_data": list(abort_reason_counts.values())}
slot_utilization_data = {"counts": [slot_utilization_counts[i] for i in range(1, 10)]} # Return counts as a simple list [_ , _, ...]
# 6. Combine all data and return
return jsonify({
"kpis": kpi_data,
"swap_activity": swap_activity_data,
"hourly_distribution": hourly_distribution_data,
"abort_reasons": abort_reasons_data,
"slot_utilization": slot_utilization_data # <-- ADD THIS NEW KEY
})
# --- CSV Export route (UPDATED) ---
def _format_periodic_row(payload, num_slots=9):
"""
Flattens a periodic payload dictionary into a single list for a CSV row.
"""
row = [
datetime.fromtimestamp(payload.get("ts")).strftime("%Y-%m-%d %H:%M:%S"),
payload.get("deviceId", ""),
payload.get("stationDiagnosticCode", "")
]
slots_data = payload.get("slotLevelPayload", [])
slot_map = {s.get('slotId', i+1): s for i, s in enumerate(slots_data)}
slot_fields_keys = [
"batteryIdentification", "batteryPresent", "chargerPresent", "doorStatus", "doorLockStatus",
"voltage", "current", "soc", "batteryMaxTemp", "slotTemperature",
"batteryFaultCode", "chargerFaultCode", "batteryMode", "chargerMode"
]
for i in range(1, num_slots + 1):
slot = slot_map.get(i)
if slot:
row.extend([
slot.get('batteryIdentification', ''),
slot.get("batteryPresent", 0),
slot.get("chargerPresent", 0),
slot.get("doorStatus", 0),
slot.get("doorLockStatus", 0),
slot.get('voltage', 0) / 1000.0,
slot.get('current', 0) / 1000.0,
slot.get('soc', 0),
slot.get('batteryMaxTemp', 0) / 10.0,
slot.get('slotTemperature', 0) / 10.0,
slot.get('batteryFaultCode', 0),
slot.get('chargerFaultCode', 0),
slot.get('batteryMode', 0),
slot.get('chargerMode', 0)
])
else:
row.extend([''] * len(slot_fields_keys))
row.append('') # Placeholder for RawHexPayload
return row
@app.route('/api/logs/export', methods=['GET'])
def export_logs():
station_id = request.args.get('station_id')
start_datetime_str = request.args.get('start_datetime')
end_datetime_str = request.args.get('end_datetime')
log_type = request.args.get('log_type', 'PERIODIC')
if not all([station_id, start_datetime_str, end_datetime_str]):
return jsonify({"error": "Missing required parameters"}), 400
try:
start_datetime = datetime.strptime(start_datetime_str, '%Y-%m-%dT%H:%M')
end_datetime = datetime.strptime(end_datetime_str, '%Y-%m-%dT%H:%M')
except ValueError:
return jsonify({"error": "Invalid datetime format"}), 400
# --- FIX 1: Correctly query for Events & RPC ---
if log_type == 'EVENT':
# If frontend asks for EVENT, search for both EVENTS and REQUEST in the DB
query = MqttLog.query.filter(
MqttLog.station_id == station_id,
MqttLog.timestamp.between(start_datetime, end_datetime),
MqttLog.topic_type.in_(['EVENTS', 'REQUEST'])
)
else: # Otherwise, query for PERIODIC
query = MqttLog.query.filter(
MqttLog.station_id == station_id,
MqttLog.timestamp.between(start_datetime, end_datetime),
MqttLog.topic_type == log_type
)
logs = query.order_by(MqttLog.timestamp.asc()).all()
if not logs:
return jsonify({"message": "No logs found for the selected criteria."}), 404
output = io.StringIO()
writer = csv.writer(output)
# --- FIX 2: Create a cleaner filename ---
station = Station.query.filter_by(station_id=station_id).first()
station_name = station.name.replace(' ', '_') if station else station_id
date_str = start_datetime.strftime('%Y-%m-%d')
filename = f"{station_name}_{log_type}_{date_str}.csv"
if log_type == 'PERIODIC':
base_header = ["Timestamp", "DeviceID", "StationDiagnosticCode"]
slot_fields = [
"BatteryID", "BatteryPresent", "ChargerPresent", "DoorStatus", "DoorLockStatus",
"Voltage_V", "Current_A", "SOC_Percent", "BatteryTemp_C", "SlotTemp_C",
"BatteryFaultCode", "ChargerFaultCode", "BatteryMode", "ChargerMode"
]
slot_header = [f"Slot{i}_{field}" for i in range(1, 10) for field in slot_fields]
header = base_header + slot_header + ["RawHexPayload"]
writer.writerow(header)
for log in logs:
writer.writerow(_format_periodic_row(log.payload))
else: # For EVENTS_RPC
header = ["Timestamp", "Topic", "Payload_JSON"]
writer.writerow(header)
for log in logs:
writer.writerow([log.timestamp, log.topic, json.dumps(log.payload)])
output.seek(0)
return Response(
output,
mimetype="text/csv",
headers={"Content-Disposition": f"attachment;filename={filename}"}
)
@socketio.on('rpc_request')
def handle_rpc_request(payload):
"""
Receives a command from the web dashboard, creates a Protobuf RPC request,
and publishes it to the station via MQTT.
"""
station_id = payload.get('station_id')
command = payload.get('command')
data = payload.get('data') # This will be the slot_id or swap_pairs array
print(f"Received RPC request for station {station_id}: {command} with data {data}")
# Find the correct MQTT client for this station
mqtt_client = mqtt_clients.get(station_id)
if not mqtt_client or not mqtt_client.is_connected:
print(f"Cannot send RPC for {station_id}: MQTT client not connected.")
return # Or emit an error back to the user
# --- Create the Protobuf message based on the command ---
# This is where the logic from your snippet is implemented.
request_payload = rpcRequest(
ts=int(time.time()),
jobId=f"job_{int(time.time())}"
)
# Determine the jobType and set data based on the command string
if command == 'OPEN':
request_payload.jobType = jobType_e.JOBTYPE_GATE_OPEN_CLOSE
request_payload.slotInfo.slotId = data
request_payload.slotInfo.state = 1
elif command == 'CHG_ON':
# Replace this with the correct name from your .proto file
request_payload.jobType = jobType_e.JOBTYPE_CHARGER_ENABLE_DISABLE
request_payload.slotInfo.slotId = data
request_payload.slotInfo.state = 1 # State 1 for ON
elif command == 'CHG_OFF':
# Replace this with the correct name from your .proto file
request_payload.jobType = jobType_e.JOBTYPE_CHARGER_ENABLE_DISABLE
request_payload.slotInfo.slotId = data
request_payload.slotInfo.state = 0 # State 0 for OFF
elif command == 'START_SWAP':
# --- THIS IS THE CORRECTED LINE ---
request_payload.jobType = jobType_e.JOBTYPE_SWAP_START
if data and isinstance(data, list):
# Your logic for adding the swap pairs to the payload
for pair in data:
swap_info = request_payload.swapInfo.add()
swap_info.fromSlot = pair[0]
swap_info.toSlot = pair[1]
# --- NEW: Added handlers for Abort and Reset ---
elif command == 'ABORT_SWAP':
request_payload.jobType = jobType_e.JOBTYPE_TRANSACTION_ABORT
elif command == 'STATION_RESET':
request_payload.jobType = jobType_e.JOBTYPE_REBOOT
elif command == 'LANGUAGE_UPDATE':
request_payload.jobType = jobType_e.JOBTYPE_LANGUAGE_UPDATE
# Logic to map language string to enum would go here
else:
print(f"Unknown command: {command}")
return
# --- Serialize and Publish the Message ---
serialized_payload = request_payload.SerializeToString()
# Construct the MQTT topic
# NOTE: You may need to fetch client_id and version from your database
topic = f"VEC/batterySmartStation/v100/{station_id}/RPC/REQUEST"
print(f"Publishing to {topic}")
mqtt_client.client.publish(topic, serialized_payload)
# ADD THIS NEW FUNCTION
def start_single_mqtt_client(station):
"""
Creates and starts a new MQTT client thread for a SINGLE station.
This is our new reusable function.
"""
if station.station_id in mqtt_clients and mqtt_clients[station.station_id].is_connected:
print(f"MQTT client for {station.station_id} is already running.")
return
print(f"Creating and starting MQTT client for station: {station.name} ({station.station_id})")
client = MqttClient(
broker=station.mqtt_broker,
port=station.mqtt_port,
user=station.mqtt_user,
password=station.mqtt_password,
station_id=station.station_id,
on_message_callback=on_message_handler
)
client.start() # The start method should handle threading
mqtt_clients[station.station_id] = client
# --- Main Application Logic ---
# def start_mqtt_clients():
# """
# Initializes and starts an MQTT client for each station found in the database,
# using the specific MQTT credentials stored for each station.
# """
# try:
# with app.app_context():
# stations = Station.query.all()
# except Exception as e:
# print(f"CRITICAL: Could not query stations from the database in MQTT thread: {e}")
# return
# for station in stations:
# if station.station_id not in mqtt_clients:
# print(f"Creating and starting MQTT client for station: {station.name} ({station.station_id})")
# client = MqttClient(
# broker=station.mqtt_broker,
# port=station.mqtt_port,
# user=station.mqtt_user,
# password=station.mqtt_password,
# station_id=station.station_id,
# on_message_callback=on_message_handler
# )
# client.start()
# mqtt_clients[station.station_id] = client
def start_mqtt_clients():
"""
Initializes and starts an MQTT client for each station found in the database
by calling our new reusable function.
"""
try:
with app.app_context():
stations = Station.query.all()
print(f"Found {len(stations)} existing stations to monitor.")
except Exception as e:
print(f"CRITICAL: Could not query stations from the database: {e}")
return
for station in stations:
start_single_mqtt_client(station)
if __name__ == '__main__':
try:
with app.app_context():
db.create_all()
# Add a default user if none exist
if not User.query.first():
print("No users found. Creating a default admin user.")
default_user = User(username='admin')
default_user.set_password('password')
db.session.add(default_user)
db.session.commit()
# Add a default station if none exist
if not Station.query.first():
print("No stations found. Adding a default station.")
default_station = Station(
station_id="V16000862287077265957",
product_id="VEC_PROD_001",
name="Test Station 1",
mqtt_broker="mqtt.vecmocon.com",
mqtt_port=1883,
mqtt_user="your_username",
mqtt_password="your_password"
)
db.session.add(default_station)
db.session.commit()
except Exception as e:
print(f"FATAL ERROR: Could not connect to PostgreSQL: {e}")
sys.exit(1)
mqtt_thread = threading.Thread(target=start_mqtt_clients, daemon=True)
mqtt_thread.start()
print(f"Starting Flask-SocketIO server on http://192.168.1.12:5000")
socketio.run(app, host='192.168.1.12', port=5000)