import datetime
import json
import os
import sys
import time
import uuid
from functools import partial
from PyQt6.QtCore import pyqtSignal, QThread, Qt, QPropertyAnimation, QEasingCurve, QSettings, pyqtSlot
from PyQt6.QtGui import QIcon, QFont, QPixmap, QCloseEvent
from PyQt6.QtWidgets import (
QMainWindow, QWidget, QVBoxLayout, QHBoxLayout, QTabWidget,
QGroupBox, QFormLayout, QLineEdit, QPushButton, QLabel, QSpacerItem,
QSizePolicy, QGridLayout, QMessageBox, QComboBox, QPlainTextEdit,
QCheckBox, QFileDialog, QLayout, QFrame, QSizePolicy, QGraphicsOpacityEffect, QVBoxLayout, QTextBrowser, QScrollArea, QGraphicsDropShadowEffect
)
from PyQt6.QtSvgWidgets import QSvgWidget
from google.protobuf.json_format import MessageToDict
from math import floor
# Make sure your proto import is correct for your project structure
from proto.vec_payload_chgSt_pb2 import mainPayload as periodicData, eventPayload, rpcRequest, jobType_e, eventType_e, languageType_e
from .styles import get_light_theme_styles, get_dark_theme_styles
from .widgets import ChamberWidget
from core.mqtt_client import MqttClient
from core.csv_logger import CsvLogger
def resource_path(relative_path):
""" Get absolute path to resource, works for dev and for PyInstaller """
try:
# PyInstaller creates a temp folder and stores path in _MEIPASS
base_path = sys._MEIPASS
except Exception:
base_path = os.path.abspath(".")
return os.path.join(base_path, relative_path)
class MainWindow(QMainWindow):
log_data_signal = pyqtSignal(list)
def __init__(self, scale_factor=1.0):
super().__init__()
# self.setWindowIcon(QIcon("logo/v_logo.png"))
self.scale_factor = scale_factor
self.setWindowTitle("Battery Swap Station Dashboard v4.2")
self.setWindowIcon(QIcon(resource_path("assets/icon.ico")))
self.settings = QSettings("VECMOCON", "BatterySwapDashboard")
self.is_dark_theme = True
self.mqtt_thread = None
self.mqtt_client = None
self.swap_sequence = []
self.logger_thread = None
self.csv_logger = None
self.swap_button_clicks = {i: 0 for i in range(1, 10)}
self.animation = None
self.instance_log_area = None
self.DIAGNOSTIC_ERRORS = [
"Lock Power Cut Failure",
"Main Power Cut Failure",
"Relayboard Can Failure",
"DB Can Failure",
"MB Can Reception Failure",
"Smoke Alarm",
"Water Alarm",
"Phase Failure",
"Earth Leakage"
]
self.AUDIO_LANGUAGES = {"English": 1, "Hindi": 2, "Kannada": 3, "Telugu": 4, "Tamil": 5}
self.central_widget = QWidget()
self.setCentralWidget(self.central_widget)
self.main_layout = QVBoxLayout(self.central_widget)
self.main_layout.setContentsMargins(0, 0, 0, 0)
self.main_layout.setSpacing(0)
self.create_status_bar()
self.tabs = QTabWidget()
self.main_layout.addWidget(self.tabs)
self.config_tab = QWidget()
self.main_tab = QWidget()
self.logs_tab = QWidget()
self.help_tab = QWidget()
self.about_tab = QWidget()
self.tabs.addTab(self.config_tab, "Config")
self.tabs.addTab(self.main_tab, "Main")
self.tabs.addTab(self.logs_tab, "Logs")
self.tabs.addTab(self.help_tab, "Help")
self.tabs.addTab(self.about_tab, "About")
self.setup_config_ui()
self.setup_main_ui()
self.setup_logs_ui()
self.setup_help_ui()
self.setup_about_ui()
self.load_settings()
self._apply_theme()
def setup_help_ui(self):
"""
Help page (theme-driven):
- Card-style container with title
- Sections: Quick Start, Tips, Warnings, Troubleshooting
- Scrollable host
"""
# get or create root layout (never replace/delete the layout itself)
root = self.help_tab.layout()
if root is None:
root = QVBoxLayout()
root.setContentsMargins(12, 12, 12, 12)
root.setSpacing(10)
self.help_tab.setLayout(root)
else:
# clear previous contents
while root.count():
it = root.takeAt(0)
w = it.widget()
if w:
w.setParent(None)
w.deleteLater()
# scroll host
scroll = QScrollArea()
scroll.setWidgetResizable(True)
scroll.setFrameShape(QFrame.Shape.NoFrame)
host = QWidget()
host_lay = QVBoxLayout(host)
host_lay.setContentsMargins(0, 0, 0, 0)
host_lay.setSpacing(12)
# card (styled by theme via objectNames)
card = QFrame()
card.setObjectName("helpCard")
card_lay = QVBoxLayout(card)
card_lay.setContentsMargins(18, 16, 18, 16)
card_lay.setSpacing(12)
# header
title = QLabel("Help & User Guide"); title.setObjectName("helpTitle")
subtitle = QLabel("Follow these steps to get connected, monitor the station, and troubleshoot issues.")
subtitle.setObjectName("helpSubtitle")
card_lay.addWidget(title)
card_lay.addWidget(subtitle)
div1 = QFrame(); div1.setObjectName("helpDivider")
card_lay.addWidget(div1)
# Quick Start
qs_title = QLabel("Quick Start"); qs_title.setObjectName("sectionTitle")
qs = QLabel(
"
"
"- Configure: Open Config, fill MQTT details, click Connect.
"
"- Verify: Status bar should show Connected.
"
"- Monitor: Use Main to view live slots.
"
"- Swap: Start only on a slot marked Available.
"
"- Logs: See history and export in Logs.
"
"
"
)
qs.setOpenExternalLinks(True)
qs.setObjectName("bodyText")
card_lay.addWidget(qs_title)
card_lay.addWidget(qs)
# Tips
tip_box = QFrame(); tip_box.setObjectName("tipBox")
tip_lay = QVBoxLayout(tip_box); tip_lay.setContentsMargins(12, 10, 12, 10)
tip_h = QLabel("π‘ Tips"); tip_h.setObjectName("tipTitle")
tip_b = QLabel(
""
"- Use a stable, low-latency network for best live updates.
"
"- Keep Device ID consistent with the configured topic.
"
"- Use Logs β Export before clearing or reinstalling.
"
"
"
)
tip_b.setOpenExternalLinks(True)
tip_b.setObjectName("tipText")
tip_lay.addWidget(tip_h)
tip_lay.addWidget(tip_b)
card_lay.addWidget(tip_box)
# Warnings
warn_box = QFrame(); warn_box.setObjectName("warnBox")
warn_lay = QVBoxLayout(warn_box); warn_lay.setContentsMargins(12, 10, 12, 10)
warn_h = QLabel("β οΈ Important Warnings"); warn_h.setObjectName("warnTitle")
warn_b = QLabel(
""
"- Disconnected state means no live data. Check server, network, and credentials.
"
"- Start swaps only on slots marked Available to avoid conflicts.
"
"- Commands during disconnect may be lost. Reconnect before critical actions.
"
"
"
)
warn_b.setObjectName("warnText")
warn_lay.addWidget(warn_h)
warn_lay.addWidget(warn_b)
card_lay.addWidget(warn_box)
# Troubleshooting
tr_title = QLabel("Troubleshooting"); tr_title.setObjectName("sectionTitle")
tr = QLabel(
""
"- Cannot connect: Verify broker/port, username/password, and firewall.
"
"- No messages: Check Device ID and topic version; ensure the device is publishing.
"
"- UI not updating: See Logs for errors; restart after changing config.
"
"- Wrong station showing: Confirm the Device ID matches the station you expect.
"
"
"
)
tr.setObjectName("bodyText")
card_lay.addWidget(tr_title)
card_lay.addWidget(tr)
# footer
div2 = QFrame(); div2.setObjectName("helpDivider")
card_lay.addWidget(div2)
foot = QLabel("Need help? Email "
""
"kirubakaran@vecmocon.com")
foot.setOpenExternalLinks(True)
foot.setObjectName("bodyText")
card_lay.addWidget(foot)
# assemble
host_lay.addWidget(card)
host_lay.addStretch(1)
scroll.setWidget(host)
root.addWidget(scroll)
def _about_stylesheet() -> str:
return """
/* Card */
#aboutCard {
background: #121417;
border: 1px solid #2a2f36;
border-radius: 16px;
}
/* Title + subtitle */
#title {
font-size: 22px;
font-weight: 700;
color: #e8edf2;
letter-spacing: 0.2px;
}
#subtitle {
font-size: 13px;
font-weight: 500;
color: #9aa4b2;
}
/* Divider */
#divider {
background: qlineargradient(x1:0, y1:0, x2:1, y2:0,
stop:0 #1b1f24, stop:0.5 #2a2f36, stop:1 #1b1f24);
min-height: 1px;
max-height: 1px;
border: none;
margin: 6px 0 8px 0;
}
/* Small label on the left column */
#kvLabel {
color: #aab4c0;
font-weight: 600;
}
/* Pill badge */
#badge {
color: #eaf3ff;
background: qlineargradient(x1:0, y1:0, x2:1, y2:0,
stop:0 #2563eb, stop:1 #7c3aed);
border-radius: 999px;
padding: 5px 12px;
font-weight: 700;
letter-spacing: 0.3px;
}
/* Links */
QLabel { color: #d7dde6; }
QLabel:hover { text-decoration: none; }
QLabel[link='true'] { color: #6aa8ff; }
QLabel[link='true']:hover { color: #8fc2ff; text-decoration: underline; }
/* Footer */
#footer {
color: #7e8895;
font-size: 12px;
}
"""
def setup_about_ui(self):
# get or create root layout
root = self.about_tab.layout()
if root is None:
root = QVBoxLayout()
root.setContentsMargins(24, 24, 24, 24)
root.setAlignment(Qt.AlignmentFlag.AlignTop | Qt.AlignmentFlag.AlignHCenter)
self.about_tab.setLayout(root)
else:
# clear existing items
while root.count():
item = root.takeAt(0)
w = item.widget()
if w:
w.setParent(None)
w.deleteLater()
# --- card ---
card = QFrame()
card.setObjectName("aboutCard")
card.setMinimumWidth(560)
card.setMaximumWidth(760)
card.setSizePolicy(QSizePolicy.Policy.Preferred, QSizePolicy.Policy.Maximum)
shadow = QGraphicsDropShadowEffect(blurRadius=28, xOffset=0, yOffset=12)
shadow.setColor(Qt.GlobalColor.black)
card.setGraphicsEffect(shadow)
card_layout = QVBoxLayout(card)
card_layout.setContentsMargins(28, 22, 28, 22)
card_layout.setSpacing(14)
# --- header row (logo + titles + badge) ---
header = QHBoxLayout()
header.setSpacing(12)
# optional logo (put a 48px logo at ./logo/app.png if you have one)
logo_label = QLabel()
logo_path = resource_path("logo/v_logo.png")
pix = QPixmap(logo_path)
if not pix.isNull():
logo_label.setPixmap(pix.scaled(40, 40, Qt.AspectRatioMode.KeepAspectRatio, Qt.TransformationMode.SmoothTransformation))
logo_label.setFixedSize(44, 44)
header.addWidget(logo_label, 0, Qt.AlignmentFlag.AlignTop)
title_box = QVBoxLayout()
title = QLabel("About This Application")
title.setObjectName("title")
subtitle = QLabel("Battery Swap Station Dashboard")
subtitle.setObjectName("subtitle")
title_box.addWidget(title)
title_box.addWidget(subtitle)
header.addLayout(title_box, 1)
badge = QLabel("Version 4.2")
badge.setObjectName("badge")
header.addWidget(badge, 0, Qt.AlignmentFlag.AlignRight | Qt.AlignmentFlag.AlignVCenter)
card_layout.addLayout(header)
# --- divider ---
divider = QFrame()
divider.setObjectName("divider")
card_layout.addWidget(divider)
# --- keyβvalue grid ---
grid = QGridLayout()
grid.setHorizontalSpacing(18)
grid.setVerticalSpacing(10)
def add_row(r: int, key: str, value: str, is_link: bool = False):
k = QLabel(key)
k.setObjectName("kvLabel")
v = QLabel(value)
v.setWordWrap(False) # instead of True
if is_link:
v.setTextFormat(Qt.TextFormat.RichText)
v.setTextInteractionFlags(Qt.TextInteractionFlag.TextBrowserInteraction) # links + selection + keyboard nav
v.setOpenExternalLinks(True)
v.setProperty("link", True)
else:
v.setTextInteractionFlags(Qt.TextInteractionFlag.TextSelectableByMouse)
grid.addWidget(k, r, 0, Qt.AlignmentFlag.AlignRight | Qt.AlignmentFlag.AlignTop)
grid.addWidget(v, r, 1, Qt.AlignmentFlag.AlignLeft)
add_row(0, "Company", "VECMOCON TECHNOLOGIES")
add_row(1, "Developed by", "Kirubakaran S")
add_row(2, "Support",
"kirubakaran@vecmocon.com",
is_link=True)
add_row(3, "Website",
"www.vecmocon.com",
is_link=True)
card_layout.addLayout(grid)
# --- tiny space + footer ---
spacer = QFrame(); spacer.setFixedHeight(4)
card_layout.addWidget(spacer)
footer = QLabel("Β© 2025 VECMOCON TECHNOLOGIES. All rights reserved.")
footer.setObjectName("footer")
card_layout.addWidget(footer, 0, Qt.AlignmentFlag.AlignHCenter)
root.addWidget(card, 0, Qt.AlignmentFlag.AlignHCenter)
def load_settings(self):
broker = self.settings.value("broker_address", "DEFAULT_BROKER")
port = self.settings.value("port", "DEFAULT_PORT")
username = self.settings.value("username", "DEFAULT_USERNAME")
password = self.settings.value("password", "DEFAULT_PASSWORD")
client_id = self.settings.value("client_id", "DEFAULT_CLIENT_ID")
version = self.settings.value("version", "DEFAULT_VERSION")
device_id = self.settings.value("device_id", "DEFAULT_DEVICE_ID")
self.broker_input.setText(broker)
self.port_input.setText(port)
self.username_input.setText(username)
self.password_input.setText(password)
self.client_id_input.setText(client_id)
self.version_input.setText(version)
self.device_id_input.setText(device_id)
self.settings.sync()
# --- START: New Decoding Logic Integration ---
def _read_varint(self, b: bytes, i: int):
"""Helper to read a varint from a raw byte buffer."""
shift = 0
val = 0
while True:
if i >= len(b): raise ValueError("truncated varint")
c = b[i]
i += 1
val |= (c & 0x7F) << shift
if not (c & 0x80): break
shift += 7
if shift > 64: raise ValueError("varint too long")
return val, i
def _skip_field(self, b: bytes, i: int, wt: int):
"""Helper to skip a field in the buffer based on its wire type."""
if wt == 0: # VARINT
_, i = self._read_varint(b, i)
return i
if wt == 1: # 64-BIT
return i + 8
if wt == 2: # LENGTH-DELIMITED
ln, i = self._read_varint(b, i)
return i + ln
if wt == 5: # 32-BIT
return i + 4
raise ValueError(f"unsupported wire type to skip: {wt}")
def _extract_field3_varint(self, b: bytes):
"""Manually parses the byte string to find the integer value of field number 3."""
i = 0
n = len(b)
while i < n:
key, i2 = self._read_varint(b, i)
wt = key & 0x7
fn = key >> 3
i = i2
if fn == 3 and wt == 0:
v, _ = self._read_varint(b, i)
return v
i = self._skip_field(b, i, wt)
return None
def _decode_event_payload(self, payload_bytes: bytes) -> str:
"""
Decodes an event payload robustly, ensuring the correct eventType is used.
"""
# 1. Standard parsing to get a base dictionary
msg = eventPayload()
msg.ParseFromString(payload_bytes)
d = MessageToDict(msg, preserving_proto_field_name=True)
# 2. Manually extract the true enum value from the raw bytes (Authoritative value)
wire_num = self._extract_field3_varint(payload_bytes)
wire_name = None
if wire_num is not None:
try:
# Find the string name corresponding to the integer value
wire_name = eventType_e.Name(wire_num)
except ValueError:
# If the number is valid but not in our .proto file, use the number itself
wire_name = f"UNKNOWN_ENUM_VALUE_{wire_num}"
# 3. Always prefer the manually extracted "wire value"
if wire_name:
d["eventType"] = wire_name
# 4. Ensure consistent structure with default values
ed = d.setdefault("eventData", {})
ed.setdefault("nfcData", None)
ed.setdefault("batteryIdentification", "")
ed.setdefault("activityFailureReason", 0)
ed.setdefault("swapAbortReason", "ABORT_UNKNOWN")
ed.setdefault("swapTime", 0)
ed.setdefault("faultCode", 0)
ed.setdefault("doorStatus", 0)
ed.setdefault("slotId", 0)
# 5. Reorder for clean logs and return as a formatted JSON string
ordered = {
"ts": d.get("ts"),
"deviceId": d.get("deviceId"),
"eventType": d.get("eventType"),
"sessionId": d.get("sessionId"),
"eventData": d.get("eventData"),
}
return json.dumps(ordered, indent=2, ensure_ascii=False)
# --- END: New Decoding Logic Integration ---
def _apply_theme(self):
if self.is_dark_theme:
self.theme_button.setText("βοΈ")
self.theme_button.setToolTip("Switch to Light Theme")
self.setStyleSheet(get_dark_theme_styles(self.scale_factor))
else:
self.theme_button.setText("π")
self.theme_button.setToolTip("Switch to Dark Theme")
self.setStyleSheet(get_light_theme_styles(self.scale_factor))
self.timestamp_label.setStyleSheet("color: #ecf0f1; background-color: transparent;")
self.apply_config_tab_styles()
def _on_fade_finished(self, overlay):
"""Safely delete the overlay widget after the animation is done."""
overlay.deleteLater()
self.animation = None # Clear the animation reference
def toggle_theme(self):
"""Toggles the UI theme with a smooth cross-fade animation."""
# Prevent starting a new animation if one is already running
if self.animation and self.animation.state() == self.animation.State.Running:
return
# 1. Take a "screenshot" of the current UI
pixmap = self.central_widget.grab()
# 2. Create a temporary overlay label with that screenshot
overlay = QLabel(self.central_widget)
overlay.setPixmap(pixmap)
overlay.setGeometry(self.central_widget.geometry())
overlay.show()
overlay.raise_()
# 3. Immediately apply the new theme underneath the overlay
self.is_dark_theme = not self.is_dark_theme
self._apply_theme()
self.setup_about_ui()
self.setup_help_ui()
# 4. Set up the opacity effect and animation for the overlay
opacity_effect = QGraphicsOpacityEffect(overlay)
overlay.setGraphicsEffect(opacity_effect)
self.animation = QPropertyAnimation(opacity_effect, b"opacity")
self.animation.setDuration(500) # Animation duration in milliseconds
self.animation.setStartValue(1.0)
self.animation.setEndValue(0.0)
self.animation.setEasingCurve(QEasingCurve.Type.InOutQuad)
# 5. Connect the animation's finish signal to our cleanup method
# We use a lambda to pass the overlay widget to the cleanup function
self.animation.finished.connect(lambda: self._on_fade_finished(overlay))
# 6. Start the fade-out animation
self.animation.start(self.animation.DeletionPolicy.DeleteWhenStopped)
def create_status_bar(self):
status_bar_widget = QWidget()
status_bar_widget.setStyleSheet(f"background-color: #2c3e50; padding: {int(6*self.scale_factor)}px;")
status_bar_layout = QHBoxLayout(status_bar_widget)
status_bar_layout.setContentsMargins(10, 0, 10, 0)
left_layout = QHBoxLayout()
logo_label = QLabel("BSS Dashboard")
logo_label.setFont(QFont("Arial", max(9, int(11 * self.scale_factor)), QFont.Weight.Bold))
logo_label.setStyleSheet("color: #ecf0f1; background-color: transparent;")
self.timestamp_label = QLabel("Last Update: N/A")
self.timestamp_label.setFont(QFont("Arial", max(8, int(9 * self.scale_factor))))
left_layout.addWidget(logo_label)
left_layout.addWidget(self.timestamp_label)
left_layout.addStretch()
logo_path = resource_path("logo/vec_logo_svg.svg")
company_logo = QSvgWidget(logo_path)
company_logo.setStyleSheet("background: transparent;")
ds = company_logo.renderer().defaultSize()
target_h = max(24, int(36 * self.scale_factor))
target_w = int(ds.width() * (target_h / ds.height())) if ds.height() > 0 else target_h
company_logo.setFixedSize(target_w, target_h)
right_layout = QHBoxLayout()
right_layout.addStretch()
self.connect_button = QPushButton("Connect")
self.disconnect_button = QPushButton("Disconnect")
button_font_size = max(10, int(12 * self.scale_factor))
button_stylesheet = f"""
QPushButton {{
font-size: {button_font_size}px;
font-weight: bold;
padding: 4px 14px;
background-color: #3498db;
}}
"""
self.connect_button.setStyleSheet(button_stylesheet)
self.disconnect_button.setStyleSheet(button_stylesheet)
self.connect_button.setObjectName("ConnectButton")
self.disconnect_button.setObjectName("DisconnectButton")
self.disconnect_button.setEnabled(False)
self.theme_button = QPushButton("π")
btn_size = max(28, int(35 * self.scale_factor))
self.theme_button.setFixedSize(btn_size, btn_size)
self.theme_button.setFont(QFont("Arial", max(10, int(14 * self.scale_factor))))
self.theme_button.setStyleSheet("border: none; background-color: transparent; color: white;")
self.theme_button.clicked.connect(self.toggle_theme)
right_layout.addWidget(self.connect_button)
right_layout.addWidget(self.disconnect_button)
right_layout.addWidget(self.theme_button)
status_bar_layout.addLayout(left_layout, 1)
status_bar_layout.addWidget(company_logo, 1)
status_bar_layout.addLayout(right_layout, 1)
self.main_layout.addWidget(status_bar_widget)
self.connect_button.clicked.connect(self.connect_to_mqtt)
self.disconnect_button.clicked.connect(self.disconnect_from_mqtt)
def apply_config_tab_styles(self):
font_size_group = max(10, int(12 * self.scale_factor))
font_size_widgets = max(9, int(10 * self.scale_factor))
text_color = "#f0f0f0" if self.is_dark_theme else "#000"
self.config_tab.setStyleSheet(f"""
QGroupBox {{ font-size: {font_size_group}pt; font-weight: bold; }}
QLabel, QLineEdit, QPushButton, QCheckBox {{
font-size: {font_size_widgets}pt;
color: {text_color};
}}
""")
def setup_config_ui(self):
layout = QVBoxLayout(self.config_tab)
layout.setAlignment(Qt.AlignmentFlag.AlignTop)
layout.setSpacing(20)
layout.setContentsMargins(20, 20, 20, 20)
self.apply_config_tab_styles()
mqtt_group = QGroupBox("MQTT Server Credentials")
form_layout = QFormLayout()
self.broker_input = QLineEdit("mqtt.vecmocon.com")
self.port_input = QLineEdit("1883")
self.username_input = QLineEdit("mqtt_username")
self.password_input = QLineEdit("mqtt_password")
self.password_input.setEchoMode(QLineEdit.EchoMode.Password)
form_layout.addRow("Broker Address:", self.broker_input)
form_layout.addRow("Port:", self.port_input)
form_layout.addRow("Username:", self.username_input)
form_layout.addRow("Password:", self.password_input)
mqtt_group.setLayout(form_layout)
topic_group = QGroupBox("Topic Information")
form_layout_topic = QFormLayout()
self.client_id_input = QLineEdit("batterySmartStation")
self.version_input = QLineEdit("100")
self.device_id_input = QLineEdit("V16000862287077265957")
form_layout_topic.addRow("Client ID:", self.client_id_input)
form_layout_topic.addRow("Version:", self.version_input)
form_layout_topic.addRow("Device ID:", self.device_id_input)
# --- NEW: Add display fields for the generated topics ---
line_separator = QFrame()
line_separator.setFrameShape(QFrame.Shape.HLine)
line_separator.setFrameShadow(QFrame.Shadow.Sunken)
form_layout_topic.addRow(line_separator)
# form_layout_topic.addRow(QLabel("")) # Separator
form_layout_topic.addRow(QLabel("Generated Topis π‘"))
self.periodic_topic_display = QLineEdit()
self.periodic_topic_display.setReadOnly(True)
self.periodic_topic_display.setObjectName("TopicDisplay")
self.events_topic_display = QLineEdit()
self.events_topic_display.setReadOnly(True)
self.events_topic_display.setObjectName("TopicDisplay")
self.rpcRequest_topic_display = QLineEdit()
self.rpcRequest_topic_display.setReadOnly(True)
self.rpcRequest_topic_display.setObjectName("TopicDisplay")
form_layout_topic.addRow("Periodic Topic:", self.periodic_topic_display)
form_layout_topic.addRow("Events Topic:", self.events_topic_display)
form_layout_topic.addRow("RPC Request Topic:", self.rpcRequest_topic_display)
topic_group.setLayout(form_layout_topic)
# --- NEW: Connect input field changes to the update method ---
self.client_id_input.textChanged.connect(self._update_topic_display)
self.version_input.textChanged.connect(self._update_topic_display)
self.device_id_input.textChanged.connect(self._update_topic_display)
logs_group = QGroupBox("Save Logs to CSV")
# ... (the rest of the method is the same)
logs_form_layout = QFormLayout()
self.save_logs_checkbox = QCheckBox("Enable Logging")
self.save_logs_checkbox.setChecked(True)
default_log_filename = f"Station_Mqtt_Dashboard_log_{datetime.datetime.now().strftime('%Y-%m-%d_%H-%M-%S')}.csv"
# self.log_filename_input = QLineEdit(default_log_filename)
log_dir_layout = QHBoxLayout()
script_dir = self.get_base_path()
default_log_dir = os.path.join(script_dir, "logs")
self.log_dir_input = QLineEdit(default_log_dir)
browse_btn = QPushButton("Browse")
# Add these lines after them to disable them:
self.log_dir_input.setEnabled(False)
self.log_dir_input.setToolTip("This is now set automatically to a 'logs' folder next to the application.")
browse_btn.clicked.connect(self.select_log_directory)
log_dir_layout.addWidget(self.log_dir_input)
log_dir_layout.addWidget(browse_btn)
logs_form_layout.addRow(self.save_logs_checkbox)
# logs_form_layout.addRow("Log Filename:", self.log_filename_input)
logs_form_layout.addRow("Output Directory:", log_dir_layout)
logs_group.setLayout(logs_form_layout)
layout.addWidget(mqtt_group)
layout.addWidget(topic_group)
layout.addWidget(logs_group)
# --- NEW: Call the update function once to set initial values ---
self._update_topic_display()
def select_log_directory(self):
directory = QFileDialog.getExistingDirectory(self, "Select Log Directory")
if directory:
self.log_dir_input.setText(directory)
def _update_topic_display(self):
"""Constructs and displays the full MQTT topics based on user input."""
client_id = self.client_id_input.text()
version = self.version_input.text()
device_id = self.device_id_input.text()
# Construct the topic strings
periodic_topic = f"VEC/{client_id}/{version}/{device_id}/PERIODIC"
events_topic = f"VEC/{client_id}/{version}/{device_id}/EVENTS"
rpcRequest_topic = f"VEC/{client_id}/{version}/{device_id}/RPC/REQUEST"
# Update the read-only display fields
self.periodic_topic_display.setText(periodic_topic)
self.events_topic_display.setText(events_topic)
self.rpcRequest_topic_display.setText(rpcRequest_topic)
def setup_main_ui(self):
page_layout = QVBoxLayout(self.main_tab)
page_layout.setContentsMargins(
int(8 * self.scale_factor), int(8 * self.scale_factor),
int(8 * self.scale_factor), int(8 * self.scale_factor)
)
self.top_bar_frame = QFrame()
self.top_bar_frame.setObjectName("topBarFrame")
top_bar_layout = QHBoxLayout()
ts_label = QLabel("LAST RECV TS:")
ts_label.setObjectName("TimestampTitleLabel")
top_bar_layout.addWidget(ts_label)
self.last_recv_ts_field = QLineEdit("No Data")
self.last_recv_ts_field.setReadOnly(True)
self.last_recv_ts_field.setObjectName("TimestampDataField")
top_bar_layout.addWidget(self.last_recv_ts_field)
top_bar_layout.addSpacerItem(QSpacerItem(20, 20, QSizePolicy.Policy.Fixed, QSizePolicy.Policy.Minimum))
# top_bar_layout.addWidget(QLabel("Backup Supply:"))
self.backup_supply_indicator = QLabel("Backup")
self.backup_supply_indicator.setFixedSize(80, 25)
self.backup_supply_indicator.setAlignment(Qt.AlignmentFlag.AlignCenter)
self.backup_supply_indicator.setStyleSheet(
"""
QLabel {
background-color: transparent;
border: 2px solid gray;
color: #E0E0E0;
border-radius: 5px;
font-weight: bold;
}
"""
)
top_bar_layout.addWidget(self.backup_supply_indicator)
refresh_btn = QPushButton("β³ Refresh")
refresh_btn.setObjectName("RefreshButton")
refresh_btn.clicked.connect(self.reset_dashboard_ui)
top_bar_layout.addWidget(refresh_btn)
reset_btn = QPushButton("Station Reset")
reset_btn.setObjectName("ResetButton")
reset_btn.clicked.connect(self.confirm_station_reset)
top_bar_layout.addWidget(reset_btn)
main_content_layout = QHBoxLayout()
self.chamber_widgets = []
grid_widget = QWidget()
grid_layout = QGridLayout(grid_widget)
grid_layout.setSpacing(max(5, int(8 * self.scale_factor)))
for i in range(9):
chamber = ChamberWidget(f"CHAMBER - {i+1}", self.scale_factor)
chamber_num = i + 1
chamber.open_door_requested.connect(partial(self.handle_open_door, chamber_num))
chamber.chg_on_requested.connect(partial(self.handle_charger_control, chamber_num, True))
chamber.chg_off_requested.connect(partial(self.handle_charger_control, chamber_num, False))
self.chamber_widgets.append(chamber)
row, col = divmod(i, 3)
grid_layout.addWidget(chamber, row, col)
diag_panel = QWidget()
diag_panel_layout = QVBoxLayout(diag_panel)
alarms_group = QGroupBox("System Diagnostics")
alarms_group.setFont(QFont("Arial", max(9, int(11*self.scale_factor)), QFont.Weight.Bold))
alarms_layout = QVBoxLayout(alarms_group)
alarms_layout.setSpacing(max(5, int(8 * self.scale_factor)))
sdc_layout = QHBoxLayout()
sdc_layout.addWidget(QLabel("SDC Value:"))
self.sdc_field = self._create_main_status_field()
sdc_layout.addWidget(self.sdc_field)
alarms_layout.addLayout(sdc_layout)
self.diag_labels = {}
for error_text in self.DIAGNOSTIC_ERRORS:
label = QLabel(error_text)
label.setProperty("alarm", "inactive")
label.setAlignment(Qt.AlignmentFlag.AlignCenter)
label.setFont(QFont("Arial", max(8, int(10 * self.scale_factor))))
alarms_layout.addWidget(label)
self.diag_labels[error_text] = label
swap_group = QGroupBox("Swap Process")
swap_group.setFont(QFont("Arial", max(9, int(11*self.scale_factor)), QFont.Weight.Bold))
swap_layout = QVBoxLayout(swap_group)
swap_layout.setSpacing(max(4, int(6 * self.scale_factor)))
swap_layout.setContentsMargins(
int(8 * self.scale_factor), int(8 * self.scale_factor),
int(8 * self.scale_factor), int(8 * self.scale_factor)
)
top_pad = QSpacerItem(
0,
max(16, int(22 * self.scale_factor)),
QSizePolicy.Policy.Minimum,
QSizePolicy.Policy.Fixed
)
swap_layout.addItem(top_pad)
swap_display_layout = QHBoxLayout()
self.swap_display = QLineEdit()
self.swap_display.setReadOnly(True)
self.swap_display.setPlaceholderText("Click to build sequence...")
swap_display_layout.addWidget(self.swap_display)
swap_layout.addLayout(swap_display_layout)
swap_grid = QGridLayout()
swap_grid.setSpacing(int(2 * self.scale_factor))
swap_grid.setContentsMargins(0, 0, 0, 0)
swap_grid.setSizeConstraint(QLayout.SizeConstraint.SetFixedSize)
self.swap_buttons = {}
btn_size = max(40, int(60 * self.scale_factor))
for i in range(1, 10):
btn = QPushButton(str(i))
btn.setFont(QFont("Arial", max(12, int(14*self.scale_factor))))
btn.setFixedSize(btn_size, btn_size)
btn.setSizePolicy(QSizePolicy.Policy.Fixed, QSizePolicy.Policy.Fixed)
btn.clicked.connect(self.on_swap_button_clicked)
self.swap_buttons[i] = btn
row, col = divmod(i-1, 3)
swap_grid.addWidget(btn, row, col, Qt.AlignmentFlag.AlignCenter)
grid_container_layout = QHBoxLayout()
grid_container_layout.addStretch(1)
grid_container_layout.addLayout(swap_grid)
grid_container_layout.addStretch(1)
swap_layout.addLayout(grid_container_layout)
swap_layout.addStretch(1)
button_row_layout = QHBoxLayout()
button_row_layout.setSpacing(max(4, int(6 * self.scale_factor)))
self.start_swap_btn = QPushButton("Start Swap")
self.start_swap_btn.setObjectName("StartSwapButton")
self.start_swap_btn.clicked.connect(self.start_swap)
self.abort_swap_btn = QPushButton("Abort Swap")
self.abort_swap_btn.setObjectName("AbortSwapButton")
self.abort_swap_btn.clicked.connect(self.abort_swap)
for btn in (self.start_swap_btn, self.abort_swap_btn):
btn.setSizePolicy(QSizePolicy.Policy.Expanding, QSizePolicy.Policy.Fixed)
btn.setMinimumHeight(max(28, int(36 * self.scale_factor)))
button_row_layout.addWidget(self.start_swap_btn)
button_row_layout.addWidget(self.abort_swap_btn)
swap_layout.addLayout(button_row_layout)
self.update_swap_buttons_state()
log_group = QGroupBox("Instance Log")
log_group.setFont(QFont("Arial", max(9, int(11*self.scale_factor)), QFont.Weight.Bold))
log_layout = QVBoxLayout(log_group)
self.instance_log_area = QPlainTextEdit()
self.instance_log_area.setReadOnly(True)
self.instance_log_area.setObjectName("InstanceLog")
log_layout.addWidget(self.instance_log_area)
audio_group = QGroupBox("Audio Command")
audio_group.setFont(QFont("Arial", max(9, int(11*self.scale_factor)), QFont.Weight.Bold))
audio_layout = QHBoxLayout(audio_group)
self.audio_combo = QComboBox()
self.audio_combo.addItems(self.AUDIO_LANGUAGES.keys())
send_audio_btn = QPushButton("β€")
send_audio_btn.setObjectName("SendAudioButton")
btn_size = max(28, int(35 * self.scale_factor))
send_audio_btn.setFixedSize(btn_size, btn_size)
send_audio_btn.clicked.connect(self.send_audio_command)
audio_layout.addWidget(self.audio_combo)
audio_layout.addWidget(send_audio_btn)
diag_panel_layout.addWidget(alarms_group, 2)
diag_panel_layout.addWidget(swap_group, 3)
diag_panel_layout.addWidget(audio_group, 1)
diag_panel_layout.addWidget(log_group, 4)
main_content_layout.addWidget(grid_widget, 1)
main_content_layout.addWidget(diag_panel, 0)
self.top_bar_frame.setLayout(top_bar_layout)
page_layout.addWidget(self.top_bar_frame)
page_layout.addLayout(main_content_layout)
def log_to_instance_view(self, message: str):
"""Adds a formatted message to the instance log on the main tab."""
if not self.instance_log_area:
return
timestamp = datetime.datetime.now().strftime("%H:%M:%S")
formatted_message = f"[{timestamp}] {message}"
self.instance_log_area.appendPlainText(formatted_message)
# Keep the log from growing forever (max 100 lines)
if self.instance_log_area.blockCount() > 100:
cursor = self.instance_log_area.textCursor()
cursor.movePosition(cursor.MoveOperation.Start)
cursor.select(cursor.SelectionType.BlockUnderCursor)
cursor.removeSelectedText()
def setup_logs_ui(self):
layout = QHBoxLayout(self.logs_tab)
# --- Setup the Request Logs area ---
request_group = QGroupBox("Request Logs (Dashboard -> MQTT Server)")
request_layout = QVBoxLayout(request_group)
self.request_log_area = QPlainTextEdit()
self.request_log_area.setReadOnly(True)
self.request_log_area.setObjectName("LogPanel")
request_layout.addWidget(self.request_log_area)
# --- Setup the Event Logs area ---
event_group = QGroupBox("Event Logs (MQTT Server -> Dashboard)")
event_layout = QVBoxLayout(event_group)
self.event_log_area = QPlainTextEdit()
self.event_log_area.setReadOnly(True)
self.event_log_area.setObjectName("LogPanel")
event_layout.addWidget(self.event_log_area)
layout.addWidget(request_group)
layout.addWidget(event_group)
def _create_main_status_field(self):
field = QLineEdit()
field.setReadOnly(True)
return field
def log_request(self, topic, payload_str, log_type="INFO"):
"""Logs an event to the UI as a structured JSON object."""
timestamp = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")[:-3]
# Add the new "log_type" to the entry for clarity
log_entry = {
"timestamp": timestamp,
"log_type": log_type, # <-- NEW
"topic": topic
}
try:
log_entry["payload"] = json.loads(payload_str)
except json.JSONDecodeError:
log_entry["message"] = payload_str
final_log_string = json.dumps(log_entry, indent=2)
self.request_log_area.appendPlainText(final_log_string)
self.request_log_area.appendPlainText("-" * 50 + "\n")
def log_event(self, topic, json_payload):
"""Logs a received event as a clean JSON object to the UI ONLY."""
timestamp = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")[:-3]
log_entry = {
"timestamp": timestamp,
"topic": topic,
"payload": json.loads(json_payload)
}
final_log_string = json.dumps(log_entry, indent=2)
self.event_log_area.appendPlainText(final_log_string)
self.event_log_area.appendPlainText("-" * 50 + "\n")
# if self.save_logs_checkbox.isChecked():
# self.log_data_signal.emit([timestamp, topic, json_payload.replace('\n', ' ')])
def confirm_station_reset(self):
reply = QMessageBox.question(self, 'Confirm Reset',
"Are you sure you want to reset the station?",
QMessageBox.StandardButton.Yes | QMessageBox.StandardButton.No,
QMessageBox.StandardButton.No)
if reply == QMessageBox.StandardButton.Yes:
print("Requesting Stationm Reset...")
request = rpcRequest(
ts=int(time.time()),
jobId=f"job_{int(time.time())}",
jobType=jobType_e.JOBTYPE_REBOOT
)
self._send_rpc_request(request)
else:
self.log_request("INFO", "Station Reset command cancelled by user.")
# --- THIS IS THE CRITICAL METHOD TO UPDATE ---
def on_message_received(self, topic, payload):
now = datetime.datetime.now()
self.timestamp_label.setText(f"Last Update: {now.strftime('%Y-%m-%d %H:%M:%S')}")
try:
msg_type = topic.split('/')[-1]
if msg_type == 'PERIODIC':
decoded_payload = periodicData()
decoded_payload.ParseFromString(payload)
data_dict = MessageToDict(decoded_payload, preserving_proto_field_name=True)
# print(data_dict)
self._log_periodic_to_terminal(decoded_payload, data_dict)
self.update_main_dashboard(data_dict)
if self.save_logs_checkbox.isChecked():
log_payload_str = json.dumps(data_dict)
self.log_data_signal.emit([now, topic, data_dict, payload])
elif msg_type == 'EVENTS':
# This part handles the UI log, as before
json_payload = self._decode_event_payload(payload)
self.log_event(topic, json_payload)
# This new block handles the CSV logging correctly
if self.save_logs_checkbox.isChecked():
event_data_dict = json.loads(json_payload)
# Emit the signal with all 4 required items
self.log_data_signal.emit([now, topic, event_data_dict, payload])
# This part updates the instance log, as before
try:
data = json.loads(json_payload)
event_type = data.get("eventType", "Unknown Event")
self.log_to_instance_view(f"Event Received: {event_type}")
except json.JSONDecodeError:
self.log_to_instance_view("Received unparseable event data")
elif msg_type == 'REQUEST':
try:
# 1. Use the correct rpcRequest protobuf object for parsing
decoded_payload = rpcRequest()
decoded_payload.ParseFromString(payload)
# 2. Convert the parsed message to a dictionary and then to a JSON string
data_dict = MessageToDict(decoded_payload, preserving_proto_field_name=True)
json_payload = json.dumps(data_dict, indent=2)
# This new block handles the CSV logging correctly
if self.save_logs_checkbox.isChecked():
request_data_dict = json.loads(json_payload)
# Emit the signal with all 4 required items
self.log_data_signal.emit([now, topic, request_data_dict, payload])
# 3. Log the INCOMING request to the correct panel
self.log_request(topic, json_payload, log_type="INCOMING_RPC")
# 4. (Optional) Log to the instance view on the main tab
job_type = data_dict.get("jobType", "Unknown Job")
self.log_to_instance_view(f"RPC Request Received: {job_type}")
except Exception as e:
# Handle potential decoding errors for this specific topic
print(f"Error decoding RPC Request from topic '{topic}': {e}")
error_msg = f'{{"error": "RPC DECODING FAILED: {e}", "raw_hex": "{payload.hex()}"}}'
self.log_request(topic, error_msg, log_type="DECODE_ERROR")
else:
print(f"Received message on unhandled topic: {topic}")
except Exception as e:
print(f"Error processing message from topic '{topic}': {e}")
# Log the failure to the UI for better debugging
self.log_event(topic, f'{{"error": "DECODING FAILED: {e}", "raw_hex": "{payload.hex()}"}}')
# --- (The rest of your methods like `on_swap_button_clicked`, etc., remain here) ---
def on_swap_button_clicked(self):
sender = self.sender()
chamber_num = int(sender.text().split('\n')[0])
self.swap_button_clicks[chamber_num] += 1
click_count = self.swap_button_clicks[chamber_num]
if click_count == 1:
self.swap_sequence.append(chamber_num)
sender.setStyleSheet("background-color: #05abd0;")
elif click_count == 2:
self.swap_sequence.append(chamber_num)
sender.setText(f"{chamber_num}")
sender.setStyleSheet("background-color: qlineargradient(x1:0, y1:0, x2:1, y2:0, stop:0 #1f7a9a, stop:0.5 #1f7a9a, stop:0.51 #1f7a9a, stop:1 #1f7a9a);")
elif click_count >= 3:
self.swap_sequence = [num for num in self.swap_sequence if num != chamber_num]
self.swap_button_clicks[chamber_num] = 0
sender.setText(str(chamber_num))
sender.setStyleSheet("")
self.update_swap_display()
self.update_swap_buttons_state()
def update_swap_display(self):
self.swap_display.setText(str(self.swap_sequence) if self.swap_sequence else "")
def update_swap_buttons_state(self):
is_sequence_present = bool(self.swap_sequence)
self.start_swap_btn.setEnabled(is_sequence_present)
def start_swap(self):
if not self.swap_sequence:
QMessageBox.warning(self, "Empty Sequence", "Cannot start an empty swap sequence.")
return
print(f"Starting swap with sequence: {self.swap_sequence}")
request = rpcRequest(
ts=int(time.time()),
jobId=f"job_{int(time.time())}",
jobType=jobType_e.JOBTYPE_SWAP_START
)
request.rpcData.slotsData.extend(self.swap_sequence)
self._send_rpc_request(request)
self.swap_sequence.clear()
self.update_swap_display()
def abort_swap(self):
print("Requesting to abort swap...")
request = rpcRequest(
ts=int(time.time()),
jobId=f"job_{int(time.time())}",
jobType=jobType_e.JOBTYPE_TRANSACTION_ABORT
)
self._send_rpc_request(request)
self.swap_sequence.clear()
self.update_swap_display()
self.swap_button_clicks = {i: 0 for i in range(1, 10)}
for i, btn in self.swap_buttons.items():
btn.setText(str(i))
btn.setStyleSheet("")
self.update_swap_buttons_state()
def send_audio_command(self):
"""
Constructs and sends an RPC request to change the station's language.
"""
language_name = self.audio_combo.currentText()
language_code = self.AUDIO_LANGUAGES.get(language_name)
if not language_code:
self.log_to_instance_view(f"Error: Could not find code for language '{language_name}'.")
return
# Dynamically find the correct languageType enum value from the .proto file
# (e.g., "English" becomes "LANGUAGE_TYPE_ENGLISH")
enum_name = f"LANGUAGE_TYPE_{language_name.upper()}"
try:
language_enum = languageType_e.Value(enum_name)
except ValueError:
self.log_to_instance_view(f"Error: Invalid language enum '{enum_name}' not found in .proto file.")
return
# Create the rpcRequest object, similar to your abort_swap function
request = rpcRequest(
ts=int(time.time()),
jobId=f"lang_{int(time.time())}",
jobType=jobType_e.JOBTYPE_LANGUAGE_UPDATE,
languageType=language_enum # Add the specific language payload
)
# Use your existing helper method to send the request
self._send_rpc_request(request)
# Log to the UI and show a confirmation pop-up
self.log_to_instance_view(f"Sent RPC to set language to {language_name} (Job ID: {request.jobId})")
QMessageBox.information(self, "RPC Sent", f"Request to change language to {language_name} has been sent.")
def _send_rpc_request(self, request_payload):
if not self.mqtt_client or not self.mqtt_client.client.is_connected():
QMessageBox.warning(self, "Not Connected", "Cannot send command. MQTT client is not connected.")
return
device_id = self.device_id_input.text()
version = self.version_input.text()
topic = f"VEC/{self.client_id_input.text()}/{version}/{device_id}/RPC/REQUEST"
serialized_payload = request_payload.SerializeToString()
data_dict = MessageToDict(request_payload, preserving_proto_field_name=True)
json_payload = json.dumps(data_dict)
self.log_request(topic, json_payload, log_type="OUTGOING_RPC")
job_type = data_dict.get("jobType", "Unknown Job")
self.log_to_instance_view(f"Command Sent: {job_type}")
self.mqtt_client.publish_message(topic, serialized_payload)
def handle_open_door(self, chamber_num):
print(f"Requesting to open door for chamber {chamber_num}...")
request = rpcRequest(
ts=int(time.time()),
jobId=f"job_{int(time.time())}",
jobType=jobType_e.JOBTYPE_GATE_OPEN_CLOSE
)
request.slotInfo.slotId = chamber_num
request.slotInfo.state = 1
self._send_rpc_request(request)
def handle_charger_control(self, chamber_num, state):
action = "ON" if state else "OFF"
print(f"Requesting to turn charger {action} for chamber {chamber_num}...")
request = rpcRequest(
ts=int(time.time()),
jobId=f"job_{int(time.time())}",
jobType=jobType_e.JOBTYPE_CHARGER_ENABLE_DISABLE
)
request.slotInfo.slotId = chamber_num
request.slotInfo.state = 1 if state else 0
self._send_rpc_request(request)
def reset_dashboard_ui(self):
self.log_request("INFO", "Dashboard UI cleared by user.")
self.last_recv_ts_field.setText("No Data")
self.sdc_field.setText("")
for chamber in self.chamber_widgets:
chamber.reset_to_default()
self.update_diagnostic_alarms(0)
self.swap_sequence.clear()
self.update_swap_display()
self.swap_button_clicks = {i: 0 for i in range(1, 10)}
for i, btn in self.swap_buttons.items():
btn.setText(str(i))
btn.setStyleSheet("")
self.update_swap_buttons_state()
def connect_to_mqtt(self):
if self.mqtt_thread and self.mqtt_thread.isRunning():
print("Cleaning up previous MQTT thread...")
if self.mqtt_client:
self.mqtt_client.disconnect_from_broker()
self.mqtt_client.cleanup()
self.mqtt_thread.quit()
self.mqtt_thread.wait()
if self.save_logs_checkbox.isChecked():
self.start_csv_logger()
self.reset_dashboard_ui()
broker = self.broker_input.text()
user = self.username_input.text()
password = self.password_input.text()
client_id = self.client_id_input.text()
version = self.version_input.text()
device_id = self.device_id_input.text()
try:
port = int(self.port_input.text())
except ValueError:
self.timestamp_label.setText("Error: Port must be a number.")
return
# ==========================================================
# ===== ADD THIS BLOCK TO SAVE YOUR SETTINGS ===============
# ==========================================================
self.settings.setValue("broker_address", broker)
self.settings.setValue("port", str(port)) # Save port as a string
self.settings.setValue("username", user)
self.settings.setValue("password", password)
self.settings.setValue("client_id", client_id)
self.settings.setValue("version",version)
self.settings.setValue("device_id", device_id)
# Add any other settings you want to save
self.settings.sync() # Force a write to disk immediately
print("β
Configuration saved.")
# ==========================================================
client_id = f"SwapStationDashboard-{str(uuid.uuid4())}"
self.mqtt_thread = QThread()
self.mqtt_client = MqttClient(broker, port, user, password, client_id)
self.mqtt_client.moveToThread(self.mqtt_thread)
self.mqtt_client.stop_logging_signal.connect(self.csv_logger.stop_logging)
self.mqtt_client.connection_status_changed.connect(self.on_connection_status_changed)
self.mqtt_client.message_received.connect(self.on_message_received)
self.mqtt_thread.started.connect(self.mqtt_client.connect_to_broker)
self.mqtt_thread.start()
def disconnect_from_mqtt(self):
if self.csv_logger: self.stop_csv_logger()
if self.mqtt_client: self.mqtt_client.disconnect_from_broker()
def start_csv_logger(self):
# REMOVED: Your old path finding method is not reliable for packaged apps.
# script_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
# default_log_dir = os.path.join(script_dir, "logs")
# <-- MODIFIED: Use the correct, reliable method to get the app's base directory
app_dir = self.get_base_path()
# print(f"Application base directory: {app_dir}")
# Create a 'logs' sub-directory for better organization
base_log_dir = os.path.join(app_dir, 'logs')
# Ensure the log directory exists
os.makedirs(base_log_dir, exist_ok=True)
session_folder_name = f"session_{datetime.datetime.now().strftime('%Y-%m-%d_%H-%M-%S')}"
self.logger_thread = QThread()
self.csv_logger = CsvLogger(base_log_dir, session_folder_name)
self.csv_logger.moveToThread(self.logger_thread)
self.log_data_signal.connect(self.csv_logger.log_data)
self.logger_thread.started.connect(self.csv_logger.start_logging)
self.logger_thread.start()
def get_base_path(self):
"""Gets the base path for the application, whether it's a script or a frozen exe."""
if getattr(sys, 'frozen', False):
# If packaged, the base path is the directory of the executable
base_path = os.path.dirname(sys.executable)
else:
# If run as a normal script, the base path is the script's directory
base_path = os.path.dirname(os.path.abspath(__file__))
return base_path
def stop_csv_logger(self):
if self.logger_thread and self.logger_thread.isRunning():
self.csv_logger.stop_logging()
self.logger_thread.quit()
self.logger_thread.wait()
self.csv_logger = None
self.logger_thread = None
def on_connection_status_changed(self, is_connected, message):
"""Handles connection status updates from the MQTT client."""
self.connect_button.setEnabled(not is_connected)
self.disconnect_button.setEnabled(is_connected)
self.set_config_inputs_enabled(not is_connected)
# Set the text of the label to the message from the client
self.timestamp_label.setText(message)
self.log_to_instance_view(message.strip("β
βππ΄ "))
if is_connected:
client_id = self.client_id_input.text()
version = self.version_input.text()
device_id = self.device_id_input.text()
periodic_topic = f"VEC/{client_id}/{version}/{device_id}/PERIODIC"
events_topic = f"VEC/{client_id}/{version}/{device_id}/EVENTS"
rpc_request_topic = f"VEC/{client_id}/{version}/{device_id}/RPC/REQUEST"
# self.log_request(periodic_topic, "Subscribing to topic")
self.mqtt_client.subscribe_to_topic(periodic_topic)
# self.log_request(events_topic, "Subscribing to topic")
self.mqtt_client.subscribe_to_topic(events_topic)
# self.log_request(rpc_request_topic, "Subscribing to topic")
self.mqtt_client.subscribe_to_topic(rpc_request_topic)
self.tabs.setCurrentWidget(self.main_tab)
def update_main_dashboard(self, data):
try:
ts = datetime.datetime.fromtimestamp(data.get('ts', datetime.datetime.now().timestamp())).strftime('%Y-%m-%d %H:%M:%S')
self.last_recv_ts_field.setText(ts)
slot_payloads = data.get("slotLevelPayload", [])
for i, slot_data in enumerate(slot_payloads):
if i < len(self.chamber_widgets):
self.chamber_widgets[i].update_data(slot_data)
# print("Updating chamber", i+1, slot_data)
if (i+1) in self.swap_buttons:
is_present = slot_data.get("batteryPresent") == 1
self.swap_buttons[i+1].setStyleSheet("background-color: #2ecc71;" if is_present else "")
sdc_value = data.get("stationDiagnosticCode", 0)
self.sdc_field.setText(str(sdc_value))
self.update_diagnostic_alarms(sdc_value)
backup_status = data.get("backupSupplyStatus", 0) # Default to 0 if not present
if backup_status == 1:
self.backup_supply_indicator.setText("Backup ON")
self.backup_supply_indicator.setStyleSheet(
"""
QLabel {
background-color: transparent;
border: 2px solid #28a745; /* Green */
color: #28a745;
border-radius: 5px;
font-weight: bold;
}
"""
)
# self.top_bar_frame.setStyleSheet("#topBarFrame { border: 1px solid #28a745; }")
else:
self.backup_supply_indicator.setText("Backup OFF")
self.backup_supply_indicator.setStyleSheet(
"""
QLabel {
background-color: transparent;
border: 2px solid #dc3545; /* Red */
color: #dc3545;
border-radius: 5px;
font-weight: bold;
}
"""
)
# self.top_bar_frame.setStyleSheet("#topBarFrame { border: 1px solid #dc3545; }")
except Exception as e:
print(f"Error updating dashboard: {e}")
def _log_periodic_to_terminal(self, decoded_payload, data_dict):
"""Formats and prints the periodic data to the terminal as a clean table."""
try:
current_time = datetime.datetime.fromtimestamp(decoded_payload.ts).strftime('%Y-%m-%d %H:%M:%S')
device_id = data_dict.get("deviceId", "N/A")
backup_supply = "ON" if decoded_payload.backupSupplyStatus == 1 else "OFF"
station_sdc = decoded_payload.stationDiagnosticCode
# --- Main Information ---
print("\n\033[1m" + "="*50 + " PERIODIC DATA " + "="*50 + "\033[0m")
print(f"\033[1m Timestamp:\033[0m {current_time} | \033[1mDevice ID:\033[0m {device_id}")
print(f"\033[1m Backup Supply:\033[0m {backup_supply} | \033[1mStation SDC:\033[0m {station_sdc}")
print("-" * 120)
# --- Table Header ---
header = "| {:^7} | {:^18} | {:^8} | {:^8} | {:^7} | {:^10} | {:^10} | {:^10} | {:^10} | {:^10} |"
print(header.format("Chamber", "Battery ID", "Present", "Charging", "SOC", "Voltage", "Current", "Slot Temp", "Bat Temp", "Door"))
print("-" * 120)
# --- Table Rows ---
row_format = "| {:^7} | {:<18} | {:^8} | {:^8} | {:>5}% | {:>8}V | {:>8}A | {:>8}Β°C | {:>8}Β°C | {:^10} |"
for i, chamber_data in enumerate(data_dict.get("slotLevelPayload", []), start=1):
is_present = chamber_data.get("batteryPresent") == 1
is_charging = chamber_data.get("chargingStatus") == 1
is_door_open = chamber_data.get("doorStatus") == 1
# Get and format the Slot Temperature
slot_temp_raw = chamber_data.get('slotTemperature', 'N/A')
slot_temp_celsius = f"{slot_temp_raw / 10:.1f}" if isinstance(slot_temp_raw, int) else "N/A"
# Get and format the Battery Temperature
battery_temp_raw = chamber_data.get('batteryMaxTemp', 'N/A')
battery_temp_celsius = f"{battery_temp_raw / 10:.1f}" if isinstance(battery_temp_raw, int) else "N/A"
print(row_format.format(
i,
chamber_data.get('batteryIdentification', 'N/A'),
"β
" if is_present else "β",
"β
" if is_charging else "β",
chamber_data.get('soc', 'N/A'),
chamber_data.get('batVoltage', 'N/A'),
chamber_data.get('current', 'N/A'),
slot_temp_celsius,
battery_temp_celsius,
"Open" if is_door_open else "Closed"
))
print("=" * 120 + "\n")
except Exception as e:
print(f"Error printing periodic log to terminal: {e}")
def update_diagnostic_alarms(self, sdc_value):
for i, error_text in enumerate(self.DIAGNOSTIC_ERRORS):
is_alarm_active = (sdc_value >> i) & 1
label = self.diag_labels[error_text]
if is_alarm_active: label.setProperty("alarm", "active")
else: label.setProperty("alarm", "inactive")
label.style().unpolish(label)
label.style().polish(label)
def set_config_inputs_enabled(self, enabled):
for w in self.config_tab.findChildren(QWidget):
if isinstance(w, (QLineEdit, QPushButton, QCheckBox, QComboBox)):
w.setEnabled(enabled)
@pyqtSlot() # This new slot handles the disconnected signal
def handle_disconnection(self):
print("Main window sees disconnection, stopping logger if active.")
if self.csv_logger and self.csv_logger.timer.isActive():
self.csv_logger.stop_logging()
# You might also want to update UI elements here
self.connect_button.setText("Connect")
self.connection_status_label.setText("Disconnected")
def closeEvent(self, event: QCloseEvent):
"""
Handles the window's close event to ensure a clean shutdown.
"""
print("--- Close event triggered. Shutting down gracefully... ---")
if self.mqtt_thread and self.mqtt_thread.isRunning():
print(" > Stopping MQTT client...")
# Tell the client to disconnect (which will stop its loop)
if self.mqtt_client:
self.mqtt_client.disconnect_from_broker()
print(" > Quitting and waiting for MQTT thread...")
self.mqtt_thread.quit()
if not self.mqtt_thread.wait(5000):
print(" > Warning: Thread did not terminate in time.")
self.mqtt_thread.terminate()
# The handle_disconnection slot will have already stopped the logger
# if it was running. No need to call it again here.
print("--- Shutdown complete. ---")
event.accept()