164 lines
5.3 KiB
Python
164 lines
5.3 KiB
Python
import os
|
|
import sys
|
|
import threading
|
|
import json
|
|
import csv
|
|
import io
|
|
import time # Import the time module
|
|
from datetime import datetime
|
|
from flask import Flask, jsonify, request, Response
|
|
from flask_socketio import SocketIO, join_room # <-- IMPORTANT: Add join_room
|
|
from flask_cors import CORS
|
|
from dotenv import load_dotenv
|
|
|
|
# Import your custom core modules and the new models
|
|
from core.mqtt_client import MqttClient
|
|
from core.protobuf_decoder import ProtobufDecoder
|
|
from models import db, Station, User, MqttLog
|
|
from flask_login import LoginManager, login_required, current_user
|
|
|
|
# --- Load Environment Variables ---
|
|
load_dotenv()
|
|
|
|
# --- Pre-startup Check for Essential Configuration ---
|
|
DATABASE_URL = os.getenv("DATABASE_URL")
|
|
if not DATABASE_URL:
|
|
print("FATAL ERROR: DATABASE_URL is not set in .env file.")
|
|
sys.exit(1)
|
|
|
|
# --- Application Setup ---
|
|
app = Flask(__name__)
|
|
CORS(app, resources={r"/api/*": {"origins": "http://127.0.0.1:5500"}}, supports_credentials=True)
|
|
|
|
login_manager = LoginManager()
|
|
login_manager.init_app(app)
|
|
|
|
app.config['SQLALCHEMY_DATABASE_URI'] = DATABASE_URL
|
|
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False
|
|
app.config['SECRET_KEY'] = os.getenv("SECRET_KEY", "a_very_secret_key")
|
|
db.init_app(app)
|
|
socketio = SocketIO(app, cors_allowed_origins="*")
|
|
|
|
# --- User Loader for Flask-Login ---
|
|
@login_manager.user_loader
|
|
def load_user(user_id):
|
|
return User.query.get(int(user_id))
|
|
|
|
# --- Global instances ---
|
|
decoder = ProtobufDecoder()
|
|
mqtt_clients = {}
|
|
last_message_timestamps = {}
|
|
STATION_TIMEOUT_SECONDS = 90
|
|
|
|
# --- MQTT Message Handling ---
|
|
def on_message_handler(station_id, topic, payload):
|
|
last_message_timestamps[station_id] = time.time()
|
|
|
|
print(f"Main handler received message for station {station_id} on topic {topic}")
|
|
|
|
decoded_data = None
|
|
message_type = topic.split('/')[-1]
|
|
|
|
if message_type == 'PERIODIC':
|
|
decoded_data = decoder.decode_periodic(payload)
|
|
elif message_type == 'EVENTS':
|
|
decoded_data = decoder.decode_event(payload)
|
|
elif message_type == 'REQUEST':
|
|
decoded_data = decoder.decode_rpc_request(payload)
|
|
|
|
if decoded_data:
|
|
print("DECODED DATA TO BE SENT:", decoded_data)
|
|
try:
|
|
with app.app_context():
|
|
log_entry = MqttLog(
|
|
station_id=station_id,
|
|
topic=topic,
|
|
topic_type=message_type,
|
|
payload=decoded_data
|
|
)
|
|
db.session.add(log_entry)
|
|
db.session.commit()
|
|
print(f"Successfully wrote data for {station_id} to PostgreSQL.")
|
|
except Exception as e:
|
|
print(f"Error writing to PostgreSQL: {e}")
|
|
|
|
socketio.emit('dashboard_update', {
|
|
'stationId': station_id,
|
|
'topic': topic,
|
|
'data': decoded_data
|
|
}, room=station_id)
|
|
|
|
# --- WebSocket Handlers ---
|
|
@socketio.on('connect')
|
|
def handle_connect():
|
|
print('Client connected to WebSocket')
|
|
|
|
# --- NEW: Function to handle joining a room and sending initial data ---
|
|
@socketio.on('join_station_room')
|
|
def handle_join_station_room(data):
|
|
station_id = data['station_id']
|
|
join_room(station_id)
|
|
print(f"Client joined room for station: {station_id}")
|
|
|
|
try:
|
|
# Find the most recent log entry for this station
|
|
latest_log = MqttLog.query.filter_by(
|
|
station_id=station_id,
|
|
topic_type='PERIODIC'
|
|
).order_by(MqttLog.timestamp.desc()).first()
|
|
|
|
if latest_log:
|
|
# If we have a past log, send it immediately to the new client
|
|
print(f"Sending initial state for {station_id} to new client.")
|
|
socketio.emit('dashboard_update', {
|
|
'stationId': station_id,
|
|
'topic': latest_log.topic,
|
|
'data': latest_log.payload
|
|
}, room=station_id)
|
|
except Exception as e:
|
|
print(f"Error querying or sending initial state for {station_id}: {e}")
|
|
|
|
# ... (rest of your API routes remain the same) ...
|
|
|
|
# --- API Routes ---
|
|
@app.route('/api/login', methods=['POST'])
|
|
def login():
|
|
# ... (code omitted for brevity)
|
|
pass
|
|
|
|
@app.route('/api/users', methods=['POST'])
|
|
# @login_required # Temporarily disabled for testing
|
|
def add_user():
|
|
# ... (code omitted for brevity)
|
|
pass
|
|
|
|
@app.route('/api/stations', methods=['POST'])
|
|
# @login_required # Temporarily disabled for testing
|
|
def add_station():
|
|
# ... (code omitted for brevity)
|
|
pass
|
|
|
|
@app.route('/api/stations', methods=['GET'])
|
|
def get_stations():
|
|
try:
|
|
stations = Station.query.all()
|
|
station_list = []
|
|
for s in stations:
|
|
last_msg_time = last_message_timestamps.get(s.station_id)
|
|
is_online = last_msg_time is not None and (time.time() - last_msg_time) < STATION_TIMEOUT_SECONDS
|
|
|
|
station_list.append({
|
|
"id": s.station_id,
|
|
"name": s.name,
|
|
"location": s.location,
|
|
"status": "Online" if is_online else "Offline"
|
|
})
|
|
return jsonify(station_list)
|
|
except Exception as e:
|
|
return jsonify({"error": f"Database query failed: {e}"}), 500
|
|
|
|
# ... (your CSV export and MQTT client start functions remain the same) ...
|
|
|
|
if __name__ == '__main__':
|
|
# ... (your main startup logic remains the same) ...
|
|
pass |