Compare commits

..

No commits in common. "68f6de695fbd1c33b1c74ecf81b7013a2fb9f8fc" and "72846262486693037d99546b22928ccc232219bb" have entirely different histories.

7 changed files with 225 additions and 285 deletions

View File

@ -1 +0,0 @@
v4.3

View File

@ -149,7 +149,6 @@ class CsvLogger(QObject):
print(f"❌ An unexpected error occurred in the logger thread: {e}")
continue
@pyqtSlot()
def stop_logging(self):
self.timer.stop()
self._process_queue()

View File

@ -1,26 +1,160 @@
# # In core/mqtt_client.py
# import socket
# from PyQt6.QtCore import QObject, pyqtSignal, pyqtSlot
# import paho.mqtt.client as mqtt
# import uuid
# class MqttClient(QObject):
# # --- MODIFIED SIGNAL: Now sends a bool and a string ---
# connection_status_changed = pyqtSignal(bool, str)
# message_received = pyqtSignal(str, bytes)
# connection_error = pyqtSignal(str)
# stop_logging_signal = pyqtSignal()
# connected = pyqtSignal()
# disconnected = pyqtSignal()
# def __init__(self, broker, port, user, password, client_id):
# super().__init__()
# self.broker = broker
# self.port = port
# self.client = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2, client_id)
# if user and password:
# self.client.username_pw_set(user, password)
# self.client.on_connect = self.on_connect
# self.client.on_disconnect = self.on_disconnect
# self.client.on_message = self.on_message
# # self.client.on_subscribe = self.on_subscribe
# def on_connect(self, client, userdata, flags, rc, properties):
# if rc == 0:
# print("Connection to MQTT Broker successful!")
# self.connection_status_changed.emit(True, "✅ Connected")
# self.connected.emit()
# # The app is connected, so we should NOT emit disconnected signals here.
# else:
# print(f"Failed to connect, return code {rc}\n")
# self.connection_status_changed.emit(False, f"❌ Connection failed (Code: {rc})")
# self.disconnected.emit() # Connection failed, so we are disconnected
# def on_disconnect(self, client, userdata, flags, rc, properties):
# print("Disconnected from MQTT Broker.")
# # Correctly emit signals for a disconnection
# # Change the icon in the line below from 🔌 to 🔴 ❌ 🚫 💔
# self.connection_status_changed.emit(False, "💔 Disconnected")
# self.disconnected.emit()
# self.stop_logging_signal.emit() # It's appropriate to stop logging here
# def on_message(self, client, userdata, msg):
# # print(f"Received {len(msg.payload)} bytes of binary data from topic `{msg.topic}`")
# self.message_received.emit(msg.topic, msg.payload)
# # --- MODIFIED connect_to_broker METHOD ---
# def connect_to_broker(self):
# print(f"Attempting to connect to {self.broker}:{self.port}...")
# try:
# self.client.connect(self.broker, self.port, 120)
# print(f"Attempting to connect to {self.broker}:{self.port}...")
# self.client.loop_start()
# except socket.gaierror:
# msg = "Host not found. Check internet."
# print(f"❌ Connection Error: {msg}")
# self.connection_status_changed.emit(False, f"❌ {msg}")
# except (socket.error, TimeoutError):
# msg = "Connection failed. Server offline?"
# print(f"❌ Connection Error: {msg}")
# self.connection_status_changed.emit(False, f"❌ {msg}")
# except Exception as e:
# msg = f"An unexpected error occurred: {e}"
# print(f"❌ {msg}")
# self.connection_status_changed.emit(False, f"❌ Error")
# def run(self):
# """
# Connects to the broker and starts the network loop.
# Handles all common connection errors gracefully.
# """
# print(f"Attempting to connect to {self.broker}:{self.port}...")
# try:
# # 1. Attempt to connect
# # print(f"Attempting to connect to {self.broker}:{self.port}...")
# self.client.connect(self.broker, self.port, 120)
# # 2. Run the blocking network loop
# # This will run until self.client.disconnect() is called
# self.client.loop_forever()
# except socket.gaierror:
# msg = "Host not found. Check the broker address or your internet connection."
# print(f"❌ {msg}")
# self.connection_error.emit(msg) # Report error to the main window
# except (socket.error, ConnectionRefusedError):
# msg = "Connection refused. Is the server offline or the port incorrect?"
# print(f"❌ {msg}")
# self.connection_error.emit(msg)
# except TimeoutError:
# msg = "Connection timed out. The server is not responding."
# print(f"❌ {msg}")
# self.connection_error.emit(msg)
# except Exception as e:
# # Catch any other unexpected errors during connection or loop
# msg = f"An unexpected error occurred: {e}"
# print(f"❌ {msg}")
# self.connection_error.emit(msg)
# # def on_subscribe(self, client, userdata, mid, reason_code_list, properties):
# # """Callback function for when the broker responds to a subscription request."""
# # if reason_code_list[0].is_failure:
# # print(f"❌ Broker rejected subscription: {reason_code_list[0]}")
# # else:
# # print(f"✅ Broker accepted subscription with QoS: {reason_code_list[0].value}")
# # --- (The rest of the file remains the same) ---
# @pyqtSlot()
# def disconnect_from_broker(self):
# """Stops the MQTT client's network loop."""
# if self.client:
# self.client.loop_stop()
# self.client.disconnect()
# print("Stopping MQTT network loop.")
# def subscribe_to_topic(self, topic): # Add qos parameter
# print(f"Subscribing to topic: {topic}")
# self.client.subscribe(topic)
# def publish_message(self, topic, payload):
# self.client.publish(topic, payload)
# def cleanup(self):
# print("Stopping MQTT network loop.")
# self.client.loop_stop()
# In core/mqtt_client.py
import socket
from PyQt6.QtCore import QObject, pyqtSignal, pyqtSlot
import paho.mqtt.client as mqtt
import uuid
class MqttClient(QObject):
# --- MODIFIED SIGNAL: Now sends a bool and a string ---
# Sends connection state (bool) and a message (str)
connection_status_changed = pyqtSignal(bool, str)
message_received = pyqtSignal(str, bytes)
connection_error = pyqtSignal(str)
stop_logging_signal = pyqtSignal()
# Generic signals for success or failure/disconnection
connected = pyqtSignal()
disconnected = pyqtSignal()
stop_logging_signal = pyqtSignal()
# Sends topic (str) and payload (bytes) when a message is received
message_received = pyqtSignal(str, bytes)
def __init__(self, broker, port, user, password, client_id):
super().__init__()
self.broker = broker
self.port = port
self._is_connected = False
self._reported_bad_creds = False
self._suppress_next_disconnect_notice = False
self.client = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2, client_id)
if user and password:
self.client.username_pw_set(user, password)
@ -28,118 +162,55 @@ class MqttClient(QObject):
self.client.on_connect = self.on_connect
self.client.on_disconnect = self.on_disconnect
self.client.on_message = self.on_message
# self.client.on_subscribe = self.on_subscribe
def on_connect(self, client, userdata, flags, rc, properties=None, *args, **kwargs):
def on_connect(self, client, userdata, flags, rc, properties):
if rc == 0:
# success
self._is_connected = True
self._ever_connected = True
self._suppress_next_disconnect_notice = False
print("Connection to MQTT Broker successful!")
self.connection_status_changed.emit(True, "✅ Connected")
self.connected.emit()
else:
# auth or other failure — log ONCE, and suppress the auto "Disconnected" message that will follow
msg = "Bad user name or password" if rc == 5 else f"Connection failed (Code: {rc})"
print(f"Failed to connect: {msg}")
self.connection_status_changed.emit(False, msg)
# make sure we do NOT show a "Disconnected" notice right after this
self._suppress_next_disconnect_notice = True
# stop any retry loop immediately
try:
client.disconnect()
client.loop_stop()
except Exception:
pass
def on_disconnect(self, client, userdata, rc, properties=None, *args, **kwargs):
self._is_connected = False
if self._suppress_next_disconnect_notice or not self._ever_connected:
self._suppress_next_disconnect_notice = False
return
print(f"Failed to connect, return code {rc}\n")
self.connection_status_changed.emit(False, f"❌ Connection failed (Code: {rc})")
self.disconnected.emit()
def on_disconnect(self, client, userdata, flags, rc, properties):
print("Disconnected from MQTT Broker.")
self.connection_status_changed.emit(False, "💔 Disconnected")
self.disconnected.emit()
self.connection_status_changed.emit(False, "Disconnected")
self.stop_logging_signal.emit()
def on_message(self, client, userdata, msg):
# print(f"Received {len(msg.payload)} bytes of binary data from topic `{msg.topic}`")
self.message_received.emit(msg.topic, msg.payload)
# --- MODIFIED connect_to_broker METHOD ---
@pyqtSlot()
def connect_to_broker(self):
print(f"Attempting to connect to {self.broker}:{self.port}...")
try:
self.client.connect(self.broker, self.port, 120)
self.client.connect(self.broker, self.port, 60)
self.client.loop_start()
except socket.gaierror:
msg = "Host not found. Check internet."
msg = "Host not found. Check broker address or your internet connection."
print(f"❌ Connection Error: {msg}")
self.connection_status_changed.emit(False, f"{msg}")
except (socket.error, TimeoutError):
msg = "Connection failed. Server offline?"
self.connection_status_changed.emit(False, msg)
except (socket.error, ConnectionRefusedError, TimeoutError):
msg = "Connection failed. Is the server offline or the port incorrect?"
print(f"❌ Connection Error: {msg}")
self.connection_status_changed.emit(False, f"{msg}")
self.connection_status_changed.emit(False, msg)
except Exception as e:
msg = f"An unexpected error occurred: {e}"
print(f"{msg}")
self.connection_status_changed.emit(False, f"❌ Error")
def run(self):
"""
Connects to the broker and starts the network loop.
Handles all common connection errors gracefully.
"""
print(f"Attempting to connect to {self.broker}:{self.port}...")
try:
# 1. Attempt to connect
# print(f"Attempting to connect to {self.broker}:{self.port}...")
self.client.connect(self.broker, self.port, 120)
# 2. Run the blocking network loop
# This will run until self.client.disconnect() is called
self.client.loop_forever()
except socket.gaierror:
msg = "Host not found. Check the broker address or your internet connection."
print(f"{msg}")
self.connection_error.emit(msg) # Report error to the main window
except (socket.error, ConnectionRefusedError):
msg = "Connection refused. Is the server offline or the port incorrect?"
print(f"{msg}")
self.connection_error.emit(msg)
except TimeoutError:
msg = "Connection timed out. The server is not responding."
print(f"{msg}")
self.connection_error.emit(msg)
except Exception as e:
# Catch any other unexpected errors during connection or loop
msg = f"An unexpected error occurred: {e}"
print(f"{msg}")
self.connection_error.emit(msg)
self.connection_status_changed.emit(False, f"Error: {e}")
@pyqtSlot()
def disconnect_from_broker(self):
"""Stops the MQTT client's network loop."""
if self.client:
self.client.loop_stop()
self.client.disconnect()
print("Stopping MQTT network loop.")
def subscribe_to_topic(self, topic): # Add qos parameter
def subscribe_to_topic(self, topic):
print(f"Subscribing to topic: {topic}")
self.client.subscribe(topic)
def publish_message(self, topic, payload):
self.client.publish(topic, payload)
def cleanup(self):
print("Stopping MQTT network loop.")
self.client.loop_stop()

View File

@ -1,26 +0,0 @@
# core/versioning.py
from pathlib import Path
import os, sys
DEFAULT_VERSION = "v0.0.0"
def _bundle_root() -> Path:
# When frozen by PyInstaller, files are unpacked in sys._MEIPASS
if getattr(sys, "frozen", False) and hasattr(sys, "_MEIPASS"):
return Path(sys._MEIPASS)
# Dev/run-from-source: project root = two levels up from this file (adjust if needed)
return Path(__file__).resolve().parents[1]
def get_version() -> str:
# 1) ENV override (useful for CI)
env = os.getenv("APP_VERSION")
if env:
return env.strip()
# 2) VERSION.txt (works in dev and frozen)
root = _bundle_root()
ver_file = root / "VERSION.txt"
if ver_file.exists():
return ver_file.read_text(encoding="utf-8").strip()
return DEFAULT_VERSION

View File

@ -1,98 +1,29 @@
# import subprocess
# import sys
# def generate_executable():
# """Prompts for a version and generates a single-file executable."""
# # 1. Ask the user for the version number
# version = ""
# while not version:
# version = input("Enter the version for the executable (e.g., 4.1): ")
# if not version:
# print("Version cannot be empty. Please try again.")
# executable_name = f"SwapStationDashboard_v{version}"
# print(f"Generating executable with name: {executable_name}")
# # Check if pyinstaller is installed
# try:
# subprocess.run([sys.executable, "-m", "PyInstaller", "--version"], check=True, capture_output=True)
# except (subprocess.CalledProcessError, FileNotFoundError):
# print("PyInstaller is not found. Please install it using: pip install pyinstaller")
# return
# print("Starting executable generation...")
# # 2. Use the version to create the command
# command = [
# sys.executable,
# "-m", "PyInstaller",
# f"--name={executable_name}",
# "--onefile",
# "--icon=assets/icon.ico",
# "--add-data=logo;logo",
# "--add-data=assets;assets",
# "--add-data=proto;proto",
# "--hidden-import=paho.mqtt",
# "--hidden-import=google.protobuf",
# "--hidden-import=PyQt6",
# "--hidden-import=PyQt6.Qt6",
# "--hidden-import=PyQt6.sip",
# "--hidden-import=setuptools",
# "main.py"
# ]
# try:
# # 3. Execute the command
# subprocess.run(command, check=True)
# print("\n✅ Executable generated successfully!")
# print(f"Look for '{executable_name}.exe' in the 'dist' folder.")
# except subprocess.CalledProcessError as e:
# print("\n❌ An error occurred during executable generation.")
# print(f"Command failed with return code: {e.returncode}")
# except FileNotFoundError:
# print("\n❌ Error: The 'main.py' file was not found. Please run this script from the project's root directory.")
# if __name__ == "__main__":
# generate_executable()
import subprocess
import sys
from pathlib import Path
def generate_executable():
"""Prompts for a version, writes VERSION.txt, and builds a single-file exe that shows the same version inside the app."""
# --- Ask version ---
"""Prompts for a version and generates a single-file executable."""
# 1. Ask the user for the version number
version = ""
while not version:
version = input("Enter the version for the executable (e.g., 4.1): ").strip()
version = input("Enter the version for the executable (e.g., 4.1): ")
if not version:
print("Version cannot be empty. Please try again.")
# Normalize how you want to display it in the app:
display_version = f"v{version}"
# --- Paths ---
project_root = Path(__file__).resolve().parent
version_file = project_root / "VERSION.txt"
executable_name = f"SwapStationDashboard_{display_version}"
# --- Persist version for the app ---
version_file.write_text(display_version, encoding="utf-8")
print(f"📦 Wrote {version_file} with '{display_version}'")
# --- Check PyInstaller ---
executable_name = f"SwapStationDashboard_v{version}"
print(f"Generating executable with name: {executable_name}")
# Check if pyinstaller is installed
try:
subprocess.run([sys.executable, "-m", "PyInstaller", "--version"], check=True, capture_output=True)
except (subprocess.CalledProcessError, FileNotFoundError):
print("PyInstaller is not found. Please install it using: pip install pyinstaller")
return
print(f"🚀 Building: {executable_name}")
# NOTE: On Windows, --add-data uses 'src;dst' (semicolon). (You already follow this.)
print("Starting executable generation...")
# 2. Use the version to create the command
command = [
sys.executable,
"-m", "PyInstaller",
@ -102,20 +33,20 @@ def generate_executable():
"--add-data=logo;logo",
"--add-data=assets;assets",
"--add-data=proto;proto",
"--add-data=VERSION.txt;.", # <-- bundle VERSION.txt at app root inside the exe
"--hidden-import=paho.mqtt",
"--hidden-import=google.protobuf",
"--hidden-import=PyQt6",
"--hidden-import=PyQt6.Qt6",
"--hidden-import=PyQt6.sip",
"--hidden-import=setuptools",
"main.py",
"main.py"
]
try:
# 3. Execute the command
subprocess.run(command, check=True)
print("\n✅ Executable generated successfully!")
print(f"📁 Find '{executable_name}.exe' in the 'dist' folder.")
print(f"Look for '{executable_name}.exe' in the 'dist' folder.")
except subprocess.CalledProcessError as e:
print("\n❌ An error occurred during executable generation.")
print(f"Command failed with return code: {e.returncode}")
@ -123,4 +54,4 @@ def generate_executable():
print("\n❌ Error: The 'main.py' file was not found. Please run this script from the project's root directory.")
if __name__ == "__main__":
generate_executable()
generate_executable()

View File

@ -6,7 +6,7 @@ import time
import uuid
from functools import partial
from PyQt6.QtCore import pyqtSignal, QThread, Qt, QPropertyAnimation, QEasingCurve, QSettings, pyqtSlot, QMetaObject
from PyQt6.QtCore import pyqtSignal, QThread, Qt, QPropertyAnimation, QEasingCurve, QSettings, pyqtSlot
from PyQt6.QtGui import QIcon, QFont, QPixmap, QCloseEvent
from PyQt6.QtWidgets import (
QMainWindow, QWidget, QVBoxLayout, QHBoxLayout, QTabWidget,
@ -17,7 +17,6 @@ from PyQt6.QtWidgets import (
from PyQt6.QtSvgWidgets import QSvgWidget
from google.protobuf.json_format import MessageToDict
from math import floor
from core.versioning import get_version
# Make sure your proto import is correct for your project structure
@ -44,9 +43,7 @@ class MainWindow(QMainWindow):
super().__init__()
# self.setWindowIcon(QIcon("logo/v_logo.png"))
self.scale_factor = scale_factor
self.app_version = get_version()
self.setWindowTitle(f"Swap Station Dashboard {self.app_version}")
self.setWindowTitle("Battery Swap Station Dashboard v4.2")
self.setWindowIcon(QIcon(resource_path("assets/icon.ico")))
self.settings = QSettings("VECMOCON", "BatterySwapDashboard")
@ -358,7 +355,7 @@ class MainWindow(QMainWindow):
title_box.addWidget(subtitle)
header.addLayout(title_box, 1)
badge = QLabel(f"Version {self.app_version}")
badge = QLabel("Version 4.2")
badge.setObjectName("badge")
header.addWidget(badge, 0, Qt.AlignmentFlag.AlignRight | Qt.AlignmentFlag.AlignVCenter)
@ -1361,14 +1358,15 @@ class MainWindow(QMainWindow):
self.update_swap_buttons_state()
def connect_to_mqtt(self):
if self.mqtt_thread and self.mqtt_thread.isRunning():
print("Cleaning up previous MQTT thread...")
if self.mqtt_client:
self.mqtt_client.disconnect_from_broker()
# self.mqtt_client.cleanup()
self.mqtt_thread.quit()
self.mqtt_thread.wait(1000) # Use a timeout to prevent freezing
self.mqtt_thread.wait(1000)
if self.save_logs_checkbox.isChecked():
self.start_csv_logger()
self.reset_dashboard_ui()
broker = self.broker_input.text()
user = self.username_input.text()
@ -1403,15 +1401,9 @@ class MainWindow(QMainWindow):
self.mqtt_thread = QThread()
self.mqtt_client = MqttClient(broker, port, user, password, client_id)
self.mqtt_client.moveToThread(self.mqtt_thread)
# Connect signals
self.mqtt_client.stop_logging_signal.connect(self.csv_logger.stop_logging)
self.mqtt_client.connection_status_changed.connect(self.on_connection_status_changed)
self.mqtt_client.message_received.connect(self.on_message_received)
if self.save_logs_checkbox.isChecked():
self.mqtt_client.connected.connect(self.start_csv_logger)
self.mqtt_client.disconnected.connect(self.stop_csv_logger)
self.mqtt_thread.started.connect(self.mqtt_client.connect_to_broker)
self.mqtt_thread.start()
@ -1456,9 +1448,7 @@ class MainWindow(QMainWindow):
def stop_csv_logger(self):
if self.logger_thread and self.logger_thread.isRunning():
# Run stop_logging() in the logger thread
QMetaObject.invokeMethod(self.csv_logger, "stop_logging",
Qt.ConnectionType.QueuedConnection)
self.csv_logger.stop_logging()
self.logger_thread.quit()
self.logger_thread.wait()
self.csv_logger = None
@ -1511,38 +1501,32 @@ class MainWindow(QMainWindow):
else:
QMessageBox.critical(self, "Connection Status", message)
QMessageBox.critical(self, "Connection Failed", message)
self.status_bar_device_id_label.setText("Device ID: --- |") # Clear the device ID label on disconnect
self.status_bar_timestamp_label.setText("Last Update: ---") # Clear the timestamp label on disconnect
def update_main_dashboard(self, data):
try:
ts = datetime.datetime.fromtimestamp(data.get('ts', datetime.datetime.now().timestamp())).strftime('%Y-%m-%d %H:%M:%S')
device_id = data.get("deviceId", "---- ---- ---- ---- ----")
self.device_id_display_field.setText(device_id)
self.last_recv_ts_field.setText(ts)
self.device_id_display_field.setText(data.get("deviceId", "---- ---- ---- ---- ----"))
slot_payloads = data.get("slotLevelPayload", [])
for i, slot_data in enumerate(slot_payloads):
if i < len(self.chamber_widgets):
self.chamber_widgets[i].update_data(slot_data)
# --- THIS IS THE CORRECTED LOGIC ---
slot_number = i + 1
if slot_number in self.swap_buttons:
# First, check if the button is already part of the user's selection
if slot_number not in self.swap_sequence:
# If NOT selected, update its color based on battery presence
is_present = slot_data.get("batteryPresent") == 1
button = self.swap_buttons[slot_number]
# Set to green if present, otherwise clear the style
button.setStyleSheet("background-color: #2ecc71;" if is_present else "")
# print("Updating chamber", i+1, slot_data)
if (i+1) in self.swap_buttons:
is_present = slot_data.get("batteryPresent") == 1
self.swap_buttons[i+1].setStyleSheet("background-color: #2ecc71;" if is_present else "")
sdc_value = data.get("stationDiagnosticCode", 0)
self.sdc_field.setText(str(sdc_value))
self.update_diagnostic_alarms(sdc_value)
backup_status = data.get("backupSupplyStatus", 0)
backup_status = data.get("backupSupplyStatus", 0) # Default to 0 if not present
if backup_status == 1:
self.backup_supply_indicator.setText("Backup ON")
self.backup_supply_indicator.setStyleSheet(
@ -1556,6 +1540,7 @@ class MainWindow(QMainWindow):
}
"""
)
# self.top_bar_frame.setStyleSheet("#topBarFrame { border: 1px solid #28a745; }")
else:
self.backup_supply_indicator.setText("Backup OFF")
self.backup_supply_indicator.setStyleSheet(
@ -1569,6 +1554,7 @@ class MainWindow(QMainWindow):
}
"""
)
# self.top_bar_frame.setStyleSheet("#topBarFrame { border: 1px solid #dc3545; }")
except Exception as e:
print(f"Error updating dashboard: {e}")
@ -1578,68 +1564,50 @@ class MainWindow(QMainWindow):
"""Formats and prints the periodic data to the terminal as a clean table."""
try:
current_time = datetime.datetime.fromtimestamp(decoded_payload.ts).strftime('%Y-%m-%d %H:%M:%S')
device_id = data_dict.get("deviceId", " ")
device_id = data_dict.get("deviceId", "N/A")
backup_supply = "ON" if decoded_payload.backupSupplyStatus == 1 else "OFF"
station_sdc = decoded_payload.stationDiagnosticCode
# --- Main Information ---
print("\n\033[1m" + "="*56 + " PERIODIC DATA " + "="*56 + "\033[0m")
print("\n\033[1m" + "="*50 + " PERIODIC DATA " + "="*50 + "\033[0m")
print(f"\033[1m Timestamp:\033[0m {current_time} | \033[1mDevice ID:\033[0m {device_id}")
print(f"\033[1m Backup Supply:\033[0m {backup_supply} | \033[1mStation SDC:\033[0m {station_sdc}")
print("-" * 127)
print("-" * 120)
# --- Table Header ---
header = "| {:^7} | {:^18} | {:^9} | {:^9} | {:^6} | {:^9} | {:^9} | {:^10} | {:^10} | {:^9} |"
header = "| {:^7} | {:^18} | {:^8} | {:^8} | {:^7} | {:^10} | {:^10} | {:^10} | {:^10} | {:^10} |"
print(header.format("Chamber", "Battery ID", "Present", "Charging", "SOC", "Voltage", "Current", "Slot Temp", "Bat Temp", "Door"))
print("-" * 127)
print("-" * 120)
# --- Table Rows ---
row_format = "| {:^7} | {:<18} | {:^8} | {:^8} | {:>5}% | {:>8}V | {:>8}A | {:>8}°C | {:>8}°C | {:^9} |"
row_format = "| {:^7} | {:<18} | {:^8} | {:^8} | {:>5}% | {:>8}V | {:>8}A | {:>8}°C | {:>8}°C | {:^10} |"
for i, chamber_data in enumerate(data_dict.get("slotLevelPayload", []), start=1):
is_present = chamber_data.get("batteryPresent") == 1
is_charging = chamber_data.get("chargingStatus") == 1
is_door_open = chamber_data.get("doorStatus") == 1
# Get and format the Slot Temperature
slot_temp_raw = chamber_data.get('slotTemperature', ' ')
slot_temp_celsius = f"{slot_temp_raw / 10:.1f}" if isinstance(slot_temp_raw, int) else " "
slot_temp_raw = chamber_data.get('slotTemperature', 'N/A')
slot_temp_celsius = f"{slot_temp_raw / 10:.1f}" if isinstance(slot_temp_raw, int) else "N/A"
# Get and format the Battery Temperature
battery_temp_raw = chamber_data.get('batteryMaxTemp', ' ')
battery_temp_celsius = f"{battery_temp_raw / 10:.1f}" if isinstance(battery_temp_raw, int) else " "
# Voltage (mV -> V)
volt_raw = chamber_data.get('voltage', chamber_data.get('batVoltage', None))
if isinstance(volt_raw, int):
volt_str = f"{volt_raw/1000:.1f}"
elif isinstance(volt_raw, float):
volt_str = f"{volt_raw:.1f}"
else:
volt_str = "0.0"
# Current (mA -> A)
curr_raw = chamber_data.get('current', None)
if isinstance(curr_raw, int):
curr_str = f"{curr_raw/1000:.1f}"
elif isinstance(curr_raw, float):
curr_str = f"{curr_raw:.1f}"
else:
curr_str = "0.0"
battery_temp_raw = chamber_data.get('batteryMaxTemp', 'N/A')
battery_temp_celsius = f"{battery_temp_raw / 10:.1f}" if isinstance(battery_temp_raw, int) else "N/A"
print(row_format.format(
i,
chamber_data.get('batteryIdentification', ' '),
chamber_data.get('batteryIdentification', 'N/A'),
"" if is_present else "",
"" if is_charging else "",
chamber_data.get('soc', 0), # show 0 if missing
volt_str, # <-- fixed
curr_str, # <-- scaled
chamber_data.get('soc', 'N/A'),
chamber_data.get('batVoltage', 'N/A'),
chamber_data.get('current', 'N/A'),
slot_temp_celsius,
battery_temp_celsius,
"Open" if is_door_open else "Closed"
))
print("=" * 127 + "\n")
print("=" * 120 + "\n")
except Exception as e:
print(f"Error printing periodic log to terminal: {e}")
@ -1658,12 +1626,13 @@ class MainWindow(QMainWindow):
if isinstance(w, (QLineEdit, QPushButton, QCheckBox, QComboBox)):
w.setEnabled(enabled)
@pyqtSlot()
@pyqtSlot() # This new slot handles the disconnected signal
def handle_disconnection(self):
print("Main window sees disconnection, stopping logger if active.")
if self.csv_logger and self.csv_logger.timer.isActive():
QMetaObject.invokeMethod(self.csv_logger, "stop_logging",
Qt.ConnectionType.QueuedConnection)
self.csv_logger.stop_logging()
# You might also want to update UI elements here
self.connect_button.setText("Connect")
self.connection_status_label.setText("Disconnected")
@ -1672,9 +1641,6 @@ class MainWindow(QMainWindow):
Handles the window's close event to ensure a clean shutdown.
"""
print("--- Close event triggered. Shutting down gracefully... ---")
if self.csv_logger:
self.stop_csv_logger()
if self.mqtt_thread and self.mqtt_thread.isRunning():
print(" > Stopping MQTT client...")

View File

@ -111,10 +111,10 @@ def get_light_theme_styles(scale=1.0):
font-weight: bold;
border-radius: {int(4*scale)}px;
}}
#ChamberOpenDoorButton {{ background-color: #d4d4d4; }}
#ChamberChgOnButton {{ background-color: #d4d4d4; }}
#ChamberChgOffButton {{ background-color: #d4d4d4; }}
#ChamberOpenDoorButton:hover {{ background-color: #1aa89c; }}
#ChamberOpenDoorButton {{ background-color: #607d8b; }}
#ChamberChgOnButton {{ background-color: #52be80; }}
#ChamberChgOffButton {{ background-color: #cd6155; }}
#ChamberOpenDoorButton:hover {{ background-color: #485c64; }}
#ChamberChgOnButton:hover {{ background-color: #04d45d; }}
#ChamberChgOffButton:hover {{ background-color: #d42318; }}
@ -334,10 +334,10 @@ def get_dark_theme_styles(scale=1.0):
font-weight: bold;
border-radius: {int(4*scale)}px;
}}
#ChamberOpenDoorButton {{ background-color: #5c5c5c; }}
#ChamberChgOnButton {{ background-color: #5c5c5c; }}
#ChamberChgOffButton {{ background-color: #5c5c5c; }}
#ChamberOpenDoorButton:hover {{ background-color: #1aa89c; }}
#ChamberOpenDoorButton {{ background-color: #607d8b; }}
#ChamberChgOnButton {{ background-color: #52be80; }}
#ChamberChgOffButton {{ background-color: #cd6155; }}
#ChamberOpenDoorButton:hover {{ background-color: #485c64; }}
#ChamberChgOnButton:hover {{ background-color: #04d45d; }}
#ChamberChgOffButton:hover {{ background-color: #d42318; }}