@@ -145,6 +145,7 @@ static int callback_mqtt(struct lws *wsi, enum lws_callback_reasons reason,
145145 // 如果是重复的,则释放待处理列表中的 topic 内存并跳过
146146 if (is_duplicate ) {
147147 free ((void * )topic_map -> topic ); // 释放重复 topic 的内存
148+ topic_map -> topic = NULL ;
148149 continue ;
149150 }
150151
@@ -153,9 +154,10 @@ static int callback_mqtt(struct lws *wsi, enum lws_callback_reasons reason,
153154 (ctx -> sub_topic_count + 1 ) * sizeof (iot_mqtt_topic_map_t ));
154155 if (new_sub_topic_maps == NULL ) {
155156 // realloc 失败,保留原有 sub_topic_maps
156- lwsl_err ("%s: realloc sub_topic_maps failed\n" , __func__ );
157+ lwsl_err ("%s: realloc sub_topic_maps failed\\ n" , __func__ );
157158 // 释放当前 topic_map 的 topic 内存,因为它无法被添加
158159 free ((void * )topic_map -> topic );
160+ topic_map -> topic = NULL ;
159161 // 这里不直接返回错误,允许处理剩余的 topic,但记录下内存问题
160162 continue ; // 继续处理下一个 topic
161163 }
@@ -354,8 +356,9 @@ static int callback_mqtt(struct lws *wsi, enum lws_callback_reasons reason,
354356 case LWS_CALLBACK_TIMER :
355357 lwsl_debug ("timer\n" );
356358 if (ctx -> is_connected ) {
357- lws_set_timer_usecs (wsi , 10 * 1000000 );
359+ lws_set_timer_usecs (wsi , ctx -> config -> ping_interval * 1000000 );
358360 lws_callback_on_writable (wsi );
361+ lws_cancel_service (ctx -> context );
359362 }
360363 break ;
361364
@@ -464,13 +467,46 @@ int iot_mqtt_init(iot_mqtt_ctx_t *ctx, iot_mqtt_config_t *config) {
464467}
465468
466469void iot_mqtt_deinit (iot_mqtt_ctx_t * ctx ) {
470+ if (!ctx ) {
471+ return ;
472+ }
467473 aws_string_destroy_secure (ctx -> config -> username );
468474 aws_string_destroy_secure (ctx -> config -> password );
469475
470476 if (ctx -> context ) {
471477 lws_context_destroy (ctx -> context );
472478 }
473479
480+ // Free subscribed topics
481+ for (size_t i = 0 ; i < ctx -> sub_topic_count ; i ++ ) {
482+ free ((void * )ctx -> sub_topic_maps [i ].topic );
483+ }
484+ free (ctx -> sub_topic_maps );
485+ ctx -> sub_topic_maps = NULL ;
486+ ctx -> sub_topic_count = 0 ;
487+
488+ // Free pending subscription list
489+ if (ctx -> pending_sub_list ) {
490+ for (size_t i = 0 ; i < ctx -> pending_sub_list -> count ; i ++ ) {
491+ free ((void * )ctx -> pending_sub_list -> topic_maps [i ].topic );
492+ }
493+ free (ctx -> pending_sub_list -> topic_maps );
494+ free (ctx -> pending_sub_list );
495+ ctx -> pending_sub_list = NULL ;
496+ }
497+
498+ // Free pending publish list
499+ if (ctx -> pending_pub_list ) {
500+ for (size_t i = 0 ; i < ctx -> pending_pub_list -> count ; i ++ ) {
501+ free ((void * )ctx -> pending_pub_list -> publish_params [i ].topic );
502+ free ((void * )ctx -> pending_pub_list -> publish_params [i ].payload );
503+ }
504+ free (ctx -> pending_pub_list -> publish_params );
505+ free (ctx -> pending_pub_list -> sent_list );
506+ free (ctx -> pending_pub_list );
507+ ctx -> pending_pub_list = NULL ;
508+ }
509+
474510 lws_pthread_mutex_destroy (& ctx -> sub_topic_mutex );
475511 lws_pthread_mutex_destroy (& ctx -> pub_topic_mutex );
476512}
@@ -527,19 +563,34 @@ static void _iot_mqtt_append_sub_topic(iot_mqtt_ctx_t *ctx, iot_mqtt_topic_map_t
527563 // 将topic_map存入待订阅列表
528564 if (ctx -> pending_sub_list == NULL ) {
529565 ctx -> pending_sub_list = (iot_mqtt_pending_sub_list_t * )malloc (sizeof (iot_mqtt_pending_sub_list_t ));
566+ if (ctx -> pending_sub_list == NULL ) {
567+ lws_pthread_mutex_unlock (& ctx -> sub_topic_mutex );
568+ return ;
569+ }
530570 ctx -> pending_sub_list -> topic_maps = NULL ;
531571 ctx -> pending_sub_list -> count = 0 ;
532572 }
533573 // copy topic_map
534- iot_mqtt_topic_map_t * topic_map_copy = (iot_mqtt_topic_map_t * )malloc (sizeof (iot_mqtt_topic_map_t ));
535- memset (topic_map_copy , 0 , sizeof (iot_mqtt_topic_map_t ));
536- topic_map_copy -> topic = strdup (topic_map -> topic );
537- topic_map_copy -> message_callback = topic_map -> message_callback ;
538- topic_map_copy -> event_callback = topic_map -> event_callback ;
539- topic_map_copy -> user_data = topic_map -> user_data ;
540- topic_map_copy -> qos = topic_map -> qos ;
541- ctx -> pending_sub_list -> topic_maps = (iot_mqtt_topic_map_t * )realloc (ctx -> pending_sub_list -> topic_maps , (ctx -> pending_sub_list -> count + 1 ) * sizeof (iot_mqtt_topic_map_t ));
542- ctx -> pending_sub_list -> topic_maps [ctx -> pending_sub_list -> count ] = * topic_map_copy ;
574+ iot_mqtt_topic_map_t topic_map_copy ;
575+ memset (& topic_map_copy , 0 , sizeof (iot_mqtt_topic_map_t ));
576+ topic_map_copy .topic = strdup (topic_map -> topic );
577+ if (topic_map_copy .topic == NULL ) {
578+ lws_pthread_mutex_unlock (& ctx -> sub_topic_mutex );
579+ return ;
580+ }
581+ topic_map_copy .message_callback = topic_map -> message_callback ;
582+ topic_map_copy .event_callback = topic_map -> event_callback ;
583+ topic_map_copy .user_data = topic_map -> user_data ;
584+ topic_map_copy .qos = topic_map -> qos ;
585+
586+ iot_mqtt_topic_map_t * new_maps = (iot_mqtt_topic_map_t * )realloc (ctx -> pending_sub_list -> topic_maps , (ctx -> pending_sub_list -> count + 1 ) * sizeof (iot_mqtt_topic_map_t ));
587+ if (new_maps == NULL ) {
588+ free ((void * )topic_map_copy .topic );
589+ lws_pthread_mutex_unlock (& ctx -> sub_topic_mutex );
590+ return ;
591+ }
592+ ctx -> pending_sub_list -> topic_maps = new_maps ;
593+ ctx -> pending_sub_list -> topic_maps [ctx -> pending_sub_list -> count ] = topic_map_copy ;
543594 ctx -> pending_sub_list -> count ++ ;
544595 lws_pthread_mutex_unlock (& ctx -> sub_topic_mutex );
545596}
@@ -574,31 +625,37 @@ int iot_mqtt_publish(iot_mqtt_ctx_t *ctx, const char *topic, const uint8_t *payl
574625 int ret = VOLC_OK ;
575626
576627 // 组装pub_param,放入pending_pub_list
577- lws_mqtt_publish_param_t * pub_param = malloc (sizeof (lws_mqtt_publish_param_t ));
578- memset (pub_param , 0 , sizeof (lws_mqtt_publish_param_t ));
579- pub_param -> topic = strdup (topic );
580- pub_param -> topic_len = strlen (topic );
581- void * payload_byte = malloc (len );
582- if (payload_byte == NULL ) {
583- // 内存分配失败,记录错误信息
584- lwsl_err ("%s: Memory allocation failed for payload\n" , __func__ );
585- // 可以选择返回错误码或进行其他处理
628+ lws_mqtt_publish_param_t pub_param ;
629+ memset (& pub_param , 0 , sizeof (lws_mqtt_publish_param_t ));
630+ pub_param .topic = strdup (topic );
631+ if (!pub_param .topic ) {
632+ lwsl_err ("%s: Memory allocation failed for topic\n" , __func__ );
586633 return VOLC_ERR_MALLOC ;
587634 }
588- memcpy (payload_byte , payload , len );
589- pub_param -> payload = payload_byte ;
590- pub_param -> payload_len = len ;
591- pub_param -> qos = qos ;
635+ pub_param .topic_len = strlen (topic );
636+
637+ if (len > 0 ) {
638+ pub_param .payload = malloc (len );
639+ if (pub_param .payload == NULL ) {
640+ lwsl_err ("%s: Memory allocation failed for payload\n" , __func__ );
641+ free ((void * )pub_param .topic );
642+ return VOLC_ERR_MALLOC ;
643+ }
644+ memcpy ((void * )pub_param .payload , payload , len );
645+ } else {
646+ pub_param .payload = NULL ;
647+ }
648+
649+ pub_param .payload_len = len ;
650+ pub_param .qos = qos ;
592651
593652 lws_pthread_mutex_lock (& ctx -> pub_topic_mutex );
594653 if (ctx -> pending_pub_list == NULL ) {
595654 ctx -> pending_pub_list = (iot_mqtt_pending_pub_list_t * )malloc (sizeof (iot_mqtt_pending_pub_list_t ));
596655 if (ctx -> pending_pub_list == NULL ) {
597- // 内存分配失败,记录错误信息
598656 lwsl_err ("%s: Memory allocation failed for pending_pub_list\n" , __func__ );
599- // 可以选择返回错误码或进行其他处理
600657 ret = VOLC_ERR_MALLOC ;
601- goto finish ;
658+ goto finish_with_free ;
602659 }
603660 ctx -> pending_pub_list -> publish_params = NULL ;
604661 ctx -> pending_pub_list -> sent_list = NULL ;
@@ -609,17 +666,38 @@ int iot_mqtt_publish(iot_mqtt_ctx_t *ctx, const char *topic, const uint8_t *payl
609666 if (ctx -> pending_pub_list -> count > 10 ) {
610667 ret = VOLC_ERR_MQTT_PUB ;
611668 lwsl_err ("%s: publish failed\n" , __func__ );
612- lws_pthread_mutex_unlock (& ctx -> pub_topic_mutex );
613- goto finish ;
669+ goto finish_with_free ;
614670 }
615671
616- ctx -> pending_pub_list -> publish_params = (lws_mqtt_publish_param_t * )realloc (ctx -> pending_pub_list -> publish_params , (ctx -> pending_pub_list -> count + 1 ) * sizeof (lws_mqtt_publish_param_t ));
617- ctx -> pending_pub_list -> publish_params [ctx -> pending_pub_list -> count ] = * pub_param ;
618- ctx -> pending_pub_list -> sent_list = (bool * )realloc (ctx -> pending_pub_list -> sent_list , (ctx -> pending_pub_list -> count + 1 ) * sizeof (bool ));
672+ lws_mqtt_publish_param_t * new_params = (lws_mqtt_publish_param_t * )realloc (ctx -> pending_pub_list -> publish_params , (ctx -> pending_pub_list -> count + 1 ) * sizeof (lws_mqtt_publish_param_t ));
673+ if (new_params == NULL ) {
674+ ret = VOLC_ERR_MALLOC ;
675+ goto finish_with_free ;
676+ }
677+ ctx -> pending_pub_list -> publish_params = new_params ;
678+
679+ bool * new_sent_list = (bool * )realloc (ctx -> pending_pub_list -> sent_list , (ctx -> pending_pub_list -> count + 1 ) * sizeof (bool ));
680+ if (new_sent_list == NULL ) {
681+ ret = VOLC_ERR_MALLOC ;
682+ goto finish_with_free ;
683+ }
684+ ctx -> pending_pub_list -> sent_list = new_sent_list ;
685+
686+ ctx -> pending_pub_list -> publish_params [ctx -> pending_pub_list -> count ] = pub_param ;
619687 ctx -> pending_pub_list -> sent_list [ctx -> pending_pub_list -> count ] = false;
620688 ctx -> pending_pub_list -> count ++ ;
621689
622- finish :
690+ lws_pthread_mutex_unlock (& ctx -> pub_topic_mutex );
691+ if (ctx -> is_connected ) {
692+ lws_callback_on_writable (ctx -> wsi );
693+ lws_cancel_service (ctx -> context );
694+ }
695+
696+ return ret ;
697+
698+ finish_with_free :
699+ free ((void * )pub_param .topic );
700+ free ((void * )pub_param .payload );
623701 lws_pthread_mutex_unlock (& ctx -> pub_topic_mutex );
624702 if (ctx -> is_connected ) {
625703 lws_callback_on_writable (ctx -> wsi );
0 commit comments