import datetime import json import os import sys import time from functools import partial from PyQt6.QtCore import pyqtSignal, QThread, Qt, QPropertyAnimation, QEasingCurve, QSettings, pyqtSlot from PyQt6.QtGui import QIcon, QFont, QPixmap, QCloseEvent from PyQt6.QtWidgets import ( QMainWindow, QWidget, QVBoxLayout, QHBoxLayout, QTabWidget, QGroupBox, QFormLayout, QLineEdit, QPushButton, QLabel, QSpacerItem, QSizePolicy, QGridLayout, QMessageBox, QComboBox, QPlainTextEdit, QCheckBox, QFileDialog, QLayout, QFrame, QSizePolicy, QGraphicsOpacityEffect, QVBoxLayout, QTextBrowser, QScrollArea, QGraphicsDropShadowEffect ) from PyQt6.QtSvgWidgets import QSvgWidget from google.protobuf.json_format import MessageToDict from math import floor # Make sure your proto import is correct for your project structure from proto.vec_payload_chgSt_pb2 import mainPayload as periodicData, eventPayload, rpcRequest, jobType_e, eventType_e, languageType_e from .styles import get_light_theme_styles, get_dark_theme_styles from .widgets import ChamberWidget from core.mqtt_client import MqttClient from core.csv_logger import CsvLogger def resource_path(relative_path): """ Get absolute path to resource, works for dev and for PyInstaller """ try: # PyInstaller creates a temp folder and stores path in _MEIPASS base_path = sys._MEIPASS except Exception: base_path = os.path.abspath(".") return os.path.join(base_path, relative_path) class MainWindow(QMainWindow): log_data_signal = pyqtSignal(list) def __init__(self, scale_factor=1.0): super().__init__() # self.setWindowIcon(QIcon("logo/v_logo.png")) self.scale_factor = scale_factor self.setWindowTitle("Battery Swap Station Dashboard") self.setWindowIcon(QIcon(resource_path("assets/icon.ico"))) self.settings = QSettings("VECMOCON", "BatterySwapDashboard") self.is_dark_theme = True self.mqtt_thread = None self.mqtt_client = None self.swap_sequence = [] self.logger_thread = None self.csv_logger = None self.swap_button_clicks = {i: 0 for i in range(1, 10)} self.animation = None self.instance_log_area = None self.DIAGNOSTIC_ERRORS = [ "Lock Power Cut Failure", "Main Power Cut Failure", "Relayboard Can Failure", "DB Can Failure", "MB Can Reception Failure", "Smoke Alarm", "Water Alarm", "Phase Failure", "Earth Leakage" ] self.AUDIO_LANGUAGES = {"English": 1, "Hindi": 2, "Kannada": 3, "Telugu": 4, "Tamil": 5} self.central_widget = QWidget() self.setCentralWidget(self.central_widget) self.main_layout = QVBoxLayout(self.central_widget) self.main_layout.setContentsMargins(0, 0, 0, 0) self.main_layout.setSpacing(0) self.create_status_bar() self.tabs = QTabWidget() self.main_layout.addWidget(self.tabs) self.config_tab = QWidget() self.main_tab = QWidget() self.logs_tab = QWidget() self.help_tab = QWidget() self.about_tab = QWidget() self.tabs.addTab(self.config_tab, "Config") self.tabs.addTab(self.main_tab, "Main") self.tabs.addTab(self.logs_tab, "Logs") self.tabs.addTab(self.help_tab, "Help") self.tabs.addTab(self.about_tab, "About") self.setup_config_ui() self.setup_main_ui() self.setup_logs_ui() self.setup_help_ui() self.setup_about_ui() self.load_settings() self._apply_theme() def setup_help_ui(self): """ Help page (theme-driven): - Card-style container with title - Sections: Quick Start, Tips, Warnings, Troubleshooting - Scrollable host """ # get or create root layout (never replace/delete the layout itself) root = self.help_tab.layout() if root is None: root = QVBoxLayout() root.setContentsMargins(12, 12, 12, 12) root.setSpacing(10) self.help_tab.setLayout(root) else: # clear previous contents while root.count(): it = root.takeAt(0) w = it.widget() if w: w.setParent(None) w.deleteLater() # scroll host scroll = QScrollArea() scroll.setWidgetResizable(True) scroll.setFrameShape(QFrame.Shape.NoFrame) host = QWidget() host_lay = QVBoxLayout(host) host_lay.setContentsMargins(0, 0, 0, 0) host_lay.setSpacing(12) # card (styled by theme via objectNames) card = QFrame() card.setObjectName("helpCard") card_lay = QVBoxLayout(card) card_lay.setContentsMargins(18, 16, 18, 16) card_lay.setSpacing(12) # header title = QLabel("Help & User Guide"); title.setObjectName("helpTitle") subtitle = QLabel("Follow these steps to get connected, monitor the station, and troubleshoot issues.") subtitle.setObjectName("helpSubtitle") card_lay.addWidget(title) card_lay.addWidget(subtitle) div1 = QFrame(); div1.setObjectName("helpDivider") card_lay.addWidget(div1) # Quick Start qs_title = QLabel("Quick Start"); qs_title.setObjectName("sectionTitle") qs = QLabel( "" ) qs.setOpenExternalLinks(True) qs.setObjectName("bodyText") card_lay.addWidget(qs_title) card_lay.addWidget(qs) # Tips tip_box = QFrame(); tip_box.setObjectName("tipBox") tip_lay = QVBoxLayout(tip_box); tip_lay.setContentsMargins(12, 10, 12, 10) tip_h = QLabel("πŸ’‘ Tips"); tip_h.setObjectName("tipTitle") tip_b = QLabel( "" ) tip_b.setOpenExternalLinks(True) tip_b.setObjectName("tipText") tip_lay.addWidget(tip_h) tip_lay.addWidget(tip_b) card_lay.addWidget(tip_box) # Warnings warn_box = QFrame(); warn_box.setObjectName("warnBox") warn_lay = QVBoxLayout(warn_box); warn_lay.setContentsMargins(12, 10, 12, 10) warn_h = QLabel("⚠️ Important Warnings"); warn_h.setObjectName("warnTitle") warn_b = QLabel( "" ) warn_b.setObjectName("warnText") warn_lay.addWidget(warn_h) warn_lay.addWidget(warn_b) card_lay.addWidget(warn_box) # Troubleshooting tr_title = QLabel("Troubleshooting"); tr_title.setObjectName("sectionTitle") tr = QLabel( "" ) tr.setObjectName("bodyText") card_lay.addWidget(tr_title) card_lay.addWidget(tr) # footer div2 = QFrame(); div2.setObjectName("helpDivider") card_lay.addWidget(div2) foot = QLabel("Need help? Email " "" "kirubakaran@vecmocon.com") foot.setOpenExternalLinks(True) foot.setObjectName("bodyText") card_lay.addWidget(foot) # assemble host_lay.addWidget(card) host_lay.addStretch(1) scroll.setWidget(host) root.addWidget(scroll) def _about_stylesheet() -> str: return """ /* Card */ #aboutCard { background: #121417; border: 1px solid #2a2f36; border-radius: 16px; } /* Title + subtitle */ #title { font-size: 22px; font-weight: 700; color: #e8edf2; letter-spacing: 0.2px; } #subtitle { font-size: 13px; font-weight: 500; color: #9aa4b2; } /* Divider */ #divider { background: qlineargradient(x1:0, y1:0, x2:1, y2:0, stop:0 #1b1f24, stop:0.5 #2a2f36, stop:1 #1b1f24); min-height: 1px; max-height: 1px; border: none; margin: 6px 0 8px 0; } /* Small label on the left column */ #kvLabel { color: #aab4c0; font-weight: 600; } /* Pill badge */ #badge { color: #eaf3ff; background: qlineargradient(x1:0, y1:0, x2:1, y2:0, stop:0 #2563eb, stop:1 #7c3aed); border-radius: 999px; padding: 5px 12px; font-weight: 700; letter-spacing: 0.3px; } /* Links */ QLabel { color: #d7dde6; } QLabel:hover { text-decoration: none; } QLabel[link='true'] { color: #6aa8ff; } QLabel[link='true']:hover { color: #8fc2ff; text-decoration: underline; } /* Footer */ #footer { color: #7e8895; font-size: 12px; } """ def setup_about_ui(self): # get or create root layout root = self.about_tab.layout() if root is None: root = QVBoxLayout() root.setContentsMargins(24, 24, 24, 24) root.setAlignment(Qt.AlignmentFlag.AlignTop | Qt.AlignmentFlag.AlignHCenter) self.about_tab.setLayout(root) else: # clear existing items while root.count(): item = root.takeAt(0) w = item.widget() if w: w.setParent(None) w.deleteLater() # --- card --- card = QFrame() card.setObjectName("aboutCard") card.setMinimumWidth(560) card.setMaximumWidth(760) card.setSizePolicy(QSizePolicy.Policy.Preferred, QSizePolicy.Policy.Maximum) shadow = QGraphicsDropShadowEffect(blurRadius=28, xOffset=0, yOffset=12) shadow.setColor(Qt.GlobalColor.black) card.setGraphicsEffect(shadow) card_layout = QVBoxLayout(card) card_layout.setContentsMargins(28, 22, 28, 22) card_layout.setSpacing(14) # --- header row (logo + titles + badge) --- header = QHBoxLayout() header.setSpacing(12) # optional logo (put a 48px logo at ./logo/app.png if you have one) logo_label = QLabel() logo_path = resource_path("logo/v_logo.png") pix = QPixmap(logo_path) if not pix.isNull(): logo_label.setPixmap(pix.scaled(40, 40, Qt.AspectRatioMode.KeepAspectRatio, Qt.TransformationMode.SmoothTransformation)) logo_label.setFixedSize(44, 44) header.addWidget(logo_label, 0, Qt.AlignmentFlag.AlignTop) title_box = QVBoxLayout() title = QLabel("About This Application") title.setObjectName("title") subtitle = QLabel("Battery Swap Station Dashboard") subtitle.setObjectName("subtitle") title_box.addWidget(title) title_box.addWidget(subtitle) header.addLayout(title_box, 1) badge = QLabel("Version 4.0") badge.setObjectName("badge") header.addWidget(badge, 0, Qt.AlignmentFlag.AlignRight | Qt.AlignmentFlag.AlignVCenter) card_layout.addLayout(header) # --- divider --- divider = QFrame() divider.setObjectName("divider") card_layout.addWidget(divider) # --- key–value grid --- grid = QGridLayout() grid.setHorizontalSpacing(18) grid.setVerticalSpacing(10) def add_row(r: int, key: str, value: str, is_link: bool = False): k = QLabel(key) k.setObjectName("kvLabel") v = QLabel(value) v.setWordWrap(False) # instead of True if is_link: v.setTextFormat(Qt.TextFormat.RichText) v.setTextInteractionFlags(Qt.TextInteractionFlag.TextBrowserInteraction) # links + selection + keyboard nav v.setOpenExternalLinks(True) v.setProperty("link", True) else: v.setTextInteractionFlags(Qt.TextInteractionFlag.TextSelectableByMouse) grid.addWidget(k, r, 0, Qt.AlignmentFlag.AlignRight | Qt.AlignmentFlag.AlignTop) grid.addWidget(v, r, 1, Qt.AlignmentFlag.AlignLeft) add_row(0, "Company", "VECMOCON TECHNOLOGIES") add_row(1, "Developed by", "Kirubakaran S") add_row(2, "Support", "kirubakaran@vecmocon.com", is_link=True) add_row(3, "Website", "www.vecmocon.com", is_link=True) card_layout.addLayout(grid) # --- tiny space + footer --- spacer = QFrame(); spacer.setFixedHeight(4) card_layout.addWidget(spacer) footer = QLabel("Β© 2025 VECMOCON TECHNOLOGIES. All rights reserved.") footer.setObjectName("footer") card_layout.addWidget(footer, 0, Qt.AlignmentFlag.AlignHCenter) root.addWidget(card, 0, Qt.AlignmentFlag.AlignHCenter) def load_settings(self): broker = self.settings.value("broker_address", "DEFAULT_BROKER") port = self.settings.value("port", "DEFAULT_PORT") username = self.settings.value("username", "DEFAULT_USERNAME") password = self.settings.value("password", "DEFAULT_PASSWORD") client_id = self.settings.value("client_id", "DEFAULT_CLIENT_ID") version = self.settings.value("version", "DEFAULT_VERSION") device_id = self.settings.value("device_id", "DEFAULT_DEVICE_ID") self.broker_input.setText(broker) self.port_input.setText(port) self.username_input.setText(username) self.password_input.setText(password) self.client_id_input.setText(client_id) self.version_input.setText(version) self.device_id_input.setText(device_id) self.settings.sync() # --- START: New Decoding Logic Integration --- def _read_varint(self, b: bytes, i: int): """Helper to read a varint from a raw byte buffer.""" shift = 0 val = 0 while True: if i >= len(b): raise ValueError("truncated varint") c = b[i] i += 1 val |= (c & 0x7F) << shift if not (c & 0x80): break shift += 7 if shift > 64: raise ValueError("varint too long") return val, i def _skip_field(self, b: bytes, i: int, wt: int): """Helper to skip a field in the buffer based on its wire type.""" if wt == 0: # VARINT _, i = self._read_varint(b, i) return i if wt == 1: # 64-BIT return i + 8 if wt == 2: # LENGTH-DELIMITED ln, i = self._read_varint(b, i) return i + ln if wt == 5: # 32-BIT return i + 4 raise ValueError(f"unsupported wire type to skip: {wt}") def _extract_field3_varint(self, b: bytes): """Manually parses the byte string to find the integer value of field number 3.""" i = 0 n = len(b) while i < n: key, i2 = self._read_varint(b, i) wt = key & 0x7 fn = key >> 3 i = i2 if fn == 3 and wt == 0: v, _ = self._read_varint(b, i) return v i = self._skip_field(b, i, wt) return None def _decode_event_payload(self, payload_bytes: bytes) -> str: """ Decodes an event payload robustly, ensuring the correct eventType is used. """ # 1. Standard parsing to get a base dictionary msg = eventPayload() msg.ParseFromString(payload_bytes) d = MessageToDict(msg, preserving_proto_field_name=True) # 2. Manually extract the true enum value from the raw bytes (Authoritative value) wire_num = self._extract_field3_varint(payload_bytes) wire_name = None if wire_num is not None: try: # Find the string name corresponding to the integer value wire_name = eventType_e.Name(wire_num) except ValueError: # If the number is valid but not in our .proto file, use the number itself wire_name = f"UNKNOWN_ENUM_VALUE_{wire_num}" # 3. Always prefer the manually extracted "wire value" if wire_name: d["eventType"] = wire_name # 4. Ensure consistent structure with default values ed = d.setdefault("eventData", {}) ed.setdefault("nfcData", None) ed.setdefault("batteryIdentification", "") ed.setdefault("activityFailureReason", 0) ed.setdefault("swapAbortReason", "ABORT_UNKNOWN") ed.setdefault("swapTime", 0) ed.setdefault("faultCode", 0) ed.setdefault("doorStatus", 0) ed.setdefault("slotId", 0) # 5. Reorder for clean logs and return as a formatted JSON string ordered = { "ts": d.get("ts"), "deviceId": d.get("deviceId"), "eventType": d.get("eventType"), "sessionId": d.get("sessionId"), "eventData": d.get("eventData"), } return json.dumps(ordered, indent=2, ensure_ascii=False) # --- END: New Decoding Logic Integration --- def _apply_theme(self): if self.is_dark_theme: self.theme_button.setText("β˜€οΈ") self.theme_button.setToolTip("Switch to Light Theme") self.setStyleSheet(get_dark_theme_styles(self.scale_factor)) else: self.theme_button.setText("πŸŒ™") self.theme_button.setToolTip("Switch to Dark Theme") self.setStyleSheet(get_light_theme_styles(self.scale_factor)) self.timestamp_label.setStyleSheet("color: #ecf0f1; background-color: transparent;") self.apply_config_tab_styles() def _on_fade_finished(self, overlay): """Safely delete the overlay widget after the animation is done.""" overlay.deleteLater() self.animation = None # Clear the animation reference def toggle_theme(self): """Toggles the UI theme with a smooth cross-fade animation.""" # Prevent starting a new animation if one is already running if self.animation and self.animation.state() == self.animation.State.Running: return # 1. Take a "screenshot" of the current UI pixmap = self.central_widget.grab() # 2. Create a temporary overlay label with that screenshot overlay = QLabel(self.central_widget) overlay.setPixmap(pixmap) overlay.setGeometry(self.central_widget.geometry()) overlay.show() overlay.raise_() # 3. Immediately apply the new theme underneath the overlay self.is_dark_theme = not self.is_dark_theme self._apply_theme() self.setup_about_ui() self.setup_help_ui() # 4. Set up the opacity effect and animation for the overlay opacity_effect = QGraphicsOpacityEffect(overlay) overlay.setGraphicsEffect(opacity_effect) self.animation = QPropertyAnimation(opacity_effect, b"opacity") self.animation.setDuration(500) # Animation duration in milliseconds self.animation.setStartValue(1.0) self.animation.setEndValue(0.0) self.animation.setEasingCurve(QEasingCurve.Type.InOutQuad) # 5. Connect the animation's finish signal to our cleanup method # We use a lambda to pass the overlay widget to the cleanup function self.animation.finished.connect(lambda: self._on_fade_finished(overlay)) # 6. Start the fade-out animation self.animation.start(self.animation.DeletionPolicy.DeleteWhenStopped) def create_status_bar(self): status_bar_widget = QWidget() status_bar_widget.setStyleSheet(f"background-color: #2c3e50; padding: {int(6*self.scale_factor)}px;") status_bar_layout = QHBoxLayout(status_bar_widget) status_bar_layout.setContentsMargins(10, 0, 10, 0) left_layout = QHBoxLayout() logo_label = QLabel("BSS Dashboard") logo_label.setFont(QFont("Arial", max(9, int(11 * self.scale_factor)), QFont.Weight.Bold)) logo_label.setStyleSheet("color: #ecf0f1; background-color: transparent;") self.timestamp_label = QLabel("Last Update: N/A") self.timestamp_label.setFont(QFont("Arial", max(8, int(9 * self.scale_factor)))) left_layout.addWidget(logo_label) left_layout.addWidget(self.timestamp_label) left_layout.addStretch() logo_path = resource_path("logo/vec_logo_svg.svg") company_logo = QSvgWidget(logo_path) company_logo.setStyleSheet("background: transparent;") ds = company_logo.renderer().defaultSize() target_h = max(24, int(36 * self.scale_factor)) target_w = int(ds.width() * (target_h / ds.height())) if ds.height() > 0 else target_h company_logo.setFixedSize(target_w, target_h) right_layout = QHBoxLayout() right_layout.addStretch() self.connect_button = QPushButton("Connect") self.disconnect_button = QPushButton("Disconnect") button_font_size = max(10, int(12 * self.scale_factor)) button_stylesheet = f""" QPushButton {{ font-size: {button_font_size}px; font-weight: bold; padding: 4px 14px; background-color: #3498db; }} """ self.connect_button.setStyleSheet(button_stylesheet) self.disconnect_button.setStyleSheet(button_stylesheet) self.connect_button.setObjectName("ConnectButton") self.disconnect_button.setObjectName("DisconnectButton") self.disconnect_button.setEnabled(False) self.theme_button = QPushButton("πŸŒ™") btn_size = max(28, int(35 * self.scale_factor)) self.theme_button.setFixedSize(btn_size, btn_size) self.theme_button.setFont(QFont("Arial", max(10, int(14 * self.scale_factor)))) self.theme_button.setStyleSheet("border: none; background-color: transparent; color: white;") self.theme_button.clicked.connect(self.toggle_theme) right_layout.addWidget(self.connect_button) right_layout.addWidget(self.disconnect_button) right_layout.addWidget(self.theme_button) status_bar_layout.addLayout(left_layout, 1) status_bar_layout.addWidget(company_logo, 1) status_bar_layout.addLayout(right_layout, 1) self.main_layout.addWidget(status_bar_widget) self.connect_button.clicked.connect(self.connect_to_mqtt) self.disconnect_button.clicked.connect(self.disconnect_from_mqtt) def apply_config_tab_styles(self): font_size_group = max(10, int(12 * self.scale_factor)) font_size_widgets = max(9, int(10 * self.scale_factor)) text_color = "#f0f0f0" if self.is_dark_theme else "#000" self.config_tab.setStyleSheet(f""" QGroupBox {{ font-size: {font_size_group}pt; font-weight: bold; }} QLabel, QLineEdit, QPushButton, QCheckBox {{ font-size: {font_size_widgets}pt; color: {text_color}; }} """) def setup_config_ui(self): layout = QVBoxLayout(self.config_tab) layout.setAlignment(Qt.AlignmentFlag.AlignTop) layout.setSpacing(20) layout.setContentsMargins(20, 20, 20, 20) self.apply_config_tab_styles() mqtt_group = QGroupBox("MQTT Server Credentials") form_layout = QFormLayout() self.broker_input = QLineEdit("mqtt.vecmocon.com") self.port_input = QLineEdit("1883") self.username_input = QLineEdit("mqtt_username") self.password_input = QLineEdit("mqtt_password") self.password_input.setEchoMode(QLineEdit.EchoMode.Password) form_layout.addRow("Broker Address:", self.broker_input) form_layout.addRow("Port:", self.port_input) form_layout.addRow("Username:", self.username_input) form_layout.addRow("Password:", self.password_input) mqtt_group.setLayout(form_layout) topic_group = QGroupBox("Topic Information") form_layout_topic = QFormLayout() self.client_id_input = QLineEdit("batterySmartStation") self.version_input = QLineEdit("100") self.device_id_input = QLineEdit("V16000862287077265957") form_layout_topic.addRow("Client ID:", self.client_id_input) form_layout_topic.addRow("Version:", self.version_input) form_layout_topic.addRow("Device ID:", self.device_id_input) # --- NEW: Add display fields for the generated topics --- line_separator = QFrame() line_separator.setFrameShape(QFrame.Shape.HLine) line_separator.setFrameShadow(QFrame.Shadow.Sunken) form_layout_topic.addRow(line_separator) # form_layout_topic.addRow(QLabel("")) # Separator form_layout_topic.addRow(QLabel("Generated Topis πŸ“‘")) self.periodic_topic_display = QLineEdit() self.periodic_topic_display.setReadOnly(True) self.periodic_topic_display.setObjectName("TopicDisplay") self.events_topic_display = QLineEdit() self.events_topic_display.setReadOnly(True) self.events_topic_display.setObjectName("TopicDisplay") self.rpcRequest_topic_display = QLineEdit() self.rpcRequest_topic_display.setReadOnly(True) self.rpcRequest_topic_display.setObjectName("TopicDisplay") form_layout_topic.addRow("Periodic Topic:", self.periodic_topic_display) form_layout_topic.addRow("Events Topic:", self.events_topic_display) form_layout_topic.addRow("RPC Request Topic:", self.rpcRequest_topic_display) topic_group.setLayout(form_layout_topic) # --- NEW: Connect input field changes to the update method --- self.client_id_input.textChanged.connect(self._update_topic_display) self.version_input.textChanged.connect(self._update_topic_display) self.device_id_input.textChanged.connect(self._update_topic_display) logs_group = QGroupBox("Save Logs to CSV") # ... (the rest of the method is the same) logs_form_layout = QFormLayout() self.save_logs_checkbox = QCheckBox("Enable Logging") self.save_logs_checkbox.setChecked(True) default_log_filename = f"Station_Mqtt_Dashboard_log_{datetime.datetime.now().strftime('%Y-%m-%d_%H-%M-%S')}.csv" # self.log_filename_input = QLineEdit(default_log_filename) log_dir_layout = QHBoxLayout() script_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) default_log_dir = os.path.join(script_dir, "logs") self.log_dir_input = QLineEdit(default_log_dir) browse_btn = QPushButton("Browse") browse_btn.clicked.connect(self.select_log_directory) log_dir_layout.addWidget(self.log_dir_input) log_dir_layout.addWidget(browse_btn) logs_form_layout.addRow(self.save_logs_checkbox) # logs_form_layout.addRow("Log Filename:", self.log_filename_input) logs_form_layout.addRow("Output Directory:", log_dir_layout) logs_group.setLayout(logs_form_layout) layout.addWidget(mqtt_group) layout.addWidget(topic_group) layout.addWidget(logs_group) # --- NEW: Call the update function once to set initial values --- self._update_topic_display() def select_log_directory(self): directory = QFileDialog.getExistingDirectory(self, "Select Log Directory") if directory: self.log_dir_input.setText(directory) def _update_topic_display(self): """Constructs and displays the full MQTT topics based on user input.""" client_id = self.client_id_input.text() version = self.version_input.text() device_id = self.device_id_input.text() # Construct the topic strings periodic_topic = f"VEC/{client_id}/{version}/{device_id}/PERIODIC" events_topic = f"VEC/{client_id}/{version}/{device_id}/EVENTS" rpcRequest_topic = f"VEC/{client_id}/{version}/{device_id}/RPC/REQUEST" # Update the read-only display fields self.periodic_topic_display.setText(periodic_topic) self.events_topic_display.setText(events_topic) self.rpcRequest_topic_display.setText(rpcRequest_topic) def setup_main_ui(self): page_layout = QVBoxLayout(self.main_tab) page_layout.setContentsMargins( int(8 * self.scale_factor), int(8 * self.scale_factor), int(8 * self.scale_factor), int(8 * self.scale_factor) ) self.top_bar_frame = QFrame() self.top_bar_frame.setObjectName("topBarFrame") top_bar_layout = QHBoxLayout() ts_label = QLabel("LAST RECV TS:") ts_label.setObjectName("TimestampTitleLabel") top_bar_layout.addWidget(ts_label) self.last_recv_ts_field = QLineEdit("No Data") self.last_recv_ts_field.setReadOnly(True) self.last_recv_ts_field.setObjectName("TimestampDataField") top_bar_layout.addWidget(self.last_recv_ts_field) top_bar_layout.addSpacerItem(QSpacerItem(20, 20, QSizePolicy.Policy.Fixed, QSizePolicy.Policy.Minimum)) # top_bar_layout.addWidget(QLabel("Backup Supply:")) self.backup_supply_indicator = QLabel("N/A") self.backup_supply_indicator.setFixedSize(80, 25) self.backup_supply_indicator.setAlignment(Qt.AlignmentFlag.AlignCenter) self.backup_supply_indicator.setStyleSheet( """ QLabel { background-color: transparent; border: 2px solid gray; color: #E0E0E0; border-radius: 5px; font-weight: bold; } """ ) top_bar_layout.addWidget(self.backup_supply_indicator) refresh_btn = QPushButton("⟳ Refresh") refresh_btn.setObjectName("RefreshButton") refresh_btn.clicked.connect(self.reset_dashboard_ui) top_bar_layout.addWidget(refresh_btn) reset_btn = QPushButton("Station Reset") reset_btn.setObjectName("ResetButton") reset_btn.clicked.connect(self.confirm_station_reset) top_bar_layout.addWidget(reset_btn) main_content_layout = QHBoxLayout() self.chamber_widgets = [] grid_widget = QWidget() grid_layout = QGridLayout(grid_widget) grid_layout.setSpacing(max(5, int(8 * self.scale_factor))) for i in range(9): chamber = ChamberWidget(f"CHAMBER - {i+1}", self.scale_factor) chamber_num = i + 1 chamber.open_door_requested.connect(partial(self.handle_open_door, chamber_num)) chamber.chg_on_requested.connect(partial(self.handle_charger_control, chamber_num, True)) chamber.chg_off_requested.connect(partial(self.handle_charger_control, chamber_num, False)) self.chamber_widgets.append(chamber) row, col = divmod(i, 3) grid_layout.addWidget(chamber, row, col) diag_panel = QWidget() diag_panel_layout = QVBoxLayout(diag_panel) alarms_group = QGroupBox("System Diagnostics") alarms_group.setFont(QFont("Arial", max(9, int(11*self.scale_factor)), QFont.Weight.Bold)) alarms_layout = QVBoxLayout(alarms_group) alarms_layout.setSpacing(max(5, int(8 * self.scale_factor))) sdc_layout = QHBoxLayout() sdc_layout.addWidget(QLabel("SDC Value:")) self.sdc_field = self._create_main_status_field() sdc_layout.addWidget(self.sdc_field) alarms_layout.addLayout(sdc_layout) self.diag_labels = {} for error_text in self.DIAGNOSTIC_ERRORS: label = QLabel(error_text) label.setProperty("alarm", "inactive") label.setAlignment(Qt.AlignmentFlag.AlignCenter) label.setFont(QFont("Arial", max(8, int(10 * self.scale_factor)))) alarms_layout.addWidget(label) self.diag_labels[error_text] = label swap_group = QGroupBox("Swap Process") swap_group.setFont(QFont("Arial", max(9, int(11*self.scale_factor)), QFont.Weight.Bold)) swap_layout = QVBoxLayout(swap_group) swap_layout.setSpacing(max(4, int(6 * self.scale_factor))) swap_layout.setContentsMargins( int(8 * self.scale_factor), int(8 * self.scale_factor), int(8 * self.scale_factor), int(8 * self.scale_factor) ) top_pad = QSpacerItem( 0, max(16, int(22 * self.scale_factor)), QSizePolicy.Policy.Minimum, QSizePolicy.Policy.Fixed ) swap_layout.addItem(top_pad) swap_display_layout = QHBoxLayout() self.swap_display = QLineEdit() self.swap_display.setReadOnly(True) self.swap_display.setPlaceholderText("Click to build sequence...") swap_display_layout.addWidget(self.swap_display) swap_layout.addLayout(swap_display_layout) swap_grid = QGridLayout() swap_grid.setSpacing(int(2 * self.scale_factor)) swap_grid.setContentsMargins(0, 0, 0, 0) swap_grid.setSizeConstraint(QLayout.SizeConstraint.SetFixedSize) self.swap_buttons = {} btn_size = max(40, int(60 * self.scale_factor)) for i in range(1, 10): btn = QPushButton(str(i)) btn.setFont(QFont("Arial", max(12, int(14*self.scale_factor)))) btn.setFixedSize(btn_size, btn_size) btn.setSizePolicy(QSizePolicy.Policy.Fixed, QSizePolicy.Policy.Fixed) btn.clicked.connect(self.on_swap_button_clicked) self.swap_buttons[i] = btn row, col = divmod(i-1, 3) swap_grid.addWidget(btn, row, col, Qt.AlignmentFlag.AlignCenter) grid_container_layout = QHBoxLayout() grid_container_layout.addStretch(1) grid_container_layout.addLayout(swap_grid) grid_container_layout.addStretch(1) swap_layout.addLayout(grid_container_layout) swap_layout.addStretch(1) button_row_layout = QHBoxLayout() button_row_layout.setSpacing(max(4, int(6 * self.scale_factor))) self.start_swap_btn = QPushButton("Start Swap") self.start_swap_btn.setObjectName("StartSwapButton") self.start_swap_btn.clicked.connect(self.start_swap) self.abort_swap_btn = QPushButton("Abort Swap") self.abort_swap_btn.setObjectName("AbortSwapButton") self.abort_swap_btn.clicked.connect(self.abort_swap) for btn in (self.start_swap_btn, self.abort_swap_btn): btn.setSizePolicy(QSizePolicy.Policy.Expanding, QSizePolicy.Policy.Fixed) btn.setMinimumHeight(max(28, int(36 * self.scale_factor))) button_row_layout.addWidget(self.start_swap_btn) button_row_layout.addWidget(self.abort_swap_btn) swap_layout.addLayout(button_row_layout) self.update_swap_buttons_state() log_group = QGroupBox("Instance Log") log_group.setFont(QFont("Arial", max(9, int(11*self.scale_factor)), QFont.Weight.Bold)) log_layout = QVBoxLayout(log_group) self.instance_log_area = QPlainTextEdit() self.instance_log_area.setReadOnly(True) self.instance_log_area.setObjectName("InstanceLog") log_layout.addWidget(self.instance_log_area) audio_group = QGroupBox("Audio Command") audio_group.setFont(QFont("Arial", max(9, int(11*self.scale_factor)), QFont.Weight.Bold)) audio_layout = QHBoxLayout(audio_group) self.audio_combo = QComboBox() self.audio_combo.addItems(self.AUDIO_LANGUAGES.keys()) send_audio_btn = QPushButton("➀") send_audio_btn.setObjectName("SendAudioButton") btn_size = max(28, int(35 * self.scale_factor)) send_audio_btn.setFixedSize(btn_size, btn_size) send_audio_btn.clicked.connect(self.send_audio_command) audio_layout.addWidget(self.audio_combo) audio_layout.addWidget(send_audio_btn) diag_panel_layout.addWidget(alarms_group, 2) diag_panel_layout.addWidget(swap_group, 3) diag_panel_layout.addWidget(audio_group, 1) diag_panel_layout.addWidget(log_group, 4) main_content_layout.addWidget(grid_widget, 1) main_content_layout.addWidget(diag_panel, 0) self.top_bar_frame.setLayout(top_bar_layout) page_layout.addWidget(self.top_bar_frame) page_layout.addLayout(main_content_layout) def log_to_instance_view(self, message: str): """Adds a formatted message to the instance log on the main tab.""" if not self.instance_log_area: return timestamp = datetime.datetime.now().strftime("%H:%M:%S") formatted_message = f"[{timestamp}] {message}" self.instance_log_area.appendPlainText(formatted_message) # Keep the log from growing forever (max 100 lines) if self.instance_log_area.blockCount() > 100: cursor = self.instance_log_area.textCursor() cursor.movePosition(cursor.MoveOperation.Start) cursor.select(cursor.SelectionType.BlockUnderCursor) cursor.removeSelectedText() def setup_logs_ui(self): layout = QHBoxLayout(self.logs_tab) # --- Setup the Request Logs area --- request_group = QGroupBox("Request Logs (Dashboard -> MQTT Server)") request_layout = QVBoxLayout(request_group) self.request_log_area = QPlainTextEdit() self.request_log_area.setReadOnly(True) self.request_log_area.setObjectName("LogPanel") request_layout.addWidget(self.request_log_area) # --- Setup the Event Logs area --- event_group = QGroupBox("Event Logs (MQTT Server -> Dashboard)") event_layout = QVBoxLayout(event_group) self.event_log_area = QPlainTextEdit() self.event_log_area.setReadOnly(True) self.event_log_area.setObjectName("LogPanel") event_layout.addWidget(self.event_log_area) layout.addWidget(request_group) layout.addWidget(event_group) def _create_main_status_field(self): field = QLineEdit() field.setReadOnly(True) return field def log_request(self, topic, payload_str, log_type="INFO"): """Logs an event to the UI as a structured JSON object.""" timestamp = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")[:-3] # Add the new "log_type" to the entry for clarity log_entry = { "timestamp": timestamp, "log_type": log_type, # <-- NEW "topic": topic } try: log_entry["payload"] = json.loads(payload_str) except json.JSONDecodeError: log_entry["message"] = payload_str final_log_string = json.dumps(log_entry, indent=2) self.request_log_area.appendPlainText(final_log_string) self.request_log_area.appendPlainText("-" * 50 + "\n") def log_event(self, topic, json_payload): """Logs a received event as a clean JSON object to the UI ONLY.""" timestamp = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")[:-3] log_entry = { "timestamp": timestamp, "topic": topic, "payload": json.loads(json_payload) } final_log_string = json.dumps(log_entry, indent=2) self.event_log_area.appendPlainText(final_log_string) self.event_log_area.appendPlainText("-" * 50 + "\n") # if self.save_logs_checkbox.isChecked(): # self.log_data_signal.emit([timestamp, topic, json_payload.replace('\n', ' ')]) def confirm_station_reset(self): reply = QMessageBox.question(self, 'Confirm Reset', "Are you sure you want to reset the station?", QMessageBox.StandardButton.Yes | QMessageBox.StandardButton.No, QMessageBox.StandardButton.No) if reply == QMessageBox.StandardButton.Yes: print("Requesting Stationm Reset...") request = rpcRequest( ts=int(time.time()), jobId=f"job_{int(time.time())}", jobType=jobType_e.JOBTYPE_REBOOT ) self._send_rpc_request(request) else: self.log_request("INFO", "Station Reset command cancelled by user.") # --- THIS IS THE CRITICAL METHOD TO UPDATE --- def on_message_received(self, topic, payload): now = datetime.datetime.now() self.timestamp_label.setText(f"Last Update: {now.strftime('%Y-%m-%d %H:%M:%S')}") try: msg_type = topic.split('/')[-1] if msg_type == 'PERIODIC': decoded_payload = periodicData() decoded_payload.ParseFromString(payload) data_dict = MessageToDict(decoded_payload, preserving_proto_field_name=True) self._log_periodic_to_terminal(decoded_payload, data_dict) self.update_main_dashboard(data_dict) if self.save_logs_checkbox.isChecked(): log_payload_str = json.dumps(data_dict) self.log_data_signal.emit([now, topic, data_dict, payload]) elif msg_type == 'EVENTS': # This part handles the UI log, as before json_payload = self._decode_event_payload(payload) self.log_event(topic, json_payload) # This new block handles the CSV logging correctly if self.save_logs_checkbox.isChecked(): event_data_dict = json.loads(json_payload) # Emit the signal with all 4 required items self.log_data_signal.emit([now, topic, event_data_dict, payload]) # This part updates the instance log, as before try: data = json.loads(json_payload) event_type = data.get("eventType", "Unknown Event") self.log_to_instance_view(f"Event Received: {event_type}") except json.JSONDecodeError: self.log_to_instance_view("Received unparseable event data") elif msg_type == 'REQUEST': try: # 1. Use the correct rpcRequest protobuf object for parsing decoded_payload = rpcRequest() decoded_payload.ParseFromString(payload) # 2. Convert the parsed message to a dictionary and then to a JSON string data_dict = MessageToDict(decoded_payload, preserving_proto_field_name=True) json_payload = json.dumps(data_dict, indent=2) # This new block handles the CSV logging correctly if self.save_logs_checkbox.isChecked(): request_data_dict = json.loads(json_payload) # Emit the signal with all 4 required items self.log_data_signal.emit([now, topic, request_data_dict, payload]) # 3. Log the INCOMING request to the correct panel self.log_request(topic, json_payload, log_type="INCOMING_RPC") # 4. (Optional) Log to the instance view on the main tab job_type = data_dict.get("jobType", "Unknown Job") self.log_to_instance_view(f"RPC Request Received: {job_type}") except Exception as e: # Handle potential decoding errors for this specific topic print(f"Error decoding RPC Request from topic '{topic}': {e}") error_msg = f'{{"error": "RPC DECODING FAILED: {e}", "raw_hex": "{payload.hex()}"}}' self.log_request(topic, error_msg, log_type="DECODE_ERROR") else: print(f"Received message on unhandled topic: {topic}") except Exception as e: print(f"Error processing message from topic '{topic}': {e}") # Log the failure to the UI for better debugging self.log_event(topic, f'{{"error": "DECODING FAILED: {e}", "raw_hex": "{payload.hex()}"}}') # --- (The rest of your methods like `on_swap_button_clicked`, etc., remain here) --- def on_swap_button_clicked(self): sender = self.sender() chamber_num = int(sender.text().split('\n')[0]) self.swap_button_clicks[chamber_num] += 1 click_count = self.swap_button_clicks[chamber_num] if click_count == 1: self.swap_sequence.append(chamber_num) sender.setStyleSheet("background-color: #27ae60;") elif click_count == 2: self.swap_sequence.append(chamber_num) sender.setText(f"{chamber_num}") sender.setStyleSheet("background-color: qlineargradient(x1:0, y1:0, x2:1, y2:0, stop:0 #27ae60, stop:0.5 #27ae60, stop:0.51 #2ecc71, stop:1 #2ecc71);") elif click_count >= 3: self.swap_sequence = [num for num in self.swap_sequence if num != chamber_num] self.swap_button_clicks[chamber_num] = 0 sender.setText(str(chamber_num)) sender.setStyleSheet("") self.update_swap_display() self.update_swap_buttons_state() def update_swap_display(self): self.swap_display.setText(str(self.swap_sequence) if self.swap_sequence else "") def update_swap_buttons_state(self): is_sequence_present = bool(self.swap_sequence) self.start_swap_btn.setEnabled(is_sequence_present) def start_swap(self): if not self.swap_sequence: QMessageBox.warning(self, "Empty Sequence", "Cannot start an empty swap sequence.") return print(f"Starting swap with sequence: {self.swap_sequence}") request = rpcRequest( ts=int(time.time()), jobId=f"job_{int(time.time())}", jobType=jobType_e.JOBTYPE_SWAP_START ) request.rpcData.slotsData.extend(self.swap_sequence) self._send_rpc_request(request) self.swap_sequence.clear() self.update_swap_display() def abort_swap(self): print("Requesting to abort swap...") request = rpcRequest( ts=int(time.time()), jobId=f"job_{int(time.time())}", jobType=jobType_e.JOBTYPE_TRANSACTION_ABORT ) self._send_rpc_request(request) self.swap_sequence.clear() self.update_swap_display() self.swap_button_clicks = {i: 0 for i in range(1, 10)} for i, btn in self.swap_buttons.items(): btn.setText(str(i)) btn.setStyleSheet("") self.update_swap_buttons_state() def send_audio_command(self): """ Constructs and sends an RPC request to change the station's language. """ language_name = self.audio_combo.currentText() language_code = self.AUDIO_LANGUAGES.get(language_name) if not language_code: self.log_to_instance_view(f"Error: Could not find code for language '{language_name}'.") return # Dynamically find the correct languageType enum value from the .proto file # (e.g., "English" becomes "LANGUAGE_TYPE_ENGLISH") enum_name = f"LANGUAGE_TYPE_{language_name.upper()}" try: language_enum = languageType_e.Value(enum_name) except ValueError: self.log_to_instance_view(f"Error: Invalid language enum '{enum_name}' not found in .proto file.") return # Create the rpcRequest object, similar to your abort_swap function request = rpcRequest( ts=int(time.time()), jobId=f"lang_{int(time.time())}", jobType=jobType_e.JOBTYPE_LANGUAGE_UPDATE, languageType=language_enum # Add the specific language payload ) # Use your existing helper method to send the request self._send_rpc_request(request) # Log to the UI and show a confirmation pop-up self.log_to_instance_view(f"Sent RPC to set language to {language_name} (Job ID: {request.jobId})") QMessageBox.information(self, "RPC Sent", f"Request to change language to {language_name} has been sent.") def _send_rpc_request(self, request_payload): if not self.mqtt_client or not self.mqtt_client.client.is_connected(): QMessageBox.warning(self, "Not Connected", "Cannot send command. MQTT client is not connected.") return device_id = self.device_id_input.text() version = self.version_input.text() topic = f"VEC/{self.client_id_input.text()}/{version}/{device_id}/RPC/REQUEST" serialized_payload = request_payload.SerializeToString() data_dict = MessageToDict(request_payload, preserving_proto_field_name=True) json_payload = json.dumps(data_dict) self.log_request(topic, json_payload, log_type="OUTGOING_RPC") job_type = data_dict.get("jobType", "Unknown Job") self.log_to_instance_view(f"Command Sent: {job_type}") self.mqtt_client.publish_message(topic, serialized_payload) def handle_open_door(self, chamber_num): print(f"Requesting to open door for chamber {chamber_num}...") request = rpcRequest( ts=int(time.time()), jobId=f"job_{int(time.time())}", jobType=jobType_e.JOBTYPE_GATE_OPEN_CLOSE ) request.slotInfo.slotId = chamber_num request.slotInfo.state = 1 self._send_rpc_request(request) def handle_charger_control(self, chamber_num, state): action = "ON" if state else "OFF" print(f"Requesting to turn charger {action} for chamber {chamber_num}...") request = rpcRequest( ts=int(time.time()), jobId=f"job_{int(time.time())}", jobType=jobType_e.JOBTYPE_CHARGER_ENABLE_DISABLE ) request.slotInfo.slotId = chamber_num request.slotInfo.state = 1 if state else 0 self._send_rpc_request(request) def reset_dashboard_ui(self): self.log_request("INFO", "Dashboard UI cleared by user.") self.last_recv_ts_field.setText("No Data") self.sdc_field.setText("") for chamber in self.chamber_widgets: chamber.reset_to_default() self.update_diagnostic_alarms(0) self.swap_sequence.clear() self.update_swap_display() self.swap_button_clicks = {i: 0 for i in range(1, 10)} for i, btn in self.swap_buttons.items(): btn.setText(str(i)) btn.setStyleSheet("") self.update_swap_buttons_state() def connect_to_mqtt(self): if self.mqtt_thread and self.mqtt_thread.isRunning(): print("Cleaning up previous MQTT thread...") if self.mqtt_client: self.mqtt_client.disconnect_from_broker() self.mqtt_client.cleanup() self.mqtt_thread.quit() self.mqtt_thread.wait() if self.save_logs_checkbox.isChecked(): self.start_csv_logger() broker = self.broker_input.text() user = self.username_input.text() password = self.password_input.text() client_id = self.client_id_input.text() version = self.version_input.text() device_id = self.device_id_input.text() try: port = int(self.port_input.text()) except ValueError: self.timestamp_label.setText("Error: Port must be a number.") return # ========================================================== # ===== ADD THIS BLOCK TO SAVE YOUR SETTINGS =============== # ========================================================== self.settings.setValue("broker_address", broker) self.settings.setValue("port", str(port)) # Save port as a string self.settings.setValue("username", user) self.settings.setValue("password", password) self.settings.setValue("client_id", client_id) self.settings.setValue("version",version) self.settings.setValue("device_id", device_id) # Add any other settings you want to save self.settings.sync() # Force a write to disk immediately print("βœ… Configuration saved.") # ========================================================== self.mqtt_thread = QThread() self.mqtt_client = MqttClient(broker, port, user, password, client_id) self.mqtt_client.moveToThread(self.mqtt_thread) self.mqtt_client.stop_logging_signal.connect(self.csv_logger.stop_logging) self.mqtt_client.connection_status_changed.connect(self.on_connection_status_changed) self.mqtt_client.message_received.connect(self.on_message_received) self.mqtt_thread.started.connect(self.mqtt_client.connect_to_broker) self.mqtt_thread.start() def disconnect_from_mqtt(self): if self.csv_logger: self.stop_csv_logger() if self.mqtt_client: self.mqtt_client.disconnect_from_broker() def start_csv_logger(self): base_log_dir = self.log_dir_input.text() session_folder_name = f"session_{datetime.datetime.now().strftime('%Y-%m-%d_%H-%M-%S')}" self.logger_thread = QThread() # Pass BOTH the base directory and the session name to the logger self.csv_logger = CsvLogger(base_log_dir, session_folder_name) self.csv_logger.moveToThread(self.logger_thread) self.log_data_signal.connect(self.csv_logger.log_data) self.logger_thread.started.connect(self.csv_logger.start_logging) self.logger_thread.start() def stop_csv_logger(self): if self.logger_thread and self.logger_thread.isRunning(): self.csv_logger.stop_logging() self.logger_thread.quit() self.logger_thread.wait() self.csv_logger = None self.logger_thread = None def on_connection_status_changed(self, is_connected, message): """Handles connection status updates from the MQTT client.""" self.connect_button.setEnabled(not is_connected) self.disconnect_button.setEnabled(is_connected) self.set_config_inputs_enabled(not is_connected) # Set the text of the label to the message from the client self.timestamp_label.setText(message) self.log_to_instance_view(message.strip("βœ…βŒπŸ”ŒπŸ”΄ ")) if is_connected: client_id = self.client_id_input.text() version = self.version_input.text() device_id = self.device_id_input.text() periodic_topic = f"VEC/{client_id}/{version}/{device_id}/PERIODIC" events_topic = f"VEC/{client_id}/{version}/{device_id}/EVENTS" rpc_request_topic = f"VEC/{client_id}/{version}/{device_id}/RPC/REQUEST" # self.log_request(periodic_topic, "Subscribing to topic") self.mqtt_client.subscribe_to_topic(periodic_topic) # self.log_request(events_topic, "Subscribing to topic") self.mqtt_client.subscribe_to_topic(events_topic) # self.log_request(rpc_request_topic, "Subscribing to topic") self.mqtt_client.subscribe_to_topic(rpc_request_topic) self.tabs.setCurrentWidget(self.main_tab) def update_main_dashboard(self, data): try: ts = datetime.datetime.fromtimestamp(data.get('ts', datetime.datetime.now().timestamp())).strftime('%Y-%m-%d %H:%M:%S') self.last_recv_ts_field.setText(ts) slot_payloads = data.get("slotLevelPayload", []) for i, slot_data in enumerate(slot_payloads): if i < len(self.chamber_widgets): self.chamber_widgets[i].update_data(slot_data) if (i+1) in self.swap_buttons: is_present = slot_data.get("batteryPresent") == 1 self.swap_buttons[i+1].setStyleSheet("background-color: #2ecc71;" if is_present else "") sdc_value = data.get("stationDiagnosticCode", 0) self.sdc_field.setText(str(sdc_value)) self.update_diagnostic_alarms(sdc_value) backup_status = data.get("backupSupplyStatus", 0) # Default to 0 if not present if backup_status == 1: self.backup_supply_indicator.setText("Backup ON") self.backup_supply_indicator.setStyleSheet( """ QLabel { background-color: transparent; border: 2px solid #28a745; /* Green */ color: #28a745; border-radius: 5px; font-weight: bold; } """ ) # self.top_bar_frame.setStyleSheet("#topBarFrame { border: 1px solid #28a745; }") else: self.backup_supply_indicator.setText("Backup OFF") self.backup_supply_indicator.setStyleSheet( """ QLabel { background-color: transparent; border: 2px solid #dc3545; /* Red */ color: #dc3545; border-radius: 5px; font-weight: bold; } """ ) # self.top_bar_frame.setStyleSheet("#topBarFrame { border: 1px solid #dc3545; }") except Exception as e: print(f"Error updating dashboard: {e}") def _log_periodic_to_terminal(self, decoded_payload, data_dict): """Formats and prints the periodic data to the terminal as a clean table.""" try: current_time = datetime.datetime.fromtimestamp(decoded_payload.ts).strftime('%Y-%m-%d %H:%M:%S') device_id = data_dict.get("deviceId", "N/A") # --- Main Information --- print("\n\033[1m" + "="*50 + " PERIODIC DATA " + "="*50 + "\033[0m") print(f"\033[1m Timestamp:\033[0m {current_time} | \033[1mDevice ID:\033[0m {device_id}") print(f"\033[1m Backup Supply:\033[0m {decoded_payload.backupSupplyStatus} | \033[1mStation SDC:\033[0m {decoded_payload.stationDiagnosticCode}") print("-" * 120) # --- Table Header --- header = "| {:^7} | {:^18} | {:^8} | {:^8} | {:^7} | {:^10} | {:^10} | {:^12} | {:^10} |" print(header.format("Chamber", "Battery ID", "Present", "Charging", "SOC", "Voltage", "Current", "Temp (Β°C)", "Door")) print("-" * 120) # --- Table Rows --- row_format = "| {:^7} | {:<18} | {:^8} | {:^8} | {:>5}% | {:>8} V | {:>8} A | {:>10}Β°C | {:^10} |" for i, chamber in enumerate(data_dict.get("slotLevelPayload", []), start=1): print(row_format.format( i, chamber.get('batteryIdentification', 'N/A'), "βœ…" if chamber.get("batteryPresent") == 1 else "❌", "βœ…" if chamber.get("chargingStatus") == 1 else "❌", chamber.get('soc', 'N/A'), chamber.get('batVoltage', 'N/A'), chamber.get('current', 'N/A'), chamber.get('batteryTemp', 'N/A'), "OPEN" if chamber.get("doorStatus") == 1 else "CLOSED" )) print("=" * 120 + "\n") except Exception as e: print(f"Error printing periodic log to terminal: {e}") def update_diagnostic_alarms(self, sdc_value): for i, error_text in enumerate(self.DIAGNOSTIC_ERRORS): is_alarm_active = (sdc_value >> i) & 1 label = self.diag_labels[error_text] if is_alarm_active: label.setProperty("alarm", "active") else: label.setProperty("alarm", "inactive") label.style().unpolish(label) label.style().polish(label) def set_config_inputs_enabled(self, enabled): for w in self.config_tab.findChildren(QWidget): if isinstance(w, (QLineEdit, QPushButton, QCheckBox, QComboBox)): w.setEnabled(enabled) @pyqtSlot() # This new slot handles the disconnected signal def handle_disconnection(self): print("Main window sees disconnection, stopping logger if active.") if self.csv_logger and self.csv_logger.timer.isActive(): self.csv_logger.stop_logging() # You might also want to update UI elements here self.connect_button.setText("Connect") self.connection_status_label.setText("Disconnected") def closeEvent(self, event: QCloseEvent): """ Handles the window's close event to ensure a clean shutdown. """ print("--- Close event triggered. Shutting down gracefully... ---") if self.mqtt_thread and self.mqtt_thread.isRunning(): print(" > Stopping MQTT client...") # Tell the client to disconnect (which will stop its loop) if self.mqtt_client: self.mqtt_client.disconnect_from_broker() print(" > Quitting and waiting for MQTT thread...") self.mqtt_thread.quit() if not self.mqtt_thread.wait(5000): print(" > Warning: Thread did not terminate in time.") self.mqtt_thread.terminate() # The handle_disconnection slot will have already stopped the logger # if it was running. No need to call it again here. print("--- Shutdown complete. ---") event.accept()