回答

收藏

[评测分享] 【STM32H735-DK 测评】基于lwip的mqtt移植

#板卡评测 #板卡评测 247718 人阅读 | 0 人回复 | 2024-03-19

【目的】
移植mqtt实现连接mqtt服务器,实现数据的收发。
【实验步骤】
1、在前的lwip移植的基础之上,添加mqtt的代码,【STM32H735-DK 测评】手工配置LWIP - 板卡试用 - 与非网 (eefocus.com)
2、在Core/inc创建bsp_mqtt.h内空如下:


  1. <font size="4">/*
  2. * bsp_mqtt.h
  3. *
  4. *  Created on: Mar 19, 2024
  5. *      Author: liujianhua
  6. */

  7. #ifndef INC_BSP_MQTT_H_
  8. #define INC_BSP_MQTT_H_

  9. void bsp_mqtt_init(void);

  10. #endif /* INC_BSP_MQTT_H_ */
  11. </font>
复制代码
其实就是声明bsp_mqtt_init函数。
3、在Core/src目录下面创建bsp_mqtt.c。
在野火的教程中,他是从mqtt官网下载的源码进行移植,需要创建两个线程来实现mqtt的订阅与发布,这次是直接什么stmcubemax下的mqtt来实现,所以不需要创建任务,直接以回调的方式来实现。
其内容如下:
  1. /*
  2. * bsp_mqtt.c
  3. *
  4. *  Created on: Mar 19, 2024
  5. *      Author: liujianhua
  6. */



  7. /*-----------------------------------------------------------
  8. * Includes files
  9. *----------------------------------------------------------*/

  10. /* lib includes. */
  11. #include <string.h>

  12. /* segger rtt includes. */
  13. #include "main.h"
  14. //#include "bsp_mqtt.h"

  15. /* FreeRTOS includes. */
  16. #include "FreeRTOS.h"
  17. #include "semphr.h"

  18. /* lwip includes. */
  19. #include "lwip/apps/mqtt.h"
  20. #include "lwip/ip4_addr.h"

  21. static err_t bsp_mqtt_connect(void);

  22. #define USE_MQTT_MUTEX //使用发送数据的互斥锁,多个任务有发送才必须
  23. #ifdef USE_MQTT_MUTEX
  24. static SemaphoreHandle_t s__mqtt_publish_mutex = NULL;
  25. #endif /* USE_MQTT_MUTEX */

  26. static mqtt_client_t *s__mqtt_client_instance = NULL; //mqtt连接句柄,这里一定要设置全局变量,防止 lwip 底层重复申请空间

  27. //MQTT 数据结构体
  28. struct mqtt_recv_buffer
  29. {
  30.     char recv_buffer[1024];  //储存接收的buffer
  31.     uint16_t recv_len;         //记录已接收多少个字节的数据,MQTT的数据分包来的
  32.     uint16_t recv_total;       //MQTT接收数据的回调函数会有个总的大小
  33. };

  34. //结构体初始化
  35. struct mqtt_recv_buffer s__mqtt_recv_buffer_g = {
  36.     .recv_len = 0,
  37.     .recv_total = 0,
  38. };



  39. static err_t bsp_mqtt_subscribe(mqtt_client_t* mqtt_client, char * sub_topic, uint8_t qos);


  40. /* ===========================================
  41.                  接收回调函数
  42. ============================================== */

  43. /*!
  44. * @brief mqtt 接收数据处理函数接口,需要在应用层进行处理
  45. *        执行条件:mqtt连接成功
  46. *
  47. * @param [in1] : 用户提供的回调参数指针
  48. * @param [in2] : 接收的数据指针
  49. * @param [in3] : 接收数据长度
  50. * @retval: 处理的结果
  51. */
  52. __weak int mqtt_rec_data_process(void* arg, char *rec_buf, uint64_t buf_len)
  53. {
  54.     printf("recv_buffer = %s\n", rec_buf);
  55.     return 0;
  56. }


  57. /*!
  58. * @brief MQTT 接收到数据的回调函数
  59. *        执行条件:MQTT 连接成功
  60. *
  61. * @param [in1] : 用户提供的回调参数指针
  62. * @param [in2] : MQTT 收到的分包数据指针
  63. * @param [in3] : MQTT 分包数据长度
  64. * @param [in4] : MQTT 数据包的标志位
  65. * @retval: None
  66. */
  67. static void bsp_mqtt_incoming_data_cb(void *arg, const u8_t *data, u16_t len, u8_t flags)
  68. {
  69.     if( (data == NULL) || (len == 0) )
  70.     {
  71.         printf("mqtt_client_incoming_data_cb: condition error @entry\n");
  72.         return;
  73.     }

  74.     if(s__mqtt_recv_buffer_g.recv_len + len < sizeof(s__mqtt_recv_buffer_g.recv_buffer))
  75.     {
  76.         //
  77.         snprintf(&s__mqtt_recv_buffer_g.recv_buffer[s__mqtt_recv_buffer_g.recv_len], len, "%s", data);
  78.         s__mqtt_recv_buffer_g.recv_len += len;
  79.     }

  80.     if ( (flags & MQTT_DATA_FLAG_LAST) == MQTT_DATA_FLAG_LAST )
  81.     {
  82.         //处理数据
  83.         mqtt_rec_data_process(arg , s__mqtt_recv_buffer_g.recv_buffer, s__mqtt_recv_buffer_g.recv_len);

  84.         //已接收字节计数归0
  85.         s__mqtt_recv_buffer_g.recv_len = 0;

  86.         //清空接收buffer
  87.         memset(s__mqtt_recv_buffer_g.recv_buffer, 0, sizeof(s__mqtt_recv_buffer_g.recv_buffer));
  88.     }


  89.     printf("mqtt_client_incoming_data_cb:reveiving incomming data.\n");
  90. }


  91. /*!
  92. * @brief MQTT 接收到数据的回调函数
  93. *        执行条件:MQTT 连接成功
  94. *
  95. * @param [in] : 用户提供的回调参数指针
  96. * @param [in] : MQTT 收到数据的topic
  97. * @param [in] : MQTT 收到数据的总长度
  98. * @retval: None
  99. */
  100. static void bsp_mqtt_incoming_publish_cb(void *arg, const char *topic, u32_t tot_len)
  101. {
  102.     if( (topic == NULL) || (tot_len == 0) )
  103.     {
  104.         printf("bsp_mqtt_incoming_publish_cb: condition error @entry\n");
  105.         return;
  106.     }

  107.         printf("bsp_mqtt_incoming_publish_cb: topic = %s.\n",topic);
  108.         printf("bsp_mqtt_incoming_publish_cb: tot_len = %d.\n",tot_len);
  109.         s__mqtt_recv_buffer_g.recv_total = tot_len;    //需要接收的总字节
  110.         s__mqtt_recv_buffer_g.recv_len = 0;            //已接收字节计数归0

  111.     //清空接收buffer
  112.     memset(s__mqtt_recv_buffer_g.recv_buffer, 0, sizeof(s__mqtt_recv_buffer_g.recv_buffer));
  113. }


  114. /* ===========================================
  115.                  连接状态回调函数
  116. ============================================== */

  117. /*!
  118. * @brief MQTT 连接成功的处理函数,需要的话在应用层定义
  119. *
  120. * @param [in1] : MQTT 连接句柄
  121. * @param [in2] : MQTT 连接参数指针
  122. *
  123. * @retval: None
  124. */
  125. __weak void mqtt_conn_suc_proc(mqtt_client_t *client, void *arg)
  126. {
  127.     char test_sub_topic[] = "/public/TEST/AidenHinGwenWong_sub";
  128.     bsp_mqtt_subscribe(client,test_sub_topic,0);
  129. }

  130. /*!
  131. * @brief MQTT 处理失败调用的函数
  132. *
  133. * @param [in1] : MQTT 连接句柄
  134. * @param [in2] : MQTT 连接参数指针
  135. *
  136. * @retval: None
  137. */
  138. __weak void mqtt_error_process_callback(mqtt_client_t * client, void *arg)
  139. {

  140. }

  141. /*!
  142. * @brief MQTT 连接状态的回调函数
  143. *
  144. * @param [in] : MQTT 连接句柄
  145. * @param [in] : 用户提供的回调参数指针
  146. * @param [in] : MQTT 连接状态
  147. * @retval: None
  148. */
  149. static void bsp_mqtt_connection_cb(mqtt_client_t *client, void *arg, mqtt_connection_status_t status)
  150. {
  151.     if( client == NULL )
  152.     {
  153.         printf("bsp_mqtt_connection_cb: condition error @entry\n");
  154.         return;
  155.     }

  156.     if ( status == MQTT_CONNECT_ACCEPTED ) //Successfully connected
  157.     {
  158.                 printf("bsp_mqtt_connection_cb: Successfully connected\n");

  159.         // 注册接收数据的回调函数
  160.                 mqtt_set_inpub_callback(client, bsp_mqtt_incoming_publish_cb, bsp_mqtt_incoming_data_cb, arg);

  161.         //成功处理函数
  162.                 mqtt_conn_suc_proc(client, arg);
  163.     }
  164.         else
  165.         {
  166.                 printf("bsp_mqtt_connection_cb: Fail connected, status = %s\n", lwip_strerr(status) );
  167.         //错误处理
  168.                 mqtt_error_process_callback(client, arg);
  169.         }
  170. }


  171. /*!
  172. * @brief 连接到 mqtt 服务器
  173. *        执行条件:无
  174. *
  175. * @param [in] : None
  176. *
  177. * @retval: 连接状态,如果返回不是 ERR_OK 则需要重新连接
  178. */
  179. static err_t bsp_mqtt_connect(void)
  180. {
  181.     printf("bsp_mqtt_connect: Enter!\n");
  182.         err_t ret;

  183.     struct mqtt_connect_client_info_t  mqtt_connect_info = {
  184.                 "AidenHinGwenWong_MQTT_Test",  /* 这里需要修改,以免在同一个服务器两个相同ID会发生冲突 */
  185.                 NULL,   /* MQTT 服务器用户名 */
  186.                 NULL,   /* MQTT 服务器密码 */
  187.                 60,     /* 与 MQTT 服务器保持连接时间,时间超过未发送数据会断开 */
  188.                 "/public/TEST/AidenHinGwenWong_pub",/* MQTT遗嘱的消息发送topic */
  189.                 "Offline_pls_check", /* MQTT遗嘱的消息,断开服务器的时候会发送 */
  190.                 0,  /* MQTT遗嘱的消息 Qos */
  191.                 0   /* MQTT遗嘱的消息 Retain */
  192.         };

  193.     ip_addr_t server_ip;
  194.     ip4_addr_set_u32(&server_ip, ipaddr_addr("192.168.3.156"));  //MQTT服务器IP

  195.     uint16_t server_port = 1883;  //注意这里是 MQTT 的 TCP 连接方式的端口号!!!!

  196.     if (s__mqtt_client_instance == NULL)
  197.     {
  198.         // 句柄==NULL 才申请空间,否则无需重复申请
  199.             s__mqtt_client_instance = mqtt_client_new();
  200.     }

  201.         if (s__mqtt_client_instance == NULL)
  202.         {
  203.         //防止申请失败
  204.                 printf("bsp_mqtt_connect: s__mqtt_client_instance malloc fail @@!!!\n");
  205.                 return ERR_MEM;
  206.         }

  207.     //进行连接,注意:如果需要带入 arg ,arg必须是全局变量,局部变量指针会被回收,大坑!!!!!
  208.     ret = mqtt_client_connect(s__mqtt_client_instance, &server_ip, server_port, bsp_mqtt_connection_cb, NULL, &mqtt_connect_info);

  209.     /******************
  210.     小提示:连接错误不需要做任何操作,mqtt_client_connect 中注册的回调函数里面做判断并进行对应的操作
  211.     *****************/

  212.     printf("bsp_mqtt_connect: connect to mqtt %s\n", lwip_strerr(ret));

  213.         return ret;
  214. }




  215. /* ===========================================
  216.                  发送接口、回调函数
  217. ============================================== */

  218. /*!
  219. * @brief MQTT 发送数据的回调函数
  220. *        执行条件:MQTT 连接成功
  221. *
  222. * @param [in] : 用户提供的回调参数指针
  223. * @param [in] : MQTT 发送的结果:成功或者可能的错误
  224. * @retval: None
  225. */
  226. static void mqtt_client_pub_request_cb(void *arg, err_t result)
  227. {

  228.     mqtt_client_t *client = (mqtt_client_t *)arg;
  229.     if (result != ERR_OK)
  230.     {
  231.         printf("mqtt_client_pub_request_cb: c002: Publish FAIL, result = %s\n", lwip_strerr(result));

  232.                 //错误处理
  233.                 mqtt_error_process_callback(client, arg);
  234.     }
  235.         else
  236.         {
  237.         printf("mqtt_client_pub_request_cb: c005: Publish complete!\n");
  238.         }
  239. }



  240. /*!
  241. * @brief 发送消息到服务器
  242. *        执行条件:无
  243. *
  244. * @param [in1] : mqtt 连接句柄
  245. * @param [in2] : mqtt 发送 topic 指针
  246. * @param [in3] : 发送数据包指针
  247. * @param [in4] : 数据包长度
  248. * @param [in5] : qos
  249. * @param [in6] : retain
  250. * @retval: 发送状态
  251. * @note: 有可能发送不成功但是现实返回值是 0 ,需要判断回调函数 mqtt_client_pub_request_cb 是否 result == ERR_OK
  252. */
  253. err_t bsp_mqtt_publish(mqtt_client_t *client, char *pub_topic, char *pub_buf, uint16_t data_len, uint8_t qos, uint8_t retain)
  254. {
  255.         if ( (client == NULL) || (pub_topic == NULL) || (pub_buf == NULL) || (data_len == 0) || (qos > 2) || (retain > 1) )
  256.         {
  257.                 printf("bsp_mqtt_publish: input error@@" );
  258.                 return ERR_VAL;
  259.         }

  260.     //判断是否连接状态
  261.     if(mqtt_client_is_connected(client) != pdTRUE)
  262.     {
  263.                 printf("bsp_mqtt_publish: client is not connected\n");
  264.         return ERR_CONN;
  265.     }

  266.         err_t err;
  267. #ifdef USE_MQTT_MUTEX

  268.     // 创建 mqtt 发送互斥锁
  269.     if (s__mqtt_publish_mutex == NULL)
  270.     {
  271.                 printf("bsp_mqtt_publish: create mqtt mutex ! \n" );
  272.         s__mqtt_publish_mutex = xSemaphoreCreateMutex();
  273.     }

  274.     if (xSemaphoreTake(s__mqtt_publish_mutex, portMAX_DELAY) == pdPASS)
  275. #endif /* USE_MQTT_MUTEX */

  276.     {
  277.             err = mqtt_publish(client, pub_topic, pub_buf, data_len, qos, retain, mqtt_client_pub_request_cb, (void*)client);
  278.             printf("bsp_mqtt_publish: mqtt_publish err = %s\n", lwip_strerr(err) );

  279. #ifdef USE_MQTT_MUTEX
  280.         printf("bsp_mqtt_publish: mqtt_publish xSemaphoreTake\n");
  281.         xSemaphoreGive(s__mqtt_publish_mutex);
  282. #endif /* USE_MQTT_MUTEX */

  283.     }
  284.         return err;
  285. }

  286. /* ===========================================
  287.                  MQTT 订阅接口函数
  288. ============================================== */


  289. /*!
  290. * @brief MQTT 订阅的回调函数
  291. *        执行条件:MQTT 连接成功
  292. *
  293. * @param [in] : 用户提供的回调参数指针
  294. * @param [in] : MQTT 订阅结果
  295. * @retval: None
  296. */
  297. static void bsp_mqtt_request_cb(void *arg, err_t err)
  298. {
  299.     if ( arg == NULL )
  300.     {
  301.         printf("bsp_mqtt_request_cb: input error@@\n");
  302.         return;
  303.     }

  304.     mqtt_client_t *client = (mqtt_client_t *)arg;

  305.     if ( err != ERR_OK )
  306.     {
  307.         printf("bsp_mqtt_request_cb: FAIL sub, sub again, err = %s\n", lwip_strerr(err));

  308.                 //错误处理
  309.                 mqtt_error_process_callback(client, arg);
  310.     }
  311.         else
  312.         {
  313.                 printf("bsp_mqtt_request_cb: sub SUCCESS!\n");
  314.         }
  315. }

  316. /*!
  317. * @brief mqtt 订阅
  318. *        执行条件:连接成功
  319. *
  320. * @param [in1] : mqtt 连接句柄
  321. * @param [in2] : mqtt 发送 topic 指针
  322. * @param [in5] : qos
  323. * @retval: 订阅状态
  324. */
  325. static err_t bsp_mqtt_subscribe(mqtt_client_t* mqtt_client, char * sub_topic, uint8_t qos)
  326. {
  327.     printf("bsp_mqtt_subscribe: Enter\n");

  328.         if( ( mqtt_client == NULL) || ( sub_topic == NULL) || ( qos > 2 ) )
  329.         {
  330.         printf("bsp_mqtt_subscribe: input error@@\n");
  331.                 return ERR_VAL;
  332.         }

  333.         if ( mqtt_client_is_connected(mqtt_client) != pdTRUE )
  334.         {
  335.                 printf("bsp_mqtt_subscribe: mqtt is not connected, return ERR_CLSD.\n");
  336.                 return ERR_CLSD;
  337.         }

  338.         err_t err;
  339.         err = mqtt_subscribe(mqtt_client, sub_topic, qos, bsp_mqtt_request_cb, (void *)mqtt_client);  // subscribe and call back.

  340.         if (err != ERR_OK)
  341.         {
  342.                 printf("bsp_mqtt_subscribe: mqtt_subscribe Fail, return:%s \n", lwip_strerr(err));
  343.         }
  344.         else
  345.         {
  346.                 printf("bsp_mqtt_subscribe: mqtt_subscribe SUCCESS, reason: %s\n", lwip_strerr(err));
  347.         }

  348.         return err;
  349. }


  350. /* ===========================================
  351.                  初始化接口函数
  352. ============================================== */

  353. /*!
  354. * @brief 封装 MQTT 初始化接口
  355. *        执行条件:无
  356. *
  357. * @retval: 无
  358. */
  359. void bsp_mqtt_init(void)
  360. {
  361.     printf("Mqtt init...");

  362.     // 连接服务器
  363.     bsp_mqtt_connect();

  364.     // 发送消息到服务器
  365.     char message_test[] = "Hello mqtt server";
  366.     for(int i = 0; i < 10; i++)
  367.     {
  368.         bsp_mqtt_publish(s__mqtt_client_instance,"/public/TEST/AidenHinGwenWong_pub",message_test,sizeof(message_test),1,0);
  369.         vTaskDelay(1000);
  370.     }

  371. }
复制代码
【代码分析】:
整个代码不长,不到500行,在void bsp_mqtt_init(void)函数中,调用bsp_mqtt_connect函数。在bsp_mqtt_connect中,首先创建一个连接mqtt的结构体 mqtt_connect_client_info_t,在mqtt.h中的定义如下:
  1. * Client information and connection parameters */
  2. struct mqtt_connect_client_info_t {
  3.   /** Clinet ID */
  4.   const char *client_id;
  5.   /** 如果服务器需要验证用户,则填写用户名, 如果不需要测为 NULL  */
  6.   const char* client_user;
  7.   /** 如果服务器需要验证用户,则填写密码, 如果不需要测为 NULL */
  8.   const char* client_pass;
  9.   /** <span style="background-color: rgb(255, 255, 255); padding-left: 2px;"><span style="color: rgb(0, 0, 0); font-family: &quot;Courier New&quot;; font-size: 10pt; white-space: pre;"><span style="color:#3f7f5f;">与 MQTT 服务器保持连接时间,时间超过未发送数据会断开</span></span></span>, 0 禁用保持活动功能*/
  10.   u16_t keep_alive;
  11.   /** w<span style="background-color: rgb(255, 255, 255); padding-left: 2px;"><span style="color: rgb(0, 0, 0); font-family: &quot;Courier New&quot;; font-size: 10pt; white-space: pre;"><span style="color:#3f7f5f;"> MQTT遗嘱的消息发送topic</span></span></span>, set to NULL if will is not to be used,
  12. */
  13.   const char* will_topic;
  14.   /** will_msg, see will_topic */
  15.   const char* will_msg;
  16.   /** will_qos, see will_topic */
  17.   u8_t will_qos;
  18.   /** will_retain, see will_topic */
  19.   u8_t will_retain;
  20. #if LWIP_ALTCP && LWIP_ALTCP_TLS
  21.   /** TLS configuration for secure connections */
  22.   struct altcp_tls_config *tls_config;
  23. #endif
  24. };
复制代码
我在上面注释了。接下来的使用ip4_addr_set_u32来设置服务器的IP地址。同时指定mqtt的服务端口。
接下来使用mqtt_client_connect来连接服务器,并注册连接中的回调函数。从而开始mqtt的服务。
接连功成功后,注册了bsp_mqtt_incoming_publish_cb接收回调函数、以及订阅成功的函数mqtt_conn_suc_proc。同时也注册了错误处理的回调函数。这样就不需要再创建线程去调用了。直接使用lwip的线程来处理报文。
最后是接收的数据回调做了简单打印功能。
到此mqtt的代码编写结事,我们把bsp_mqtt_init放到LWIPINIt后面执行,就行了。
  1. /* USER CODE END Header_StartDefaultTask */
  2. void StartDefaultTask(void *argument)
  3. {
  4.   /* init code for LWIP */
  5.   MX_LWIP_Init();
  6.   /* USER CODE BEGIN StartDefaultTask */
  7. // TCP_Client_Init();
  8. // TCP_Echo_Init();
  9.   bsp_mqtt_init();
  10.   /* Infinite loop */
  11.   for(;;)
  12.   {
  13.     osDelay(1);
  14.   }
  15.   /* USER CODE END StartDefaultTask */
  16. }
复制代码
【实验现象】
我们在本地创建了mqtt服务器,端口为1883。启动器后,开发板可以顺利的连接到服务回:

同时我们开启一个mqtt客户端,并打开串口调试助手查看信息:

串口助手中查看到我们开发板接收到的信息:

这样就成功的实现了MQTT的移植,并实现了其功能。
【注意】
在刚刚移植好后,会在连一会后报错。
Assertion “sys_timeout: timeout != NULL, pool MEMP_SYS_TIMEOUT is empty” failed at line 216 in src/core/timers.c然后就需要重启好几次才能连上,后面找到资料,需要修改一下参数:

这样修改后,就不会报错了。
【总结】
这个移植,相比于野火的mqtt移植要简单许多,使用tcpclien、tcpserver都要比他简单。
【感谢】
在移植过程中学习到了两篇文章在此感谢:
《【嵌入式实战】一文拿下 STM32 Lwip MQTT(超详细)》作者:HinGwenWoong
《STM32+LWIP协议栈实现MQTT协议并挂载到EMQ_X_CLOUD平台》作者:爱小羊
分享到:
回复

使用道具 举报

您需要登录后才可以回帖 注册/登录

本版积分规则

8782 积分
42 主题
+ 关注
热门推荐
关闭

站长推荐上一条 /3 下一条