#include "mqtt_manager.h" #include #include #include "esp_log.h" #include "mqtt_client.h" #include "esp_system.h" #include "esp_mac.h" #include "rpc_job_parser.h" #include "ftp_manager.h" #include "fota_manager.h" #include "device_config.h" #include "freertos/FreeRTOS.h" #include "freertos/task.h" static const char *TAG = "MQTT_MANAGER"; static esp_mqtt_client_handle_t s_client = NULL; /* Topics: VEC/Vectr12X/v362/{MAC}/RPC/REQUEST/RESPONSE/PERIODIC */ static char s_rpc_req_topic[128]; static char s_rpc_resp_topic[128]; static char s_periodic_topic[128]; // <-- NEW static bool s_periodic_task_started = false; // <-- NEW /* Job enums from your definition */ typedef enum { JOB_RESULT_UNKNOWN = 0, JOB_RESULT_SUCCESS = 1, JOB_RESULT_REJECTED = 2, JOB_RESULT_TIMEOUT = 3, } jobResult_e; typedef enum { JOB_STATUS_IDLE = 0, JOB_STATUS_PENDING = 1, JOB_STATUS_EXECUTING = 2, JOB_STATUS_EXECUTED = 3, } jobStatus_e; static void reboot_task(void *arg) { ESP_LOGW(TAG, "Rebooting system for FOTA..."); vTaskDelay(pdMS_TO_TICKS(1500)); esp_restart(); } static void periodic_publish_task(void *arg) { const char *msg = "hello_again"; while (1) { if (s_client) { int msg_id = esp_mqtt_client_publish( s_client, s_periodic_topic, msg, 0, // length 0 -> strlen(msg) inside ESP-IDF 0, // QoS 0 0 // retain = 0 ); ESP_LOGI(TAG, "FOTA PASS_2! Periodic publish (msg_id=%d): %s", msg_id, msg); } vTaskDelay(pdMS_TO_TICKS(5000)); // 5 seconds } } /* Helper: publish response payload */ void mqtt_send_response(const char *payload) { if (!s_client) { ESP_LOGW(TAG, "MQTT client not ready, cannot send response"); return; } int msg_id = esp_mqtt_client_publish( s_client, s_rpc_resp_topic, payload, 0, /* len = 0 => strlen() inside esp-idf */ 1, /* QoS 1 */ 0 /* retain */ ); ESP_LOGI(TAG, "Published RPC response (msg_id=%d): %s", msg_id, payload); } /* Helper: build and send job status JSON */ static void send_job_status(const ota_job_t *job, jobStatus_e status, jobResult_e result) { char json[256]; /* Current epoch timestamp; you can replace with your own RTC if needed */ time_t now = time(NULL); /* NOTE: DEVICE_ID comes from device_config.h */ int len = snprintf(json, sizeof(json), "{" "\"ts\":%ld," "\"deviceId\":\"%s\"," "\"jobId\":\"%s\"," "\"jobStatus\":%d," "\"jobResult\":%d" "}", (long)now, DEVICE_ID, job->jobId, (int)status, (int)result ); if (len <= 0 || len >= (int)sizeof(json)) { ESP_LOGW(TAG, "Job status JSON truncated or error"); } mqtt_send_response(json); } /* MQTT event handler */ static void mqtt_event_handler(void *handler_args, esp_event_base_t base, int32_t event_id, void *event_data) { esp_mqtt_event_handle_t event = (esp_mqtt_event_handle_t)event_data; switch (event->event_id) { case MQTT_EVENT_CONNECTED: ESP_LOGI(TAG, "MQTT_EVENT_CONNECTED"); /* Subscribe to RPC request topic */ esp_mqtt_client_subscribe(s_client, s_rpc_req_topic, 1); ESP_LOGI(TAG, "Subscribed to: %s", s_rpc_req_topic); /* NEW: start periodic hello publisher once */ if (!s_periodic_task_started) { BaseType_t res = xTaskCreate( periodic_publish_task, "periodic_pub", 4096, NULL, 5, NULL ); if (res == pdPASS) { s_periodic_task_started = true; ESP_LOGI(TAG, "Started periodic publish task"); } else { ESP_LOGE(TAG, "Failed to start periodic publish task"); } } break; case MQTT_EVENT_DISCONNECTED: ESP_LOGW(TAG, "MQTT_EVENT_DISCONNECTED"); break; case MQTT_EVENT_DATA: { ESP_LOGI(TAG, "MQTT_EVENT_DATA: %.*s", event->data_len, event->data); /* Ensure payload is null-terminated for JSON parsing */ char *payload = calloc(event->data_len + 1, 1); if (!payload) { ESP_LOGE(TAG, "OOM in MQTT_EVENT_DATA"); return; } memcpy(payload, event->data, event->data_len); payload[event->data_len] = '\0'; ota_job_t job = {0}; if (!rpc_job_parse(payload, &job)) { ESP_LOGW(TAG, "Failed to parse RPC job JSON"); free(payload); return; } ESP_LOGI(TAG, "Parsed job: status=%d, deviceType=%d, otaType=%d, jobId=%s", job.l_ota_status, job.deviceType, job.otaType, job.jobId); /* l_ota_status == 3 => Download from FTP */ if (job.l_ota_status == 3) { ESP_LOGI(TAG, "OTA status 3: Download request from FTP"); /* Notify PENDING (or EXECUTING) + SUCCESS (ack receipt) */ send_job_status(&job, JOB_STATUS_PENDING, JOB_RESULT_SUCCESS); ftp_config_t cfg = { .server = FTP_SERVER_IP, .port = FTP_SERVER_PORT, .username = FTP_USERNAME, .password = FTP_PASSWORD, .remote_path = FTP_REMOTE_PATH, .local_path = FTP_LOCAL_PATH, }; ftp_err_t ret = ftp_manager_download(&cfg, NULL); if (ret == FTP_OK) { ESP_LOGI(TAG, "FTP download success"); send_job_status(&job, JOB_STATUS_EXECUTED, JOB_RESULT_SUCCESS); } else { ESP_LOGE(TAG, "FTP download failed, err=%d", ret); send_job_status(&job, JOB_STATUS_EXECUTED, JOB_RESULT_REJECTED); } } /* l_ota_status == 6 => Apply FOTA & reboot */ else if (job.l_ota_status == 6) { ESP_LOGI(TAG, "OTA status 6: Apply FOTA & reboot"); send_job_status(&job, JOB_STATUS_EXECUTING, JOB_RESULT_SUCCESS); fota_err_t res = fota_manager_apply(FTP_LOCAL_PATH); if (res == FOTA_OK) { ESP_LOGI(TAG, "FOTA apply success, scheduling reboot"); send_job_status(&job, JOB_STATUS_EXECUTED, JOB_RESULT_SUCCESS); xTaskCreate( reboot_task, "reboot_task", 4096, NULL, 5, NULL ); } else { ESP_LOGE(TAG, "FOTA apply failed, err=%d", res); send_job_status(&job, JOB_STATUS_EXECUTED, JOB_RESULT_REJECTED); } } else { ESP_LOGW(TAG, "Unhandled l_ota_status=%d", job.l_ota_status); } free(payload); break; } default: break; } } /* Build MAC string and topics: VEC/Vectr12X/v362/{MAC}/RPC/... */ static void build_rpc_topics(void) { uint8_t mac[6]; esp_read_mac(mac, ESP_MAC_WIFI_STA); char mac_str[13] = {0}; /* 12 hex chars + null */ snprintf(mac_str, sizeof(mac_str), "%02X%02X%02X%02X%02X%02X", mac[0], mac[1], mac[2], mac[3], mac[4], mac[5]); snprintf(s_rpc_req_topic, sizeof(s_rpc_req_topic), "VEC/Vectr12X/v362/%s/RPC/REQUEST", mac_str); snprintf(s_rpc_resp_topic, sizeof(s_rpc_resp_topic), "VEC/Vectr12X/v362/%s/RPC/RESPONSE", mac_str); /* NEW: periodic topic */ snprintf(s_periodic_topic, sizeof(s_periodic_topic), "VEC/Vectr12X/v362/%s/PERIODIC", mac_str); ESP_LOGI(TAG, "RPC REQUEST topic: %s", s_rpc_req_topic); ESP_LOGI(TAG, "RPC RESPONSE topic: %s", s_rpc_resp_topic); ESP_LOGI(TAG, "PERIODIC topic: %s", s_periodic_topic); } /* Public init called from app_main() */ void mqtt_manager_init(void) { build_rpc_topics(); esp_mqtt_client_config_t mqtt_cfg = { .broker = { .address = { .uri = MQTT_BROKER_URL, // e.g. "mqtt://192.168.0.10" // .port = MQTT_BROKER_PORT, // e.g. 1883 }, }, .credentials = { .username = MQTT_USERNAME, .authentication = { .password = MQTT_PASSWORD, }, }, .session.keepalive = 60, .network.reconnect_timeout_ms = 10000, .network.timeout_ms = 10000, }; s_client = esp_mqtt_client_init(&mqtt_cfg); if (!s_client) { ESP_LOGE(TAG, "Failed to create MQTT client"); return; } esp_mqtt_client_register_event( s_client, ESP_EVENT_ANY_ID, mqtt_event_handler, NULL ); esp_mqtt_client_start(s_client); ESP_LOGI(TAG, "MQTT client started"); }