159 lines
7.1 KiB
Python
159 lines
7.1 KiB
Python
# In core/csv_logger.py
|
|
|
|
import csv
|
|
import json
|
|
import os
|
|
import queue
|
|
from datetime import datetime
|
|
from PyQt6.QtCore import QObject, pyqtSlot, QTimer
|
|
from google.protobuf.json_format import MessageToDict
|
|
from proto.vec_payload_chgSt_pb2 import eventPayload, rpcRequest
|
|
|
|
class CsvLogger(QObject):
|
|
def __init__(self, base_log_directory, session_name):
|
|
super().__init__()
|
|
self.base_log_directory = base_log_directory
|
|
self.session_name = session_name
|
|
os.makedirs(self.base_log_directory, exist_ok=True)
|
|
|
|
self.files = {}
|
|
self.writers = {}
|
|
self.queue = queue.Queue()
|
|
self.timer = QTimer(self)
|
|
self.timer.timeout.connect(self._process_queue)
|
|
self.num_slots = 9
|
|
|
|
def start_logging(self):
|
|
self.timer.start(100)
|
|
print(f"✅ CSV logging service started for session: {self.session_name}")
|
|
|
|
def _get_writer(self, device_id, file_group):
|
|
writer_key = (device_id, file_group)
|
|
if writer_key in self.writers:
|
|
return self.writers[writer_key]
|
|
|
|
try:
|
|
session_dir = os.path.join(self.base_log_directory, device_id, self.session_name)
|
|
os.makedirs(session_dir, exist_ok=True)
|
|
filepath = os.path.join(session_dir, f"{file_group}.csv")
|
|
|
|
self.files[writer_key] = open(filepath, 'w', newline='', encoding='utf-8')
|
|
writer = csv.writer(self.files[writer_key])
|
|
self.writers[writer_key] = writer
|
|
|
|
if file_group == 'PERIODIC':
|
|
# --- Programmatically build the WIDE header ---
|
|
base_header = ["Timestamp", "DeviceID", "StationDiagnosticCode"]
|
|
slot_fields = [
|
|
"BatteryID", "BatteryPresent", "ChargerPresent", "DoorStatus", "DoorLockStatus",
|
|
"Voltage_V", "Current_A", "SOC_Percent", "BatteryTemp_C", "SlotTemp_C",
|
|
"BatteryFaultCode", "ChargerFaultCode", "BatteryMode", "ChargerMode"
|
|
]
|
|
slot_header = [f"Slot{i}_{field}" for i in range(1, self.num_slots + 1) for field in slot_fields]
|
|
header = base_header + slot_header + ["RawHexPayload"]
|
|
else: # Header for EVENTS_RPC
|
|
header = ["Timestamp", "Topic", "Payload_JSON", "RawHexPayload"]
|
|
|
|
writer.writerow(header)
|
|
print(f"---> New log file created: {filepath}")
|
|
return writer
|
|
except Exception as e:
|
|
print(f"❌ Failed to create CSV writer for {writer_key}: {e}")
|
|
return None
|
|
|
|
@pyqtSlot(list)
|
|
def log_data(self, data_list):
|
|
self.queue.put(data_list)
|
|
|
|
def _process_queue(self):
|
|
while not self.queue.empty():
|
|
# ==========================================================
|
|
# ===== EDITED SECTION STARTS HERE =========================
|
|
# ==========================================================
|
|
item = None # Define item outside the try block for better error reporting
|
|
try:
|
|
# First, get the whole item from the queue.
|
|
item = self.queue.get()
|
|
|
|
# Now, try to unpack it. This is where the ValueError can happen.
|
|
timestamp_obj, topic, data, raw_payload = item
|
|
|
|
# The rest of your logic remains the same
|
|
parts = topic.split('/')
|
|
if len(parts) < 5: continue
|
|
device_id = parts[3]
|
|
file_group = 'PERIODIC' if topic.endswith('/PERIODIC') else 'EVENTS_RPC'
|
|
|
|
writer = self._get_writer(device_id, file_group)
|
|
if not writer: continue
|
|
|
|
if file_group == 'PERIODIC':
|
|
# --- Build one single WIDE row ---
|
|
row_data = [
|
|
datetime.fromtimestamp(data.get("ts")).strftime("%Y-%m-%d %H:%M:%S"),
|
|
device_id,
|
|
data.get("stationDiagnosticCode", "N/A")
|
|
]
|
|
|
|
all_slots_data = []
|
|
slots = data.get("slotLevelPayload", [])
|
|
num_slot_fields = 14
|
|
|
|
for i in range(self.num_slots):
|
|
if i < len(slots):
|
|
slot = slots[i]
|
|
all_slots_data.extend([
|
|
slot.get('batteryIdentification', ''),
|
|
"TRUE" if slot.get("batteryPresent") == 1 else "FALSE",
|
|
"TRUE" if slot.get("chargerPresent") == 1 else "FALSE",
|
|
"OPEN" if slot.get("doorStatus") == 1 else "CLOSED",
|
|
"LOCKED" if slot.get("doorLockStatus") == 1 else "UNLOCKED",
|
|
slot.get('voltage', 0) / 1000.0,
|
|
slot.get('current', 0) / 1000.0,
|
|
slot.get('soc', 0),
|
|
slot.get('batteryMaxTemp', 0) / 10.0,
|
|
slot.get('slotTemperature', 0) / 10.0,
|
|
slot.get('batteryFaultCode', 0),
|
|
slot.get('chargerFaultCode', 0),
|
|
slot.get('batteryMode', 0),
|
|
slot.get('chargerMode', 0)
|
|
])
|
|
else:
|
|
all_slots_data.extend([''] * num_slot_fields)
|
|
|
|
final_row = row_data + all_slots_data + [raw_payload.hex()]
|
|
writer.writerow(final_row)
|
|
else:
|
|
# Logic for EVENTS and RPC remains the same
|
|
payload_json_string = json.dumps(data)
|
|
row = [
|
|
timestamp_obj.strftime("%Y-%m-%d %H:%M:%S.%f")[:-3],
|
|
topic,
|
|
payload_json_string,
|
|
raw_payload.hex()
|
|
]
|
|
writer.writerow(row)
|
|
|
|
writer_key = (device_id, file_group)
|
|
if file_handle := self.files.get(writer_key):
|
|
file_handle.flush()
|
|
|
|
except ValueError:
|
|
# This specifically catches the unpacking error and prints a helpful message.
|
|
print(f"❌ Error: Malformed item in log queue. Expected 4 values, but got {len(item)}. Item: {item}")
|
|
continue # Continue to the next item in the queue
|
|
|
|
except Exception as e:
|
|
# A general catch-all for any other unexpected errors.
|
|
# This message is safe because it doesn't use variables from the try block.
|
|
print(f"❌ An unexpected error occurred in the logger thread: {e}")
|
|
continue
|
|
|
|
def stop_logging(self):
|
|
self.timer.stop()
|
|
self._process_queue()
|
|
for file in self.files.values():
|
|
file.close()
|
|
self.files.clear()
|
|
self.writers.clear()
|
|
print(f"🛑 CSV logging stopped for session: {self.session_name}") |