Compare commits

..

2 Commits

Author SHA1 Message Date
Kirubakaran 68f6de695f feat(versioning): centralize dashboard version management
- Added VERSION.txt at project root to serve as the single source of truth
- Added core/versioning.py with get_version() helper:
  • Reads APP_VERSION env var if present
  • Reads VERSION.txt (works in dev and frozen exe)
  • Falls back to "v0.0.0"
- Updated main_window.py:
  • Import get_version() and store self.app_version
  • Set window title dynamically with dashboard version
  • Replaced hardcoded QLabel("Version 4.2") with QLabel(f"Version {get_version()}")
- Updated build script (generate_executable.py):
  • Prompt for version at build time
  • Write VERSION.txt with entered version
  • Bundle VERSION.txt into PyInstaller exe (--add-data)
  • Use version in exe filename (SwapStationDashboard_vX.Y.exe)
- Result: version number now changes automatically everywhere (badge, title, About, logs)
  just by editing VERSION.txt or providing APP_VERSION at build/runtime.
2025-08-27 20:16:02 +05:30
Kirubakaran 37500e3877 fix: Bugs & UI 2025-08-27 17:36:12 +05:30
7 changed files with 285 additions and 225 deletions

1
VERSION.txt Normal file
View File

@ -0,0 +1 @@
v4.3

View File

@ -149,6 +149,7 @@ class CsvLogger(QObject):
print(f"❌ An unexpected error occurred in the logger thread: {e}")
continue
@pyqtSlot()
def stop_logging(self):
self.timer.stop()
self._process_queue()

View File

@ -1,160 +1,26 @@
# # In core/mqtt_client.py
# import socket
# from PyQt6.QtCore import QObject, pyqtSignal, pyqtSlot
# import paho.mqtt.client as mqtt
# import uuid
# class MqttClient(QObject):
# # --- MODIFIED SIGNAL: Now sends a bool and a string ---
# connection_status_changed = pyqtSignal(bool, str)
# message_received = pyqtSignal(str, bytes)
# connection_error = pyqtSignal(str)
# stop_logging_signal = pyqtSignal()
# connected = pyqtSignal()
# disconnected = pyqtSignal()
# def __init__(self, broker, port, user, password, client_id):
# super().__init__()
# self.broker = broker
# self.port = port
# self.client = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2, client_id)
# if user and password:
# self.client.username_pw_set(user, password)
# self.client.on_connect = self.on_connect
# self.client.on_disconnect = self.on_disconnect
# self.client.on_message = self.on_message
# # self.client.on_subscribe = self.on_subscribe
# def on_connect(self, client, userdata, flags, rc, properties):
# if rc == 0:
# print("Connection to MQTT Broker successful!")
# self.connection_status_changed.emit(True, "✅ Connected")
# self.connected.emit()
# # The app is connected, so we should NOT emit disconnected signals here.
# else:
# print(f"Failed to connect, return code {rc}\n")
# self.connection_status_changed.emit(False, f"❌ Connection failed (Code: {rc})")
# self.disconnected.emit() # Connection failed, so we are disconnected
# def on_disconnect(self, client, userdata, flags, rc, properties):
# print("Disconnected from MQTT Broker.")
# # Correctly emit signals for a disconnection
# # Change the icon in the line below from 🔌 to 🔴 ❌ 🚫 💔
# self.connection_status_changed.emit(False, "💔 Disconnected")
# self.disconnected.emit()
# self.stop_logging_signal.emit() # It's appropriate to stop logging here
# def on_message(self, client, userdata, msg):
# # print(f"Received {len(msg.payload)} bytes of binary data from topic `{msg.topic}`")
# self.message_received.emit(msg.topic, msg.payload)
# # --- MODIFIED connect_to_broker METHOD ---
# def connect_to_broker(self):
# print(f"Attempting to connect to {self.broker}:{self.port}...")
# try:
# self.client.connect(self.broker, self.port, 120)
# print(f"Attempting to connect to {self.broker}:{self.port}...")
# self.client.loop_start()
# except socket.gaierror:
# msg = "Host not found. Check internet."
# print(f"❌ Connection Error: {msg}")
# self.connection_status_changed.emit(False, f"❌ {msg}")
# except (socket.error, TimeoutError):
# msg = "Connection failed. Server offline?"
# print(f"❌ Connection Error: {msg}")
# self.connection_status_changed.emit(False, f"❌ {msg}")
# except Exception as e:
# msg = f"An unexpected error occurred: {e}"
# print(f"❌ {msg}")
# self.connection_status_changed.emit(False, f"❌ Error")
# def run(self):
# """
# Connects to the broker and starts the network loop.
# Handles all common connection errors gracefully.
# """
# print(f"Attempting to connect to {self.broker}:{self.port}...")
# try:
# # 1. Attempt to connect
# # print(f"Attempting to connect to {self.broker}:{self.port}...")
# self.client.connect(self.broker, self.port, 120)
# # 2. Run the blocking network loop
# # This will run until self.client.disconnect() is called
# self.client.loop_forever()
# except socket.gaierror:
# msg = "Host not found. Check the broker address or your internet connection."
# print(f"❌ {msg}")
# self.connection_error.emit(msg) # Report error to the main window
# except (socket.error, ConnectionRefusedError):
# msg = "Connection refused. Is the server offline or the port incorrect?"
# print(f"❌ {msg}")
# self.connection_error.emit(msg)
# except TimeoutError:
# msg = "Connection timed out. The server is not responding."
# print(f"❌ {msg}")
# self.connection_error.emit(msg)
# except Exception as e:
# # Catch any other unexpected errors during connection or loop
# msg = f"An unexpected error occurred: {e}"
# print(f"❌ {msg}")
# self.connection_error.emit(msg)
# # def on_subscribe(self, client, userdata, mid, reason_code_list, properties):
# # """Callback function for when the broker responds to a subscription request."""
# # if reason_code_list[0].is_failure:
# # print(f"❌ Broker rejected subscription: {reason_code_list[0]}")
# # else:
# # print(f"✅ Broker accepted subscription with QoS: {reason_code_list[0].value}")
# # --- (The rest of the file remains the same) ---
# @pyqtSlot()
# def disconnect_from_broker(self):
# """Stops the MQTT client's network loop."""
# if self.client:
# self.client.loop_stop()
# self.client.disconnect()
# print("Stopping MQTT network loop.")
# def subscribe_to_topic(self, topic): # Add qos parameter
# print(f"Subscribing to topic: {topic}")
# self.client.subscribe(topic)
# def publish_message(self, topic, payload):
# self.client.publish(topic, payload)
# def cleanup(self):
# print("Stopping MQTT network loop.")
# self.client.loop_stop()
# In core/mqtt_client.py
import socket
from PyQt6.QtCore import QObject, pyqtSignal, pyqtSlot
import paho.mqtt.client as mqtt
import uuid
class MqttClient(QObject):
# Sends connection state (bool) and a message (str)
# --- MODIFIED SIGNAL: Now sends a bool and a string ---
connection_status_changed = pyqtSignal(bool, str)
# Generic signals for success or failure/disconnection
message_received = pyqtSignal(str, bytes)
connection_error = pyqtSignal(str)
stop_logging_signal = pyqtSignal()
connected = pyqtSignal()
disconnected = pyqtSignal()
stop_logging_signal = pyqtSignal()
# Sends topic (str) and payload (bytes) when a message is received
message_received = pyqtSignal(str, bytes)
def __init__(self, broker, port, user, password, client_id):
super().__init__()
self.broker = broker
self.port = port
self._is_connected = False
self._reported_bad_creds = False
self._suppress_next_disconnect_notice = False
self.client = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2, client_id)
if user and password:
self.client.username_pw_set(user, password)
@ -162,55 +28,118 @@ class MqttClient(QObject):
self.client.on_connect = self.on_connect
self.client.on_disconnect = self.on_disconnect
self.client.on_message = self.on_message
# self.client.on_subscribe = self.on_subscribe
def on_connect(self, client, userdata, flags, rc, properties):
def on_connect(self, client, userdata, flags, rc, properties=None, *args, **kwargs):
if rc == 0:
# success
self._is_connected = True
self._ever_connected = True
self._suppress_next_disconnect_notice = False
print("Connection to MQTT Broker successful!")
self.connection_status_changed.emit(True, "✅ Connected")
self.connected.emit()
else:
print(f"Failed to connect, return code {rc}\n")
self.connection_status_changed.emit(False, f"❌ Connection failed (Code: {rc})")
self.disconnected.emit()
# auth or other failure — log ONCE, and suppress the auto "Disconnected" message that will follow
msg = "Bad user name or password" if rc == 5 else f"Connection failed (Code: {rc})"
print(f"Failed to connect: {msg}")
self.connection_status_changed.emit(False, msg)
# make sure we do NOT show a "Disconnected" notice right after this
self._suppress_next_disconnect_notice = True
# stop any retry loop immediately
try:
client.disconnect()
client.loop_stop()
except Exception:
pass
def on_disconnect(self, client, userdata, rc, properties=None, *args, **kwargs):
self._is_connected = False
if self._suppress_next_disconnect_notice or not self._ever_connected:
self._suppress_next_disconnect_notice = False
return
def on_disconnect(self, client, userdata, flags, rc, properties):
print("Disconnected from MQTT Broker.")
self.connection_status_changed.emit(False, "💔 Disconnected")
self.disconnected.emit()
self.stop_logging_signal.emit()
self.connection_status_changed.emit(False, "Disconnected")
def on_message(self, client, userdata, msg):
# print(f"Received {len(msg.payload)} bytes of binary data from topic `{msg.topic}`")
self.message_received.emit(msg.topic, msg.payload)
@pyqtSlot()
# --- MODIFIED connect_to_broker METHOD ---
def connect_to_broker(self):
print(f"Attempting to connect to {self.broker}:{self.port}...")
try:
self.client.connect(self.broker, self.port, 60)
self.client.connect(self.broker, self.port, 120)
self.client.loop_start()
except socket.gaierror:
msg = "Host not found. Check broker address or your internet connection."
msg = "Host not found. Check internet."
print(f"❌ Connection Error: {msg}")
self.connection_status_changed.emit(False, msg)
except (socket.error, ConnectionRefusedError, TimeoutError):
msg = "Connection failed. Is the server offline or the port incorrect?"
self.connection_status_changed.emit(False, f"{msg}")
except (socket.error, TimeoutError):
msg = "Connection failed. Server offline?"
print(f"❌ Connection Error: {msg}")
self.connection_status_changed.emit(False, msg)
self.connection_status_changed.emit(False, f"{msg}")
except Exception as e:
msg = f"An unexpected error occurred: {e}"
print(f"{msg}")
self.connection_status_changed.emit(False, f"Error: {e}")
self.connection_status_changed.emit(False, f"❌ Error")
def run(self):
"""
Connects to the broker and starts the network loop.
Handles all common connection errors gracefully.
"""
print(f"Attempting to connect to {self.broker}:{self.port}...")
try:
# 1. Attempt to connect
# print(f"Attempting to connect to {self.broker}:{self.port}...")
self.client.connect(self.broker, self.port, 120)
# 2. Run the blocking network loop
# This will run until self.client.disconnect() is called
self.client.loop_forever()
except socket.gaierror:
msg = "Host not found. Check the broker address or your internet connection."
print(f"{msg}")
self.connection_error.emit(msg) # Report error to the main window
except (socket.error, ConnectionRefusedError):
msg = "Connection refused. Is the server offline or the port incorrect?"
print(f"{msg}")
self.connection_error.emit(msg)
except TimeoutError:
msg = "Connection timed out. The server is not responding."
print(f"{msg}")
self.connection_error.emit(msg)
except Exception as e:
# Catch any other unexpected errors during connection or loop
msg = f"An unexpected error occurred: {e}"
print(f"{msg}")
self.connection_error.emit(msg)
@pyqtSlot()
def disconnect_from_broker(self):
"""Stops the MQTT client's network loop."""
if self.client:
self.client.loop_stop()
self.client.disconnect()
print("Stopping MQTT network loop.")
def subscribe_to_topic(self, topic):
def subscribe_to_topic(self, topic): # Add qos parameter
print(f"Subscribing to topic: {topic}")
self.client.subscribe(topic)
def publish_message(self, topic, payload):
self.client.publish(topic, payload)
def cleanup(self):
print("Stopping MQTT network loop.")
self.client.loop_stop()

26
core/versioning.py Normal file
View File

@ -0,0 +1,26 @@
# core/versioning.py
from pathlib import Path
import os, sys
DEFAULT_VERSION = "v0.0.0"
def _bundle_root() -> Path:
# When frozen by PyInstaller, files are unpacked in sys._MEIPASS
if getattr(sys, "frozen", False) and hasattr(sys, "_MEIPASS"):
return Path(sys._MEIPASS)
# Dev/run-from-source: project root = two levels up from this file (adjust if needed)
return Path(__file__).resolve().parents[1]
def get_version() -> str:
# 1) ENV override (useful for CI)
env = os.getenv("APP_VERSION")
if env:
return env.strip()
# 2) VERSION.txt (works in dev and frozen)
root = _bundle_root()
ver_file = root / "VERSION.txt"
if ver_file.exists():
return ver_file.read_text(encoding="utf-8").strip()
return DEFAULT_VERSION

View File

@ -1,29 +1,98 @@
# import subprocess
# import sys
# def generate_executable():
# """Prompts for a version and generates a single-file executable."""
# # 1. Ask the user for the version number
# version = ""
# while not version:
# version = input("Enter the version for the executable (e.g., 4.1): ")
# if not version:
# print("Version cannot be empty. Please try again.")
# executable_name = f"SwapStationDashboard_v{version}"
# print(f"Generating executable with name: {executable_name}")
# # Check if pyinstaller is installed
# try:
# subprocess.run([sys.executable, "-m", "PyInstaller", "--version"], check=True, capture_output=True)
# except (subprocess.CalledProcessError, FileNotFoundError):
# print("PyInstaller is not found. Please install it using: pip install pyinstaller")
# return
# print("Starting executable generation...")
# # 2. Use the version to create the command
# command = [
# sys.executable,
# "-m", "PyInstaller",
# f"--name={executable_name}",
# "--onefile",
# "--icon=assets/icon.ico",
# "--add-data=logo;logo",
# "--add-data=assets;assets",
# "--add-data=proto;proto",
# "--hidden-import=paho.mqtt",
# "--hidden-import=google.protobuf",
# "--hidden-import=PyQt6",
# "--hidden-import=PyQt6.Qt6",
# "--hidden-import=PyQt6.sip",
# "--hidden-import=setuptools",
# "main.py"
# ]
# try:
# # 3. Execute the command
# subprocess.run(command, check=True)
# print("\n✅ Executable generated successfully!")
# print(f"Look for '{executable_name}.exe' in the 'dist' folder.")
# except subprocess.CalledProcessError as e:
# print("\n❌ An error occurred during executable generation.")
# print(f"Command failed with return code: {e.returncode}")
# except FileNotFoundError:
# print("\n❌ Error: The 'main.py' file was not found. Please run this script from the project's root directory.")
# if __name__ == "__main__":
# generate_executable()
import subprocess
import sys
from pathlib import Path
def generate_executable():
"""Prompts for a version and generates a single-file executable."""
# 1. Ask the user for the version number
"""Prompts for a version, writes VERSION.txt, and builds a single-file exe that shows the same version inside the app."""
# --- Ask version ---
version = ""
while not version:
version = input("Enter the version for the executable (e.g., 4.1): ")
version = input("Enter the version for the executable (e.g., 4.1): ").strip()
if not version:
print("Version cannot be empty. Please try again.")
executable_name = f"SwapStationDashboard_v{version}"
print(f"Generating executable with name: {executable_name}")
# Check if pyinstaller is installed
# Normalize how you want to display it in the app:
display_version = f"v{version}"
# --- Paths ---
project_root = Path(__file__).resolve().parent
version_file = project_root / "VERSION.txt"
executable_name = f"SwapStationDashboard_{display_version}"
# --- Persist version for the app ---
version_file.write_text(display_version, encoding="utf-8")
print(f"📦 Wrote {version_file} with '{display_version}'")
# --- Check PyInstaller ---
try:
subprocess.run([sys.executable, "-m", "PyInstaller", "--version"], check=True, capture_output=True)
except (subprocess.CalledProcessError, FileNotFoundError):
print("PyInstaller is not found. Please install it using: pip install pyinstaller")
return
print("Starting executable generation...")
# 2. Use the version to create the command
print(f"🚀 Building: {executable_name}")
# NOTE: On Windows, --add-data uses 'src;dst' (semicolon). (You already follow this.)
command = [
sys.executable,
"-m", "PyInstaller",
@ -33,20 +102,20 @@ def generate_executable():
"--add-data=logo;logo",
"--add-data=assets;assets",
"--add-data=proto;proto",
"--add-data=VERSION.txt;.", # <-- bundle VERSION.txt at app root inside the exe
"--hidden-import=paho.mqtt",
"--hidden-import=google.protobuf",
"--hidden-import=PyQt6",
"--hidden-import=PyQt6.Qt6",
"--hidden-import=PyQt6.sip",
"--hidden-import=setuptools",
"main.py"
"main.py",
]
try:
# 3. Execute the command
subprocess.run(command, check=True)
print("\n✅ Executable generated successfully!")
print(f"Look for '{executable_name}.exe' in the 'dist' folder.")
print(f"📁 Find '{executable_name}.exe' in the 'dist' folder.")
except subprocess.CalledProcessError as e:
print("\n❌ An error occurred during executable generation.")
print(f"Command failed with return code: {e.returncode}")
@ -54,4 +123,4 @@ def generate_executable():
print("\n❌ Error: The 'main.py' file was not found. Please run this script from the project's root directory.")
if __name__ == "__main__":
generate_executable()
generate_executable()

View File

@ -6,7 +6,7 @@ import time
import uuid
from functools import partial
from PyQt6.QtCore import pyqtSignal, QThread, Qt, QPropertyAnimation, QEasingCurve, QSettings, pyqtSlot
from PyQt6.QtCore import pyqtSignal, QThread, Qt, QPropertyAnimation, QEasingCurve, QSettings, pyqtSlot, QMetaObject
from PyQt6.QtGui import QIcon, QFont, QPixmap, QCloseEvent
from PyQt6.QtWidgets import (
QMainWindow, QWidget, QVBoxLayout, QHBoxLayout, QTabWidget,
@ -17,6 +17,7 @@ from PyQt6.QtWidgets import (
from PyQt6.QtSvgWidgets import QSvgWidget
from google.protobuf.json_format import MessageToDict
from math import floor
from core.versioning import get_version
# Make sure your proto import is correct for your project structure
@ -43,7 +44,9 @@ class MainWindow(QMainWindow):
super().__init__()
# self.setWindowIcon(QIcon("logo/v_logo.png"))
self.scale_factor = scale_factor
self.setWindowTitle("Battery Swap Station Dashboard v4.2")
self.app_version = get_version()
self.setWindowTitle(f"Swap Station Dashboard {self.app_version}")
self.setWindowIcon(QIcon(resource_path("assets/icon.ico")))
self.settings = QSettings("VECMOCON", "BatterySwapDashboard")
@ -355,7 +358,7 @@ class MainWindow(QMainWindow):
title_box.addWidget(subtitle)
header.addLayout(title_box, 1)
badge = QLabel("Version 4.2")
badge = QLabel(f"Version {self.app_version}")
badge.setObjectName("badge")
header.addWidget(badge, 0, Qt.AlignmentFlag.AlignRight | Qt.AlignmentFlag.AlignVCenter)
@ -1358,15 +1361,14 @@ class MainWindow(QMainWindow):
self.update_swap_buttons_state()
def connect_to_mqtt(self):
if self.mqtt_thread and self.mqtt_thread.isRunning():
print("Cleaning up previous MQTT thread...")
if self.mqtt_client:
self.mqtt_client.disconnect_from_broker()
# self.mqtt_client.cleanup()
self.mqtt_thread.quit()
self.mqtt_thread.wait(1000)
if self.save_logs_checkbox.isChecked():
self.start_csv_logger()
self.mqtt_thread.wait(1000) # Use a timeout to prevent freezing
self.reset_dashboard_ui()
broker = self.broker_input.text()
user = self.username_input.text()
@ -1401,9 +1403,15 @@ class MainWindow(QMainWindow):
self.mqtt_thread = QThread()
self.mqtt_client = MqttClient(broker, port, user, password, client_id)
self.mqtt_client.moveToThread(self.mqtt_thread)
self.mqtt_client.stop_logging_signal.connect(self.csv_logger.stop_logging)
# Connect signals
self.mqtt_client.connection_status_changed.connect(self.on_connection_status_changed)
self.mqtt_client.message_received.connect(self.on_message_received)
if self.save_logs_checkbox.isChecked():
self.mqtt_client.connected.connect(self.start_csv_logger)
self.mqtt_client.disconnected.connect(self.stop_csv_logger)
self.mqtt_thread.started.connect(self.mqtt_client.connect_to_broker)
self.mqtt_thread.start()
@ -1448,7 +1456,9 @@ class MainWindow(QMainWindow):
def stop_csv_logger(self):
if self.logger_thread and self.logger_thread.isRunning():
self.csv_logger.stop_logging()
# Run stop_logging() in the logger thread
QMetaObject.invokeMethod(self.csv_logger, "stop_logging",
Qt.ConnectionType.QueuedConnection)
self.logger_thread.quit()
self.logger_thread.wait()
self.csv_logger = None
@ -1501,32 +1511,38 @@ class MainWindow(QMainWindow):
else:
QMessageBox.critical(self, "Connection Failed", message)
QMessageBox.critical(self, "Connection Status", message)
self.status_bar_device_id_label.setText("Device ID: --- |") # Clear the device ID label on disconnect
self.status_bar_timestamp_label.setText("Last Update: ---") # Clear the timestamp label on disconnect
def update_main_dashboard(self, data):
try:
ts = datetime.datetime.fromtimestamp(data.get('ts', datetime.datetime.now().timestamp())).strftime('%Y-%m-%d %H:%M:%S')
device_id = data.get("deviceId", "---- ---- ---- ---- ----")
self.device_id_display_field.setText(device_id)
self.last_recv_ts_field.setText(ts)
self.device_id_display_field.setText(data.get("deviceId", "---- ---- ---- ---- ----"))
slot_payloads = data.get("slotLevelPayload", [])
for i, slot_data in enumerate(slot_payloads):
if i < len(self.chamber_widgets):
self.chamber_widgets[i].update_data(slot_data)
# print("Updating chamber", i+1, slot_data)
if (i+1) in self.swap_buttons:
is_present = slot_data.get("batteryPresent") == 1
self.swap_buttons[i+1].setStyleSheet("background-color: #2ecc71;" if is_present else "")
# --- THIS IS THE CORRECTED LOGIC ---
slot_number = i + 1
if slot_number in self.swap_buttons:
# First, check if the button is already part of the user's selection
if slot_number not in self.swap_sequence:
# If NOT selected, update its color based on battery presence
is_present = slot_data.get("batteryPresent") == 1
button = self.swap_buttons[slot_number]
# Set to green if present, otherwise clear the style
button.setStyleSheet("background-color: #2ecc71;" if is_present else "")
sdc_value = data.get("stationDiagnosticCode", 0)
self.sdc_field.setText(str(sdc_value))
self.update_diagnostic_alarms(sdc_value)
backup_status = data.get("backupSupplyStatus", 0) # Default to 0 if not present
backup_status = data.get("backupSupplyStatus", 0)
if backup_status == 1:
self.backup_supply_indicator.setText("Backup ON")
self.backup_supply_indicator.setStyleSheet(
@ -1540,7 +1556,6 @@ class MainWindow(QMainWindow):
}
"""
)
# self.top_bar_frame.setStyleSheet("#topBarFrame { border: 1px solid #28a745; }")
else:
self.backup_supply_indicator.setText("Backup OFF")
self.backup_supply_indicator.setStyleSheet(
@ -1554,7 +1569,6 @@ class MainWindow(QMainWindow):
}
"""
)
# self.top_bar_frame.setStyleSheet("#topBarFrame { border: 1px solid #dc3545; }")
except Exception as e:
print(f"Error updating dashboard: {e}")
@ -1564,50 +1578,68 @@ class MainWindow(QMainWindow):
"""Formats and prints the periodic data to the terminal as a clean table."""
try:
current_time = datetime.datetime.fromtimestamp(decoded_payload.ts).strftime('%Y-%m-%d %H:%M:%S')
device_id = data_dict.get("deviceId", "N/A")
device_id = data_dict.get("deviceId", " ")
backup_supply = "ON" if decoded_payload.backupSupplyStatus == 1 else "OFF"
station_sdc = decoded_payload.stationDiagnosticCode
# --- Main Information ---
print("\n\033[1m" + "="*50 + " PERIODIC DATA " + "="*50 + "\033[0m")
print("\n\033[1m" + "="*56 + " PERIODIC DATA " + "="*56 + "\033[0m")
print(f"\033[1m Timestamp:\033[0m {current_time} | \033[1mDevice ID:\033[0m {device_id}")
print(f"\033[1m Backup Supply:\033[0m {backup_supply} | \033[1mStation SDC:\033[0m {station_sdc}")
print("-" * 120)
print("-" * 127)
# --- Table Header ---
header = "| {:^7} | {:^18} | {:^8} | {:^8} | {:^7} | {:^10} | {:^10} | {:^10} | {:^10} | {:^10} |"
header = "| {:^7} | {:^18} | {:^9} | {:^9} | {:^6} | {:^9} | {:^9} | {:^10} | {:^10} | {:^9} |"
print(header.format("Chamber", "Battery ID", "Present", "Charging", "SOC", "Voltage", "Current", "Slot Temp", "Bat Temp", "Door"))
print("-" * 120)
print("-" * 127)
# --- Table Rows ---
row_format = "| {:^7} | {:<18} | {:^8} | {:^8} | {:>5}% | {:>8}V | {:>8}A | {:>8}°C | {:>8}°C | {:^10} |"
row_format = "| {:^7} | {:<18} | {:^8} | {:^8} | {:>5}% | {:>8}V | {:>8}A | {:>8}°C | {:>8}°C | {:^9} |"
for i, chamber_data in enumerate(data_dict.get("slotLevelPayload", []), start=1):
is_present = chamber_data.get("batteryPresent") == 1
is_charging = chamber_data.get("chargingStatus") == 1
is_door_open = chamber_data.get("doorStatus") == 1
# Get and format the Slot Temperature
slot_temp_raw = chamber_data.get('slotTemperature', 'N/A')
slot_temp_celsius = f"{slot_temp_raw / 10:.1f}" if isinstance(slot_temp_raw, int) else "N/A"
slot_temp_raw = chamber_data.get('slotTemperature', ' ')
slot_temp_celsius = f"{slot_temp_raw / 10:.1f}" if isinstance(slot_temp_raw, int) else " "
# Get and format the Battery Temperature
battery_temp_raw = chamber_data.get('batteryMaxTemp', 'N/A')
battery_temp_celsius = f"{battery_temp_raw / 10:.1f}" if isinstance(battery_temp_raw, int) else "N/A"
battery_temp_raw = chamber_data.get('batteryMaxTemp', ' ')
battery_temp_celsius = f"{battery_temp_raw / 10:.1f}" if isinstance(battery_temp_raw, int) else " "
# Voltage (mV -> V)
volt_raw = chamber_data.get('voltage', chamber_data.get('batVoltage', None))
if isinstance(volt_raw, int):
volt_str = f"{volt_raw/1000:.1f}"
elif isinstance(volt_raw, float):
volt_str = f"{volt_raw:.1f}"
else:
volt_str = "0.0"
# Current (mA -> A)
curr_raw = chamber_data.get('current', None)
if isinstance(curr_raw, int):
curr_str = f"{curr_raw/1000:.1f}"
elif isinstance(curr_raw, float):
curr_str = f"{curr_raw:.1f}"
else:
curr_str = "0.0"
print(row_format.format(
i,
chamber_data.get('batteryIdentification', 'N/A'),
chamber_data.get('batteryIdentification', ' '),
"" if is_present else "",
"" if is_charging else "",
chamber_data.get('soc', 'N/A'),
chamber_data.get('batVoltage', 'N/A'),
chamber_data.get('current', 'N/A'),
chamber_data.get('soc', 0), # show 0 if missing
volt_str, # <-- fixed
curr_str, # <-- scaled
slot_temp_celsius,
battery_temp_celsius,
"Open" if is_door_open else "Closed"
))
print("=" * 120 + "\n")
print("=" * 127 + "\n")
except Exception as e:
print(f"Error printing periodic log to terminal: {e}")
@ -1626,13 +1658,12 @@ class MainWindow(QMainWindow):
if isinstance(w, (QLineEdit, QPushButton, QCheckBox, QComboBox)):
w.setEnabled(enabled)
@pyqtSlot() # This new slot handles the disconnected signal
@pyqtSlot()
def handle_disconnection(self):
print("Main window sees disconnection, stopping logger if active.")
if self.csv_logger and self.csv_logger.timer.isActive():
self.csv_logger.stop_logging()
# You might also want to update UI elements here
QMetaObject.invokeMethod(self.csv_logger, "stop_logging",
Qt.ConnectionType.QueuedConnection)
self.connect_button.setText("Connect")
self.connection_status_label.setText("Disconnected")
@ -1641,6 +1672,9 @@ class MainWindow(QMainWindow):
Handles the window's close event to ensure a clean shutdown.
"""
print("--- Close event triggered. Shutting down gracefully... ---")
if self.csv_logger:
self.stop_csv_logger()
if self.mqtt_thread and self.mqtt_thread.isRunning():
print(" > Stopping MQTT client...")

View File

@ -111,10 +111,10 @@ def get_light_theme_styles(scale=1.0):
font-weight: bold;
border-radius: {int(4*scale)}px;
}}
#ChamberOpenDoorButton {{ background-color: #607d8b; }}
#ChamberChgOnButton {{ background-color: #52be80; }}
#ChamberChgOffButton {{ background-color: #cd6155; }}
#ChamberOpenDoorButton:hover {{ background-color: #485c64; }}
#ChamberOpenDoorButton {{ background-color: #d4d4d4; }}
#ChamberChgOnButton {{ background-color: #d4d4d4; }}
#ChamberChgOffButton {{ background-color: #d4d4d4; }}
#ChamberOpenDoorButton:hover {{ background-color: #1aa89c; }}
#ChamberChgOnButton:hover {{ background-color: #04d45d; }}
#ChamberChgOffButton:hover {{ background-color: #d42318; }}
@ -334,10 +334,10 @@ def get_dark_theme_styles(scale=1.0):
font-weight: bold;
border-radius: {int(4*scale)}px;
}}
#ChamberOpenDoorButton {{ background-color: #607d8b; }}
#ChamberChgOnButton {{ background-color: #52be80; }}
#ChamberChgOffButton {{ background-color: #cd6155; }}
#ChamberOpenDoorButton:hover {{ background-color: #485c64; }}
#ChamberOpenDoorButton {{ background-color: #5c5c5c; }}
#ChamberChgOnButton {{ background-color: #5c5c5c; }}
#ChamberChgOffButton {{ background-color: #5c5c5c; }}
#ChamberOpenDoorButton:hover {{ background-color: #1aa89c; }}
#ChamberChgOnButton:hover {{ background-color: #04d45d; }}
#ChamberChgOffButton:hover {{ background-color: #d42318; }}