esp32-s3_fota_test_wifi/components/mqtt_manager/mqtt_manager.c

321 lines
8.9 KiB
C

#include "mqtt_manager.h"
#include <string.h>
#include <time.h>
#include "esp_log.h"
#include "mqtt_client.h"
#include "esp_system.h"
#include "esp_mac.h"
#include "rpc_job_parser.h"
#include "ftp_manager.h"
#include "fota_manager.h"
#include "device_config.h"
#include "freertos/FreeRTOS.h"
#include "freertos/task.h"
static const char *TAG = "MQTT_MANAGER";
static esp_mqtt_client_handle_t s_client = NULL;
/* Topics: VEC/Vectr12X/v362/{MAC}/RPC/REQUEST/RESPONSE/PERIODIC */
static char s_rpc_req_topic[128];
static char s_rpc_resp_topic[128];
static char s_periodic_topic[128]; // <-- NEW
static bool s_periodic_task_started = false; // <-- NEW
/* Job enums from your definition */
typedef enum {
JOB_RESULT_UNKNOWN = 0,
JOB_RESULT_SUCCESS = 1,
JOB_RESULT_REJECTED = 2,
JOB_RESULT_TIMEOUT = 3,
} jobResult_e;
typedef enum {
JOB_STATUS_IDLE = 0,
JOB_STATUS_PENDING = 1,
JOB_STATUS_EXECUTING = 2,
JOB_STATUS_EXECUTED = 3,
} jobStatus_e;
static void reboot_task(void *arg)
{
ESP_LOGW(TAG, "Rebooting system for FOTA...");
vTaskDelay(pdMS_TO_TICKS(1500));
esp_restart();
}
static void periodic_publish_task(void *arg)
{
const char *msg = "hello_again";
while (1) {
if (s_client) {
int msg_id = esp_mqtt_client_publish(
s_client,
s_periodic_topic,
msg,
0, // length 0 -> strlen(msg) inside ESP-IDF
0, // QoS 0
0 // retain = 0
);
ESP_LOGI(TAG, "FOTA PASS_2! Periodic publish (msg_id=%d): %s", msg_id, msg);
}
vTaskDelay(pdMS_TO_TICKS(5000)); // 5 seconds
}
}
/* Helper: publish response payload */
void mqtt_send_response(const char *payload)
{
if (!s_client) {
ESP_LOGW(TAG, "MQTT client not ready, cannot send response");
return;
}
int msg_id = esp_mqtt_client_publish(
s_client,
s_rpc_resp_topic,
payload,
0, /* len = 0 => strlen() inside esp-idf */
1, /* QoS 1 */
0 /* retain */
);
ESP_LOGI(TAG, "Published RPC response (msg_id=%d): %s", msg_id, payload);
}
/* Helper: build and send job status JSON */
static void send_job_status(const ota_job_t *job,
jobStatus_e status,
jobResult_e result)
{
char json[256];
/* Current epoch timestamp; you can replace with your own RTC if needed */
time_t now = time(NULL);
/* NOTE: DEVICE_ID comes from device_config.h */
int len = snprintf(json, sizeof(json),
"{"
"\"ts\":%ld,"
"\"deviceId\":\"%s\","
"\"jobId\":\"%s\","
"\"jobStatus\":%d,"
"\"jobResult\":%d"
"}",
(long)now,
DEVICE_ID,
job->jobId,
(int)status,
(int)result
);
if (len <= 0 || len >= (int)sizeof(json)) {
ESP_LOGW(TAG, "Job status JSON truncated or error");
}
mqtt_send_response(json);
}
/* MQTT event handler */
static void mqtt_event_handler(void *handler_args,
esp_event_base_t base,
int32_t event_id,
void *event_data)
{
esp_mqtt_event_handle_t event = (esp_mqtt_event_handle_t)event_data;
switch (event->event_id) {
case MQTT_EVENT_CONNECTED:
ESP_LOGI(TAG, "MQTT_EVENT_CONNECTED");
/* Subscribe to RPC request topic */
esp_mqtt_client_subscribe(s_client, s_rpc_req_topic, 1);
ESP_LOGI(TAG, "Subscribed to: %s", s_rpc_req_topic);
/* NEW: start periodic hello publisher once */
if (!s_periodic_task_started) {
BaseType_t res = xTaskCreate(
periodic_publish_task,
"periodic_pub",
4096,
NULL,
5,
NULL
);
if (res == pdPASS) {
s_periodic_task_started = true;
ESP_LOGI(TAG, "Started periodic publish task");
} else {
ESP_LOGE(TAG, "Failed to start periodic publish task");
}
}
break;
case MQTT_EVENT_DISCONNECTED:
ESP_LOGW(TAG, "MQTT_EVENT_DISCONNECTED");
break;
case MQTT_EVENT_DATA: {
ESP_LOGI(TAG, "MQTT_EVENT_DATA: %.*s", event->data_len, event->data);
/* Ensure payload is null-terminated for JSON parsing */
char *payload = calloc(event->data_len + 1, 1);
if (!payload) {
ESP_LOGE(TAG, "OOM in MQTT_EVENT_DATA");
return;
}
memcpy(payload, event->data, event->data_len);
payload[event->data_len] = '\0';
ota_job_t job = {0};
if (!rpc_job_parse(payload, &job)) {
ESP_LOGW(TAG, "Failed to parse RPC job JSON");
free(payload);
return;
}
ESP_LOGI(TAG,
"Parsed job: status=%d, deviceType=%d, otaType=%d, jobId=%s",
job.l_ota_status, job.deviceType, job.otaType, job.jobId);
/* l_ota_status == 3 => Download from FTP */
if (job.l_ota_status == 3) {
ESP_LOGI(TAG, "OTA status 3: Download request from FTP");
/* Notify PENDING (or EXECUTING) + SUCCESS (ack receipt) */
send_job_status(&job, JOB_STATUS_PENDING, JOB_RESULT_SUCCESS);
ftp_config_t cfg = {
.server = FTP_SERVER_IP,
.port = FTP_SERVER_PORT,
.username = FTP_USERNAME,
.password = FTP_PASSWORD,
.remote_path = FTP_REMOTE_PATH,
.local_path = FTP_LOCAL_PATH,
};
ftp_err_t ret = ftp_manager_download(&cfg, NULL);
if (ret == FTP_OK) {
ESP_LOGI(TAG, "FTP download success");
send_job_status(&job, JOB_STATUS_EXECUTED, JOB_RESULT_SUCCESS);
} else {
ESP_LOGE(TAG, "FTP download failed, err=%d", ret);
send_job_status(&job, JOB_STATUS_EXECUTED, JOB_RESULT_REJECTED);
}
}
/* l_ota_status == 6 => Apply FOTA & reboot */
else if (job.l_ota_status == 6) {
ESP_LOGI(TAG, "OTA status 6: Apply FOTA & reboot");
send_job_status(&job, JOB_STATUS_EXECUTING, JOB_RESULT_SUCCESS);
fota_err_t res = fota_manager_apply(FTP_LOCAL_PATH);
if (res == FOTA_OK) {
ESP_LOGI(TAG, "FOTA apply success, scheduling reboot");
send_job_status(&job, JOB_STATUS_EXECUTED, JOB_RESULT_SUCCESS);
xTaskCreate(
reboot_task,
"reboot_task",
4096,
NULL,
5,
NULL
);
} else {
ESP_LOGE(TAG, "FOTA apply failed, err=%d", res);
send_job_status(&job, JOB_STATUS_EXECUTED, JOB_RESULT_REJECTED);
}
} else {
ESP_LOGW(TAG, "Unhandled l_ota_status=%d", job.l_ota_status);
}
free(payload);
break;
}
default:
break;
}
}
/* Build MAC string and topics: VEC/Vectr12X/v362/{MAC}/RPC/... */
static void build_rpc_topics(void)
{
uint8_t mac[6];
esp_read_mac(mac, ESP_MAC_WIFI_STA);
char mac_str[13] = {0}; /* 12 hex chars + null */
snprintf(mac_str, sizeof(mac_str),
"%02X%02X%02X%02X%02X%02X",
mac[0], mac[1], mac[2], mac[3], mac[4], mac[5]);
snprintf(s_rpc_req_topic, sizeof(s_rpc_req_topic),
"VEC/Vectr12X/v362/%s/RPC/REQUEST", mac_str);
snprintf(s_rpc_resp_topic, sizeof(s_rpc_resp_topic),
"VEC/Vectr12X/v362/%s/RPC/RESPONSE", mac_str);
/* NEW: periodic topic */
snprintf(s_periodic_topic, sizeof(s_periodic_topic),
"VEC/Vectr12X/v362/%s/PERIODIC", mac_str);
ESP_LOGI(TAG, "RPC REQUEST topic: %s", s_rpc_req_topic);
ESP_LOGI(TAG, "RPC RESPONSE topic: %s", s_rpc_resp_topic);
ESP_LOGI(TAG, "PERIODIC topic: %s", s_periodic_topic);
}
/* Public init called from app_main() */
void mqtt_manager_init(void)
{
build_rpc_topics();
esp_mqtt_client_config_t mqtt_cfg = {
.broker = {
.address = {
.uri = MQTT_BROKER_URL, // e.g. "mqtt://192.168.0.10"
// .port = MQTT_BROKER_PORT, // e.g. 1883
},
},
.credentials = {
.username = MQTT_USERNAME,
.authentication = {
.password = MQTT_PASSWORD,
},
},
.session.keepalive = 60,
.network.reconnect_timeout_ms = 10000,
.network.timeout_ms = 10000,
};
s_client = esp_mqtt_client_init(&mqtt_cfg);
if (!s_client) {
ESP_LOGE(TAG, "Failed to create MQTT client");
return;
}
esp_mqtt_client_register_event(
s_client,
ESP_EVENT_ANY_ID,
mqtt_event_handler,
NULL
);
esp_mqtt_client_start(s_client);
ESP_LOGI(TAG, "MQTT client started");
}