feat(versioning): centralize dashboard version management

- Added VERSION.txt at project root to serve as the single source of truth
- Added core/versioning.py with get_version() helper:
  • Reads APP_VERSION env var if present
  • Reads VERSION.txt (works in dev and frozen exe)
  • Falls back to "v0.0.0"
- Updated main_window.py:
  • Import get_version() and store self.app_version
  • Set window title dynamically with dashboard version
  • Replaced hardcoded QLabel("Version 4.2") with QLabel(f"Version {get_version()}")
- Updated build script (generate_executable.py):
  • Prompt for version at build time
  • Write VERSION.txt with entered version
  • Bundle VERSION.txt into PyInstaller exe (--add-data)
  • Use version in exe filename (SwapStationDashboard_vX.Y.exe)
- Result: version number now changes automatically everywhere (badge, title, About, logs)
  just by editing VERSION.txt or providing APP_VERSION at build/runtime.
main
Kirubakaran 2025-08-27 20:16:02 +05:30
parent 37500e3877
commit 68f6de695f
5 changed files with 253 additions and 273 deletions

1
VERSION.txt Normal file
View File

@ -0,0 +1 @@
v4.3

View File

@ -1,172 +1,25 @@
# # In core/mqtt_client.py
# import socket
# from PyQt6.QtCore import QObject, pyqtSignal, pyqtSlot
# import paho.mqtt.client as mqtt
# import uuid
# class MqttClient(QObject):
# # --- MODIFIED SIGNAL: Now sends a bool and a string ---
# connection_status_changed = pyqtSignal(bool, str)
# message_received = pyqtSignal(str, bytes)
# connection_error = pyqtSignal(str)
# stop_logging_signal = pyqtSignal()
# connected = pyqtSignal()
# disconnected = pyqtSignal()
# def __init__(self, broker, port, user, password, client_id):
# super().__init__()
# self.broker = broker
# self.port = port
# self._is_connected = False
# self.client = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2, client_id)
# if user and password:
# self.client.username_pw_set(user, password)
# self.client.on_connect = self.on_connect
# self.client.on_disconnect = self.on_disconnect
# self.client.on_message = self.on_message
# # self.client.on_subscribe = self.on_subscribe
# def on_connect(self, client, userdata, flags, rc, properties):
# if rc == 0:
# self._is_connected = True # Set flag on success
# print("Connection to MQTT Broker successful!")
# self.connection_status_changed.emit(True, "✅ Connected")
# self.connected.emit()
# else:
# # This block now handles the failure message, but not the disconnect signal
# error_message = f"Connection failed (Code: {rc})"
# if rc == 5:
# error_message = "❌ Not Authorized: Check username and password."
# print(f"Failed to connect: {error_message}")
# self.connection_status_changed.emit(False, error_message)
# # The on_disconnect callback will handle the disconnected signal
# def on_disconnect(self, client, userdata, flags, rc, properties):
# # --- MODIFIED: This entire block is now protected by the flag ---
# if not self._is_connected and rc != 0:
# # This is a connection failure, on_connect already handled the message
# pass
# else:
# # This is a true disconnection from an active session
# print("Disconnected from MQTT Broker.")
# self.connection_status_changed.emit(False, "💔 Disconnected")
# # This logic now runs only ONCE per disconnect/failure event
# if self._is_connected or rc != 0:
# self.disconnected.emit()
# self._is_connected = False
# def on_message(self, client, userdata, msg):
# # print(f"Received {len(msg.payload)} bytes of binary data from topic `{msg.topic}`")
# self.message_received.emit(msg.topic, msg.payload)
# # --- MODIFIED connect_to_broker METHOD ---
# def connect_to_broker(self):
# print(f"Attempting to connect to {self.broker}:{self.port}...")
# try:
# self.client.connect(self.broker, self.port, 120)
# print(f"Attempting to connect to {self.broker}:{self.port}...")
# self.client.loop_start()
# except socket.gaierror:
# msg = "Host not found. Check internet."
# print(f"❌ Connection Error: {msg}")
# self.connection_status_changed.emit(False, f"❌ {msg}")
# except (socket.error, TimeoutError):
# msg = "Connection failed. Server offline?"
# print(f"❌ Connection Error: {msg}")
# self.connection_status_changed.emit(False, f"❌ {msg}")
# except Exception as e:
# msg = f"An unexpected error occurred: {e}"
# print(f"❌ {msg}")
# self.connection_status_changed.emit(False, f"❌ Error")
# def run(self):
# """
# Connects to the broker and starts the network loop.
# Handles all common connection errors gracefully.
# """
# print(f"Attempting to connect to {self.broker}:{self.port}...")
# try:
# # 1. Attempt to connect
# # print(f"Attempting to connect to {self.broker}:{self.port}...")
# self.client.connect(self.broker, self.port, 120)
# # 2. Run the blocking network loop
# # This will run until self.client.disconnect() is called
# self.client.loop_forever()
# except socket.gaierror:
# msg = "Host not found. Check the broker address or your internet connection."
# print(f"❌ {msg}")
# self.connection_error.emit(msg) # Report error to the main window
# except (socket.error, ConnectionRefusedError):
# msg = "Connection refused. Is the server offline or the port incorrect?"
# print(f"❌ {msg}")
# self.connection_error.emit(msg)
# except TimeoutError:
# msg = "Connection timed out. The server is not responding."
# print(f"❌ {msg}")
# self.connection_error.emit(msg)
# except Exception as e:
# # Catch any other unexpected errors during connection or loop
# msg = f"An unexpected error occurred: {e}"
# print(f"❌ {msg}")
# self.connection_error.emit(msg)
# # def on_subscribe(self, client, userdata, mid, reason_code_list, properties):
# # """Callback function for when the broker responds to a subscription request."""
# # if reason_code_list[0].is_failure:
# # print(f"❌ Broker rejected subscription: {reason_code_list[0]}")
# # else:
# # print(f"✅ Broker accepted subscription with QoS: {reason_code_list[0].value}")
# # --- (The rest of the file remains the same) ---
# @pyqtSlot()
# def disconnect_from_broker(self):
# """Stops the MQTT client's network loop."""
# if self.client:
# self.client.loop_stop()
# self.client.disconnect()
# print("Stopping MQTT network loop.")
# def subscribe_to_topic(self, topic): # Add qos parameter
# print(f"Subscribing to topic: {topic}")
# self.client.subscribe(topic)
# def publish_message(self, topic, payload):
# self.client.publish(topic, payload)
# def cleanup(self):
# print("Stopping MQTT network loop.")
# self.client.loop_stop()
# In core/mqtt_client.py # In core/mqtt_client.py
import socket import socket
from PyQt6.QtCore import QObject, pyqtSignal, pyqtSlot from PyQt6.QtCore import QObject, pyqtSignal, pyqtSlot
import paho.mqtt.client as mqtt import paho.mqtt.client as mqtt
import uuid
class MqttClient(QObject): class MqttClient(QObject):
# Sends connection state (bool) and a message (str) # --- MODIFIED SIGNAL: Now sends a bool and a string ---
connection_status_changed = pyqtSignal(bool, str) connection_status_changed = pyqtSignal(bool, str)
# Generic signals for success or failure/disconnection message_received = pyqtSignal(str, bytes)
connection_error = pyqtSignal(str)
stop_logging_signal = pyqtSignal()
connected = pyqtSignal() connected = pyqtSignal()
disconnected = pyqtSignal() disconnected = pyqtSignal()
# Sends topic (str) and payload (bytes) when a message is received
message_received = pyqtSignal(str, bytes)
def __init__(self, broker, port, user, password, client_id): def __init__(self, broker, port, user, password, client_id):
super().__init__() super().__init__()
self.broker = broker self.broker = broker
self.port = port self.port = port
self._is_connected = False # Flag to track connection state self._is_connected = False
self._reported_bad_creds = False
self._suppress_next_disconnect_notice = False
self.client = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2, client_id) self.client = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2, client_id)
if user and password: if user and password:
@ -175,57 +28,118 @@ class MqttClient(QObject):
self.client.on_connect = self.on_connect self.client.on_connect = self.on_connect
self.client.on_disconnect = self.on_disconnect self.client.on_disconnect = self.on_disconnect
self.client.on_message = self.on_message self.client.on_message = self.on_message
# self.client.on_subscribe = self.on_subscribe
def on_connect(self, client, userdata, flags, rc, properties): def on_connect(self, client, userdata, flags, rc, properties=None, *args, **kwargs):
if rc == 0: if rc == 0:
# success
self._is_connected = True self._is_connected = True
self._ever_connected = True
self._suppress_next_disconnect_notice = False
print("Connection to MQTT Broker successful!") print("Connection to MQTT Broker successful!")
self.connection_status_changed.emit(True, "✅ Connected") self.connection_status_changed.emit(True, "✅ Connected")
self.connected.emit() self.connected.emit()
else: else:
# Report the specific error from the broker # auth or other failure — log ONCE, and suppress the auto "Disconnected" message that will follow
error_message = f"Connection failed (Code: {rc})" msg = "Bad user name or password" if rc == 5 else f"Connection failed (Code: {rc})"
if rc == 5: print(f"Failed to connect: {msg}")
error_message = "❌ Not Authorized: Check username and password." self.connection_status_changed.emit(False, msg)
print(f"Failed to connect: {error_message}") # make sure we do NOT show a "Disconnected" notice right after this
self.connection_status_changed.emit(False, error_message) self._suppress_next_disconnect_notice = True
# The on_disconnect callback will handle the generic 'disconnected' signal
def on_disconnect(self, client, userdata, flags, rc, properties): # stop any retry loop immediately
# Only show the generic "Disconnected" message if we were actually connected before. try:
if self._is_connected: client.disconnect()
print("Disconnected from MQTT Broker.") client.loop_stop()
self.connection_status_changed.emit(False, "💔 Disconnected") except Exception:
pass
# Always emit the generic disconnected signal to trigger cleanup in the main window. def on_disconnect(self, client, userdata, rc, properties=None, *args, **kwargs):
self.disconnected.emit()
self._is_connected = False self._is_connected = False
if self._suppress_next_disconnect_notice or not self._ever_connected:
self._suppress_next_disconnect_notice = False
return
print("Disconnected from MQTT Broker.")
self.disconnected.emit()
self.connection_status_changed.emit(False, "Disconnected")
def on_message(self, client, userdata, msg): def on_message(self, client, userdata, msg):
# print(f"Received {len(msg.payload)} bytes of binary data from topic `{msg.topic}`")
self.message_received.emit(msg.topic, msg.payload) self.message_received.emit(msg.topic, msg.payload)
@pyqtSlot() # --- MODIFIED connect_to_broker METHOD ---
def connect_to_broker(self): def connect_to_broker(self):
print(f"Attempting to connect to {self.broker}:{self.port}...") print(f"Attempting to connect to {self.broker}:{self.port}...")
try: try:
self.client.connect(self.broker, self.port, 60) self.client.connect(self.broker, self.port, 120)
self.client.loop_start() self.client.loop_start()
except socket.gaierror:
msg = "Host not found. Check internet."
print(f"❌ Connection Error: {msg}")
self.connection_status_changed.emit(False, f"{msg}")
except (socket.error, TimeoutError):
msg = "Connection failed. Server offline?"
print(f"❌ Connection Error: {msg}")
self.connection_status_changed.emit(False, f"{msg}")
except Exception as e: except Exception as e:
# Catch any exception during the initial connect call msg = f"An unexpected error occurred: {e}"
msg = f"Connection Error: {e}"
print(f"{msg}") print(f"{msg}")
self.connection_status_changed.emit(False, msg) self.connection_status_changed.emit(False, f"❌ Error")
self.disconnected.emit()
def run(self):
"""
Connects to the broker and starts the network loop.
Handles all common connection errors gracefully.
"""
print(f"Attempting to connect to {self.broker}:{self.port}...")
try:
# 1. Attempt to connect
# print(f"Attempting to connect to {self.broker}:{self.port}...")
self.client.connect(self.broker, self.port, 120)
# 2. Run the blocking network loop
# This will run until self.client.disconnect() is called
self.client.loop_forever()
except socket.gaierror:
msg = "Host not found. Check the broker address or your internet connection."
print(f"{msg}")
self.connection_error.emit(msg) # Report error to the main window
except (socket.error, ConnectionRefusedError):
msg = "Connection refused. Is the server offline or the port incorrect?"
print(f"{msg}")
self.connection_error.emit(msg)
except TimeoutError:
msg = "Connection timed out. The server is not responding."
print(f"{msg}")
self.connection_error.emit(msg)
except Exception as e:
# Catch any other unexpected errors during connection or loop
msg = f"An unexpected error occurred: {e}"
print(f"{msg}")
self.connection_error.emit(msg)
@pyqtSlot() @pyqtSlot()
def disconnect_from_broker(self): def disconnect_from_broker(self):
"""Stops the MQTT client's network loop."""
if self.client: if self.client:
self.client.loop_stop() self.client.loop_stop()
self.client.disconnect() self.client.disconnect()
print("Stopping MQTT network loop.")
def subscribe_to_topic(self, topic): def subscribe_to_topic(self, topic): # Add qos parameter
print(f"Subscribing to topic: {topic}")
self.client.subscribe(topic) self.client.subscribe(topic)
def publish_message(self, topic, payload): def publish_message(self, topic, payload):
self.client.publish(topic, payload) self.client.publish(topic, payload)
def cleanup(self):
print("Stopping MQTT network loop.")
self.client.loop_stop()

26
core/versioning.py Normal file
View File

@ -0,0 +1,26 @@
# core/versioning.py
from pathlib import Path
import os, sys
DEFAULT_VERSION = "v0.0.0"
def _bundle_root() -> Path:
# When frozen by PyInstaller, files are unpacked in sys._MEIPASS
if getattr(sys, "frozen", False) and hasattr(sys, "_MEIPASS"):
return Path(sys._MEIPASS)
# Dev/run-from-source: project root = two levels up from this file (adjust if needed)
return Path(__file__).resolve().parents[1]
def get_version() -> str:
# 1) ENV override (useful for CI)
env = os.getenv("APP_VERSION")
if env:
return env.strip()
# 2) VERSION.txt (works in dev and frozen)
root = _bundle_root()
ver_file = root / "VERSION.txt"
if ver_file.exists():
return ver_file.read_text(encoding="utf-8").strip()
return DEFAULT_VERSION

View File

@ -1,29 +1,98 @@
# import subprocess
# import sys
# def generate_executable():
# """Prompts for a version and generates a single-file executable."""
# # 1. Ask the user for the version number
# version = ""
# while not version:
# version = input("Enter the version for the executable (e.g., 4.1): ")
# if not version:
# print("Version cannot be empty. Please try again.")
# executable_name = f"SwapStationDashboard_v{version}"
# print(f"Generating executable with name: {executable_name}")
# # Check if pyinstaller is installed
# try:
# subprocess.run([sys.executable, "-m", "PyInstaller", "--version"], check=True, capture_output=True)
# except (subprocess.CalledProcessError, FileNotFoundError):
# print("PyInstaller is not found. Please install it using: pip install pyinstaller")
# return
# print("Starting executable generation...")
# # 2. Use the version to create the command
# command = [
# sys.executable,
# "-m", "PyInstaller",
# f"--name={executable_name}",
# "--onefile",
# "--icon=assets/icon.ico",
# "--add-data=logo;logo",
# "--add-data=assets;assets",
# "--add-data=proto;proto",
# "--hidden-import=paho.mqtt",
# "--hidden-import=google.protobuf",
# "--hidden-import=PyQt6",
# "--hidden-import=PyQt6.Qt6",
# "--hidden-import=PyQt6.sip",
# "--hidden-import=setuptools",
# "main.py"
# ]
# try:
# # 3. Execute the command
# subprocess.run(command, check=True)
# print("\n✅ Executable generated successfully!")
# print(f"Look for '{executable_name}.exe' in the 'dist' folder.")
# except subprocess.CalledProcessError as e:
# print("\n❌ An error occurred during executable generation.")
# print(f"Command failed with return code: {e.returncode}")
# except FileNotFoundError:
# print("\n❌ Error: The 'main.py' file was not found. Please run this script from the project's root directory.")
# if __name__ == "__main__":
# generate_executable()
import subprocess import subprocess
import sys import sys
from pathlib import Path
def generate_executable(): def generate_executable():
"""Prompts for a version and generates a single-file executable.""" """Prompts for a version, writes VERSION.txt, and builds a single-file exe that shows the same version inside the app."""
# --- Ask version ---
# 1. Ask the user for the version number
version = "" version = ""
while not version: while not version:
version = input("Enter the version for the executable (e.g., 4.1): ") version = input("Enter the version for the executable (e.g., 4.1): ").strip()
if not version: if not version:
print("Version cannot be empty. Please try again.") print("Version cannot be empty. Please try again.")
executable_name = f"SwapStationDashboard_v{version}" # Normalize how you want to display it in the app:
print(f"Generating executable with name: {executable_name}") display_version = f"v{version}"
# Check if pyinstaller is installed # --- Paths ---
project_root = Path(__file__).resolve().parent
version_file = project_root / "VERSION.txt"
executable_name = f"SwapStationDashboard_{display_version}"
# --- Persist version for the app ---
version_file.write_text(display_version, encoding="utf-8")
print(f"📦 Wrote {version_file} with '{display_version}'")
# --- Check PyInstaller ---
try: try:
subprocess.run([sys.executable, "-m", "PyInstaller", "--version"], check=True, capture_output=True) subprocess.run([sys.executable, "-m", "PyInstaller", "--version"], check=True, capture_output=True)
except (subprocess.CalledProcessError, FileNotFoundError): except (subprocess.CalledProcessError, FileNotFoundError):
print("PyInstaller is not found. Please install it using: pip install pyinstaller") print("PyInstaller is not found. Please install it using: pip install pyinstaller")
return return
print("Starting executable generation...") print(f"🚀 Building: {executable_name}")
# 2. Use the version to create the command # NOTE: On Windows, --add-data uses 'src;dst' (semicolon). (You already follow this.)
command = [ command = [
sys.executable, sys.executable,
"-m", "PyInstaller", "-m", "PyInstaller",
@ -33,20 +102,20 @@ def generate_executable():
"--add-data=logo;logo", "--add-data=logo;logo",
"--add-data=assets;assets", "--add-data=assets;assets",
"--add-data=proto;proto", "--add-data=proto;proto",
"--add-data=VERSION.txt;.", # <-- bundle VERSION.txt at app root inside the exe
"--hidden-import=paho.mqtt", "--hidden-import=paho.mqtt",
"--hidden-import=google.protobuf", "--hidden-import=google.protobuf",
"--hidden-import=PyQt6", "--hidden-import=PyQt6",
"--hidden-import=PyQt6.Qt6", "--hidden-import=PyQt6.Qt6",
"--hidden-import=PyQt6.sip", "--hidden-import=PyQt6.sip",
"--hidden-import=setuptools", "--hidden-import=setuptools",
"main.py" "main.py",
] ]
try: try:
# 3. Execute the command
subprocess.run(command, check=True) subprocess.run(command, check=True)
print("\n✅ Executable generated successfully!") print("\n✅ Executable generated successfully!")
print(f"Look for '{executable_name}.exe' in the 'dist' folder.") print(f"📁 Find '{executable_name}.exe' in the 'dist' folder.")
except subprocess.CalledProcessError as e: except subprocess.CalledProcessError as e:
print("\n❌ An error occurred during executable generation.") print("\n❌ An error occurred during executable generation.")
print(f"Command failed with return code: {e.returncode}") print(f"Command failed with return code: {e.returncode}")

View File

@ -6,7 +6,7 @@ import time
import uuid import uuid
from functools import partial from functools import partial
from PyQt6.QtCore import pyqtSignal, QThread, Qt, QPropertyAnimation, QEasingCurve, QSettings, pyqtSlot from PyQt6.QtCore import pyqtSignal, QThread, Qt, QPropertyAnimation, QEasingCurve, QSettings, pyqtSlot, QMetaObject
from PyQt6.QtGui import QIcon, QFont, QPixmap, QCloseEvent from PyQt6.QtGui import QIcon, QFont, QPixmap, QCloseEvent
from PyQt6.QtWidgets import ( from PyQt6.QtWidgets import (
QMainWindow, QWidget, QVBoxLayout, QHBoxLayout, QTabWidget, QMainWindow, QWidget, QVBoxLayout, QHBoxLayout, QTabWidget,
@ -17,6 +17,7 @@ from PyQt6.QtWidgets import (
from PyQt6.QtSvgWidgets import QSvgWidget from PyQt6.QtSvgWidgets import QSvgWidget
from google.protobuf.json_format import MessageToDict from google.protobuf.json_format import MessageToDict
from math import floor from math import floor
from core.versioning import get_version
# Make sure your proto import is correct for your project structure # Make sure your proto import is correct for your project structure
@ -43,7 +44,9 @@ class MainWindow(QMainWindow):
super().__init__() super().__init__()
# self.setWindowIcon(QIcon("logo/v_logo.png")) # self.setWindowIcon(QIcon("logo/v_logo.png"))
self.scale_factor = scale_factor self.scale_factor = scale_factor
self.setWindowTitle("Battery Swap Station Dashboard v4.2")
self.app_version = get_version()
self.setWindowTitle(f"Swap Station Dashboard {self.app_version}")
self.setWindowIcon(QIcon(resource_path("assets/icon.ico"))) self.setWindowIcon(QIcon(resource_path("assets/icon.ico")))
self.settings = QSettings("VECMOCON", "BatterySwapDashboard") self.settings = QSettings("VECMOCON", "BatterySwapDashboard")
@ -355,7 +358,7 @@ class MainWindow(QMainWindow):
title_box.addWidget(subtitle) title_box.addWidget(subtitle)
header.addLayout(title_box, 1) header.addLayout(title_box, 1)
badge = QLabel("Version 4.2") badge = QLabel(f"Version {self.app_version}")
badge.setObjectName("badge") badge.setObjectName("badge")
header.addWidget(badge, 0, Qt.AlignmentFlag.AlignRight | Qt.AlignmentFlag.AlignVCenter) header.addWidget(badge, 0, Qt.AlignmentFlag.AlignRight | Qt.AlignmentFlag.AlignVCenter)
@ -1366,9 +1369,6 @@ class MainWindow(QMainWindow):
self.mqtt_thread.quit() self.mqtt_thread.quit()
self.mqtt_thread.wait(1000) # Use a timeout to prevent freezing self.mqtt_thread.wait(1000) # Use a timeout to prevent freezing
if self.save_logs_checkbox.isChecked():
self.start_csv_logger()
self.reset_dashboard_ui() self.reset_dashboard_ui()
broker = self.broker_input.text() broker = self.broker_input.text()
user = self.username_input.text() user = self.username_input.text()
@ -1407,7 +1407,9 @@ class MainWindow(QMainWindow):
# Connect signals # Connect signals
self.mqtt_client.connection_status_changed.connect(self.on_connection_status_changed) self.mqtt_client.connection_status_changed.connect(self.on_connection_status_changed)
self.mqtt_client.message_received.connect(self.on_message_received) self.mqtt_client.message_received.connect(self.on_message_received)
self.mqtt_client.connected.connect(self.start_csv_logger)
if self.save_logs_checkbox.isChecked():
self.mqtt_client.connected.connect(self.start_csv_logger)
self.mqtt_client.disconnected.connect(self.stop_csv_logger) self.mqtt_client.disconnected.connect(self.stop_csv_logger)
self.mqtt_thread.started.connect(self.mqtt_client.connect_to_broker) self.mqtt_thread.started.connect(self.mqtt_client.connect_to_broker)
@ -1454,7 +1456,9 @@ class MainWindow(QMainWindow):
def stop_csv_logger(self): def stop_csv_logger(self):
if self.logger_thread and self.logger_thread.isRunning(): if self.logger_thread and self.logger_thread.isRunning():
self.csv_logger.stop_logging() # Run stop_logging() in the logger thread
QMetaObject.invokeMethod(self.csv_logger, "stop_logging",
Qt.ConnectionType.QueuedConnection)
self.logger_thread.quit() self.logger_thread.quit()
self.logger_thread.wait() self.logger_thread.wait()
self.csv_logger = None self.csv_logger = None
@ -1512,60 +1516,6 @@ class MainWindow(QMainWindow):
self.status_bar_device_id_label.setText("Device ID: --- |") # Clear the device ID label on disconnect self.status_bar_device_id_label.setText("Device ID: --- |") # Clear the device ID label on disconnect
self.status_bar_timestamp_label.setText("Last Update: ---") # Clear the timestamp label on disconnect self.status_bar_timestamp_label.setText("Last Update: ---") # Clear the timestamp label on disconnect
# def update_main_dashboard(self, data):
# try:
# ts = datetime.datetime.fromtimestamp(data.get('ts', datetime.datetime.now().timestamp())).strftime('%Y-%m-%d %H:%M:%S')
# device_id = data.get("deviceId", "---- ---- ---- ---- ----")
# self.device_id_display_field.setText(device_id)
# self.last_recv_ts_field.setText(ts)
# slot_payloads = data.get("slotLevelPayload", [])
# for i, slot_data in enumerate(slot_payloads):
# if i < len(self.chamber_widgets):
# self.chamber_widgets[i].update_data(slot_data)
# # print("Updating chamber", i+1, slot_data)
# if (i+1) in self.swap_buttons:
# is_present = slot_data.get("batteryPresent") == 1
# self.swap_buttons[i+1].setStyleSheet("background-color: #2ecc71;" if is_present else "")
# sdc_value = data.get("stationDiagnosticCode", 0)
# self.sdc_field.setText(str(sdc_value))
# self.update_diagnostic_alarms(sdc_value)
# backup_status = data.get("backupSupplyStatus", 0) # Default to 0 if not present
# if backup_status == 1:
# self.backup_supply_indicator.setText("Backup ON")
# self.backup_supply_indicator.setStyleSheet(
# """
# QLabel {
# background-color: transparent;
# border: 2px solid #28a745; /* Green */
# color: #28a745;
# border-radius: 5px;
# font-weight: bold;
# }
# """
# )
# # self.top_bar_frame.setStyleSheet("#topBarFrame { border: 1px solid #28a745; }")
# else:
# self.backup_supply_indicator.setText("Backup OFF")
# self.backup_supply_indicator.setStyleSheet(
# """
# QLabel {
# background-color: transparent;
# border: 2px solid #dc3545; /* Red */
# color: #dc3545;
# border-radius: 5px;
# font-weight: bold;
# }
# """
# )
# # self.top_bar_frame.setStyleSheet("#topBarFrame { border: 1px solid #dc3545; }")
# except Exception as e:
# print(f"Error updating dashboard: {e}")
def update_main_dashboard(self, data): def update_main_dashboard(self, data):
try: try:
ts = datetime.datetime.fromtimestamp(data.get('ts', datetime.datetime.now().timestamp())).strftime('%Y-%m-%d %H:%M:%S') ts = datetime.datetime.fromtimestamp(data.get('ts', datetime.datetime.now().timestamp())).strftime('%Y-%m-%d %H:%M:%S')
@ -1633,18 +1583,18 @@ class MainWindow(QMainWindow):
station_sdc = decoded_payload.stationDiagnosticCode station_sdc = decoded_payload.stationDiagnosticCode
# --- Main Information --- # --- Main Information ---
print("\n\033[1m" + "="*50 + " PERIODIC DATA " + "="*50 + "\033[0m") print("\n\033[1m" + "="*56 + " PERIODIC DATA " + "="*56 + "\033[0m")
print(f"\033[1m Timestamp:\033[0m {current_time} | \033[1mDevice ID:\033[0m {device_id}") print(f"\033[1m Timestamp:\033[0m {current_time} | \033[1mDevice ID:\033[0m {device_id}")
print(f"\033[1m Backup Supply:\033[0m {backup_supply} | \033[1mStation SDC:\033[0m {station_sdc}") print(f"\033[1m Backup Supply:\033[0m {backup_supply} | \033[1mStation SDC:\033[0m {station_sdc}")
print("-" * 120) print("-" * 127)
# --- Table Header --- # --- Table Header ---
header = "| {:^7} | {:^18} | {:^8} | {:^8} | {:^7} | {:^10} | {:^10} | {:^10} | {:^10} | {:^10} |" header = "| {:^7} | {:^18} | {:^9} | {:^9} | {:^6} | {:^9} | {:^9} | {:^10} | {:^10} | {:^9} |"
print(header.format("Chamber", "Battery ID", "Present", "Charging", "SOC", "Voltage", "Current", "Slot Temp", "Bat Temp", "Door")) print(header.format("Chamber", "Battery ID", "Present", "Charging", "SOC", "Voltage", "Current", "Slot Temp", "Bat Temp", "Door"))
print("-" * 120) print("-" * 127)
# --- Table Rows --- # --- Table Rows ---
row_format = "| {:^7} | {:<18} | {:^8} | {:^8} | {:>5}% | {:>8}V | {:>8}A | {:>8}°C | {:>8}°C | {:^10} |" row_format = "| {:^7} | {:<18} | {:^8} | {:^8} | {:>5}% | {:>8}V | {:>8}A | {:>8}°C | {:>8}°C | {:^9} |"
for i, chamber_data in enumerate(data_dict.get("slotLevelPayload", []), start=1): for i, chamber_data in enumerate(data_dict.get("slotLevelPayload", []), start=1):
is_present = chamber_data.get("batteryPresent") == 1 is_present = chamber_data.get("batteryPresent") == 1
is_charging = chamber_data.get("chargingStatus") == 1 is_charging = chamber_data.get("chargingStatus") == 1
@ -1658,20 +1608,38 @@ class MainWindow(QMainWindow):
battery_temp_raw = chamber_data.get('batteryMaxTemp', ' ') battery_temp_raw = chamber_data.get('batteryMaxTemp', ' ')
battery_temp_celsius = f"{battery_temp_raw / 10:.1f}" if isinstance(battery_temp_raw, int) else " " battery_temp_celsius = f"{battery_temp_raw / 10:.1f}" if isinstance(battery_temp_raw, int) else " "
# Voltage (mV -> V)
volt_raw = chamber_data.get('voltage', chamber_data.get('batVoltage', None))
if isinstance(volt_raw, int):
volt_str = f"{volt_raw/1000:.1f}"
elif isinstance(volt_raw, float):
volt_str = f"{volt_raw:.1f}"
else:
volt_str = "0.0"
# Current (mA -> A)
curr_raw = chamber_data.get('current', None)
if isinstance(curr_raw, int):
curr_str = f"{curr_raw/1000:.1f}"
elif isinstance(curr_raw, float):
curr_str = f"{curr_raw:.1f}"
else:
curr_str = "0.0"
print(row_format.format( print(row_format.format(
i, i,
chamber_data.get('batteryIdentification', ' '), chamber_data.get('batteryIdentification', ' '),
"" if is_present else "", "" if is_present else "",
"" if is_charging else "", "" if is_charging else "",
chamber_data.get('soc', ' '), chamber_data.get('soc', 0), # show 0 if missing
chamber_data.get('batVoltage', ' '), volt_str, # <-- fixed
chamber_data.get('current', ' '), curr_str, # <-- scaled
slot_temp_celsius, slot_temp_celsius,
battery_temp_celsius, battery_temp_celsius,
"Open" if is_door_open else "Closed" "Open" if is_door_open else "Closed"
)) ))
print("=" * 120 + "\n") print("=" * 127 + "\n")
except Exception as e: except Exception as e:
print(f"Error printing periodic log to terminal: {e}") print(f"Error printing periodic log to terminal: {e}")
@ -1690,13 +1658,12 @@ class MainWindow(QMainWindow):
if isinstance(w, (QLineEdit, QPushButton, QCheckBox, QComboBox)): if isinstance(w, (QLineEdit, QPushButton, QCheckBox, QComboBox)):
w.setEnabled(enabled) w.setEnabled(enabled)
@pyqtSlot() # This new slot handles the disconnected signal @pyqtSlot()
def handle_disconnection(self): def handle_disconnection(self):
print("Main window sees disconnection, stopping logger if active.") print("Main window sees disconnection, stopping logger if active.")
if self.csv_logger and self.csv_logger.timer.isActive(): if self.csv_logger and self.csv_logger.timer.isActive():
self.csv_logger.stop_logging() QMetaObject.invokeMethod(self.csv_logger, "stop_logging",
Qt.ConnectionType.QueuedConnection)
# You might also want to update UI elements here
self.connect_button.setText("Connect") self.connect_button.setText("Connect")
self.connection_status_label.setText("Disconnected") self.connection_status_label.setText("Disconnected")
@ -1706,6 +1673,9 @@ class MainWindow(QMainWindow):
""" """
print("--- Close event triggered. Shutting down gracefully... ---") print("--- Close event triggered. Shutting down gracefully... ---")
if self.csv_logger:
self.stop_csv_logger()
if self.mqtt_thread and self.mqtt_thread.isRunning(): if self.mqtt_thread and self.mqtt_thread.isRunning():
print(" > Stopping MQTT client...") print(" > Stopping MQTT client...")
# Tell the client to disconnect (which will stop its loop) # Tell the client to disconnect (which will stop its loop)