mqtt_client.c
Go to the documentation of this file.
1 /**
2  * @file mqtt_client.c
3  * @brief MQTT client
4  *
5  * @section License
6  *
7  * SPDX-License-Identifier: GPL-2.0-or-later
8  *
9  * Copyright (C) 2010-2026 Oryx Embedded SARL. All rights reserved.
10  *
11  * This file is part of CycloneTCP Open.
12  *
13  * This program is free software; you can redistribute it and/or
14  * modify it under the terms of the GNU General Public License
15  * as published by the Free Software Foundation; either version 2
16  * of the License, or (at your option) any later version.
17  *
18  * This program is distributed in the hope that it will be useful,
19  * but WITHOUT ANY WARRANTY; without even the implied warranty of
20  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
21  * GNU General Public License for more details.
22  *
23  * You should have received a copy of the GNU General Public License
24  * along with this program; if not, write to the Free Software Foundation,
25  * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
26  *
27  * @author Oryx Embedded SARL (www.oryx-embedded.com)
28  * @version 2.6.4
29  **/
30 
31 //Switch to the appropriate trace level
32 #define TRACE_LEVEL MQTT_TRACE_LEVEL
33 
34 //Dependencies
35 #include "core/net.h"
36 #include "mqtt/mqtt_client.h"
39 #include "mqtt/mqtt_client_misc.h"
40 #include "debug.h"
41 
42 //Check TCP/IP stack configuration
43 #if (MQTT_CLIENT_SUPPORT == ENABLED)
44 
45 //Default PUBLISH packet parameters
47 
48 
49 /**
50  * @brief Initialize MQTT client context
51  * @param[in] context Pointer to the MQTT client context
52  * @return Error code
53  **/
54 
56 {
57 #if (MQTT_CLIENT_TLS_SUPPORT == ENABLED)
58  error_t error;
59 #endif
60 
61  //Make sure the MQTT client context is valid
62  if(context == NULL)
64 
65  //Clear MQTT client context
66  osMemset(context, 0, sizeof(MqttClientContext));
67 
68 #if (MQTT_CLIENT_TLS_SUPPORT == ENABLED)
69  //Initialize TLS session state
70  error = tlsInitSessionState(&context->tlsSession);
71  //Any error to report?
72  if(error)
73  return error;
74 #endif
75 
76  //Default protocol version
77  context->settings.version = MQTT_VERSION_3_1_1;
78  //Default transport protocol
79  context->settings.transportProtocol = MQTT_TRANSPORT_PROTOCOL_TCP;
80  //Default keep-alive time interval
81  context->settings.keepAlive = MQTT_CLIENT_DEFAULT_KEEP_ALIVE;
82  //Default communication timeout
83  context->settings.timeout = MQTT_CLIENT_DEFAULT_TIMEOUT;
84 
85 #if (MQTT_CLIENT_WS_SUPPORT == ENABLED)
86  //Default resource name (for WebSocket connections only)
87  osStrcpy(context->settings.uri, "/");
88 #endif
89 
90  //Initialize MQTT client state
91  context->state = MQTT_CLIENT_STATE_DISCONNECTED;
92  //Initialize packet identifier
93  context->packetId = 0;
94 
95  //Successful initialization
96  return NO_ERROR;
97 }
98 
99 
100 /**
101  * @brief Initialize callback structure
102  * @param[in] callbacks Pointer to a structure that contains callback functions
103  **/
104 
106 {
107  //Initialize callback structure
108  osMemset(callbacks, 0, sizeof(MqttClientCallbacks));
109 }
110 
111 
112 /**
113  * @brief Register MQTT client callbacks
114  * @param[in] context Pointer to the MQTT client context
115  * @param[in] callbacks Pointer to a structure that contains callback functions
116  * @return Error code
117  **/
118 
120  const MqttClientCallbacks *callbacks)
121 {
122  //Make sure the MQTT client context is valid
123  if(context == NULL)
125 
126  //Attach callback functions
127  context->callbacks = *callbacks;
128 
129  //Successful processing
130  return NO_ERROR;
131 }
132 
133 
134 /**
135  * @brief Set the MQTT protocol version to be used
136  * @param[in] context Pointer to the MQTT client context
137  * @param[in] version MQTT protocol version (3.1 or 3.1.1)
138  * @return Error code
139  **/
140 
142 {
143  //Make sure the MQTT client context is valid
144  if(context == NULL)
146 
147  //Save the MQTT protocol version to be used
148  context->settings.version = version;
149 
150  //Successful processing
151  return NO_ERROR;
152 }
153 
154 
155 /**
156  * @brief Set the transport protocol to be used
157  * @param[in] context Pointer to the MQTT client context
158  * @param[in] transportProtocol Transport protocol to be used (TCP, TLS,
159  * WebSocket, or secure WebSocket)
160  * @return Error code
161  **/
162 
164  MqttTransportProtocol transportProtocol)
165 {
166  //Make sure the MQTT client context is valid
167  if(context == NULL)
169 
170  //Save the transport protocol to be used
171  context->settings.transportProtocol = transportProtocol;
172 
173  //Successful processing
174  return NO_ERROR;
175 }
176 
177 
178 #if (MQTT_CLIENT_TLS_SUPPORT == ENABLED)
179 
180 /**
181  * @brief Register TLS initialization callback function
182  * @param[in] context Pointer to the MQTT- client context
183  * @param[in] callback TLS initialization callback function
184  * @return Error code
185  **/
186 
188  MqttClientTlsInitCallback callback)
189 {
190  //Check parameters
191  if(context == NULL || callback == NULL)
193 
194  //Save callback function
195  context->callbacks.tlsInitCallback = callback;
196 
197  //Successful processing
198  return NO_ERROR;
199 }
200 
201 #endif
202 
203 
204 /**
205  * @brief Register publish callback function
206  * @param[in] context Pointer to the MQTT client context
207  * @param[in] callback Callback function to be called when a PUBLISH message
208  * is received
209  * @return Error code
210  **/
211 
213  MqttClientPublishCallback callback)
214 {
215  //Make sure the MQTT client context is valid
216  if(context == NULL)
218 
219  //Save callback function
220  context->callbacks.publishCallback = callback;
221 
222  //Successful processing
223  return NO_ERROR;
224 }
225 
226 
227 /**
228  * @brief Set communication timeout
229  * @param[in] context Pointer to the MQTT client context
230  * @param[in] timeout Timeout value, in seconds
231  * @return Error code
232  **/
233 
235 {
236  //Make sure the MQTT client context is valid
237  if(context == NULL)
239 
240  //Save timeout value
241  context->settings.timeout = timeout;
242 
243  //Successful processing
244  return NO_ERROR;
245 }
246 
247 
248 /**
249  * @brief Set keep-alive value
250  * @param[in] context Pointer to the MQTT client context
251  * @param[in] keepAlive Maximum time interval that is permitted to elapse
252  * between the point at which the client finishes transmitting one control
253  * packet and the point it starts sending the next
254  * @return Error code
255  **/
256 
257 error_t mqttClientSetKeepAlive(MqttClientContext *context, uint16_t keepAlive)
258 {
259  //Make sure the MQTT client context is valid
260  if(context == NULL)
262 
263  //Save keep-alive value
264  context->settings.keepAlive = keepAlive;
265 
266  //Successful processing
267  return NO_ERROR;
268 }
269 
270 
271 /**
272  * @brief Set the domain name of the server (for virtual hosting)
273  * @param[in] context Pointer to the MQTT client context
274  * @param[in] host NULL-terminated string containing the hostname
275  * @return Error code
276  **/
277 
279 {
280  //Check parameters
281  if(context == NULL || host == NULL)
283 
284  //Make sure the length of the hostname is acceptable
286  return ERROR_INVALID_LENGTH;
287 
288 #if (MQTT_CLIENT_WS_SUPPORT == ENABLED)
289  //Save hostname (for WebSocket connections only)
290  osStrcpy(context->settings.host, host);
291 #endif
292 
293  //Successful processing
294  return NO_ERROR;
295 }
296 
297 
298 /**
299  * @brief Set the name of the resource being requested
300  * @param[in] context Pointer to the MQTT client context
301  * @param[in] uri NULL-terminated string that contains the resource name
302  * @return Error code
303  **/
304 
306 {
307  //Check parameters
308  if(context == NULL || uri == NULL)
310 
311  //Make sure the length of the resource name is acceptable
313  return ERROR_INVALID_LENGTH;
314 
315 #if (MQTT_CLIENT_WS_SUPPORT == ENABLED)
316  //Save resource name (for WebSocket connections only)
317  osStrcpy(context->settings.uri, uri);
318 #endif
319 
320  //Successful processing
321  return NO_ERROR;
322 }
323 
324 
325 /**
326  * @brief Set client identifier
327  * @param[in] context Pointer to the MQTT client context
328  * @param[in] clientId NULL-terminated string containing the client identifier
329  * @return Error code
330  **/
331 
333  const char_t *clientId)
334 {
335  //Check parameters
336  if(context == NULL || clientId == NULL)
338 
339  //Make sure the length of the client identifier is acceptable
341  return ERROR_INVALID_LENGTH;
342 
343  //Save client identifier
344  osStrcpy(context->settings.clientId, clientId);
345 
346  //Successful processing
347  return NO_ERROR;
348 }
349 
350 
351 /**
352  * @brief Set authentication information
353  * @param[in] context Pointer to the MQTT client context
354  * @param[in] username NULL-terminated string containing the user name to be used
355  * @param[in] password NULL-terminated string containing the password to be used
356  * @return Error code
357  **/
358 
360  const char_t *username, const char_t *password)
361 {
362  //Check parameters
363  if(context == NULL || username == NULL || password == NULL)
365 
366  //Make sure the length of the user name is acceptable
367  if(osStrlen(username) > MQTT_CLIENT_MAX_USERNAME_LEN)
368  return ERROR_INVALID_LENGTH;
369 
370  //Make sure the length of the password is acceptable
371  if(osStrlen(password) > MQTT_CLIENT_MAX_PASSWORD_LEN)
372  return ERROR_INVALID_LENGTH;
373 
374  //Save user name
375  osStrcpy(context->settings.username, username);
376  //Save password
377  osStrcpy(context->settings.password, password);
378 
379  //Successful processing
380  return NO_ERROR;
381 }
382 
383 
384 /**
385  * @brief Specify the Will message
386  * @param[in] context Pointer to the MQTT client context
387  * @param[in] topic Will topic name
388  * @param[in] message Will message
389  * @param[in] length Length of the Will message
390  * @param[in] qos QoS level to be used when publishing the Will message
391  * @param[in] retain This flag specifies if the Will message is to be retained
392  * @return Error code
393  **/
394 
396  const void *message, size_t length, MqttQosLevel qos, bool_t retain)
397 {
398  MqttClientWillMessage *willMessage;
399 
400  //Check parameters
401  if(context == NULL || topic == NULL)
403 
404  //Make sure the length of the Will topic is acceptable
406  return ERROR_INVALID_LENGTH;
407 
408  //Point to the Will message
409  willMessage = &context->settings.willMessage;
410 
411  //Save Will topic
412  osStrcpy(willMessage->topic, topic);
413 
414  //Any message payload
415  if(length > 0)
416  {
417  //Sanity check
418  if(message == NULL)
420 
421  //Make sure the length of the Will message payload is acceptable
423  return ERROR_INVALID_LENGTH;
424 
425  //Save Will message payload
426  osMemcpy(willMessage->payload, message, length);
427  }
428 
429  //Length of the Will message payload
430  willMessage->length = length;
431  //QoS level to be used when publishing the Will message
432  willMessage->qos = qos;
433  //This flag specifies if the Will message is to be retained
434  willMessage->retain = retain;
435 
436  //Successful processing
437  return NO_ERROR;
438 }
439 
440 
441 /**
442  * @brief Bind the MQTT client to a particular network interface
443  * @param[in] context Pointer to the MQTT client context
444  * @param[in] interface Network interface to be used
445  * @return Error code
446  **/
447 
449  NetInterface *interface)
450 {
451  //Make sure the MQTT client context is valid
452  if(context == NULL)
454 
455  //Explicitly associate the MQTT client with the specified interface
456  context->interface = interface;
457 
458  //Successful processing
459  return NO_ERROR;
460 }
461 
462 
463 /**
464  * @brief Establish connection with the MQTT server
465  * @param[in] context Pointer to the MQTT client context
466  * @param[in] serverIpAddr IP address of the MQTT server to connect to
467  * @param[in] serverPort TCP port number that will be used to establish the
468  * connection
469  * @param[in] cleanSession If this flag is set, then the client and server
470  * must discard any previous session and start a new one
471  * @return Error code
472  **/
473 
475  const IpAddr *serverIpAddr, uint16_t serverPort, bool_t cleanSession)
476 {
477  error_t error;
478 
479  //Check parameters
480  if(context == NULL || serverIpAddr == NULL)
482 
483  //Initialize status code
484  error = NO_ERROR;
485 
486  //Establish network connection
487  while(!error)
488  {
489  //Check current state
490  if(context->state == MQTT_CLIENT_STATE_DISCONNECTED)
491  {
492  //Open network connection
493  error = mqttClientOpenConnection(context);
494 
495  //Check status code
496  if(!error)
497  {
498  //Debug message
499  TRACE_INFO("MQTT: Connecting to server %s port %" PRIu16 "...\r\n",
500  ipAddrToString(serverIpAddr, NULL), serverPort);
501 
502  //The network connection is open
504  //Save current time
505  context->startTime = osGetSystemTime();
506  }
507  }
508  else if(context->state == MQTT_CLIENT_STATE_CONNECTING)
509  {
510  //Establish network connection
511  error = mqttClientEstablishConnection(context, serverIpAddr,
512  serverPort);
513 
514  //Check status code
515  if(!error)
516  {
517  //Debug message
518  TRACE_INFO("MQTT: Connected to server\r\n");
519 
520  //The network connection is established
522  }
523  }
524  else if(context->state == MQTT_CLIENT_STATE_CONNECTED)
525  {
526  //Format CONNECT packet
527  error = mqttClientFormatConnect(context, cleanSession);
528 
529  //Check status code
530  if(!error)
531  {
532  //Debug message
533  TRACE_INFO("MQTT: Sending CONNECT packet (%" PRIuSIZE " bytes)...\r\n", context->packetLen);
534  TRACE_DEBUG_ARRAY(" ", context->packet, context->packetLen);
535 
536  //Save the type of the MQTT packet to be sent
537  context->packetType = MQTT_PACKET_TYPE_CONNECT;
538  //Point to the beginning of the packet
539  context->packetPos = 0;
540 
541  //Send CONNECT packet
543  }
544  }
545  else if(context->state == MQTT_CLIENT_STATE_SENDING_PACKET)
546  {
547  //Send more data
548  error = mqttClientProcessEvents(context, context->settings.timeout);
549  }
550  else if(context->state == MQTT_CLIENT_STATE_PACKET_SENT)
551  {
552  //Wait for CONNACK packet
553  error = mqttClientProcessEvents(context, context->settings.timeout);
554  }
555  else if(context->state == MQTT_CLIENT_STATE_RECEIVING_PACKET)
556  {
557  //Receive more data
558  error = mqttClientProcessEvents(context, context->settings.timeout);
559  }
560  else if(context->state == MQTT_CLIENT_STATE_PACKET_RECEIVED)
561  {
562  //Save TLS session
563  error = mqttClientSaveSession(context);
564 
565  //Check status code
566  if(!error)
567  {
568  //Reset packet type
569  context->packetType = MQTT_PACKET_TYPE_INVALID;
570  //A CONNACK packet has been received
572  }
573  }
574  else if(context->state == MQTT_CLIENT_STATE_IDLE)
575  {
576  //The MQTT client is connected
577  break;
578  }
579  else
580  {
581  //Invalid state
582  error = ERROR_NOT_CONNECTED;
583  }
584  }
585 
586  //Check status code
587  if(error == ERROR_WOULD_BLOCK || error == ERROR_TIMEOUT)
588  {
589  //Check whether the timeout has elapsed
590  error = mqttClientCheckTimeout(context);
591  }
592 
593  //Failed to establish connection with the MQTT server?
594  if(error != NO_ERROR && error != ERROR_WOULD_BLOCK)
595  {
596  //Clean up side effects
597  mqttClientCloseConnection(context);
598  //Update MQTT client state
600  }
601 
602  //Return status code
603  return error;
604 }
605 
606 
607 /**
608  * @brief Set packet identifier
609  * @param[in] context Pointer to the MQTT client context
610  * @param[out] packetId Packet identifier to be used
611  * @return Error code
612  **/
613 
614 error_t mqttClientSetPacketId(MqttClientContext *context, uint16_t packetId)
615 {
616  //Check parameters
617  if(context == NULL || packetId == 0)
619 
620  //Restore packet identifier
621  if(context->packetId > 1)
622  {
623  context->packetId = packetId - 1;
624  }
625  else
626  {
627  context->packetId = UINT16_MAX;
628  }
629 
630  //Successful processing
631  return NO_ERROR;
632 }
633 
634 
635 /**
636  * @brief Publish message
637  * @param[in] context Pointer to the MQTT client context
638  * @param[in] topic Topic name
639  * @param[in] message Message payload
640  * @param[in] length Length of the message payload
641  * @param[in] qos QoS level to be used when publishing the message
642  * @param[in] retain This flag specifies if the message is to be retained
643  * @param[out] packetId Packet identifier used to send the PUBLISH packet
644  * @return Error code
645  **/
646 
648  const void *message, size_t length, MqttQosLevel qos, bool_t retain,
649  uint16_t *packetId)
650 {
651  MqttPublishInfo publishInfo;
652 
653  //Check parameters
654  if(context == NULL || topic == NULL)
656 
657  //Set PUBLISH packet parameters
658  publishInfo.dup = FALSE;
659  publishInfo.qos = qos;
660  publishInfo.retain = retain;
661  publishInfo.topicName = topic;
662  publishInfo.payload = message;
663  publishInfo.payloadLen = length;
664  publishInfo.fragOffset = 0;
665  publishInfo.fragLen = 0;
666  publishInfo.packetId = packetId;
667 
668  //Publish message
669  return mqttClientPublishEx(context, &publishInfo);
670 }
671 
672 
673 /**
674  * @brief Publish message
675  * @param[in] context Pointer to the MQTT client context
676  * @param[in] publishInfo PUBLISH packet parameters
677  * @return Error code
678  **/
679 
681  MqttPublishInfo *publishInfo)
682 {
683  error_t error;
684  size_t n;
685 
686  //Check parameters
687  if(context == NULL || publishInfo == NULL)
689 
690  if(publishInfo->topicName == NULL)
692 
693  if(publishInfo->payload == NULL && publishInfo->payloadLen != 0)
695 
696  //Initialize status code
697  error = NO_ERROR;
698 
699  //Send PUBLISH packet and wait for PUBACK/PUBCOMP packet to be received
700  while(!error)
701  {
702  //Check current state
703  if(context->state == MQTT_CLIENT_STATE_IDLE)
704  {
705  //Check for transmission completion
706  if(context->packetType == MQTT_PACKET_TYPE_INVALID)
707  {
708  //Format PUBLISH packet
709  error = mqttClientFormatPublish(context, publishInfo);
710 
711  //Check status code
712  if(!error)
713  {
714  //Save the packet identifier used to send the PUBLISH packet
715  if(publishInfo->packetId != NULL)
716  {
717  *publishInfo->packetId = context->packetId;
718  }
719 
720  //Debug message
721  TRACE_INFO("MQTT: Sending PUBLISH packet (%" PRIuSIZE " bytes)...\r\n",
722  context->packetLen);
723 
724  //Dump the contents of the PUBLISH packet
725  TRACE_DEBUG_ARRAY(" ", context->packet, context->packetLen);
726 
727  //Save the type of the MQTT packet to be sent
728  context->packetType = MQTT_PACKET_TYPE_PUBLISH;
729  //Point to the beginning of the packet
730  context->packetPos = 0;
731 
732  //Send PUBLISH packet
734  //Save the time at which the packet was sent
735  context->startTime = osGetSystemTime();
736  }
737  }
738  else
739  {
740  //Reset packet type
741  context->packetType = MQTT_PACKET_TYPE_INVALID;
742  //We are done
743  break;
744  }
745  }
746  else if(context->state == MQTT_CLIENT_STATE_SENDING_PACKET)
747  {
748  //Any remaining data to be sent?
749  if(context->packetPos < context->packetLen)
750  {
751  //Send more data
752  error = mqttClientSendData(context, context->packet + context->packetPos,
753  context->packetLen - context->packetPos, &n, 0);
754 
755  //Advance data pointer
756  context->packetPos += n;
757  }
758  else
759  {
760  //Save the time at which the data was sent
761  context->keepAliveTimestamp = osGetSystemTime();
762 
763  //Update MQTT client state
765  }
766  }
767  else if(context->state == MQTT_CLIENT_STATE_SENDING_PAYLOAD)
768  {
769  //Fragmented payload?
770  if(publishInfo->fragLen > 0)
771  {
772  //Any remaining data to be sent?
773  if(context->payloadPos < publishInfo->payloadLen)
774  {
775  if(context->fragPos < publishInfo->fragLen)
776  {
777  //Send more data
778  error = mqttClientSendData(context,
779  (uint8_t *) publishInfo->payload + context->fragPos,
780  publishInfo->fragLen - context->fragPos, &n, 0);
781 
782  //Advance data pointer
783  context->fragPos += n;
784  }
785  else
786  {
787  //The transmission of the fragment is complete
788  context->payloadPos += publishInfo->fragLen;
789  context->fragPos = 0;
790 
791  //Check whether the fragment is the final one or not
792  if(context->payloadPos < publishInfo->payloadLen)
793  break;
794  }
795  }
796  else
797  {
798  //The transmission of the payload is complete
799  context->payloadPos = 0;
800  context->fragPos = 0;
801 
802  //Save the time at which the data was sent
803  context->keepAliveTimestamp = osGetSystemTime();
804 
805  //Update MQTT client state
807  }
808  }
809  else
810  {
811  //Any remaining data to be sent?
812  if(context->payloadPos < publishInfo->payloadLen)
813  {
814  //Send more data
815  error = mqttClientSendData(context,
816  (uint8_t *) publishInfo->payload + context->payloadPos,
817  publishInfo->payloadLen - context->payloadPos, &n, 0);
818 
819  //Advance data pointer
820  context->payloadPos += n;
821  }
822  else
823  {
824  //The transmission of the payload is complete
825  context->payloadPos = 0;
826  context->fragPos = 0;
827 
828  //Save the time at which the data was sent
829  context->keepAliveTimestamp = osGetSystemTime();
830 
831  //Update MQTT client state
833  }
834  }
835  }
836  else if(context->state == MQTT_CLIENT_STATE_PACKET_SENT)
837  {
838  //The last parameter is optional
839  if(publishInfo->packetId != NULL)
840  {
841  //Do not wait for PUBACK/PUBCOMP packet
843  }
844  else
845  {
846  //Check QoS level
847  if(publishInfo->qos == MQTT_QOS_LEVEL_0)
848  {
849  //No response is sent by the receiver and no retry is performed by the sender
851  }
852  else
853  {
854  //Wait for PUBACK/PUBCOMP packet
855  error = mqttClientProcessEvents(context, context->settings.timeout);
856  }
857  }
858  }
859  else if(context->state == MQTT_CLIENT_STATE_RECEIVING_PACKET)
860  {
861  //Receive more data
862  error = mqttClientProcessEvents(context, context->settings.timeout);
863  }
864  else if(context->state == MQTT_CLIENT_STATE_PACKET_RECEIVED)
865  {
866  //A PUBACK/PUBCOMP packet has been received
868  }
869  else
870  {
871  //Invalid state
872  error = ERROR_NOT_CONNECTED;
873  }
874  }
875 
876  //Check status code
877  if(error == ERROR_WOULD_BLOCK || error == ERROR_TIMEOUT)
878  {
879  //Check whether the timeout has elapsed
880  error = mqttClientCheckTimeout(context);
881  }
882 
883  //Return status code
884  return error;
885 }
886 
887 
888 /**
889  * @brief Subscribe to topic
890  * @param[in] context Pointer to the MQTT client context
891  * @param[in] topic Topic filter
892  * @param[in] qos Maximum QoS level at which the server can send application
893  * messages to the client
894  * @param[out] packetId Packet identifier used to send the SUBSCRIBE packet
895  * @return Error code
896  **/
897 
899  MqttQosLevel qos, uint16_t *packetId)
900 {
901  error_t error;
902 
903  //Check parameters
904  if(context == NULL || topic == NULL)
906 
907  //Initialize status code
908  error = NO_ERROR;
909 
910  //Send SUBSCRIBE packet and wait for SUBACK packet to be received
911  while(!error)
912  {
913  //Check current state
914  if(context->state == MQTT_CLIENT_STATE_IDLE)
915  {
916  //Check for transmission completion
917  if(context->packetType == MQTT_PACKET_TYPE_INVALID)
918  {
919  //Format SUBSCRIBE packet
920  error = mqttClientFormatSubscribe(context, topic, qos);
921 
922  //Check status code
923  if(!error)
924  {
925  //Save the packet identifier used to send the SUBSCRIBE packet
926  if(packetId != NULL)
927  {
928  *packetId = context->packetId;
929  }
930 
931  //Debug message
932  TRACE_INFO("MQTT: Sending SUBSCRIBE packet (%" PRIuSIZE " bytes)...\r\n",
933  context->packetLen);
934 
935  //Dump the contents of the SUBSCRIBE packet
936  TRACE_DEBUG_ARRAY(" ", context->packet, context->packetLen);
937 
938  //Save the type of the MQTT packet to be sent
939  context->packetType = MQTT_PACKET_TYPE_SUBSCRIBE;
940  //Point to the beginning of the packet
941  context->packetPos = 0;
942 
943  //Send SUBSCRIBE packet
945  //Save the time at which the packet was sent
946  context->startTime = osGetSystemTime();
947  }
948  }
949  else
950  {
951  //Reset packet type
952  context->packetType = MQTT_PACKET_TYPE_INVALID;
953  //We are done
954  break;
955  }
956  }
957  else if(context->state == MQTT_CLIENT_STATE_SENDING_PACKET)
958  {
959  //Send more data
960  error = mqttClientProcessEvents(context, context->settings.timeout);
961  }
962  else if(context->state == MQTT_CLIENT_STATE_PACKET_SENT)
963  {
964  //The last parameter is optional
965  if(packetId != NULL)
966  {
967  //Do not wait for SUBACK packet
969  }
970  else
971  {
972  //Wait for SUBACK packet
973  error = mqttClientProcessEvents(context, context->settings.timeout);
974  }
975  }
976  else if(context->state == MQTT_CLIENT_STATE_RECEIVING_PACKET)
977  {
978  //Receive more data
979  error = mqttClientProcessEvents(context, context->settings.timeout);
980  }
981  else if(context->state == MQTT_CLIENT_STATE_PACKET_RECEIVED)
982  {
983  //A SUBACK packet has been received
985  }
986  else
987  {
988  //Invalid state
989  error = ERROR_NOT_CONNECTED;
990  }
991  }
992 
993  //Check status code
994  if(error == ERROR_WOULD_BLOCK || error == ERROR_TIMEOUT)
995  {
996  //Check whether the timeout has elapsed
997  error = mqttClientCheckTimeout(context);
998  }
999 
1000  //Return status code
1001  return error;
1002 }
1003 
1004 
1005 /**
1006  * @brief Unsubscribe from topic
1007  * @param[in] context Pointer to the MQTT client context
1008  * @param[in] topic Topic filter
1009  * @param[out] packetId Packet identifier used to send the UNSUBSCRIBE packet
1010  * @return Error code
1011  **/
1012 
1014  uint16_t *packetId)
1015 {
1016  error_t error;
1017 
1018  //Check parameters
1019  if(context == NULL || topic == NULL)
1020  return ERROR_INVALID_PARAMETER;
1021 
1022  //Initialize status code
1023  error = NO_ERROR;
1024 
1025  //Send UNSUBSCRIBE packet and wait for UNSUBACK packet to be received
1026  while(!error)
1027  {
1028  //Check current state
1029  if(context->state == MQTT_CLIENT_STATE_IDLE)
1030  {
1031  //Check for transmission completion
1032  if(context->packetType == MQTT_PACKET_TYPE_INVALID)
1033  {
1034  //Format UNSUBSCRIBE packet
1035  error = mqttClientFormatUnsubscribe(context, topic);
1036 
1037  //Check status code
1038  if(!error)
1039  {
1040  //Save the packet identifier used to send the UNSUBSCRIBE packet
1041  if(packetId != NULL)
1042  {
1043  *packetId = context->packetId;
1044  }
1045 
1046  //Debug message
1047  TRACE_INFO("MQTT: Sending UNSUBSCRIBE packet (%" PRIuSIZE " bytes)...\r\n",
1048  context->packetLen);
1049 
1050  //Dump the contents of the UNSUBSCRIBE packet
1051  TRACE_DEBUG_ARRAY(" ", context->packet, context->packetLen);
1052 
1053  //Save the type of the MQTT packet to be sent
1054  context->packetType = MQTT_PACKET_TYPE_UNSUBSCRIBE;
1055  //Point to the beginning of the packet
1056  context->packetPos = 0;
1057 
1058  //Send UNSUBSCRIBE packet
1060  //Save the time at which the packet was sent
1061  context->startTime = osGetSystemTime();
1062  }
1063  }
1064  else
1065  {
1066  //Reset packet type
1067  context->packetType = MQTT_PACKET_TYPE_INVALID;
1068  //We are done
1069  break;
1070  }
1071  }
1072  else if(context->state == MQTT_CLIENT_STATE_SENDING_PACKET)
1073  {
1074  //Send more data
1075  error = mqttClientProcessEvents(context, context->settings.timeout);
1076  }
1077  else if(context->state == MQTT_CLIENT_STATE_PACKET_SENT)
1078  {
1079  //The last parameter is optional
1080  if(packetId != NULL)
1081  {
1082  //Do not wait for UNSUBACK packet
1084  }
1085  else
1086  {
1087  //Wait for UNSUBACK packet
1088  error = mqttClientProcessEvents(context, context->settings.timeout);
1089  }
1090  }
1091  else if(context->state == MQTT_CLIENT_STATE_RECEIVING_PACKET)
1092  {
1093  //Receive more data
1094  error = mqttClientProcessEvents(context, context->settings.timeout);
1095  }
1096  else if(context->state == MQTT_CLIENT_STATE_PACKET_RECEIVED)
1097  {
1098  //An UNSUBACK packet has been received
1100  }
1101  else
1102  {
1103  //Invalid state
1104  error = ERROR_NOT_CONNECTED;
1105  }
1106  }
1107 
1108  //Check status code
1109  if(error == ERROR_WOULD_BLOCK || error == ERROR_TIMEOUT)
1110  {
1111  //Check whether the timeout has elapsed
1112  error = mqttClientCheckTimeout(context);
1113  }
1114 
1115  //Return status code
1116  return error;
1117 }
1118 
1119 
1120 /**
1121  * @brief Send ping request
1122  * @param[in] context Pointer to the MQTT client context
1123  * @param[out] rtt Round-trip time (optional parameter)
1124  * @return Error code
1125  **/
1126 
1128 {
1129  error_t error;
1130 
1131  //Make sure the MQTT client context is valid
1132  if(context == NULL)
1133  return ERROR_INVALID_PARAMETER;
1134 
1135  //Initialize status code
1136  error = NO_ERROR;
1137 
1138  //Send PINGREQ packet and wait for PINGRESP packet to be received
1139  while(!error)
1140  {
1141  //Check current state
1142  if(context->state == MQTT_CLIENT_STATE_IDLE)
1143  {
1144  //Check for transmission completion
1145  if(context->packetType == MQTT_PACKET_TYPE_INVALID)
1146  {
1147  //Format PINGREQ packet
1148  error = mqttClientFormatPingReq(context);
1149 
1150  //Check status code
1151  if(!error)
1152  {
1153  //Debug message
1154  TRACE_INFO("MQTT: Sending PINGREQ packet (%" PRIuSIZE " bytes)...\r\n",
1155  context->packetLen);
1156 
1157  //Dump the contents of the PINGREQ packet
1158  TRACE_DEBUG_ARRAY(" ", context->packet, context->packetLen);
1159 
1160  //Save the type of the MQTT packet to be sent
1161  context->packetType = MQTT_PACKET_TYPE_PINGREQ;
1162  //Point to the beginning of the packet
1163  context->packetPos = 0;
1164 
1165  //Send PINGREQ packet
1167  //Save the time at which the packet was sent
1168  context->startTime = osGetSystemTime();
1169  }
1170  }
1171  else
1172  {
1173  //Reset packet type
1174  context->packetType = MQTT_PACKET_TYPE_INVALID;
1175  //We are done
1176  break;
1177  }
1178  }
1179  else if(context->state == MQTT_CLIENT_STATE_SENDING_PACKET)
1180  {
1181  //Send more data
1182  error = mqttClientProcessEvents(context, context->settings.timeout);
1183  }
1184  else if(context->state == MQTT_CLIENT_STATE_PACKET_SENT)
1185  {
1186  //The last parameter is optional
1187  if(rtt != NULL)
1188  {
1189  //Wait for PINGRESP packet
1190  error = mqttClientProcessEvents(context, context->settings.timeout);
1191  }
1192  else
1193  {
1194  //Do not wait for PINGRESP packet
1196  }
1197  }
1198  else if(context->state == MQTT_CLIENT_STATE_RECEIVING_PACKET)
1199  {
1200  //Receive more data
1201  error = mqttClientProcessEvents(context, context->settings.timeout);
1202  }
1203  else if(context->state == MQTT_CLIENT_STATE_PACKET_RECEIVED)
1204  {
1205  //The last parameter is optional
1206  if(rtt != NULL)
1207  {
1208  //Compute round-trip time
1209  *rtt = osGetSystemTime() - context->startTime;
1210  }
1211 
1212  //A PINGRESP packet has been received
1214  }
1215  else
1216  {
1217  //Invalid state
1218  error = ERROR_NOT_CONNECTED;
1219  }
1220  }
1221 
1222  //Check status code
1223  if(error == ERROR_WOULD_BLOCK || error == ERROR_TIMEOUT)
1224  {
1225  //Check whether the timeout has elapsed
1226  error = mqttClientCheckTimeout(context);
1227  }
1228 
1229  //Return status code
1230  return error;
1231 }
1232 
1233 
1234 /**
1235  * @brief Process MQTT client events
1236  * @param[in] context Pointer to the MQTT client context
1237  * @param[in] timeout Maximum time to wait before returning
1238  * @return Error code
1239  **/
1240 
1242 {
1243  error_t error;
1244 
1245  //Make sure the MQTT client context is valid
1246  if(context == NULL)
1247  return ERROR_INVALID_PARAMETER;
1248 
1249  //Process MQTT client events
1250  error = mqttClientProcessEvents(context, timeout);
1251 
1252  //Check status code
1253  if(error == ERROR_TIMEOUT)
1254  {
1255  //Catch exception
1256  error = NO_ERROR;
1257  }
1258 
1259  //Return status code
1260  return error;
1261 }
1262 
1263 
1264 /**
1265  * @brief Gracefully disconnect from the MQTT server
1266  * @param[in] context Pointer to the MQTT client context
1267  * @return Error code
1268  **/
1269 
1271 {
1272  error_t error;
1273 
1274  //Make sure the MQTT client context is valid
1275  if(context == NULL)
1276  return ERROR_INVALID_PARAMETER;
1277 
1278  //Initialize status code
1279  error = NO_ERROR;
1280 
1281  //Send DISCONNECT packet and shutdown network connection
1282  while(!error)
1283  {
1284  //Check current state
1285  if(context->state == MQTT_CLIENT_STATE_IDLE)
1286  {
1287  //Format DISCONNECT packet
1288  error = mqttClientFormatDisconnect(context);
1289 
1290  //Check status code
1291  if(!error)
1292  {
1293  //Debug message
1294  TRACE_INFO("MQTT: Sending DISCONNECT packet (%" PRIuSIZE " bytes)...\r\n", context->packetLen);
1295  TRACE_DEBUG_ARRAY(" ", context->packet, context->packetLen);
1296 
1297  //Save the type of the MQTT packet to be sent
1298  context->packetType = MQTT_PACKET_TYPE_DISCONNECT;
1299  //Point to the beginning of the packet
1300  context->packetPos = 0;
1301 
1302  //Send DISCONNECT packet
1304  //Save the time at which the packet was sent
1305  context->startTime = osGetSystemTime();
1306  }
1307  }
1308  else if(context->state == MQTT_CLIENT_STATE_SENDING_PACKET)
1309  {
1310  //Send more data
1311  error = mqttClientProcessEvents(context, context->settings.timeout);
1312  }
1313  else if(context->state == MQTT_CLIENT_STATE_PACKET_SENT)
1314  {
1315  //Debug message
1316  TRACE_INFO("MQTT: Shutting down connection...\r\n");
1317 
1318  //After sending a DISCONNECT packet the client must not send any
1319  //more control packets on that network connection
1321  }
1322  else if(context->state == MQTT_CLIENT_STATE_DISCONNECTING)
1323  {
1324  //Properly dispose the network connection
1325  error = mqttClientShutdownConnection(context);
1326 
1327  //Check status code
1328  if(!error)
1329  {
1330  //The MQTT client is disconnected
1332  }
1333  }
1334  else if(context->state == MQTT_CLIENT_STATE_DISCONNECTED)
1335  {
1336  //The MQTT client is disconnected
1337  break;
1338  }
1339  else
1340  {
1341  //Invalid state
1342  error = ERROR_NOT_CONNECTED;
1343  }
1344  }
1345 
1346  //Check status code
1347  if(error == ERROR_WOULD_BLOCK || error == ERROR_TIMEOUT)
1348  {
1349  //Check whether the timeout has elapsed
1350  error = mqttClientCheckTimeout(context);
1351  }
1352 
1353  //Failed to gracefully disconnect from the MQTT server?
1354  if(error != NO_ERROR && error != ERROR_WOULD_BLOCK)
1355  {
1356  //Close connection
1357  mqttClientCloseConnection(context);
1358  //Update MQTT client state
1360  }
1361 
1362  //Return status code
1363  return error;
1364 }
1365 
1366 
1367 /**
1368  * @brief Close the connection with the MQTT server
1369  * @param[in] context Pointer to the MQTT client context
1370  * @return Error code
1371  **/
1372 
1374 {
1375  //Make sure the MQTT client context is valid
1376  if(context == NULL)
1377  return ERROR_INVALID_PARAMETER;
1378 
1379  //Close connection
1380  mqttClientCloseConnection(context);
1381  //Update MQTT client state
1383 
1384  //Successful processing
1385  return NO_ERROR;
1386 }
1387 
1388 
1389 /**
1390  * @brief Release MQTT client context
1391  * @param[in] context Pointer to the MQTT client context
1392  **/
1393 
1395 {
1396  //Make sure the MQTT client context is valid
1397  if(context != NULL)
1398  {
1399  //Close connection
1400  mqttClientCloseConnection(context);
1401 
1402 #if (MQTT_CLIENT_TLS_SUPPORT == ENABLED)
1403  //Release TLS session state
1404  tlsFreeSessionState(&context->tlsSession);
1405 #endif
1406 
1407  //Clear MQTT client context
1408  osMemset(context, 0, sizeof(MqttClientContext));
1409  }
1410 }
1411 
1412 #endif
void mqttClientCloseConnection(MqttClientContext *context)
Close network connection.
error_t mqttClientEstablishConnection(MqttClientContext *context, const IpAddr *serverIpAddr, uint16_t serverPort)
Establish network connection.
const void * payload
Definition: mqtt_client.h:359
int bool_t
Definition: compiler_port.h:63
@ MQTT_PACKET_TYPE_INVALID
Invalid packet.
Definition: mqtt_common.h:100
@ MQTT_PACKET_TYPE_DISCONNECT
Client is disconnecting.
Definition: mqtt_common.h:114
error_t(* MqttClientTlsInitCallback)(MqttClientContext *context, TlsContext *tlsContext)
TLS initialization callback.
Definition: mqtt_client.h:254
@ ERROR_WOULD_BLOCK
Definition: error.h:96
error_t mqttClientSubscribe(MqttClientContext *context, const char_t *topic, MqttQosLevel qos, uint16_t *packetId)
Subscribe to topic.
Definition: mqtt_client.c:898
error_t mqttClientFormatPingReq(MqttClientContext *context)
Format PINGREQ packet.
IP network address.
Definition: ip.h:90
MqttTransportProtocol
Transport protocol.
Definition: mqtt_common.h:74
error_t mqttClientPing(MqttClientContext *context, systime_t *rtt)
Send ping request.
Definition: mqtt_client.c:1127
uint8_t message[]
Definition: chap.h:154
@ MQTT_PACKET_TYPE_PUBLISH
Publish message.
Definition: mqtt_common.h:103
error_t mqttClientSendData(MqttClientContext *context, const void *data, size_t length, size_t *written, uint_t flags)
Send data using the relevant transport protocol.
error_t mqttClientOpenConnection(MqttClientContext *context)
Open network connection.
error_t mqttClientPublishEx(MqttClientContext *context, MqttPublishInfo *publishInfo)
Publish message.
Definition: mqtt_client.c:680
char_t * ipAddrToString(const IpAddr *ipAddr, char_t *str)
Convert a binary IP address to a string representation.
Definition: ip.c:810
@ MQTT_CLIENT_STATE_DISCONNECTED
Definition: mqtt_client.h:161
const MqttPublishInfo MQTT_DEFAULT_PUBLISH_INFO
Definition: mqtt_client.c:46
error_t mqttClientConnect(MqttClientContext *context, const IpAddr *serverIpAddr, uint16_t serverPort, bool_t cleanSession)
Establish connection with the MQTT server.
Definition: mqtt_client.c:474
uint8_t qos
Definition: mqtt_common.h:181
#define osStrlen(s)
Definition: os_port.h:171
uint8_t version
Definition: coap_common.h:177
uint8_t payload[MQTT_CLIENT_MAX_WILL_PAYLOAD_LEN]
Will message payload.
Definition: mqtt_client.h:267
@ MQTT_CLIENT_STATE_SENDING_PACKET
Definition: mqtt_client.h:165
void tlsFreeSessionState(TlsSessionState *session)
Properly dispose a session state.
Definition: tls.c:3126
MQTT client callback functions.
Definition: mqtt_client.h:279
error_t mqttClientInit(MqttClientContext *context)
Initialize MQTT client context.
Definition: mqtt_client.c:55
error_t mqttClientFormatConnect(MqttClientContext *context, bool_t cleanSession)
Format CONNECT packet.
@ MQTT_CLIENT_STATE_PACKET_RECEIVED
Definition: mqtt_client.h:170
error_t mqttClientProcessEvents(MqttClientContext *context, systime_t timeout)
Process MQTT client events.
error_t mqttClientPublish(MqttClientContext *context, const char_t *topic, const void *message, size_t length, MqttQosLevel qos, bool_t retain, uint16_t *packetId)
Publish message.
Definition: mqtt_client.c:647
#define FALSE
Definition: os_port.h:46
error_t mqttClientDisconnect(MqttClientContext *context)
Gracefully disconnect from the MQTT server.
Definition: mqtt_client.c:1270
@ ERROR_INVALID_PARAMETER
Invalid parameter.
Definition: error.h:47
#define MQTT_CLIENT_MAX_USERNAME_LEN
Definition: mqtt_client.h:96
#define osMemcpy(dest, src, length)
Definition: os_port.h:147
Helper functions for MQTT client.
void mqttClientChangeState(MqttClientContext *context, MqttClientState newState)
Update MQTT client state.
error_t
Error codes.
Definition: error.h:43
Transport protocol abstraction layer.
@ MQTT_CLIENT_STATE_PACKET_SENT
Definition: mqtt_client.h:167
@ MQTT_QOS_LEVEL_0
At most once delivery.
Definition: mqtt_common.h:88
MqttQosLevel
Quality of service level.
Definition: mqtt_common.h:87
@ MQTT_PACKET_TYPE_PINGREQ
Ping request.
Definition: mqtt_common.h:112
#define MQTT_CLIENT_MAX_WILL_TOPIC_LEN
Definition: mqtt_client.h:110
@ MQTT_TRANSPORT_PROTOCOL_TCP
Definition: mqtt_common.h:75
error_t mqttClientSetUri(MqttClientContext *context, const char_t *uri)
Set the name of the resource being requested.
Definition: mqtt_client.c:305
#define NetInterface
Definition: net.h:40
error_t mqttClientSetTransportProtocol(MqttClientContext *context, MqttTransportProtocol transportProtocol)
Set the transport protocol to be used.
Definition: mqtt_client.c:163
@ ERROR_INVALID_LENGTH
Definition: error.h:111
error_t mqttClientFormatUnsubscribe(MqttClientContext *context, const char_t *topic)
Format UNSUBSCRIBE packet.
error_t mqttClientClose(MqttClientContext *context)
Close the connection with the MQTT server.
Definition: mqtt_client.c:1373
error_t mqttClientBindToInterface(MqttClientContext *context, NetInterface *interface)
Bind the MQTT client to a particular network interface.
Definition: mqtt_client.c:448
@ MQTT_PACKET_TYPE_CONNECT
Client request to connect to server.
Definition: mqtt_common.h:101
@ MQTT_PACKET_TYPE_SUBSCRIBE
Client subscribe request.
Definition: mqtt_common.h:108
MQTT packet parsing and formatting.
void mqttClientDeinit(MqttClientContext *context)
Release MQTT client context.
Definition: mqtt_client.c:1394
#define MQTT_CLIENT_MAX_URI_LEN
Definition: mqtt_client.h:82
#define TRACE_INFO(...)
Definition: debug.h:105
uint8_t length
Definition: tcp.h:375
MqttVersion
MQTT protocol level.
Definition: mqtt_common.h:63
error_t mqttClientRegisterCallbacks(MqttClientContext *context, const MqttClientCallbacks *callbacks)
Register MQTT client callbacks.
Definition: mqtt_client.c:119
bool_t retain
Specifies if the Will message is to be retained.
Definition: mqtt_client.h:270
@ MQTT_CLIENT_STATE_RECEIVING_PACKET
Definition: mqtt_client.h:169
#define MQTT_CLIENT_DEFAULT_KEEP_ALIVE
Definition: mqtt_client.h:61
error_t mqttClientSetKeepAlive(MqttClientContext *context, uint16_t keepAlive)
Set keep-alive value.
Definition: mqtt_client.c:257
@ MQTT_PACKET_TYPE_UNSUBSCRIBE
Unsubscribe request.
Definition: mqtt_common.h:110
error_t mqttClientSaveSession(MqttClientContext *context)
Save TLS session.
error_t mqttClientFormatDisconnect(MqttClientContext *context)
Format DISCONNECT packet.
error_t mqttClientShutdownConnection(MqttClientContext *context)
Shutdown network connection.
size_t length
Length of the Will message payload.
Definition: mqtt_client.h:268
uint32_t systime_t
System time.
error_t mqttClientSetVersion(MqttClientContext *context, MqttVersion version)
Set the MQTT protocol version to be used.
Definition: mqtt_client.c:141
@ MQTT_CLIENT_STATE_IDLE
Definition: mqtt_client.h:164
@ ERROR_TIMEOUT
Definition: error.h:95
char char_t
Definition: compiler_port.h:55
error_t mqttClientTask(MqttClientContext *context, systime_t timeout)
Process MQTT client events.
Definition: mqtt_client.c:1241
#define TRACE_DEBUG_ARRAY(p, a, n)
Definition: debug.h:120
error_t mqttClientSetTimeout(MqttClientContext *context, systime_t timeout)
Set communication timeout.
Definition: mqtt_client.c:234
@ ERROR_NOT_CONNECTED
Definition: error.h:80
@ MQTT_CLIENT_STATE_CONNECTING
Definition: mqtt_client.h:162
uint8_t n
#define MQTT_CLIENT_MAX_ID_LEN
Definition: mqtt_client.h:89
@ MQTT_CLIENT_STATE_SENDING_PAYLOAD
Definition: mqtt_client.h:166
error_t mqttClientCheckTimeout(MqttClientContext *context)
Determine whether a timeout error has occurred.
void(* MqttClientPublishCallback)(MqttClientContext *context, const char_t *topic, const uint8_t *message, size_t length, bool_t dup, MqttQosLevel qos, bool_t retain, uint16_t packetId)
PUBLISH message received callback.
Definition: mqtt_client.h:187
error_t mqttClientSetPacketId(MqttClientContext *context, uint16_t packetId)
Set packet identifier.
Definition: mqtt_client.c:614
error_t mqttClientFormatSubscribe(MqttClientContext *context, const char_t *topic, MqttQosLevel qos)
Format SUBSCRIBE packet.
@ MQTT_CLIENT_STATE_DISCONNECTING
Definition: mqtt_client.h:171
@ MQTT_CLIENT_STATE_CONNECTED
Definition: mqtt_client.h:163
@ MQTT_VERSION_3_1_1
MQTT version 3.1.1.
Definition: mqtt_common.h:65
PUBLISH packet parameters.
Definition: mqtt_client.h:354
#define MQTT_CLIENT_MAX_PASSWORD_LEN
Definition: mqtt_client.h:103
#define MqttClientContext
Definition: mqtt_client.h:147
error_t mqttClientFormatPublish(MqttClientContext *context, MqttPublishInfo *publishInfo)
Format PUBLISH packet.
MqttQosLevel qos
QoS level to be used when publishing the Will message.
Definition: mqtt_client.h:269
char_t clientId[]
void mqttClientInitCallbacks(MqttClientCallbacks *callbacks)
Initialize callback structure.
Definition: mqtt_client.c:105
error_t mqttClientRegisterPublishCallback(MqttClientContext *context, MqttClientPublishCallback callback)
Register publish callback function.
Definition: mqtt_client.c:212
#define MQTT_CLIENT_DEFAULT_TIMEOUT
Definition: mqtt_client.h:68
#define MQTT_CLIENT_MAX_WILL_PAYLOAD_LEN
Definition: mqtt_client.h:117
error_t mqttClientUnsubscribe(MqttClientContext *context, const char_t *topic, uint16_t *packetId)
Unsubscribe from topic.
Definition: mqtt_client.c:1013
uint16_t * packetId
Definition: mqtt_client.h:363
#define PRIuSIZE
#define osMemset(p, value, length)
Definition: os_port.h:141
TCP/IP stack core.
#define MQTT_CLIENT_MAX_HOST_LEN
Definition: mqtt_client.h:75
error_t tlsInitSessionState(TlsSessionState *session)
Initialize session state.
Definition: tls.c:2983
#define osStrcpy(s1, s2)
Definition: os_port.h:213
error_t mqttClientSetAuthInfo(MqttClientContext *context, const char_t *username, const char_t *password)
Set authentication information.
Definition: mqtt_client.c:359
MqttQosLevel qos
Definition: mqtt_client.h:356
error_t mqttClientSetHost(MqttClientContext *context, const char_t *host)
Set the domain name of the server (for virtual hosting)
Definition: mqtt_client.c:278
error_t mqttClientSetWillMessage(MqttClientContext *context, const char_t *topic, const void *message, size_t length, MqttQosLevel qos, bool_t retain)
Specify the Will message.
Definition: mqtt_client.c:395
char_t topic[MQTT_CLIENT_MAX_WILL_TOPIC_LEN+1]
Will topic name.
Definition: mqtt_client.h:266
@ NO_ERROR
Success.
Definition: error.h:44
const char_t * topicName
Definition: mqtt_client.h:358
Debugging facilities.
error_t mqttClientRegisterTlsInitCallback(MqttClientContext *context, MqttClientTlsInitCallback callback)
Register TLS initialization callback function.
Definition: mqtt_client.c:187
systime_t osGetSystemTime(void)
Retrieve system time.
MQTT client.
error_t mqttClientSetIdentifier(MqttClientContext *context, const char_t *clientId)
Set client identifier.
Definition: mqtt_client.c:332