mqtt_client.c
Go to the documentation of this file.
1 /**
2  * @file mqtt_client.c
3  * @brief MQTT client
4  *
5  * @section License
6  *
7  * SPDX-License-Identifier: GPL-2.0-or-later
8  *
9  * Copyright (C) 2010-2019 Oryx Embedded SARL. All rights reserved.
10  *
11  * This file is part of CycloneTCP Open.
12  *
13  * This program is free software; you can redistribute it and/or
14  * modify it under the terms of the GNU General Public License
15  * as published by the Free Software Foundation; either version 2
16  * of the License, or (at your option) any later version.
17  *
18  * This program is distributed in the hope that it will be useful,
19  * but WITHOUT ANY WARRANTY; without even the implied warranty of
20  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
21  * GNU General Public License for more details.
22  *
23  * You should have received a copy of the GNU General Public License
24  * along with this program; if not, write to the Free Software Foundation,
25  * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
26  *
27  * @author Oryx Embedded SARL (www.oryx-embedded.com)
28  * @version 1.9.6
29  **/
30 
31 //Switch to the appropriate trace level
32 #define TRACE_LEVEL MQTT_TRACE_LEVEL
33 
34 //Dependencies
35 #include "core/net.h"
36 #include "mqtt/mqtt_client.h"
39 #include "mqtt/mqtt_client_misc.h"
40 #include "debug.h"
41 
42 //Check TCP/IP stack configuration
43 #if (MQTT_CLIENT_SUPPORT == ENABLED)
44 
45 
46 /**
47  * @brief Initialize MQTT client context
48  * @param[in] context Pointer to the MQTT client context
49  * @return Error code
50  **/
51 
53 {
54 #if (MQTT_CLIENT_TLS_SUPPORT == ENABLED)
55  error_t error;
56 #endif
57 
58  //Make sure the MQTT client context is valid
59  if(context == NULL)
61 
62  //Clear MQTT client context
63  memset(context, 0, sizeof(MqttClientContext));
64 
65 #if (MQTT_CLIENT_TLS_SUPPORT == ENABLED)
66  //Initialize TLS session state
67  error = tlsInitSessionState(&context->tlsSession);
68  //Any error to report?
69  if(error)
70  return error;
71 #endif
72 
73  //Default protocol version
74  context->settings.version = MQTT_VERSION_3_1_1;
75  //Default transport protocol
76  context->settings.transportProtocol = MQTT_TRANSPORT_PROTOCOL_TCP;
77  //Default keep-alive time interval
78  context->settings.keepAlive = MQTT_CLIENT_DEFAULT_KEEP_ALIVE;
79  //Default communication timeout
80  context->settings.timeout = MQTT_CLIENT_DEFAULT_TIMEOUT;
81 
82 #if (MQTT_CLIENT_WS_SUPPORT == ENABLED)
83  //Default resource name (for WebSocket connections only)
84  strcpy(context->settings.uri, "/");
85 #endif
86 
87  //Initialize state machine
88  context->state = MQTT_CLIENT_STATE_CLOSED;
89  //Initialize packet identifier
90  context->packetId = 0;
91 
92  //Successful initialization
93  return NO_ERROR;
94 }
95 
96 
97 /**
98  * @brief Initialize callback structure
99  * @param[in] callbacks Pointer to a structure that contains callback functions
100  * @return Error code
101  **/
102 
104 {
105  //Initialize callback structure
106  memset(callbacks, 0, sizeof(MqttClientCallbacks));
107 }
108 
109 
110 /**
111  * @brief Register MQTT client callbacks
112  * @param[in] context Pointer to the MQTT client context
113  * @param[in] callbacks Pointer to a structure that contains callback functions
114  * @return Error code
115  **/
116 
118  const MqttClientCallbacks *callbacks)
119 {
120  //Make sure the MQTT client context is valid
121  if(context == NULL)
123 
124  //Attach callback functions
125  context->callbacks = *callbacks;
126 
127  //Successful processing
128  return NO_ERROR;
129 }
130 
131 
132 /**
133  * @brief Set the MQTT protocol version to be used
134  * @param[in] context Pointer to the MQTT client context
135  * @param[in] version MQTT protocol version (3.1 or 3.1.1)
136  * @return Error code
137  **/
138 
140 {
141  //Make sure the MQTT client context is valid
142  if(context == NULL)
144 
145  //Save the MQTT protocol version to be used
146  context->settings.version = version;
147 
148  //Successful processing
149  return NO_ERROR;
150 }
151 
152 
153 /**
154  * @brief Set the transport protocol to be used
155  * @param[in] context Pointer to the MQTT client context
156  * @param[in] transportProtocol Transport protocol to be used (TCP, TLS,
157  * WebSocket, or secure WebSocket)
158  * @return Error code
159  **/
160 
162  MqttTransportProtocol transportProtocol)
163 {
164  //Make sure the MQTT client context is valid
165  if(context == NULL)
167 
168  //Save the transport protocol to be used
169  context->settings.transportProtocol = transportProtocol;
170 
171  //Successful processing
172  return NO_ERROR;
173 }
174 
175 
176 #if (MQTT_CLIENT_TLS_SUPPORT == ENABLED)
177 
178 /**
179  * @brief Register TLS initialization callback function
180  * @param[in] context Pointer to the MQTT- client context
181  * @param[in] callback TLS initialization callback function
182  * @return Error code
183  **/
184 
186  MqttClientTlsInitCallback callback)
187 {
188  //Check parameters
189  if(context == NULL || callback == NULL)
191 
192  //Save callback function
193  context->callbacks.tlsInitCallback = callback;
194 
195  //Successful processing
196  return NO_ERROR;
197 }
198 
199 #endif
200 
201 
202 /**
203  * @brief Register publish callback function
204  * @param[in] context Pointer to the MQTT client context
205  * @param[in] callback Callback function to be called when a PUBLISH message
206  * is received
207  * @return Error code
208  **/
209 
211  MqttClientPublishCallback callback)
212 {
213  //Make sure the MQTT client context is valid
214  if(context == NULL)
216 
217  //Save callback function
218  context->callbacks.publishCallback = callback;
219 
220  //Successful processing
221  return NO_ERROR;
222 }
223 
224 
225 /**
226  * @brief Set communication timeout
227  * @param[in] context Pointer to the MQTT client context
228  * @param[in] timeout Timeout value, in seconds
229  * @return Error code
230  **/
231 
233 {
234  //Make sure the MQTT client context is valid
235  if(context == NULL)
237 
238  //Save timeout value
239  context->settings.timeout = timeout;
240 
241  //Successful processing
242  return NO_ERROR;
243 }
244 
245 
246 /**
247  * @brief Set keep-alive value
248  * @param[in] context Pointer to the MQTT client context
249  * @param[in] keepAlive Maximum time interval that is permitted to elapse
250  * between the point at which the client finishes transmitting one control
251  * packet and the point it starts sending the next
252  * @return Error code
253  **/
254 
255 error_t mqttClientSetKeepAlive(MqttClientContext *context, uint16_t keepAlive)
256 {
257  //Make sure the MQTT client context is valid
258  if(context == NULL)
260 
261  //Save keep-alive value
262  context->settings.keepAlive = keepAlive;
263 
264  //Successful processing
265  return NO_ERROR;
266 }
267 
268 
269 /**
270  * @brief Set the domain name of the server (for virtual hosting)
271  * @param[in] context Pointer to the MQTT client context
272  * @param[in] host NULL-terminated string containing the hostname
273  * @return Error code
274  **/
275 
277 {
278  //Check parameters
279  if(context == NULL || host == NULL)
281 
282  //Make sure the length of the hostname is acceptable
283  if(strlen(host) > MQTT_CLIENT_MAX_HOST_LEN)
284  return ERROR_INVALID_LENGTH;
285 
286 #if (MQTT_CLIENT_WS_SUPPORT == ENABLED)
287  //Save hostname (for WebSocket connections only)
288  strcpy(context->settings.host, host);
289 #endif
290 
291  //Successful processing
292  return NO_ERROR;
293 }
294 
295 
296 /**
297  * @brief Set the name of the resource being requested
298  * @param[in] context Pointer to the MQTT client context
299  * @param[in] uri NULL-terminated string that contains the resource name
300  * @return Error code
301  **/
302 
304 {
305  //Check parameters
306  if(context == NULL || uri == NULL)
308 
309  //Make sure the length of the resource name is acceptable
310  if(strlen(uri) > MQTT_CLIENT_MAX_URI_LEN)
311  return ERROR_INVALID_LENGTH;
312 
313 #if (MQTT_CLIENT_WS_SUPPORT == ENABLED)
314  //Save resource name (for WebSocket connections only)
315  strcpy(context->settings.uri, uri);
316 #endif
317 
318  //Successful processing
319  return NO_ERROR;
320 }
321 
322 
323 /**
324  * @brief Set client identifier
325  * @param[in] context Pointer to the MQTT client context
326  * @param[in] clientId NULL-terminated string containing the client identifier
327  * @return Error code
328  **/
329 
331  const char_t *clientId)
332 {
333  //Check parameters
334  if(context == NULL || clientId == NULL)
336 
337  //Make sure the length of the client identifier is acceptable
338  if(strlen(clientId) > MQTT_CLIENT_MAX_ID_LEN)
339  return ERROR_INVALID_LENGTH;
340 
341  //Save client identifier
342  strcpy(context->settings.clientId, clientId);
343 
344  //Successful processing
345  return NO_ERROR;
346 }
347 
348 
349 /**
350  * @brief Set authentication information
351  * @param[in] context Pointer to the MQTT client context
352  * @param[in] username NULL-terminated string containing the user name to be used
353  * @param[in] password NULL-terminated string containing the password to be used
354  * @return Error code
355  **/
356 
358  const char_t *username, const char_t *password)
359 {
360  //Check parameters
361  if(context == NULL || username == NULL || password == NULL)
363 
364  //Make sure the length of the user name is acceptable
365  if(strlen(username) > MQTT_CLIENT_MAX_USERNAME_LEN)
366  return ERROR_INVALID_LENGTH;
367 
368  //Save user name
369  strcpy(context->settings.username, username);
370 
371  //Make sure the length of the password is acceptable
372  if(strlen(password) > MQTT_CLIENT_MAX_PASSWORD_LEN)
373  return ERROR_INVALID_LENGTH;
374 
375  //Save password
376  strcpy(context->settings.password, password);
377 
378  //Successful processing
379  return NO_ERROR;
380 }
381 
382 
383 /**
384  * @brief Specify the Will message
385  * @param[in] context Pointer to the MQTT client context
386  * @param[in] topic Will topic name
387  * @param[in] message Will message
388  * @param[in] length Length of the Will message
389  * @param[in] qos QoS level to be used when publishing the Will message
390  * @param[in] retain This flag specifies if the Will message is to be retained
391  * @return Error code
392  **/
393 
395  const void *message, size_t length, MqttQosLevel qos, bool_t retain)
396 {
397  MqttClientWillMessage *willMessage;
398 
399  //Check parameters
400  if(context == NULL || topic == NULL)
402 
403  //Make sure the length of the Will topic is acceptable
404  if(strlen(topic) > MQTT_CLIENT_MAX_WILL_TOPIC_LEN)
405  return ERROR_INVALID_LENGTH;
406 
407  //Point to the Will message
408  willMessage = &context->settings.willMessage;
409 
410  //Save Will topic
411  strcpy(willMessage->topic, topic);
412 
413  //Any message payload
414  if(length > 0)
415  {
416  //Sanity check
417  if(message == NULL)
419 
420  //Make sure the length of the Will message payload is acceptable
422  return ERROR_INVALID_LENGTH;
423 
424  //Save Will message payload
425  memcpy(willMessage->payload, message, length);
426  }
427 
428  //Length of the Will message payload
429  willMessage->length = length;
430  //QoS level to be used when publishing the Will message
431  willMessage->qos = qos;
432  //This flag specifies if the Will message is to be retained
433  willMessage->retain = retain;
434 
435  //Successful processing
436  return NO_ERROR;
437 }
438 
439 
440 /**
441  * @brief Bind the MQTT client to a particular network interface
442  * @param[in] context Pointer to the MQTT client context
443  * @param[in] interface Network interface to be used
444  * @return Error code
445  **/
446 
448  NetInterface *interface)
449 {
450  //Make sure the MQTT client context is valid
451  if(context == NULL)
453 
454  //Explicitly associate the MQTT client with the specified interface
455  context->interface = interface;
456 
457  //Successful processing
458  return NO_ERROR;
459 }
460 
461 
462 /**
463  * @brief Establish connection with the MQTT server
464  * @param[in] context Pointer to the MQTT client context
465  * @param[in] serverIpAddr IP address of the MQTT server to connect to
466  * @param[in] serverPort TCP port number that will be used to establish the
467  * connection
468  * @param[in] cleanSession If this flag is set, then the client and server
469  * must discard any previous session and start a new one
470  * @return Error code
471  **/
472 
474  const IpAddr *serverIpAddr, uint16_t serverPort, bool_t cleanSession)
475 {
476  error_t error;
477 
478  //Check parameters
479  if(context == NULL || serverIpAddr == NULL)
481 
482  //Initialize status code
483  error = NO_ERROR;
484 
485  //Establish network connection
486  while(context->state != MQTT_CLIENT_STATE_IDLE)
487  {
488  //Check current state
489  if(context->state == MQTT_CLIENT_STATE_CLOSED)
490  {
491  //Open network connection
492  error = mqttClientOpenConnection(context);
493 
494  //Check status code
495  if(!error)
496  {
497  //Debug message
498  TRACE_INFO("MQTT: Connecting to server %s port %" PRIu16 "...\r\n",
499  ipAddrToString(serverIpAddr, NULL), serverPort);
500 
501  //The network connection is open
503  }
504  else
505  {
506  //Clean up side effects
507  mqttClientCloseConnection(context);
508  }
509  }
510  else if(context->state == MQTT_CLIENT_STATE_CONNECTING)
511  {
512  //Establish network connection
513  error = mqttClientEstablishConnection(context,
514  serverIpAddr, serverPort);
515 
516  //Check status code
517  if(!error)
518  {
519  //Debug message
520  TRACE_INFO("MQTT: Connected to server\r\n");
521 
522  //The network connection is established
524  }
525  }
526  else if(context->state == MQTT_CLIENT_STATE_CONNECTED)
527  {
528  //Format CONNECT packet
529  error = mqttClientFormatConnect(context, cleanSession);
530 
531  //Check status code
532  if(!error)
533  {
534  //Debug message
535  TRACE_INFO("MQTT: Sending CONNECT packet (%" PRIuSIZE " bytes)...\r\n", context->packetLen);
536  TRACE_DEBUG_ARRAY(" ", context->packet, context->packetLen);
537 
538  //Save the type of the MQTT packet to be sent
539  context->packetType = MQTT_PACKET_TYPE_CONNECT;
540  //Point to the beginning of the packet
541  context->packetPos = 0;
542 
543  //Send CONNECT packet
545  }
546  }
547  else if(context->state == MQTT_CLIENT_STATE_SENDING_PACKET)
548  {
549  //Send more data
550  error = mqttClientProcessEvents(context, context->settings.timeout);
551  }
552  else if(context->state == MQTT_CLIENT_STATE_PACKET_SENT)
553  {
554  //Wait for CONNACK packet
555  error = mqttClientProcessEvents(context, context->settings.timeout);
556  }
557  else if(context->state == MQTT_CLIENT_STATE_RECEIVING_PACKET)
558  {
559  //Receive more data
560  error = mqttClientProcessEvents(context, context->settings.timeout);
561  }
562  else if(context->state == MQTT_CLIENT_STATE_PACKET_RECEIVED)
563  {
564  //Reset packet type
565  context->packetType = MQTT_PACKET_TYPE_INVALID;
566  //A CONNACK packet has been received
568  }
569  else
570  {
571  //Invalid state
572  error = ERROR_NOT_CONNECTED;
573  }
574 
575  //Any error to report?
576  if(error)
577  {
578 #if (NET_RTOS_SUPPORT == DISABLED)
579  //Timeout error?
580  if(error == ERROR_WOULD_BLOCK || error == ERROR_TIMEOUT)
581  break;
582 #endif
583  //Close connection
584  mqttClientCloseConnection(context);
585  //The connection is closed
587  //Exit immediately
588  break;
589  }
590  }
591 
592  //Return status code
593  return error;
594 }
595 
596 
597 /**
598  * @brief Publish message
599  * @param[in] context Pointer to the MQTT client context
600  * @param[in] topic Topic name
601  * @param[in] message Message payload
602  * @param[in] length Length of the message payload
603  * @param[in] qos QoS level to be used when publishing the message
604  * @param[in] retain This flag specifies if the message is to be retained
605  * @param[out] packetId Packet identifier used to send the PUBLISH packet
606  * @return Error code
607  **/
608 
610  const char_t *topic, const void *message, size_t length,
611  MqttQosLevel qos, bool_t retain, uint16_t *packetId)
612 {
613  error_t error;
614 
615  //Check parameters
616  if(context == NULL || topic == NULL)
618  if(message == NULL && length != 0)
620 
621  //Initialize status code
622  error = NO_ERROR;
623 
624  //Send PUBLISH packet and wait for PUBACK/PUBCOMP packet to be received
625  while(!error)
626  {
627  //Check current state
628  if(context->state == MQTT_CLIENT_STATE_IDLE)
629  {
630  //Check for transmission completion
631  if(context->packetType == MQTT_PACKET_TYPE_INVALID)
632  {
633  //Format PUBLISH packet
634  error = mqttClientFormatPublish(context, topic, message,
635  length, qos, retain);
636 
637  //Check status code
638  if(!error)
639  {
640  //Save the packet identifier used to send the PUBLISH packet
641  if(packetId != NULL)
642  *packetId = context->packetId;
643 
644  //Debug message
645  TRACE_INFO("MQTT: Sending PUBLISH packet (%" PRIuSIZE " bytes)...\r\n",
646  context->packetLen);
647 
648  //Dump the contents of the PUBLISH packet
649  TRACE_DEBUG_ARRAY(" ", context->packet, context->packetLen);
650 
651  //Save the type of the MQTT packet to be sent
652  context->packetType = MQTT_PACKET_TYPE_PUBLISH;
653  //Point to the beginning of the packet
654  context->packetPos = 0;
655 
656  //Send PUBLISH packet
658  }
659  }
660  else
661  {
662  //Reset packet type
663  context->packetType = MQTT_PACKET_TYPE_INVALID;
664  //We are done
665  break;
666  }
667  }
668  else if(context->state == MQTT_CLIENT_STATE_SENDING_PACKET)
669  {
670  //Send more data
671  error = mqttClientProcessEvents(context, context->settings.timeout);
672  }
673  else if(context->state == MQTT_CLIENT_STATE_PACKET_SENT)
674  {
675  //The last parameter is optional
676  if(packetId != NULL)
677  {
678  //Do not wait for PUBACK/PUBCOMP packet
680  }
681  else
682  {
683  //Check QoS level
684  if(qos == MQTT_QOS_LEVEL_0)
685  {
686  //No response is sent by the receiver and no retry is performed by the sender
688  }
689  else
690  {
691  //Wait for PUBACK/PUBCOMP packet
692  error = mqttClientProcessEvents(context, context->settings.timeout);
693  }
694  }
695  }
696  else if(context->state == MQTT_CLIENT_STATE_RECEIVING_PACKET)
697  {
698  //Receive more data
699  error = mqttClientProcessEvents(context, context->settings.timeout);
700  }
701  else if(context->state == MQTT_CLIENT_STATE_PACKET_RECEIVED)
702  {
703  //A PUBACK/PUBCOMP packet has been received
705  }
706  else
707  {
708  //Invalid state
709  error = ERROR_NOT_CONNECTED;
710  }
711  }
712 
713  //Return status code
714  return error;
715 }
716 
717 
718 /**
719  * @brief Subscribe to topic
720  * @param[in] context Pointer to the MQTT client context
721  * @param[in] topic Topic filter
722  * @param[in] qos Maximum QoS level at which the server can send application
723  * messages to the client
724  * @param[out] packetId Packet identifier used to send the SUBSCRIBE packet
725  * @return Error code
726  **/
727 
729  const char_t *topic, MqttQosLevel qos, uint16_t *packetId)
730 {
731  error_t error;
732 
733  //Check parameters
734  if(context == NULL || topic == NULL)
736 
737  //Initialize status code
738  error = NO_ERROR;
739 
740  //Send SUBSCRIBE packet and wait for SUBACK packet to be received
741  while(!error)
742  {
743  //Check current state
744  if(context->state == MQTT_CLIENT_STATE_IDLE)
745  {
746  //Check for transmission completion
747  if(context->packetType == MQTT_PACKET_TYPE_INVALID)
748  {
749  //Format SUBSCRIBE packet
750  error = mqttClientFormatSubscribe(context, topic, qos);
751 
752  //Check status code
753  if(!error)
754  {
755  //Save the packet identifier used to send the SUBSCRIBE packet
756  if(packetId != NULL)
757  *packetId = context->packetId;
758 
759  //Debug message
760  TRACE_INFO("MQTT: Sending SUBSCRIBE packet (%" PRIuSIZE " bytes)...\r\n",
761  context->packetLen);
762 
763  //Dump the contents of the SUBSCRIBE packet
764  TRACE_DEBUG_ARRAY(" ", context->packet, context->packetLen);
765 
766  //Save the type of the MQTT packet to be sent
767  context->packetType = MQTT_PACKET_TYPE_SUBSCRIBE;
768  //Point to the beginning of the packet
769  context->packetPos = 0;
770 
771  //Send SUBSCRIBE packet
773  }
774  }
775  else
776  {
777  //Reset packet type
778  context->packetType = MQTT_PACKET_TYPE_INVALID;
779  //We are done
780  break;
781  }
782  }
783  else if(context->state == MQTT_CLIENT_STATE_SENDING_PACKET)
784  {
785  //Send more data
786  error = mqttClientProcessEvents(context, context->settings.timeout);
787  }
788  else if(context->state == MQTT_CLIENT_STATE_PACKET_SENT)
789  {
790  //The last parameter is optional
791  if(packetId != NULL)
792  {
793  //Do not wait for SUBACK packet
795  }
796  else
797  {
798  //Wait for SUBACK packet
799  error = mqttClientProcessEvents(context, context->settings.timeout);
800  }
801  }
802  else if(context->state == MQTT_CLIENT_STATE_RECEIVING_PACKET)
803  {
804  //Receive more data
805  error = mqttClientProcessEvents(context, context->settings.timeout);
806  }
807  else if(context->state == MQTT_CLIENT_STATE_PACKET_RECEIVED)
808  {
809  //A SUBACK packet has been received
811  }
812  else
813  {
814  //Invalid state
815  error = ERROR_NOT_CONNECTED;
816  }
817  }
818 
819  //Return status code
820  return error;
821 }
822 
823 
824 /**
825  * @brief Unsubscribe from topic
826  * @param[in] context Pointer to the MQTT client context
827  * @param[in] topic Topic filter
828  * @param[out] packetId Packet identifier used to send the UNSUBSCRIBE packet
829  * @return Error code
830  **/
831 
833  const char_t *topic, uint16_t *packetId)
834 {
835  error_t error;
836 
837  //Check parameters
838  if(context == NULL || topic == NULL)
840 
841  //Initialize status code
842  error = NO_ERROR;
843 
844  //Send UNSUBSCRIBE packet and wait for UNSUBACK packet to be received
845  while(!error)
846  {
847  //Check current state
848  if(context->state == MQTT_CLIENT_STATE_IDLE)
849  {
850  //Check for transmission completion
851  if(context->packetType == MQTT_PACKET_TYPE_INVALID)
852  {
853  //Format UNSUBSCRIBE packet
854  error = mqttClientFormatUnsubscribe(context, topic);
855 
856  //Check status code
857  if(!error)
858  {
859  //Save the packet identifier used to send the UNSUBSCRIBE packet
860  if(packetId != NULL)
861  *packetId = context->packetId;
862 
863  //Debug message
864  TRACE_INFO("MQTT: Sending UNSUBSCRIBE packet (%" PRIuSIZE " bytes)...\r\n",
865  context->packetLen);
866 
867  //Dump the contents of the UNSUBSCRIBE packet
868  TRACE_DEBUG_ARRAY(" ", context->packet, context->packetLen);
869 
870  //Save the type of the MQTT packet to be sent
871  context->packetType = MQTT_PACKET_TYPE_UNSUBSCRIBE;
872  //Point to the beginning of the packet
873  context->packetPos = 0;
874 
875  //Send UNSUBSCRIBE packet
877  }
878  }
879  else
880  {
881  //Reset packet type
882  context->packetType = MQTT_PACKET_TYPE_INVALID;
883  //We are done
884  break;
885  }
886  }
887  else if(context->state == MQTT_CLIENT_STATE_SENDING_PACKET)
888  {
889  //Send more data
890  error = mqttClientProcessEvents(context, context->settings.timeout);
891  }
892  else if(context->state == MQTT_CLIENT_STATE_PACKET_SENT)
893  {
894  //The last parameter is optional
895  if(packetId != NULL)
896  {
897  //Do not wait for UNSUBACK packet
899  }
900  else
901  {
902  //Wait for UNSUBACK packet
903  error = mqttClientProcessEvents(context, context->settings.timeout);
904  }
905  }
906  else if(context->state == MQTT_CLIENT_STATE_RECEIVING_PACKET)
907  {
908  //Receive more data
909  error = mqttClientProcessEvents(context, context->settings.timeout);
910  }
911  else if(context->state == MQTT_CLIENT_STATE_PACKET_RECEIVED)
912  {
913  //An UNSUBACK packet has been received
915  }
916  else
917  {
918  //Invalid state
919  error = ERROR_NOT_CONNECTED;
920  }
921  }
922 
923  //Return status code
924  return error;
925 }
926 
927 
928 /**
929  * @brief Send ping request
930  * @param[in] context Pointer to the MQTT client context
931  * @param[out] rtt Round-trip time (optional parameter)
932  * @return Error code
933  **/
934 
936 {
937  error_t error;
938 
939  //Make sure the MQTT client context is valid
940  if(context == NULL)
942 
943  //Initialize status code
944  error = NO_ERROR;
945 
946  //Send PINGREQ packet and wait for PINGRESP packet to be received
947  while(!error)
948  {
949  //Check current state
950  if(context->state == MQTT_CLIENT_STATE_IDLE)
951  {
952  //Check for transmission completion
953  if(context->packetType == MQTT_PACKET_TYPE_INVALID)
954  {
955  //Format PINGREQ packet
956  error = mqttClientFormatPingReq(context);
957 
958  //Check status code
959  if(!error)
960  {
961  //Debug message
962  TRACE_INFO("MQTT: Sending PINGREQ packet (%" PRIuSIZE " bytes)...\r\n",
963  context->packetLen);
964 
965  //Dump the contents of the PINGREQ packet
966  TRACE_DEBUG_ARRAY(" ", context->packet, context->packetLen);
967 
968  //Save the type of the MQTT packet to be sent
969  context->packetType = MQTT_PACKET_TYPE_PINGREQ;
970  //Point to the beginning of the packet
971  context->packetPos = 0;
972 
973  //Send PINGREQ packet
975 
976  //Save the time at which the request was sent
977  if(rtt != NULL)
978  context->pingTimestamp = osGetSystemTime();
979  }
980  }
981  else
982  {
983  //Reset packet type
984  context->packetType = MQTT_PACKET_TYPE_INVALID;
985  //We are done
986  break;
987  }
988  }
989  else if(context->state == MQTT_CLIENT_STATE_SENDING_PACKET)
990  {
991  //Send more data
992  error = mqttClientProcessEvents(context, context->settings.timeout);
993  }
994  else if(context->state == MQTT_CLIENT_STATE_PACKET_SENT)
995  {
996  //The last parameter is optional
997  if(rtt != NULL)
998  {
999  //Wait for PINGRESP packet
1000  error = mqttClientProcessEvents(context, context->settings.timeout);
1001  }
1002  else
1003  {
1004  //Do not wait for PINGRESP packet
1006  }
1007  }
1008  else if(context->state == MQTT_CLIENT_STATE_RECEIVING_PACKET)
1009  {
1010  //Receive more data
1011  error = mqttClientProcessEvents(context, context->settings.timeout);
1012  }
1013  else if(context->state == MQTT_CLIENT_STATE_PACKET_RECEIVED)
1014  {
1015  //The last parameter is optional
1016  if(rtt != NULL)
1017  {
1018  //Compute round-trip time
1019  *rtt = osGetSystemTime() - context->pingTimestamp;
1020  }
1021 
1022  //A PINGRESP packet has been received
1024  }
1025  else
1026  {
1027  //Invalid state
1028  error = ERROR_NOT_CONNECTED;
1029  }
1030  }
1031 
1032  //Return status code
1033  return error;
1034 }
1035 
1036 
1037 /**
1038  * @brief Process MQTT client events
1039  * @param[in] context Pointer to the MQTT client context
1040  * @param[in] timeout Maximum time to wait before returning
1041  * @return Error code
1042  **/
1043 
1045 {
1046  error_t error;
1047 
1048  //Make sure the MQTT client context is valid
1049  if(context == NULL)
1050  return ERROR_INVALID_PARAMETER;
1051 
1052  //Process MQTT client events
1053  error = mqttClientProcessEvents(context, timeout);
1054 
1055  //Check status code
1056  if(error == ERROR_TIMEOUT)
1057  {
1058  //Catch exception
1059  error = NO_ERROR;
1060  }
1061 
1062  //Return status code
1063  return error;
1064 }
1065 
1066 
1067 /**
1068  * @brief Gracefully disconnect from the MQTT server
1069  * @param[in] context Pointer to the MQTT client context
1070  * @return Error code
1071  **/
1072 
1074 {
1075  error_t error;
1076 
1077  //Make sure the MQTT client context is valid
1078  if(context == NULL)
1079  return ERROR_INVALID_PARAMETER;
1080 
1081  //Initialize status code
1082  error = NO_ERROR;
1083 
1084  //Send DISCONNECT packet and shutdown network connection
1085  while(context->state != MQTT_CLIENT_STATE_DISCONNECTED)
1086  {
1087  //Check current state
1088  if(context->state == MQTT_CLIENT_STATE_IDLE)
1089  {
1090  //Format DISCONNECT packet
1091  error = mqttClientFormatDisconnect(context);
1092 
1093  //Check status code
1094  if(!error)
1095  {
1096  //Debug message
1097  TRACE_INFO("MQTT: Sending DISCONNECT packet (%" PRIuSIZE " bytes)...\r\n", context->packetLen);
1098  TRACE_DEBUG_ARRAY(" ", context->packet, context->packetLen);
1099 
1100  //Save the type of the MQTT packet to be sent
1101  context->packetType = MQTT_PACKET_TYPE_DISCONNECT;
1102  //Point to the beginning of the packet
1103  context->packetPos = 0;
1104 
1105  //Send DISCONNECT packet
1107  }
1108  }
1109  else if(context->state == MQTT_CLIENT_STATE_SENDING_PACKET)
1110  {
1111  //Send more data
1112  error = mqttClientProcessEvents(context, context->settings.timeout);
1113  }
1114  else if(context->state == MQTT_CLIENT_STATE_PACKET_SENT)
1115  {
1116  //Debug message
1117  TRACE_INFO("MQTT: Shutting down connection...\r\n");
1118 
1119  //After sending a DISCONNECT packet the client must not send any
1120  //more control packets on that network connection
1122  }
1123  else if(context->state == MQTT_CLIENT_STATE_DISCONNECTING)
1124  {
1125  //Properly dispose the network connection
1126  error = mqttClientShutdownConnection(context);
1127 
1128  //Check status code
1129  if(!error)
1130  {
1131  //The MQTT client is disconnected
1133  }
1134  }
1135  else
1136  {
1137  //Invalid state
1138  error = ERROR_NOT_CONNECTED;
1139  }
1140 
1141  //Any error to report?
1142  if(error)
1143  break;
1144  }
1145 
1146  //Return status code
1147  return error;
1148 }
1149 
1150 
1151 /**
1152  * @brief Close the connection with the MQTT server
1153  * @param[in] context Pointer to the MQTT client context
1154  * @return Error code
1155  **/
1156 
1158 {
1159  //Make sure the MQTT client context is valid
1160  if(context == NULL)
1161  return ERROR_INVALID_PARAMETER;
1162 
1163  //Close connection
1164  mqttClientCloseConnection(context);
1165  //The connection is closed
1167 
1168  //Network connection successfully closed
1169  return NO_ERROR;
1170 }
1171 
1172 
1173 /**
1174  * @brief Release MQTT client context
1175  * @param[in] context Pointer to the MQTT client context
1176  **/
1177 
1179 {
1180  //Make sure the MQTT client context is valid
1181  if(context != NULL)
1182  {
1183  //Close connection
1184  mqttClientCloseConnection(context);
1185 
1186 #if (MQTT_CLIENT_TLS_SUPPORT == ENABLED)
1187  //Release TLS session state
1188  tlsFreeSessionState(&context->tlsSession);
1189 #endif
1190 
1191  //Clear MQTT client context
1192  memset(context, 0, sizeof(MqttClientContext));
1193  }
1194 }
1195 
1196 #endif
uint8_t length
Definition: dtls_misc.h:149
void mqttClientCloseConnection(MqttClientContext *context)
Close network connection.
error_t mqttClientEstablishConnection(MqttClientContext *context, const IpAddr *serverIpAddr, uint16_t serverPort)
Establish network connection.
int bool_t
Definition: compiler_port.h:49
@ MQTT_PACKET_TYPE_INVALID
Invalid packet.
Definition: mqtt_common.h:100
@ MQTT_PACKET_TYPE_DISCONNECT
Client is disconnecting.
Definition: mqtt_common.h:114
error_t(* MqttClientTlsInitCallback)(MqttClientContext *context, TlsContext *tlsContext)
TLS initialization callback.
Definition: mqtt_client.h:249
@ ERROR_WOULD_BLOCK
Definition: error.h:95
error_t mqttClientSubscribe(MqttClientContext *context, const char_t *topic, MqttQosLevel qos, uint16_t *packetId)
Subscribe to topic.
Definition: mqtt_client.c:728
error_t mqttClientFormatPingReq(MqttClientContext *context)
Format PINGREQ packet.
IP network address.
Definition: ip.h:71
MqttTransportProtocol
Transport protocol.
Definition: mqtt_common.h:73
error_t mqttClientPing(MqttClientContext *context, systime_t *rtt)
Send ping request.
Definition: mqtt_client.c:935
@ MQTT_PACKET_TYPE_PUBLISH
Publish message.
Definition: mqtt_common.h:103
error_t mqttClientOpenConnection(MqttClientContext *context)
Open network connection.
uint16_t version
Definition: dtls_misc.h:172
char_t * ipAddrToString(const IpAddr *ipAddr, char_t *str)
Convert a binary IP address to a string representation.
Definition: ip.c:726
@ MQTT_CLIENT_STATE_DISCONNECTED
Definition: mqtt_client.h:166
error_t mqttClientConnect(MqttClientContext *context, const IpAddr *serverIpAddr, uint16_t serverPort, bool_t cleanSession)
Establish connection with the MQTT server.
Definition: mqtt_client.c:473
uint8_t qos
Definition: mqtt_common.h:179
uint8_t payload[MQTT_CLIENT_MAX_WILL_PAYLOAD_LEN]
Will message payload.
Definition: mqtt_client.h:262
@ MQTT_CLIENT_STATE_SENDING_PACKET
Definition: mqtt_client.h:160
void tlsFreeSessionState(TlsSessionState *session)
Properly dispose a session state.
Definition: tls.c:2711
MQTT client callback functions.
Definition: mqtt_client.h:273
error_t mqttClientInit(MqttClientContext *context)
Initialize MQTT client context.
Definition: mqtt_client.c:52
error_t mqttClientFormatConnect(MqttClientContext *context, bool_t cleanSession)
Format CONNECT packet.
@ MQTT_CLIENT_STATE_PACKET_RECEIVED
Definition: mqtt_client.h:164
error_t mqttClientProcessEvents(MqttClientContext *context, systime_t timeout)
Process MQTT client events.
error_t mqttClientPublish(MqttClientContext *context, const char_t *topic, const void *message, size_t length, MqttQosLevel qos, bool_t retain, uint16_t *packetId)
Publish message.
Definition: mqtt_client.c:609
error_t mqttClientDisconnect(MqttClientContext *context)
Gracefully disconnect from the MQTT server.
Definition: mqtt_client.c:1073
@ ERROR_INVALID_PARAMETER
Invalid parameter.
Definition: error.h:47
#define MQTT_CLIENT_MAX_USERNAME_LEN
Definition: mqtt_client.h:96
Helper functions for MQTT client.
void mqttClientChangeState(MqttClientContext *context, MqttClientState newState)
Update MQTT client state.
error_t
Error codes.
Definition: error.h:42
Transport protocol abstraction layer.
@ MQTT_CLIENT_STATE_PACKET_SENT
Definition: mqtt_client.h:161
@ MQTT_QOS_LEVEL_0
At most once delivery.
Definition: mqtt_common.h:88
error_t mqttClientFormatPublish(MqttClientContext *context, const char_t *topic, const void *message, size_t length, MqttQosLevel qos, bool_t retain)
Format PUBLISH packet.
MqttQosLevel
Quality of service level.
Definition: mqtt_common.h:86
uint8_t retain
Definition: mqtt_common.h:178
@ MQTT_PACKET_TYPE_PINGREQ
Ping request.
Definition: mqtt_common.h:112
#define MQTT_CLIENT_MAX_WILL_TOPIC_LEN
Definition: mqtt_client.h:110
@ MQTT_TRANSPORT_PROTOCOL_TCP
Definition: mqtt_common.h:75
error_t mqttClientSetUri(MqttClientContext *context, const char_t *uri)
Set the name of the resource being requested.
Definition: mqtt_client.c:303
#define NetInterface
Definition: net.h:36
error_t mqttClientSetTransportProtocol(MqttClientContext *context, MqttTransportProtocol transportProtocol)
Set the transport protocol to be used.
Definition: mqtt_client.c:161
@ ERROR_INVALID_LENGTH
Definition: error.h:109
error_t mqttClientFormatUnsubscribe(MqttClientContext *context, const char_t *topic)
Format UNSUBSCRIBE packet.
error_t mqttClientClose(MqttClientContext *context)
Close the connection with the MQTT server.
Definition: mqtt_client.c:1157
error_t mqttClientBindToInterface(MqttClientContext *context, NetInterface *interface)
Bind the MQTT client to a particular network interface.
Definition: mqtt_client.c:447
@ MQTT_PACKET_TYPE_CONNECT
Client request to connect to server.
Definition: mqtt_common.h:101
@ MQTT_PACKET_TYPE_SUBSCRIBE
Client subscribe request.
Definition: mqtt_common.h:108
MQTT packet parsing and formatting.
void mqttClientDeinit(MqttClientContext *context)
Release MQTT client context.
Definition: mqtt_client.c:1178
#define MQTT_CLIENT_MAX_URI_LEN
Definition: mqtt_client.h:82
#define TRACE_INFO(...)
Definition: debug.h:94
MqttVersion
MQTT protocol level.
Definition: mqtt_common.h:62
error_t mqttClientRegisterCallbacks(MqttClientContext *context, const MqttClientCallbacks *callbacks)
Register MQTT client callbacks.
Definition: mqtt_client.c:117
bool_t retain
Specifies if the Will message is to be retained.
Definition: mqtt_client.h:265
@ MQTT_CLIENT_STATE_RECEIVING_PACKET
Definition: mqtt_client.h:163
#define MQTT_CLIENT_DEFAULT_KEEP_ALIVE
Definition: mqtt_client.h:61
error_t mqttClientSetKeepAlive(MqttClientContext *context, uint16_t keepAlive)
Set keep-alive value.
Definition: mqtt_client.c:255
@ MQTT_PACKET_TYPE_UNSUBSCRIBE
Unsubscribe request.
Definition: mqtt_common.h:110
error_t mqttClientFormatDisconnect(MqttClientContext *context)
Format DISCONNECT packet.
error_t mqttClientShutdownConnection(MqttClientContext *context)
Shutdown network connection.
size_t length
Length of the Will message payload.
Definition: mqtt_client.h:263
error_t mqttClientSetVersion(MqttClientContext *context, MqttVersion version)
Set the MQTT protocol version to be used.
Definition: mqtt_client.c:139
@ MQTT_CLIENT_STATE_IDLE
Definition: mqtt_client.h:159
@ ERROR_TIMEOUT
Definition: error.h:94
char char_t
Definition: compiler_port.h:43
@ MQTT_CLIENT_STATE_CLOSED
Definition: mqtt_client.h:156
error_t mqttClientTask(MqttClientContext *context, systime_t timeout)
Process MQTT client events.
Definition: mqtt_client.c:1044
#define TRACE_DEBUG_ARRAY(p, a, n)
Definition: debug.h:107
error_t mqttClientSetTimeout(MqttClientContext *context, systime_t timeout)
Set communication timeout.
Definition: mqtt_client.c:232
@ ERROR_NOT_CONNECTED
Definition: error.h:79
@ MQTT_CLIENT_STATE_CONNECTING
Definition: mqtt_client.h:157
#define MQTT_CLIENT_MAX_ID_LEN
Definition: mqtt_client.h:89
uint8_t cleanSession
void(* MqttClientPublishCallback)(MqttClientContext *context, const char_t *topic, const uint8_t *message, size_t length, bool_t dup, MqttQosLevel qos, bool_t retain, uint16_t packetId)
PUBLISH message received callback.
Definition: mqtt_client.h:182
uint8_t message[]
Definition: chap.h:152
error_t mqttClientFormatSubscribe(MqttClientContext *context, const char_t *topic, MqttQosLevel qos)
Format SUBSCRIBE packet.
@ MQTT_CLIENT_STATE_DISCONNECTING
Definition: mqtt_client.h:165
@ MQTT_CLIENT_STATE_CONNECTED
Definition: mqtt_client.h:158
@ MQTT_VERSION_3_1_1
MQTT version 3.1.1.
Definition: mqtt_common.h:65
#define MQTT_CLIENT_MAX_PASSWORD_LEN
Definition: mqtt_client.h:103
#define MqttClientContext
Definition: mqtt_client.h:142
MqttQosLevel qos
QoS level to be used when publishing the Will message.
Definition: mqtt_client.h:264
char_t clientId[]
void mqttClientInitCallbacks(MqttClientCallbacks *callbacks)
Initialize callback structure.
Definition: mqtt_client.c:103
error_t mqttClientRegisterPublishCallback(MqttClientContext *context, MqttClientPublishCallback callback)
Register publish callback function.
Definition: mqtt_client.c:210
#define MQTT_CLIENT_DEFAULT_TIMEOUT
Definition: mqtt_client.h:68
#define MQTT_CLIENT_MAX_WILL_PAYLOAD_LEN
Definition: mqtt_client.h:117
error_t mqttClientUnsubscribe(MqttClientContext *context, const char_t *topic, uint16_t *packetId)
Unsubscribe from topic.
Definition: mqtt_client.c:832
#define PRIuSIZE
Definition: compiler_port.h:78
TCP/IP stack core.
#define MQTT_CLIENT_MAX_HOST_LEN
Definition: mqtt_client.h:75
error_t tlsInitSessionState(TlsSessionState *session)
Initialize session state.
Definition: tls.c:2406
error_t mqttClientSetAuthInfo(MqttClientContext *context, const char_t *username, const char_t *password)
Set authentication information.
Definition: mqtt_client.c:357
error_t mqttClientSetHost(MqttClientContext *context, const char_t *host)
Set the domain name of the server (for virtual hosting)
Definition: mqtt_client.c:276
uint32_t systime_t
Definition: compiler_port.h:46
error_t mqttClientSetWillMessage(MqttClientContext *context, const char_t *topic, const void *message, size_t length, MqttQosLevel qos, bool_t retain)
Specify the Will message.
Definition: mqtt_client.c:394
char_t topic[MQTT_CLIENT_MAX_WILL_TOPIC_LEN+1]
Will topic name.
Definition: mqtt_client.h:261
@ NO_ERROR
Success.
Definition: error.h:44
Debugging facilities.
error_t mqttClientRegisterTlsInitCallback(MqttClientContext *context, MqttClientTlsInitCallback callback)
Register TLS initialization callback function.
Definition: mqtt_client.c:185
systime_t osGetSystemTime(void)
Retrieve system time.
MQTT client.
error_t mqttClientSetIdentifier(MqttClientContext *context, const char_t *clientId)
Set client identifier.
Definition: mqtt_client.c:330