import os import sys import threading import json import csv import io import time # Import the time module from datetime import datetime from flask import Flask, jsonify, request, Response from flask_socketio import SocketIO, join_room # <-- IMPORTANT: Add join_room from flask_cors import CORS from dotenv import load_dotenv # Import your custom core modules and the new models from core.mqtt_client import MqttClient from core.protobuf_decoder import ProtobufDecoder from models import db, Station, User, MqttLog from flask_login import LoginManager, login_required, current_user # --- Load Environment Variables --- load_dotenv() # --- Pre-startup Check for Essential Configuration --- DATABASE_URL = os.getenv("DATABASE_URL") if not DATABASE_URL: print("FATAL ERROR: DATABASE_URL is not set in .env file.") sys.exit(1) # --- Application Setup --- app = Flask(__name__) CORS(app, resources={r"/api/*": {"origins": "http://127.0.0.1:5500"}}, supports_credentials=True) login_manager = LoginManager() login_manager.init_app(app) app.config['SQLALCHEMY_DATABASE_URI'] = DATABASE_URL app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False app.config['SECRET_KEY'] = os.getenv("SECRET_KEY", "a_very_secret_key") db.init_app(app) socketio = SocketIO(app, cors_allowed_origins="*") # --- User Loader for Flask-Login --- @login_manager.user_loader def load_user(user_id): return User.query.get(int(user_id)) # --- Global instances --- decoder = ProtobufDecoder() mqtt_clients = {} last_message_timestamps = {} STATION_TIMEOUT_SECONDS = 90 # --- MQTT Message Handling --- def on_message_handler(station_id, topic, payload): last_message_timestamps[station_id] = time.time() print(f"Main handler received message for station {station_id} on topic {topic}") decoded_data = None message_type = topic.split('/')[-1] if message_type == 'PERIODIC': decoded_data = decoder.decode_periodic(payload) elif message_type == 'EVENTS': decoded_data = decoder.decode_event(payload) elif message_type == 'REQUEST': decoded_data = decoder.decode_rpc_request(payload) if decoded_data: print("DECODED DATA TO BE SENT:", decoded_data) try: with app.app_context(): log_entry = MqttLog( station_id=station_id, topic=topic, topic_type=message_type, payload=decoded_data ) db.session.add(log_entry) db.session.commit() print(f"Successfully wrote data for {station_id} to PostgreSQL.") except Exception as e: print(f"Error writing to PostgreSQL: {e}") socketio.emit('dashboard_update', { 'stationId': station_id, 'topic': topic, 'data': decoded_data }, room=station_id) # --- WebSocket Handlers --- @socketio.on('connect') def handle_connect(): print('Client connected to WebSocket') # --- NEW: Function to handle joining a room and sending initial data --- @socketio.on('join_station_room') def handle_join_station_room(data): station_id = data['station_id'] join_room(station_id) print(f"Client joined room for station: {station_id}") try: # Find the most recent log entry for this station latest_log = MqttLog.query.filter_by( station_id=station_id, topic_type='PERIODIC' ).order_by(MqttLog.timestamp.desc()).first() if latest_log: # If we have a past log, send it immediately to the new client print(f"Sending initial state for {station_id} to new client.") socketio.emit('dashboard_update', { 'stationId': station_id, 'topic': latest_log.topic, 'data': latest_log.payload }, room=station_id) except Exception as e: print(f"Error querying or sending initial state for {station_id}: {e}") # ... (rest of your API routes remain the same) ... # --- API Routes --- @app.route('/api/login', methods=['POST']) def login(): # ... (code omitted for brevity) pass @app.route('/api/users', methods=['POST']) # @login_required # Temporarily disabled for testing def add_user(): # ... (code omitted for brevity) pass @app.route('/api/stations', methods=['POST']) # @login_required # Temporarily disabled for testing def add_station(): # ... (code omitted for brevity) pass @app.route('/api/stations', methods=['GET']) def get_stations(): try: stations = Station.query.all() station_list = [] for s in stations: last_msg_time = last_message_timestamps.get(s.station_id) is_online = last_msg_time is not None and (time.time() - last_msg_time) < STATION_TIMEOUT_SECONDS station_list.append({ "id": s.station_id, "name": s.name, "location": s.location, "status": "Online" if is_online else "Offline" }) return jsonify(station_list) except Exception as e: return jsonify({"error": f"Database query failed: {e}"}), 500 # ... (your CSV export and MQTT client start functions remain the same) ... if __name__ == '__main__': # ... (your main startup logic remains the same) ... pass