mqtt_client.c
Go to the documentation of this file.
1 /**
2  * @file mqtt_client.c
3  * @brief MQTT client
4  *
5  * @section License
6  *
7  * SPDX-License-Identifier: GPL-2.0-or-later
8  *
9  * Copyright (C) 2010-2019 Oryx Embedded SARL. All rights reserved.
10  *
11  * This file is part of CycloneTCP Open.
12  *
13  * This program is free software; you can redistribute it and/or
14  * modify it under the terms of the GNU General Public License
15  * as published by the Free Software Foundation; either version 2
16  * of the License, or (at your option) any later version.
17  *
18  * This program is distributed in the hope that it will be useful,
19  * but WITHOUT ANY WARRANTY; without even the implied warranty of
20  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
21  * GNU General Public License for more details.
22  *
23  * You should have received a copy of the GNU General Public License
24  * along with this program; if not, write to the Free Software Foundation,
25  * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
26  *
27  * @author Oryx Embedded SARL (www.oryx-embedded.com)
28  * @version 1.9.6
29  **/
30 
31 //Switch to the appropriate trace level
32 #define TRACE_LEVEL MQTT_TRACE_LEVEL
33 
34 //Dependencies
35 #include "core/net.h"
36 #include "mqtt/mqtt_client.h"
39 #include "mqtt/mqtt_client_misc.h"
40 #include "debug.h"
41 
42 //Check TCP/IP stack configuration
43 #if (MQTT_CLIENT_SUPPORT == ENABLED)
44 
45 
46 /**
47  * @brief Initialize MQTT client context
48  * @param[in] context Pointer to the MQTT client context
49  * @return Error code
50  **/
51 
53 {
54 #if (MQTT_CLIENT_TLS_SUPPORT == ENABLED)
55  error_t error;
56 #endif
57 
58  //Make sure the MQTT client context is valid
59  if(context == NULL)
61 
62  //Clear MQTT client context
63  memset(context, 0, sizeof(MqttClientContext));
64 
65 #if (MQTT_CLIENT_TLS_SUPPORT == ENABLED)
66  //Initialize TLS session state
67  error = tlsInitSessionState(&context->tlsSession);
68  //Any error to report?
69  if(error)
70  return error;
71 #endif
72 
73  //Default protocol version
74  context->settings.version = MQTT_VERSION_3_1_1;
75  //Default transport protocol
76  context->settings.transportProtocol = MQTT_TRANSPORT_PROTOCOL_TCP;
77  //Default keep-alive time interval
78  context->settings.keepAlive = MQTT_CLIENT_DEFAULT_KEEP_ALIVE;
79  //Default communication timeout
80  context->settings.timeout = MQTT_CLIENT_DEFAULT_TIMEOUT;
81 
82 #if (MQTT_CLIENT_WS_SUPPORT == ENABLED)
83  //Default resource name (for WebSocket connections only)
84  strcpy(context->settings.uri, "/");
85 #endif
86 
87  //Initialize state machine
88  context->state = MQTT_CLIENT_STATE_CLOSED;
89  //Initialize packet identifier
90  context->packetId = 0;
91 
92  //Successful initialization
93  return NO_ERROR;
94 }
95 
96 
97 /**
98  * @brief Initialize callback structure
99  * @param[in] callbacks Pointer to a structure that contains callback functions
100  **/
101 
103 {
104  //Initialize callback structure
105  memset(callbacks, 0, sizeof(MqttClientCallbacks));
106 }
107 
108 
109 /**
110  * @brief Register MQTT client callbacks
111  * @param[in] context Pointer to the MQTT client context
112  * @param[in] callbacks Pointer to a structure that contains callback functions
113  * @return Error code
114  **/
115 
117  const MqttClientCallbacks *callbacks)
118 {
119  //Make sure the MQTT client context is valid
120  if(context == NULL)
122 
123  //Attach callback functions
124  context->callbacks = *callbacks;
125 
126  //Successful processing
127  return NO_ERROR;
128 }
129 
130 
131 /**
132  * @brief Set the MQTT protocol version to be used
133  * @param[in] context Pointer to the MQTT client context
134  * @param[in] version MQTT protocol version (3.1 or 3.1.1)
135  * @return Error code
136  **/
137 
139 {
140  //Make sure the MQTT client context is valid
141  if(context == NULL)
143 
144  //Save the MQTT protocol version to be used
145  context->settings.version = version;
146 
147  //Successful processing
148  return NO_ERROR;
149 }
150 
151 
152 /**
153  * @brief Set the transport protocol to be used
154  * @param[in] context Pointer to the MQTT client context
155  * @param[in] transportProtocol Transport protocol to be used (TCP, TLS,
156  * WebSocket, or secure WebSocket)
157  * @return Error code
158  **/
159 
161  MqttTransportProtocol transportProtocol)
162 {
163  //Make sure the MQTT client context is valid
164  if(context == NULL)
166 
167  //Save the transport protocol to be used
168  context->settings.transportProtocol = transportProtocol;
169 
170  //Successful processing
171  return NO_ERROR;
172 }
173 
174 
175 #if (MQTT_CLIENT_TLS_SUPPORT == ENABLED)
176 
177 /**
178  * @brief Register TLS initialization callback function
179  * @param[in] context Pointer to the MQTT- client context
180  * @param[in] callback TLS initialization callback function
181  * @return Error code
182  **/
183 
185  MqttClientTlsInitCallback callback)
186 {
187  //Check parameters
188  if(context == NULL || callback == NULL)
190 
191  //Save callback function
192  context->callbacks.tlsInitCallback = callback;
193 
194  //Successful processing
195  return NO_ERROR;
196 }
197 
198 #endif
199 
200 
201 /**
202  * @brief Register publish callback function
203  * @param[in] context Pointer to the MQTT client context
204  * @param[in] callback Callback function to be called when a PUBLISH message
205  * is received
206  * @return Error code
207  **/
208 
210  MqttClientPublishCallback callback)
211 {
212  //Make sure the MQTT client context is valid
213  if(context == NULL)
215 
216  //Save callback function
217  context->callbacks.publishCallback = callback;
218 
219  //Successful processing
220  return NO_ERROR;
221 }
222 
223 
224 /**
225  * @brief Set communication timeout
226  * @param[in] context Pointer to the MQTT client context
227  * @param[in] timeout Timeout value, in seconds
228  * @return Error code
229  **/
230 
232 {
233  //Make sure the MQTT client context is valid
234  if(context == NULL)
236 
237  //Save timeout value
238  context->settings.timeout = timeout;
239 
240  //Successful processing
241  return NO_ERROR;
242 }
243 
244 
245 /**
246  * @brief Set keep-alive value
247  * @param[in] context Pointer to the MQTT client context
248  * @param[in] keepAlive Maximum time interval that is permitted to elapse
249  * between the point at which the client finishes transmitting one control
250  * packet and the point it starts sending the next
251  * @return Error code
252  **/
253 
254 error_t mqttClientSetKeepAlive(MqttClientContext *context, uint16_t keepAlive)
255 {
256  //Make sure the MQTT client context is valid
257  if(context == NULL)
259 
260  //Save keep-alive value
261  context->settings.keepAlive = keepAlive;
262 
263  //Successful processing
264  return NO_ERROR;
265 }
266 
267 
268 /**
269  * @brief Set the domain name of the server (for virtual hosting)
270  * @param[in] context Pointer to the MQTT client context
271  * @param[in] host NULL-terminated string containing the hostname
272  * @return Error code
273  **/
274 
276 {
277  //Check parameters
278  if(context == NULL || host == NULL)
280 
281  //Make sure the length of the hostname is acceptable
282  if(strlen(host) > MQTT_CLIENT_MAX_HOST_LEN)
283  return ERROR_INVALID_LENGTH;
284 
285 #if (MQTT_CLIENT_WS_SUPPORT == ENABLED)
286  //Save hostname (for WebSocket connections only)
287  strcpy(context->settings.host, host);
288 #endif
289 
290  //Successful processing
291  return NO_ERROR;
292 }
293 
294 
295 /**
296  * @brief Set the name of the resource being requested
297  * @param[in] context Pointer to the MQTT client context
298  * @param[in] uri NULL-terminated string that contains the resource name
299  * @return Error code
300  **/
301 
303 {
304  //Check parameters
305  if(context == NULL || uri == NULL)
307 
308  //Make sure the length of the resource name is acceptable
309  if(strlen(uri) > MQTT_CLIENT_MAX_URI_LEN)
310  return ERROR_INVALID_LENGTH;
311 
312 #if (MQTT_CLIENT_WS_SUPPORT == ENABLED)
313  //Save resource name (for WebSocket connections only)
314  strcpy(context->settings.uri, uri);
315 #endif
316 
317  //Successful processing
318  return NO_ERROR;
319 }
320 
321 
322 /**
323  * @brief Set client identifier
324  * @param[in] context Pointer to the MQTT client context
325  * @param[in] clientId NULL-terminated string containing the client identifier
326  * @return Error code
327  **/
328 
330  const char_t *clientId)
331 {
332  //Check parameters
333  if(context == NULL || clientId == NULL)
335 
336  //Make sure the length of the client identifier is acceptable
337  if(strlen(clientId) > MQTT_CLIENT_MAX_ID_LEN)
338  return ERROR_INVALID_LENGTH;
339 
340  //Save client identifier
341  strcpy(context->settings.clientId, clientId);
342 
343  //Successful processing
344  return NO_ERROR;
345 }
346 
347 
348 /**
349  * @brief Set authentication information
350  * @param[in] context Pointer to the MQTT client context
351  * @param[in] username NULL-terminated string containing the user name to be used
352  * @param[in] password NULL-terminated string containing the password to be used
353  * @return Error code
354  **/
355 
357  const char_t *username, const char_t *password)
358 {
359  //Check parameters
360  if(context == NULL || username == NULL || password == NULL)
362 
363  //Make sure the length of the user name is acceptable
364  if(strlen(username) > MQTT_CLIENT_MAX_USERNAME_LEN)
365  return ERROR_INVALID_LENGTH;
366 
367  //Save user name
368  strcpy(context->settings.username, username);
369 
370  //Make sure the length of the password is acceptable
371  if(strlen(password) > MQTT_CLIENT_MAX_PASSWORD_LEN)
372  return ERROR_INVALID_LENGTH;
373 
374  //Save password
375  strcpy(context->settings.password, password);
376 
377  //Successful processing
378  return NO_ERROR;
379 }
380 
381 
382 /**
383  * @brief Specify the Will message
384  * @param[in] context Pointer to the MQTT client context
385  * @param[in] topic Will topic name
386  * @param[in] message Will message
387  * @param[in] length Length of the Will message
388  * @param[in] qos QoS level to be used when publishing the Will message
389  * @param[in] retain This flag specifies if the Will message is to be retained
390  * @return Error code
391  **/
392 
394  const void *message, size_t length, MqttQosLevel qos, bool_t retain)
395 {
396  MqttClientWillMessage *willMessage;
397 
398  //Check parameters
399  if(context == NULL || topic == NULL)
401 
402  //Make sure the length of the Will topic is acceptable
403  if(strlen(topic) > MQTT_CLIENT_MAX_WILL_TOPIC_LEN)
404  return ERROR_INVALID_LENGTH;
405 
406  //Point to the Will message
407  willMessage = &context->settings.willMessage;
408 
409  //Save Will topic
410  strcpy(willMessage->topic, topic);
411 
412  //Any message payload
413  if(length > 0)
414  {
415  //Sanity check
416  if(message == NULL)
418 
419  //Make sure the length of the Will message payload is acceptable
421  return ERROR_INVALID_LENGTH;
422 
423  //Save Will message payload
424  memcpy(willMessage->payload, message, length);
425  }
426 
427  //Length of the Will message payload
428  willMessage->length = length;
429  //QoS level to be used when publishing the Will message
430  willMessage->qos = qos;
431  //This flag specifies if the Will message is to be retained
432  willMessage->retain = retain;
433 
434  //Successful processing
435  return NO_ERROR;
436 }
437 
438 
439 /**
440  * @brief Bind the MQTT client to a particular network interface
441  * @param[in] context Pointer to the MQTT client context
442  * @param[in] interface Network interface to be used
443  * @return Error code
444  **/
445 
447  NetInterface *interface)
448 {
449  //Make sure the MQTT client context is valid
450  if(context == NULL)
452 
453  //Explicitly associate the MQTT client with the specified interface
454  context->interface = interface;
455 
456  //Successful processing
457  return NO_ERROR;
458 }
459 
460 
461 /**
462  * @brief Establish connection with the MQTT server
463  * @param[in] context Pointer to the MQTT client context
464  * @param[in] serverIpAddr IP address of the MQTT server to connect to
465  * @param[in] serverPort TCP port number that will be used to establish the
466  * connection
467  * @param[in] cleanSession If this flag is set, then the client and server
468  * must discard any previous session and start a new one
469  * @return Error code
470  **/
471 
473  const IpAddr *serverIpAddr, uint16_t serverPort, bool_t cleanSession)
474 {
475  error_t error;
476 
477  //Check parameters
478  if(context == NULL || serverIpAddr == NULL)
480 
481  //Initialize status code
482  error = NO_ERROR;
483 
484  //Establish network connection
485  while(context->state != MQTT_CLIENT_STATE_IDLE)
486  {
487  //Check current state
488  if(context->state == MQTT_CLIENT_STATE_CLOSED)
489  {
490  //Open network connection
491  error = mqttClientOpenConnection(context);
492 
493  //Check status code
494  if(!error)
495  {
496  //Debug message
497  TRACE_INFO("MQTT: Connecting to server %s port %" PRIu16 "...\r\n",
498  ipAddrToString(serverIpAddr, NULL), serverPort);
499 
500  //The network connection is open
502  }
503  else
504  {
505  //Clean up side effects
506  mqttClientCloseConnection(context);
507  }
508  }
509  else if(context->state == MQTT_CLIENT_STATE_CONNECTING)
510  {
511  //Establish network connection
512  error = mqttClientEstablishConnection(context,
513  serverIpAddr, serverPort);
514 
515  //Check status code
516  if(!error)
517  {
518  //Debug message
519  TRACE_INFO("MQTT: Connected to server\r\n");
520 
521  //The network connection is established
523  }
524  }
525  else if(context->state == MQTT_CLIENT_STATE_CONNECTED)
526  {
527  //Format CONNECT packet
528  error = mqttClientFormatConnect(context, cleanSession);
529 
530  //Check status code
531  if(!error)
532  {
533  //Debug message
534  TRACE_INFO("MQTT: Sending CONNECT packet (%" PRIuSIZE " bytes)...\r\n", context->packetLen);
535  TRACE_DEBUG_ARRAY(" ", context->packet, context->packetLen);
536 
537  //Save the type of the MQTT packet to be sent
538  context->packetType = MQTT_PACKET_TYPE_CONNECT;
539  //Point to the beginning of the packet
540  context->packetPos = 0;
541 
542  //Send CONNECT packet
544  }
545  }
546  else if(context->state == MQTT_CLIENT_STATE_SENDING_PACKET)
547  {
548  //Send more data
549  error = mqttClientProcessEvents(context, context->settings.timeout);
550  }
551  else if(context->state == MQTT_CLIENT_STATE_PACKET_SENT)
552  {
553  //Wait for CONNACK packet
554  error = mqttClientProcessEvents(context, context->settings.timeout);
555  }
556  else if(context->state == MQTT_CLIENT_STATE_RECEIVING_PACKET)
557  {
558  //Receive more data
559  error = mqttClientProcessEvents(context, context->settings.timeout);
560  }
561  else if(context->state == MQTT_CLIENT_STATE_PACKET_RECEIVED)
562  {
563  //Reset packet type
564  context->packetType = MQTT_PACKET_TYPE_INVALID;
565  //A CONNACK packet has been received
567  }
568  else
569  {
570  //Invalid state
571  error = ERROR_NOT_CONNECTED;
572  }
573 
574  //Any error to report?
575  if(error)
576  {
577 #if (NET_RTOS_SUPPORT == DISABLED)
578  //Timeout error?
579  if(error == ERROR_WOULD_BLOCK || error == ERROR_TIMEOUT)
580  break;
581 #endif
582  //Close connection
583  mqttClientCloseConnection(context);
584  //The connection is closed
586  //Exit immediately
587  break;
588  }
589  }
590 
591  //Return status code
592  return error;
593 }
594 
595 
596 /**
597  * @brief Publish message
598  * @param[in] context Pointer to the MQTT client context
599  * @param[in] topic Topic name
600  * @param[in] message Message payload
601  * @param[in] length Length of the message payload
602  * @param[in] qos QoS level to be used when publishing the message
603  * @param[in] retain This flag specifies if the message is to be retained
604  * @param[out] packetId Packet identifier used to send the PUBLISH packet
605  * @return Error code
606  **/
607 
609  const char_t *topic, const void *message, size_t length,
610  MqttQosLevel qos, bool_t retain, uint16_t *packetId)
611 {
612  error_t error;
613 
614  //Check parameters
615  if(context == NULL || topic == NULL)
617  if(message == NULL && length != 0)
619 
620  //Initialize status code
621  error = NO_ERROR;
622 
623  //Send PUBLISH packet and wait for PUBACK/PUBCOMP packet to be received
624  while(!error)
625  {
626  //Check current state
627  if(context->state == MQTT_CLIENT_STATE_IDLE)
628  {
629  //Check for transmission completion
630  if(context->packetType == MQTT_PACKET_TYPE_INVALID)
631  {
632  //Format PUBLISH packet
633  error = mqttClientFormatPublish(context, topic, message,
634  length, qos, retain);
635 
636  //Check status code
637  if(!error)
638  {
639  //Save the packet identifier used to send the PUBLISH packet
640  if(packetId != NULL)
641  *packetId = context->packetId;
642 
643  //Debug message
644  TRACE_INFO("MQTT: Sending PUBLISH packet (%" PRIuSIZE " bytes)...\r\n",
645  context->packetLen);
646 
647  //Dump the contents of the PUBLISH packet
648  TRACE_DEBUG_ARRAY(" ", context->packet, context->packetLen);
649 
650  //Save the type of the MQTT packet to be sent
651  context->packetType = MQTT_PACKET_TYPE_PUBLISH;
652  //Point to the beginning of the packet
653  context->packetPos = 0;
654 
655  //Send PUBLISH packet
657  }
658  }
659  else
660  {
661  //Reset packet type
662  context->packetType = MQTT_PACKET_TYPE_INVALID;
663  //We are done
664  break;
665  }
666  }
667  else if(context->state == MQTT_CLIENT_STATE_SENDING_PACKET)
668  {
669  //Send more data
670  error = mqttClientProcessEvents(context, context->settings.timeout);
671  }
672  else if(context->state == MQTT_CLIENT_STATE_PACKET_SENT)
673  {
674  //The last parameter is optional
675  if(packetId != NULL)
676  {
677  //Do not wait for PUBACK/PUBCOMP packet
679  }
680  else
681  {
682  //Check QoS level
683  if(qos == MQTT_QOS_LEVEL_0)
684  {
685  //No response is sent by the receiver and no retry is performed by the sender
687  }
688  else
689  {
690  //Wait for PUBACK/PUBCOMP packet
691  error = mqttClientProcessEvents(context, context->settings.timeout);
692  }
693  }
694  }
695  else if(context->state == MQTT_CLIENT_STATE_RECEIVING_PACKET)
696  {
697  //Receive more data
698  error = mqttClientProcessEvents(context, context->settings.timeout);
699  }
700  else if(context->state == MQTT_CLIENT_STATE_PACKET_RECEIVED)
701  {
702  //A PUBACK/PUBCOMP packet has been received
704  }
705  else
706  {
707  //Invalid state
708  error = ERROR_NOT_CONNECTED;
709  }
710  }
711 
712  //Return status code
713  return error;
714 }
715 
716 
717 /**
718  * @brief Subscribe to topic
719  * @param[in] context Pointer to the MQTT client context
720  * @param[in] topic Topic filter
721  * @param[in] qos Maximum QoS level at which the server can send application
722  * messages to the client
723  * @param[out] packetId Packet identifier used to send the SUBSCRIBE packet
724  * @return Error code
725  **/
726 
728  const char_t *topic, MqttQosLevel qos, uint16_t *packetId)
729 {
730  error_t error;
731 
732  //Check parameters
733  if(context == NULL || topic == NULL)
735 
736  //Initialize status code
737  error = NO_ERROR;
738 
739  //Send SUBSCRIBE packet and wait for SUBACK packet to be received
740  while(!error)
741  {
742  //Check current state
743  if(context->state == MQTT_CLIENT_STATE_IDLE)
744  {
745  //Check for transmission completion
746  if(context->packetType == MQTT_PACKET_TYPE_INVALID)
747  {
748  //Format SUBSCRIBE packet
749  error = mqttClientFormatSubscribe(context, topic, qos);
750 
751  //Check status code
752  if(!error)
753  {
754  //Save the packet identifier used to send the SUBSCRIBE packet
755  if(packetId != NULL)
756  *packetId = context->packetId;
757 
758  //Debug message
759  TRACE_INFO("MQTT: Sending SUBSCRIBE packet (%" PRIuSIZE " bytes)...\r\n",
760  context->packetLen);
761 
762  //Dump the contents of the SUBSCRIBE packet
763  TRACE_DEBUG_ARRAY(" ", context->packet, context->packetLen);
764 
765  //Save the type of the MQTT packet to be sent
766  context->packetType = MQTT_PACKET_TYPE_SUBSCRIBE;
767  //Point to the beginning of the packet
768  context->packetPos = 0;
769 
770  //Send SUBSCRIBE packet
772  }
773  }
774  else
775  {
776  //Reset packet type
777  context->packetType = MQTT_PACKET_TYPE_INVALID;
778  //We are done
779  break;
780  }
781  }
782  else if(context->state == MQTT_CLIENT_STATE_SENDING_PACKET)
783  {
784  //Send more data
785  error = mqttClientProcessEvents(context, context->settings.timeout);
786  }
787  else if(context->state == MQTT_CLIENT_STATE_PACKET_SENT)
788  {
789  //The last parameter is optional
790  if(packetId != NULL)
791  {
792  //Do not wait for SUBACK packet
794  }
795  else
796  {
797  //Wait for SUBACK packet
798  error = mqttClientProcessEvents(context, context->settings.timeout);
799  }
800  }
801  else if(context->state == MQTT_CLIENT_STATE_RECEIVING_PACKET)
802  {
803  //Receive more data
804  error = mqttClientProcessEvents(context, context->settings.timeout);
805  }
806  else if(context->state == MQTT_CLIENT_STATE_PACKET_RECEIVED)
807  {
808  //A SUBACK packet has been received
810  }
811  else
812  {
813  //Invalid state
814  error = ERROR_NOT_CONNECTED;
815  }
816  }
817 
818  //Return status code
819  return error;
820 }
821 
822 
823 /**
824  * @brief Unsubscribe from topic
825  * @param[in] context Pointer to the MQTT client context
826  * @param[in] topic Topic filter
827  * @param[out] packetId Packet identifier used to send the UNSUBSCRIBE packet
828  * @return Error code
829  **/
830 
832  const char_t *topic, uint16_t *packetId)
833 {
834  error_t error;
835 
836  //Check parameters
837  if(context == NULL || topic == NULL)
839 
840  //Initialize status code
841  error = NO_ERROR;
842 
843  //Send UNSUBSCRIBE packet and wait for UNSUBACK packet to be received
844  while(!error)
845  {
846  //Check current state
847  if(context->state == MQTT_CLIENT_STATE_IDLE)
848  {
849  //Check for transmission completion
850  if(context->packetType == MQTT_PACKET_TYPE_INVALID)
851  {
852  //Format UNSUBSCRIBE packet
853  error = mqttClientFormatUnsubscribe(context, topic);
854 
855  //Check status code
856  if(!error)
857  {
858  //Save the packet identifier used to send the UNSUBSCRIBE packet
859  if(packetId != NULL)
860  *packetId = context->packetId;
861 
862  //Debug message
863  TRACE_INFO("MQTT: Sending UNSUBSCRIBE packet (%" PRIuSIZE " bytes)...\r\n",
864  context->packetLen);
865 
866  //Dump the contents of the UNSUBSCRIBE packet
867  TRACE_DEBUG_ARRAY(" ", context->packet, context->packetLen);
868 
869  //Save the type of the MQTT packet to be sent
870  context->packetType = MQTT_PACKET_TYPE_UNSUBSCRIBE;
871  //Point to the beginning of the packet
872  context->packetPos = 0;
873 
874  //Send UNSUBSCRIBE packet
876  }
877  }
878  else
879  {
880  //Reset packet type
881  context->packetType = MQTT_PACKET_TYPE_INVALID;
882  //We are done
883  break;
884  }
885  }
886  else if(context->state == MQTT_CLIENT_STATE_SENDING_PACKET)
887  {
888  //Send more data
889  error = mqttClientProcessEvents(context, context->settings.timeout);
890  }
891  else if(context->state == MQTT_CLIENT_STATE_PACKET_SENT)
892  {
893  //The last parameter is optional
894  if(packetId != NULL)
895  {
896  //Do not wait for UNSUBACK packet
898  }
899  else
900  {
901  //Wait for UNSUBACK packet
902  error = mqttClientProcessEvents(context, context->settings.timeout);
903  }
904  }
905  else if(context->state == MQTT_CLIENT_STATE_RECEIVING_PACKET)
906  {
907  //Receive more data
908  error = mqttClientProcessEvents(context, context->settings.timeout);
909  }
910  else if(context->state == MQTT_CLIENT_STATE_PACKET_RECEIVED)
911  {
912  //An UNSUBACK packet has been received
914  }
915  else
916  {
917  //Invalid state
918  error = ERROR_NOT_CONNECTED;
919  }
920  }
921 
922  //Return status code
923  return error;
924 }
925 
926 
927 /**
928  * @brief Send ping request
929  * @param[in] context Pointer to the MQTT client context
930  * @param[out] rtt Round-trip time (optional parameter)
931  * @return Error code
932  **/
933 
935 {
936  error_t error;
937 
938  //Make sure the MQTT client context is valid
939  if(context == NULL)
941 
942  //Initialize status code
943  error = NO_ERROR;
944 
945  //Send PINGREQ packet and wait for PINGRESP packet to be received
946  while(!error)
947  {
948  //Check current state
949  if(context->state == MQTT_CLIENT_STATE_IDLE)
950  {
951  //Check for transmission completion
952  if(context->packetType == MQTT_PACKET_TYPE_INVALID)
953  {
954  //Format PINGREQ packet
955  error = mqttClientFormatPingReq(context);
956 
957  //Check status code
958  if(!error)
959  {
960  //Debug message
961  TRACE_INFO("MQTT: Sending PINGREQ packet (%" PRIuSIZE " bytes)...\r\n",
962  context->packetLen);
963 
964  //Dump the contents of the PINGREQ packet
965  TRACE_DEBUG_ARRAY(" ", context->packet, context->packetLen);
966 
967  //Save the type of the MQTT packet to be sent
968  context->packetType = MQTT_PACKET_TYPE_PINGREQ;
969  //Point to the beginning of the packet
970  context->packetPos = 0;
971 
972  //Send PINGREQ packet
974 
975  //Save the time at which the request was sent
976  if(rtt != NULL)
977  context->pingTimestamp = osGetSystemTime();
978  }
979  }
980  else
981  {
982  //Reset packet type
983  context->packetType = MQTT_PACKET_TYPE_INVALID;
984  //We are done
985  break;
986  }
987  }
988  else if(context->state == MQTT_CLIENT_STATE_SENDING_PACKET)
989  {
990  //Send more data
991  error = mqttClientProcessEvents(context, context->settings.timeout);
992  }
993  else if(context->state == MQTT_CLIENT_STATE_PACKET_SENT)
994  {
995  //The last parameter is optional
996  if(rtt != NULL)
997  {
998  //Wait for PINGRESP packet
999  error = mqttClientProcessEvents(context, context->settings.timeout);
1000  }
1001  else
1002  {
1003  //Do not wait for PINGRESP packet
1005  }
1006  }
1007  else if(context->state == MQTT_CLIENT_STATE_RECEIVING_PACKET)
1008  {
1009  //Receive more data
1010  error = mqttClientProcessEvents(context, context->settings.timeout);
1011  }
1012  else if(context->state == MQTT_CLIENT_STATE_PACKET_RECEIVED)
1013  {
1014  //The last parameter is optional
1015  if(rtt != NULL)
1016  {
1017  //Compute round-trip time
1018  *rtt = osGetSystemTime() - context->pingTimestamp;
1019  }
1020 
1021  //A PINGRESP packet has been received
1023  }
1024  else
1025  {
1026  //Invalid state
1027  error = ERROR_NOT_CONNECTED;
1028  }
1029  }
1030 
1031  //Return status code
1032  return error;
1033 }
1034 
1035 
1036 /**
1037  * @brief Process MQTT client events
1038  * @param[in] context Pointer to the MQTT client context
1039  * @param[in] timeout Maximum time to wait before returning
1040  * @return Error code
1041  **/
1042 
1044 {
1045  error_t error;
1046 
1047  //Make sure the MQTT client context is valid
1048  if(context == NULL)
1049  return ERROR_INVALID_PARAMETER;
1050 
1051  //Process MQTT client events
1052  error = mqttClientProcessEvents(context, timeout);
1053 
1054  //Check status code
1055  if(error == ERROR_TIMEOUT)
1056  {
1057  //Catch exception
1058  error = NO_ERROR;
1059  }
1060 
1061  //Return status code
1062  return error;
1063 }
1064 
1065 
1066 /**
1067  * @brief Gracefully disconnect from the MQTT server
1068  * @param[in] context Pointer to the MQTT client context
1069  * @return Error code
1070  **/
1071 
1073 {
1074  error_t error;
1075 
1076  //Make sure the MQTT client context is valid
1077  if(context == NULL)
1078  return ERROR_INVALID_PARAMETER;
1079 
1080  //Initialize status code
1081  error = NO_ERROR;
1082 
1083  //Send DISCONNECT packet and shutdown network connection
1084  while(context->state != MQTT_CLIENT_STATE_DISCONNECTED)
1085  {
1086  //Check current state
1087  if(context->state == MQTT_CLIENT_STATE_IDLE)
1088  {
1089  //Format DISCONNECT packet
1090  error = mqttClientFormatDisconnect(context);
1091 
1092  //Check status code
1093  if(!error)
1094  {
1095  //Debug message
1096  TRACE_INFO("MQTT: Sending DISCONNECT packet (%" PRIuSIZE " bytes)...\r\n", context->packetLen);
1097  TRACE_DEBUG_ARRAY(" ", context->packet, context->packetLen);
1098 
1099  //Save the type of the MQTT packet to be sent
1100  context->packetType = MQTT_PACKET_TYPE_DISCONNECT;
1101  //Point to the beginning of the packet
1102  context->packetPos = 0;
1103 
1104  //Send DISCONNECT packet
1106  }
1107  }
1108  else if(context->state == MQTT_CLIENT_STATE_SENDING_PACKET)
1109  {
1110  //Send more data
1111  error = mqttClientProcessEvents(context, context->settings.timeout);
1112  }
1113  else if(context->state == MQTT_CLIENT_STATE_PACKET_SENT)
1114  {
1115  //Debug message
1116  TRACE_INFO("MQTT: Shutting down connection...\r\n");
1117 
1118  //After sending a DISCONNECT packet the client must not send any
1119  //more control packets on that network connection
1121  }
1122  else if(context->state == MQTT_CLIENT_STATE_DISCONNECTING)
1123  {
1124  //Properly dispose the network connection
1125  error = mqttClientShutdownConnection(context);
1126 
1127  //Check status code
1128  if(!error)
1129  {
1130  //The MQTT client is disconnected
1132  }
1133  }
1134  else
1135  {
1136  //Invalid state
1137  error = ERROR_NOT_CONNECTED;
1138  }
1139 
1140  //Any error to report?
1141  if(error)
1142  break;
1143  }
1144 
1145  //Return status code
1146  return error;
1147 }
1148 
1149 
1150 /**
1151  * @brief Close the connection with the MQTT server
1152  * @param[in] context Pointer to the MQTT client context
1153  * @return Error code
1154  **/
1155 
1157 {
1158  //Make sure the MQTT client context is valid
1159  if(context == NULL)
1160  return ERROR_INVALID_PARAMETER;
1161 
1162  //Close connection
1163  mqttClientCloseConnection(context);
1164  //The connection is closed
1166 
1167  //Network connection successfully closed
1168  return NO_ERROR;
1169 }
1170 
1171 
1172 /**
1173  * @brief Release MQTT client context
1174  * @param[in] context Pointer to the MQTT client context
1175  **/
1176 
1178 {
1179  //Make sure the MQTT client context is valid
1180  if(context != NULL)
1181  {
1182  //Close connection
1183  mqttClientCloseConnection(context);
1184 
1185 #if (MQTT_CLIENT_TLS_SUPPORT == ENABLED)
1186  //Release TLS session state
1187  tlsFreeSessionState(&context->tlsSession);
1188 #endif
1189 
1190  //Clear MQTT client context
1191  memset(context, 0, sizeof(MqttClientContext));
1192  }
1193 }
1194 
1195 #endif
uint8_t length
Definition: dtls_misc.h:149
void mqttClientCloseConnection(MqttClientContext *context)
Close network connection.
error_t mqttClientEstablishConnection(MqttClientContext *context, const IpAddr *serverIpAddr, uint16_t serverPort)
Establish network connection.
int bool_t
Definition: compiler_port.h:49
Client is disconnecting.
Definition: mqtt_common.h:114
error_t(* MqttClientTlsInitCallback)(MqttClientContext *context, TlsContext *tlsContext)
TLS initialization callback.
Definition: mqtt_client.h:249
error_t mqttClientSubscribe(MqttClientContext *context, const char_t *topic, MqttQosLevel qos, uint16_t *packetId)
Subscribe to topic.
Definition: mqtt_client.c:727
error_t mqttClientFormatPingReq(MqttClientContext *context)
Format PINGREQ packet.
IP network address.
Definition: ip.h:71
MqttTransportProtocol
Transport protocol.
Definition: mqtt_common.h:73
error_t mqttClientPing(MqttClientContext *context, systime_t *rtt)
Send ping request.
Definition: mqtt_client.c:934
Publish message.
Definition: mqtt_common.h:103
error_t mqttClientOpenConnection(MqttClientContext *context)
Open network connection.
uint16_t version
Definition: dtls_misc.h:172
char_t * ipAddrToString(const IpAddr *ipAddr, char_t *str)
Convert a binary IP address to a string representation.
Definition: ip.c:726
error_t mqttClientConnect(MqttClientContext *context, const IpAddr *serverIpAddr, uint16_t serverPort, bool_t cleanSession)
Establish connection with the MQTT server.
Definition: mqtt_client.c:472
uint8_t qos
Definition: mqtt_common.h:179
uint8_t payload[MQTT_CLIENT_MAX_WILL_PAYLOAD_LEN]
Will message payload.
Definition: mqtt_client.h:262
void tlsFreeSessionState(TlsSessionState *session)
Properly dispose a session state.
Definition: tls.c:2711
MQTT client callback functions.
Definition: mqtt_client.h:273
error_t mqttClientInit(MqttClientContext *context)
Initialize MQTT client context.
Definition: mqtt_client.c:52
error_t mqttClientFormatConnect(MqttClientContext *context, bool_t cleanSession)
Format CONNECT packet.
error_t mqttClientProcessEvents(MqttClientContext *context, systime_t timeout)
Process MQTT client events.
error_t mqttClientPublish(MqttClientContext *context, const char_t *topic, const void *message, size_t length, MqttQosLevel qos, bool_t retain, uint16_t *packetId)
Publish message.
Definition: mqtt_client.c:608
error_t mqttClientDisconnect(MqttClientContext *context)
Gracefully disconnect from the MQTT server.
Definition: mqtt_client.c:1072
Invalid parameter.
Definition: error.h:47
#define MQTT_CLIENT_MAX_USERNAME_LEN
Definition: mqtt_client.h:96
Helper functions for MQTT client.
void mqttClientChangeState(MqttClientContext *context, MqttClientState newState)
Update MQTT client state.
error_t
Error codes.
Definition: error.h:42
Transport protocol abstraction layer.
At most once delivery.
Definition: mqtt_common.h:88
error_t mqttClientFormatPublish(MqttClientContext *context, const char_t *topic, const void *message, size_t length, MqttQosLevel qos, bool_t retain)
Format PUBLISH packet.
MqttQosLevel
Quality of service level.
Definition: mqtt_common.h:86
uint8_t retain
Definition: mqtt_common.h:178
#define MQTT_CLIENT_MAX_WILL_TOPIC_LEN
Definition: mqtt_client.h:110
error_t mqttClientSetUri(MqttClientContext *context, const char_t *uri)
Set the name of the resource being requested.
Definition: mqtt_client.c:302
#define NetInterface
Definition: net.h:36
error_t mqttClientSetTransportProtocol(MqttClientContext *context, MqttTransportProtocol transportProtocol)
Set the transport protocol to be used.
Definition: mqtt_client.c:160
error_t mqttClientFormatUnsubscribe(MqttClientContext *context, const char_t *topic)
Format UNSUBSCRIBE packet.
error_t mqttClientClose(MqttClientContext *context)
Close the connection with the MQTT server.
Definition: mqtt_client.c:1156
error_t mqttClientBindToInterface(MqttClientContext *context, NetInterface *interface)
Bind the MQTT client to a particular network interface.
Definition: mqtt_client.c:446
Client request to connect to server.
Definition: mqtt_common.h:101
Client subscribe request.
Definition: mqtt_common.h:108
MQTT packet parsing and formatting.
void mqttClientDeinit(MqttClientContext *context)
Release MQTT client context.
Definition: mqtt_client.c:1177
#define MQTT_CLIENT_MAX_URI_LEN
Definition: mqtt_client.h:82
#define TRACE_INFO(...)
Definition: debug.h:94
MqttVersion
MQTT protocol level.
Definition: mqtt_common.h:62
error_t mqttClientRegisterCallbacks(MqttClientContext *context, const MqttClientCallbacks *callbacks)
Register MQTT client callbacks.
Definition: mqtt_client.c:116
bool_t retain
Specifies if the Will message is to be retained.
Definition: mqtt_client.h:265
#define MQTT_CLIENT_DEFAULT_KEEP_ALIVE
Definition: mqtt_client.h:61
error_t mqttClientSetKeepAlive(MqttClientContext *context, uint16_t keepAlive)
Set keep-alive value.
Definition: mqtt_client.c:254
Unsubscribe request.
Definition: mqtt_common.h:110
error_t mqttClientFormatDisconnect(MqttClientContext *context)
Format DISCONNECT packet.
error_t mqttClientShutdownConnection(MqttClientContext *context)
Shutdown network connection.
size_t length
Length of the Will message payload.
Definition: mqtt_client.h:263
error_t mqttClientSetVersion(MqttClientContext *context, MqttVersion version)
Set the MQTT protocol version to be used.
Definition: mqtt_client.c:138
char char_t
Definition: compiler_port.h:43
error_t mqttClientTask(MqttClientContext *context, systime_t timeout)
Process MQTT client events.
Definition: mqtt_client.c:1043
#define TRACE_DEBUG_ARRAY(p, a, n)
Definition: debug.h:107
error_t mqttClientSetTimeout(MqttClientContext *context, systime_t timeout)
Set communication timeout.
Definition: mqtt_client.c:231
#define MQTT_CLIENT_MAX_ID_LEN
Definition: mqtt_client.h:89
uint8_t cleanSession
void(* MqttClientPublishCallback)(MqttClientContext *context, const char_t *topic, const uint8_t *message, size_t length, bool_t dup, MqttQosLevel qos, bool_t retain, uint16_t packetId)
PUBLISH message received callback.
Definition: mqtt_client.h:182
uint8_t message[]
Definition: chap.h:152
error_t mqttClientFormatSubscribe(MqttClientContext *context, const char_t *topic, MqttQosLevel qos)
Format SUBSCRIBE packet.
MQTT version 3.1.1.
Definition: mqtt_common.h:65
#define MQTT_CLIENT_MAX_PASSWORD_LEN
Definition: mqtt_client.h:103
#define MqttClientContext
Definition: mqtt_client.h:142
MqttQosLevel qos
QoS level to be used when publishing the Will message.
Definition: mqtt_client.h:264
char_t clientId[]
void mqttClientInitCallbacks(MqttClientCallbacks *callbacks)
Initialize callback structure.
Definition: mqtt_client.c:102
error_t mqttClientRegisterPublishCallback(MqttClientContext *context, MqttClientPublishCallback callback)
Register publish callback function.
Definition: mqtt_client.c:209
#define MQTT_CLIENT_DEFAULT_TIMEOUT
Definition: mqtt_client.h:68
#define MQTT_CLIENT_MAX_WILL_PAYLOAD_LEN
Definition: mqtt_client.h:117
error_t mqttClientUnsubscribe(MqttClientContext *context, const char_t *topic, uint16_t *packetId)
Unsubscribe from topic.
Definition: mqtt_client.c:831
#define PRIuSIZE
Definition: compiler_port.h:78
TCP/IP stack core.
#define MQTT_CLIENT_MAX_HOST_LEN
Definition: mqtt_client.h:75
error_t tlsInitSessionState(TlsSessionState *session)
Initialize session state.
Definition: tls.c:2406
error_t mqttClientSetAuthInfo(MqttClientContext *context, const char_t *username, const char_t *password)
Set authentication information.
Definition: mqtt_client.c:356
error_t mqttClientSetHost(MqttClientContext *context, const char_t *host)
Set the domain name of the server (for virtual hosting)
Definition: mqtt_client.c:275
uint32_t systime_t
Definition: compiler_port.h:46
error_t mqttClientSetWillMessage(MqttClientContext *context, const char_t *topic, const void *message, size_t length, MqttQosLevel qos, bool_t retain)
Specify the Will message.
Definition: mqtt_client.c:393
char_t topic[MQTT_CLIENT_MAX_WILL_TOPIC_LEN+1]
Will topic name.
Definition: mqtt_client.h:261
Success.
Definition: error.h:44
Debugging facilities.
error_t mqttClientRegisterTlsInitCallback(MqttClientContext *context, MqttClientTlsInitCallback callback)
Register TLS initialization callback function.
Definition: mqtt_client.c:184
systime_t osGetSystemTime(void)
Retrieve system time.
MQTT client.
error_t mqttClientSetIdentifier(MqttClientContext *context, const char_t *clientId)
Set client identifier.
Definition: mqtt_client.c:329