mqtt_client.c
Go to the documentation of this file.
1 /**
2  * @file mqtt_client.c
3  * @brief MQTT client
4  *
5  * @section License
6  *
7  * Copyright (C) 2010-2018 Oryx Embedded SARL. All rights reserved.
8  *
9  * This file is part of CycloneTCP Open.
10  *
11  * This program is free software; you can redistribute it and/or
12  * modify it under the terms of the GNU General Public License
13  * as published by the Free Software Foundation; either version 2
14  * of the License, or (at your option) any later version.
15  *
16  * This program is distributed in the hope that it will be useful,
17  * but WITHOUT ANY WARRANTY; without even the implied warranty of
18  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
19  * GNU General Public License for more details.
20  *
21  * You should have received a copy of the GNU General Public License
22  * along with this program; if not, write to the Free Software Foundation,
23  * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
24  *
25  * @author Oryx Embedded SARL (www.oryx-embedded.com)
26  * @version 1.9.0
27  **/
28 
29 //Switch to the appropriate trace level
30 #define TRACE_LEVEL MQTT_TRACE_LEVEL
31 
32 //Dependencies
33 #include "core/net.h"
34 #include "mqtt/mqtt_client.h"
37 #include "mqtt/mqtt_client_misc.h"
38 #include "debug.h"
39 
40 //Check TCP/IP stack configuration
41 #if (MQTT_CLIENT_SUPPORT == ENABLED)
42 
43 
44 /**
45  * @brief Initialize MQTT client context
46  * @param[in] context Pointer to the MQTT client context
47  * @return Error code
48  **/
49 
51 {
52 #if (MQTT_CLIENT_TLS_SUPPORT == ENABLED)
53  error_t error;
54 #endif
55 
56  //Make sure the MQTT client context is valid
57  if(context == NULL)
59 
60  //Clear MQTT client context
61  memset(context, 0, sizeof(MqttClientContext));
62 
63 #if (MQTT_CLIENT_TLS_SUPPORT == ENABLED)
64  //Initialize TLS session state
65  error = tlsInitSessionState(&context->tlsSession);
66  //Any error to report?
67  if(error)
68  return error;
69 #endif
70 
71  //Default protocol version
72  context->settings.protocolLevel = MQTT_PROTOCOL_LEVEL_3_1_1;
73  //Default transport protocol
74  context->settings.transportProtocol = MQTT_TRANSPORT_PROTOCOL_TCP;
75  //Default keep-alive time interval
76  context->settings.keepAlive = MQTT_CLIENT_DEFAULT_KEEP_ALIVE;
77  //Default communication timeout
78  context->settings.timeout = MQTT_CLIENT_DEFAULT_TIMEOUT;
79 
80 #if (MQTT_CLIENT_WS_SUPPORT == ENABLED)
81  //Default resource name (for WebSocket connections only)
82  strcpy(context->settings.uri, "/");
83 #endif
84 
85  //Initialize state machine
86  context->state = MQTT_CLIENT_STATE_CLOSED;
87  //Initialize packet identifier
88  context->packetId = 0;
89 
90  //Successful initialization
91  return NO_ERROR;
92 }
93 
94 
95 /**
96  * @brief Initialize callback structure
97  * @param[in] callbacks Pointer to a structure that contains callback functions
98  * @return Error code
99  **/
100 
102 {
103  //Initialize callback structure
104  memset(callbacks, 0, sizeof(MqttClientCallbacks));
105 }
106 
107 
108 /**
109  * @brief Register MQTT client callbacks
110  * @param[in] context Pointer to the MQTT client context
111  * @param[in] callbacks Pointer to a structure that contains callback functions
112  * @return Error code
113  **/
114 
116  const MqttClientCallbacks *callbacks)
117 {
118  //Make sure the MQTT client context is valid
119  if(context == NULL)
121 
122  //Attach callback functions
123  context->callbacks = *callbacks;
124 
125  //Successful processing
126  return NO_ERROR;
127 }
128 
129 
130 /**
131  * @brief Set the MQTT protocol version to be used
132  * @param[in] context Pointer to the MQTT client context
133  * @param[in] protocolLevel MQTT protocol level (3.1 or 3.1.1)
134  * @return Error code
135  **/
136 
138  MqttProtocolLevel protocolLevel)
139 {
140  //Make sure the MQTT client context is valid
141  if(context == NULL)
143 
144  //Save the MQTT protocol version to be used
145  context->settings.protocolLevel = protocolLevel;
146 
147  //Successful processing
148  return NO_ERROR;
149 }
150 
151 
152 /**
153  * @brief Set the transport protocol to be used
154  * @param[in] context Pointer to the MQTT client context
155  * @param[in] transportProtocol Transport protocol to be used (TCP, TLS,
156  * WebSocket, or secure WebSocket)
157  * @return Error code
158  **/
159 
161  MqttTransportProtocol transportProtocol)
162 {
163  //Make sure the MQTT client context is valid
164  if(context == NULL)
166 
167  //Save the transport protocol to be used
168  context->settings.transportProtocol = transportProtocol;
169 
170  //Successful processing
171  return NO_ERROR;
172 }
173 
174 
175 /**
176  * @brief Set communication timeout
177  * @param[in] context Pointer to the MQTT client context
178  * @param[in] timeout Timeout value, in seconds
179  * @return Error code
180  **/
181 
182 error_t mqttClientSetTimeout(MqttClientContext *context, uint16_t timeout)
183 {
184  //Make sure the MQTT client context is valid
185  if(context == NULL)
187 
188  //Save timeout value
189  context->settings.timeout = timeout;
190 
191  //Successful processing
192  return NO_ERROR;
193 }
194 
195 
196 /**
197  * @brief Set keep-alive value
198  * @param[in] context Pointer to the MQTT client context
199  * @param[in] keepAlive Maximum time interval that is permitted to elapse
200  * between the point at which the client finishes transmitting one control
201  * packet and the point it starts sending the next
202  * @return Error code
203  **/
204 
205 error_t mqttClientSetKeepAlive(MqttClientContext *context, uint16_t keepAlive)
206 {
207  //Make sure the MQTT client context is valid
208  if(context == NULL)
210 
211  //Save keep-alive value
212  context->settings.keepAlive = keepAlive;
213 
214  //Successful processing
215  return NO_ERROR;
216 }
217 
218 
219 /**
220  * @brief Set the hostname of the resource being requested
221  * @param[in] context Pointer to the MQTT client context
222  * @param[in] host NULL-terminated string containing the hostname
223  * @return Error code
224  **/
225 
227 {
228  //Check parameters
229  if(context == NULL || host == NULL)
231 
232  //Make sure the length of the hostname is acceptable
233  if(strlen(host) > MQTT_CLIENT_MAX_HOST_LEN)
234  return ERROR_INVALID_LENGTH;
235 
236 #if (MQTT_CLIENT_WS_SUPPORT == ENABLED)
237  //Save hostname (for WebSocket connections only)
238  strcpy(context->settings.host, host);
239 #endif
240 
241  //Successful processing
242  return NO_ERROR;
243 }
244 
245 
246 /**
247  * @brief Set the name of the resource being requested
248  * @param[in] context Pointer to the MQTT client context
249  * @param[in] uri NULL-terminated string containing the URI
250  * @return Error code
251  **/
252 
254 {
255  //Check parameters
256  if(context == NULL || uri == NULL)
258 
259  //Make sure the length of the resource name is acceptable
260  if(strlen(uri) > MQTT_CLIENT_MAX_URI_LEN)
261  return ERROR_INVALID_LENGTH;
262 
263 #if (MQTT_CLIENT_WS_SUPPORT == ENABLED)
264  //Save resource name (for WebSocket connections only)
265  strcpy(context->settings.uri, uri);
266 #endif
267 
268  //Successful processing
269  return NO_ERROR;
270 }
271 
272 
273 /**
274  * @brief Set client identifier
275  * @param[in] context Pointer to the MQTT client context
276  * @param[in] clientId NULL-terminated string containing the client identifier
277  * @return Error code
278  **/
279 
281  const char_t *clientId)
282 {
283  //Check parameters
284  if(context == NULL || clientId == NULL)
286 
287  //Make sure the length of the client identifier is acceptable
288  if(strlen(clientId) > MQTT_CLIENT_MAX_ID_LEN)
289  return ERROR_INVALID_LENGTH;
290 
291  //Save client identifier
292  strcpy(context->settings.clientId, clientId);
293 
294  //Successful processing
295  return NO_ERROR;
296 }
297 
298 
299 /**
300  * @brief Set authentication information
301  * @param[in] context Pointer to the MQTT client context
302  * @param[in] username NULL-terminated string containing the user name to be used
303  * @param[in] password NULL-terminated string containing the password to be used
304  * @return Error code
305  **/
306 
308  const char_t *username, const char_t *password)
309 {
310  //Check parameters
311  if(context == NULL || username == NULL || password == NULL)
313 
314  //Make sure the length of the user name is acceptable
315  if(strlen(username) > MQTT_CLIENT_MAX_USERNAME_LEN)
316  return ERROR_INVALID_LENGTH;
317 
318  //Save user name
319  strcpy(context->settings.username, username);
320 
321  //Make sure the length of the password is acceptable
322  if(strlen(password) > MQTT_CLIENT_MAX_PASSWORD_LEN)
323  return ERROR_INVALID_LENGTH;
324 
325  //Save password
326  strcpy(context->settings.password, password);
327 
328  //Successful processing
329  return NO_ERROR;
330 }
331 
332 
333 /**
334  * @brief Specify the Will message
335  * @param[in] context Pointer to the MQTT client context
336  * @param[in] topic Will topic name
337  * @param[in] message Will message
338  * @param[in] length Length of the Will message
339  * @param[in] qos QoS level to be used when publishing the Will message
340  * @param[in] retain This flag specifies if the Will message is to be retained
341  * @return Error code
342  **/
343 
345  const void *message, size_t length, MqttQosLevel qos, bool_t retain)
346 {
347  MqttClientWillMessage *willMessage;
348 
349  //Check parameters
350  if(context == NULL || topic == NULL)
352 
353  //Make sure the length of the Will topic is acceptable
354  if(strlen(topic) > MQTT_CLIENT_MAX_WILL_TOPIC_LEN)
355  return ERROR_INVALID_LENGTH;
356 
357  //Point to the Will message
358  willMessage = &context->settings.willMessage;
359 
360  //Save Will topic
361  strcpy(willMessage->topic, topic);
362 
363  //Any message payload
364  if(length > 0)
365  {
366  //Sanity check
367  if(message == NULL)
369 
370  //Make sure the length of the Will message payload is acceptable
372  return ERROR_INVALID_LENGTH;
373 
374  //Save Will message payload
375  memcpy(willMessage->payload, message, length);
376  }
377 
378  //Length of the Will message payload
379  willMessage->length = length;
380  //QoS level to be used when publishing the Will message
381  willMessage->qos = qos;
382  //This flag specifies if the Will message is to be retained
383  willMessage->retain = retain;
384 
385  //Successful processing
386  return NO_ERROR;
387 }
388 
389 
390 /**
391  * @brief Bind the MQTT client to a particular network interface
392  * @param[in] context Pointer to the MQTT client context
393  * @param[in] interface Network interface to be used
394  * @return Error code
395  **/
396 
398  NetInterface *interface)
399 {
400  //Make sure the MQTT client context is valid
401  if(context == NULL)
403 
404  //Explicitly associate the MQTT client with the specified interface
405  context->interface = interface;
406 
407  //Successful processing
408  return NO_ERROR;
409 }
410 
411 
412 /**
413  * @brief Establish connection with the MQTT server
414  * @param[in] context Pointer to the MQTT client context
415  * @param[in] serverIpAddr IP address of the MQTT server to connect to
416  * @param[in] serverPort TCP port number that will be used to establish the
417  * connection
418  * @param[in] cleanSession If this flag is set, then the client and server
419  * must discard any previous session and start a new one
420  * @return Error code
421  **/
422 
424  const IpAddr *serverIpAddr, uint16_t serverPort, bool_t cleanSession)
425 {
426  error_t error;
427 
428  //Check parameters
429  if(context == NULL || serverIpAddr == NULL)
431 
432  //Initialize status code
433  error = NO_ERROR;
434 
435  //Establish network connection
436  while(context->state != MQTT_CLIENT_STATE_IDLE)
437  {
438  //Check current state
439  if(context->state == MQTT_CLIENT_STATE_CLOSED)
440  {
441  //Open network connection
442  error = mqttClientOpenConnection(context);
443 
444  //Check status code
445  if(!error)
446  {
447  //Debug message
448  TRACE_INFO("MQTT: Connecting to server %s port %" PRIu16 "...\r\n",
449  ipAddrToString(serverIpAddr, NULL), serverPort);
450 
451  //The network connection is open
453  }
454  else
455  {
456  //Clean up side effects
457  mqttClientCloseConnection(context);
458  }
459  }
460  else if(context->state == MQTT_CLIENT_STATE_CONNECTING)
461  {
462  //Establish network connection
463  error = mqttClientEstablishConnection(context,
464  serverIpAddr, serverPort);
465 
466  //Check status code
467  if(!error)
468  {
469  //Debug message
470  TRACE_INFO("MQTT: Connected to server\r\n");
471 
472  //The network connection is established
474  }
475  }
476  else if(context->state == MQTT_CLIENT_STATE_CONNECTED)
477  {
478  //Format CONNECT packet
479  error = mqttClientFormatConnect(context, cleanSession);
480 
481  //Check status code
482  if(!error)
483  {
484  //Debug message
485  TRACE_INFO("MQTT: Sending CONNECT packet (%" PRIuSIZE " bytes)...\r\n", context->packetLen);
486  TRACE_DEBUG_ARRAY(" ", context->packet, context->packetLen);
487 
488  //Save the type of the MQTT packet to be sent
489  context->packetType = MQTT_PACKET_TYPE_CONNECT;
490  //Point to the beginning of the packet
491  context->packetPos = 0;
492 
493  //Send CONNECT packet
495  }
496  }
497  else if(context->state == MQTT_CLIENT_STATE_SENDING_PACKET)
498  {
499  //Send more data
500  error = mqttClientProcessEvents(context, context->settings.timeout);
501  }
502  else if(context->state == MQTT_CLIENT_STATE_PACKET_SENT)
503  {
504  //Wait for CONNACK packet
505  error = mqttClientProcessEvents(context, context->settings.timeout);
506  }
507  else if(context->state == MQTT_CLIENT_STATE_RECEIVING_PACKET)
508  {
509  //Receive more data
510  error = mqttClientProcessEvents(context, context->settings.timeout);
511  }
512  else if(context->state == MQTT_CLIENT_STATE_PACKET_RECEIVED)
513  {
514  //Reset packet type
515  context->packetType = MQTT_PACKET_TYPE_INVALID;
516  //A CONNACK packet has been received
518  }
519  else
520  {
521  //Invalid state
522  error = ERROR_NOT_CONNECTED;
523  }
524 
525  //Any error to report?
526  if(error)
527  {
528 #if (NET_RTOS_SUPPORT == DISABLED)
529  //Timeout error?
530  if(error == ERROR_WOULD_BLOCK || error == ERROR_TIMEOUT)
531  break;
532 #endif
533  //Close connection
534  mqttClientCloseConnection(context);
535  //The connection is closed
537  //Exit immediately
538  break;
539  }
540  }
541 
542  //Return status code
543  return error;
544 }
545 
546 
547 /**
548  * @brief Publish message
549  * @param[in] context Pointer to the MQTT client context
550  * @param[in] topic Topic name
551  * @param[in] message Message payload
552  * @param[in] length Length of the message payload
553  * @param[in] qos QoS level to be used when publishing the message
554  * @param[in] retain This flag specifies if the message is to be retained
555  * @param[out] packetId Packet identifier used to send the PUBLISH packet
556  * @return Error code
557  **/
558 
560  const char_t *topic, const void *message, size_t length,
561  MqttQosLevel qos, bool_t retain, uint16_t *packetId)
562 {
563  error_t error;
564 
565  //Check parameters
566  if(context == NULL || topic == NULL)
568  if(message == NULL && length != 0)
570 
571  //Initialize status code
572  error = NO_ERROR;
573 
574  //Send PUBLISH packet and wait for PUBACK/PUBCOMP packet to be received
575  while(!error)
576  {
577  //Check current state
578  if(context->state == MQTT_CLIENT_STATE_IDLE)
579  {
580  //Check for transmission completion
581  if(context->packetType == MQTT_PACKET_TYPE_INVALID)
582  {
583  //Format PUBLISH packet
584  error = mqttClientFormatPublish(context, topic, message,
585  length, qos, retain);
586 
587  //Check status code
588  if(!error)
589  {
590  //Save the packet identifier used to send the PUBLISH packet
591  if(packetId != NULL)
592  *packetId = context->packetId;
593 
594  //Debug message
595  TRACE_INFO("MQTT: Sending PUBLISH packet (%" PRIuSIZE " bytes)...\r\n",
596  context->packetLen);
597 
598  //Dump the contents of the PUBLISH packet
599  TRACE_DEBUG_ARRAY(" ", context->packet, context->packetLen);
600 
601  //Save the type of the MQTT packet to be sent
602  context->packetType = MQTT_PACKET_TYPE_PUBLISH;
603  //Point to the beginning of the packet
604  context->packetPos = 0;
605 
606  //Send PUBLISH packet
608  }
609  }
610  else
611  {
612  //Reset packet type
613  context->packetType = MQTT_PACKET_TYPE_INVALID;
614  //We are done
615  break;
616  }
617  }
618  else if(context->state == MQTT_CLIENT_STATE_SENDING_PACKET)
619  {
620  //Send more data
621  error = mqttClientProcessEvents(context, context->settings.timeout);
622  }
623  else if(context->state == MQTT_CLIENT_STATE_PACKET_SENT)
624  {
625  //The last parameter is optional
626  if(packetId != NULL)
627  {
628  //Do not wait for PUBACK/PUBCOMP packet
630  }
631  else
632  {
633  //Check QoS level
634  if(qos == MQTT_QOS_LEVEL_0)
635  {
636  //No response is sent by the receiver and no retry is performed by the sender
638  }
639  else
640  {
641  //Wait for PUBACK/PUBCOMP packet
642  error = mqttClientProcessEvents(context, context->settings.timeout);
643  }
644  }
645  }
646  else if(context->state == MQTT_CLIENT_STATE_RECEIVING_PACKET)
647  {
648  //Receive more data
649  error = mqttClientProcessEvents(context, context->settings.timeout);
650  }
651  else if(context->state == MQTT_CLIENT_STATE_PACKET_RECEIVED)
652  {
653  //A PUBACK/PUBCOMP packet has been received
655  }
656  else
657  {
658  //Invalid state
659  error = ERROR_NOT_CONNECTED;
660  }
661  }
662 
663  //Return status code
664  return error;
665 }
666 
667 
668 /**
669  * @brief Subscribe to topic
670  * @param[in] context Pointer to the MQTT client context
671  * @param[in] topic Topic filter
672  * @param[in] qos Maximum QoS level at which the server can send application
673  * messages to the client
674  * @param[out] packetId Packet identifier used to send the SUBSCRIBE packet
675  * @return Error code
676  **/
677 
679  const char_t *topic, MqttQosLevel qos, uint16_t *packetId)
680 {
681  error_t error;
682 
683  //Check parameters
684  if(context == NULL || topic == NULL)
686 
687  //Initialize status code
688  error = NO_ERROR;
689 
690  //Send SUBSCRIBE packet and wait for SUBACK packet to be received
691  while(!error)
692  {
693  //Check current state
694  if(context->state == MQTT_CLIENT_STATE_IDLE)
695  {
696  //Check for transmission completion
697  if(context->packetType == MQTT_PACKET_TYPE_INVALID)
698  {
699  //Format SUBSCRIBE packet
700  error = mqttClientFormatSubscribe(context, topic, qos);
701 
702  //Check status code
703  if(!error)
704  {
705  //Save the packet identifier used to send the SUBSCRIBE packet
706  if(packetId != NULL)
707  *packetId = context->packetId;
708 
709  //Debug message
710  TRACE_INFO("MQTT: Sending SUBSCRIBE packet (%" PRIuSIZE " bytes)...\r\n",
711  context->packetLen);
712 
713  //Dump the contents of the SUBSCRIBE packet
714  TRACE_DEBUG_ARRAY(" ", context->packet, context->packetLen);
715 
716  //Save the type of the MQTT packet to be sent
717  context->packetType = MQTT_PACKET_TYPE_SUBSCRIBE;
718  //Point to the beginning of the packet
719  context->packetPos = 0;
720 
721  //Send SUBSCRIBE packet
723  }
724  }
725  else
726  {
727  //Reset packet type
728  context->packetType = MQTT_PACKET_TYPE_INVALID;
729  //We are done
730  break;
731  }
732  }
733  else if(context->state == MQTT_CLIENT_STATE_SENDING_PACKET)
734  {
735  //Send more data
736  error = mqttClientProcessEvents(context, context->settings.timeout);
737  }
738  else if(context->state == MQTT_CLIENT_STATE_PACKET_SENT)
739  {
740  //The last parameter is optional
741  if(packetId != NULL)
742  {
743  //Do not wait for SUBACK packet
745  }
746  else
747  {
748  //Wait for SUBACK packet
749  error = mqttClientProcessEvents(context, context->settings.timeout);
750  }
751  }
752  else if(context->state == MQTT_CLIENT_STATE_RECEIVING_PACKET)
753  {
754  //Receive more data
755  error = mqttClientProcessEvents(context, context->settings.timeout);
756  }
757  else if(context->state == MQTT_CLIENT_STATE_PACKET_RECEIVED)
758  {
759  //A SUBACK packet has been received
761  }
762  else
763  {
764  //Invalid state
765  error = ERROR_NOT_CONNECTED;
766  }
767  }
768 
769  //Return status code
770  return error;
771 }
772 
773 
774 /**
775  * @brief Unsubscribe from topic
776  * @param[in] context Pointer to the MQTT client context
777  * @param[in] topic Topic filter
778  * @param[out] packetId Packet identifier used to send the UNSUBSCRIBE packet
779  * @return Error code
780  **/
781 
783  const char_t *topic, uint16_t *packetId)
784 {
785  error_t error;
786 
787  //Check parameters
788  if(context == NULL || topic == NULL)
790 
791  //Initialize status code
792  error = NO_ERROR;
793 
794  //Send UNSUBSCRIBE packet and wait for UNSUBACK packet to be received
795  while(!error)
796  {
797  //Check current state
798  if(context->state == MQTT_CLIENT_STATE_IDLE)
799  {
800  //Check for transmission completion
801  if(context->packetType == MQTT_PACKET_TYPE_INVALID)
802  {
803  //Format UNSUBSCRIBE packet
804  error = mqttClientFormatUnsubscribe(context, topic);
805 
806  //Check status code
807  if(!error)
808  {
809  //Save the packet identifier used to send the UNSUBSCRIBE packet
810  if(packetId != NULL)
811  *packetId = context->packetId;
812 
813  //Debug message
814  TRACE_INFO("MQTT: Sending UNSUBSCRIBE packet (%" PRIuSIZE " bytes)...\r\n",
815  context->packetLen);
816 
817  //Dump the contents of the UNSUBSCRIBE packet
818  TRACE_DEBUG_ARRAY(" ", context->packet, context->packetLen);
819 
820  //Save the type of the MQTT packet to be sent
821  context->packetType = MQTT_PACKET_TYPE_UNSUBSCRIBE;
822  //Point to the beginning of the packet
823  context->packetPos = 0;
824 
825  //Send UNSUBSCRIBE packet
827  }
828  }
829  else
830  {
831  //Reset packet type
832  context->packetType = MQTT_PACKET_TYPE_INVALID;
833  //We are done
834  break;
835  }
836  }
837  else if(context->state == MQTT_CLIENT_STATE_SENDING_PACKET)
838  {
839  //Send more data
840  error = mqttClientProcessEvents(context, context->settings.timeout);
841  }
842  else if(context->state == MQTT_CLIENT_STATE_PACKET_SENT)
843  {
844  //The last parameter is optional
845  if(packetId != NULL)
846  {
847  //Do not wait for UNSUBACK packet
849  }
850  else
851  {
852  //Wait for UNSUBACK packet
853  error = mqttClientProcessEvents(context, context->settings.timeout);
854  }
855  }
856  else if(context->state == MQTT_CLIENT_STATE_RECEIVING_PACKET)
857  {
858  //Receive more data
859  error = mqttClientProcessEvents(context, context->settings.timeout);
860  }
861  else if(context->state == MQTT_CLIENT_STATE_PACKET_RECEIVED)
862  {
863  //An UNSUBACK packet has been received
865  }
866  else
867  {
868  //Invalid state
869  error = ERROR_NOT_CONNECTED;
870  }
871  }
872 
873  //Return status code
874  return error;
875 }
876 
877 
878 /**
879  * @brief Send ping request
880  * @param[in] context Pointer to the MQTT client context
881  * @param[out] rtt Round-trip time (optional parameter)
882  * @return Error code
883  **/
884 
886 {
887  error_t error;
888 
889  //Make sure the MQTT client context is valid
890  if(context == NULL)
892 
893  //Initialize status code
894  error = NO_ERROR;
895 
896  //Send PINGREQ packet and wait for PINGRESP packet to be received
897  while(!error)
898  {
899  //Check current state
900  if(context->state == MQTT_CLIENT_STATE_IDLE)
901  {
902  //Check for transmission completion
903  if(context->packetType == MQTT_PACKET_TYPE_INVALID)
904  {
905  //Format PINGREQ packet
906  error = mqttClientFormatPingReq(context);
907 
908  //Check status code
909  if(!error)
910  {
911  //Debug message
912  TRACE_INFO("MQTT: Sending PINGREQ packet (%" PRIuSIZE " bytes)...\r\n",
913  context->packetLen);
914 
915  //Dump the contents of the PINGREQ packet
916  TRACE_DEBUG_ARRAY(" ", context->packet, context->packetLen);
917 
918  //Save the type of the MQTT packet to be sent
919  context->packetType = MQTT_PACKET_TYPE_PINGREQ;
920  //Point to the beginning of the packet
921  context->packetPos = 0;
922 
923  //Send PINGREQ packet
925 
926  //Save the time at which the request was sent
927  if(rtt != NULL)
928  context->pingTimestamp = osGetSystemTime();
929  }
930  }
931  else
932  {
933  //Reset packet type
934  context->packetType = MQTT_PACKET_TYPE_INVALID;
935  //We are done
936  break;
937  }
938  }
939  else if(context->state == MQTT_CLIENT_STATE_SENDING_PACKET)
940  {
941  //Send more data
942  error = mqttClientProcessEvents(context, context->settings.timeout);
943  }
944  else if(context->state == MQTT_CLIENT_STATE_PACKET_SENT)
945  {
946  //The last parameter is optional
947  if(rtt != NULL)
948  {
949  //Wait for PINGRESP packet
950  error = mqttClientProcessEvents(context, context->settings.timeout);
951  }
952  else
953  {
954  //Do not wait for PINGRESP packet
956  }
957  }
958  else if(context->state == MQTT_CLIENT_STATE_RECEIVING_PACKET)
959  {
960  //Receive more data
961  error = mqttClientProcessEvents(context, context->settings.timeout);
962  }
963  else if(context->state == MQTT_CLIENT_STATE_PACKET_RECEIVED)
964  {
965  //The last parameter is optional
966  if(rtt != NULL)
967  {
968  //Compute round-trip time
969  *rtt = osGetSystemTime() - context->pingTimestamp;
970  }
971 
972  //A PINGRESP packet has been received
974  }
975  else
976  {
977  //Invalid state
978  error = ERROR_NOT_CONNECTED;
979  }
980  }
981 
982  //Return status code
983  return error;
984 }
985 
986 
987 /**
988  * @brief Gracefully disconnect from the MQTT server
989  * @param[in] context Pointer to the MQTT client context
990  * @return Error code
991  **/
992 
994 {
995  error_t error;
996 
997  //Make sure the MQTT client context is valid
998  if(context == NULL)
1000 
1001  //Initialize status code
1002  error = NO_ERROR;
1003 
1004  //Send DISCONNECT packet and shutdown network connection
1005  while(context->state != MQTT_CLIENT_STATE_DISCONNECTED)
1006  {
1007  //Check current state
1008  if(context->state == MQTT_CLIENT_STATE_IDLE)
1009  {
1010  //Format DISCONNECT packet
1011  error = mqttClientFormatDisconnect(context);
1012 
1013  //Check status code
1014  if(!error)
1015  {
1016  //Debug message
1017  TRACE_INFO("MQTT: Sending DISCONNECT packet (%" PRIuSIZE " bytes)...\r\n", context->packetLen);
1018  TRACE_DEBUG_ARRAY(" ", context->packet, context->packetLen);
1019 
1020  //Save the type of the MQTT packet to be sent
1021  context->packetType = MQTT_PACKET_TYPE_DISCONNECT;
1022  //Point to the beginning of the packet
1023  context->packetPos = 0;
1024 
1025  //Send DISCONNECT packet
1027  }
1028  }
1029  else if(context->state == MQTT_CLIENT_STATE_SENDING_PACKET)
1030  {
1031  //Send more data
1032  error = mqttClientProcessEvents(context, context->settings.timeout);
1033  }
1034  else if(context->state == MQTT_CLIENT_STATE_PACKET_SENT)
1035  {
1036  //Debug message
1037  TRACE_INFO("MQTT: Shutting down connection...\r\n");
1038 
1039  //After sending a DISCONNECT packet the client must not send any
1040  //more control packets on that network connection
1042  }
1043  else if(context->state == MQTT_CLIENT_STATE_DISCONNECTING)
1044  {
1045  //Properly dispose the network connection
1046  error = mqttClientShutdownConnection(context);
1047 
1048  //Check status code
1049  if(!error)
1050  {
1051  //The MQTT client is disconnected
1053  }
1054  }
1055  else
1056  {
1057  //Invalid state
1058  error = ERROR_NOT_CONNECTED;
1059  }
1060 
1061  //Any error to report?
1062  if(error)
1063  break;
1064  }
1065 
1066  //Return status code
1067  return error;
1068 }
1069 
1070 
1071 /**
1072  * @brief Close the connection with the MQTT server
1073  * @param[in] context Pointer to the MQTT client context
1074  * @return Error code
1075  **/
1076 
1078 {
1079  //Make sure the MQTT client context is valid
1080  if(context == NULL)
1081  return ERROR_INVALID_PARAMETER;
1082 
1083  //Close connection
1084  mqttClientCloseConnection(context);
1085 
1086 #if (MQTT_CLIENT_TLS_SUPPORT == ENABLED)
1087  //Release TLS session state
1088  tlsFreeSessionState(&context->tlsSession);
1089 #endif
1090 
1091  //The connection is closed
1093 
1094  //Network connection successfully closed
1095  return NO_ERROR;
1096 }
1097 
1098 
1099 /**
1100  * @brief Process MQTT client events
1101  * @param[in] context Pointer to the MQTT client context
1102  * @param[in] timeout Maximum time to wait before returning
1103  * @return Error code
1104  **/
1105 
1107 {
1108  error_t error;
1109  size_t n;
1110 
1111  //It is the responsibility of the client to ensure that the interval
1112  //between control packets being sent does not exceed the keep-alive value
1113  error = mqttClientCheckKeepAlive(context);
1114 
1115  //Check status code
1116  if(!error)
1117  {
1118  //Check current state
1119  if(context->state == MQTT_CLIENT_STATE_IDLE ||
1120  context->state == MQTT_CLIENT_STATE_PACKET_SENT)
1121  {
1122  //Wait for incoming data
1123  error = mqttClientWaitForData(context, timeout);
1124 
1125  //Check status code
1126  if(!error)
1127  {
1128  //Initialize context
1129  context->packet = context->buffer;
1130  context->packetPos = 0;
1131  context->packetLen = 0;
1132  context->remainingLen = 0;
1133 
1134  //Start receiving the packet
1136  }
1137  }
1138  else if(context->state == MQTT_CLIENT_STATE_RECEIVING_PACKET)
1139  {
1140  //Receive the incoming packet
1141  error = mqttClientReceivePacket(context);
1142 
1143  //Check status code
1144  if(!error)
1145  {
1146  //Process MQTT control packet
1147  error = mqttClientProcessPacket(context);
1148 
1149  //Update MQTT client state
1150  if(context->state == MQTT_CLIENT_STATE_RECEIVING_PACKET)
1151  {
1152  if(context->packetType == MQTT_PACKET_TYPE_INVALID)
1154  else
1156  }
1157  }
1158  }
1159  else if(context->state == MQTT_CLIENT_STATE_SENDING_PACKET)
1160  {
1161  //Any remaining data to be sent?
1162  if(context->packetPos < context->packetLen)
1163  {
1164  //Send more data
1165  error = mqttClientSendData(context, context->packet + context->packetPos,
1166  context->packetLen - context->packetPos, &n, 0);
1167 
1168  //Advance data pointer
1169  context->packetPos += n;
1170  }
1171  else
1172  {
1173  //Save the time at which the message was sent
1174  context->keepAliveTimestamp = osGetSystemTime();
1175 
1176  //Update MQTT client state
1177  if(context->packetType == MQTT_PACKET_TYPE_INVALID)
1179  else
1181  }
1182  }
1183  }
1184 
1185  //Return status code
1186  return error;
1187 }
1188 
1189 #endif
error_t mqttClientSubscribe(MqttClientContext *context, const char_t *topic, MqttQosLevel qos, uint16_t *packetId)
Subscribe to topic.
Definition: mqtt_client.c:678
uint32_t systime_t
Definition: compiler_port.h:44
error_t mqttClientEstablishConnection(MqttClientContext *context, const IpAddr *serverIpAddr, uint16_t serverPort)
Establish network connection.
error_t mqttClientUnsubscribe(MqttClientContext *context, const char_t *topic, uint16_t *packetId)
Unsubscribe from topic.
Definition: mqtt_client.c:782
Will message.
Definition: mqtt_client.h:257
#define MQTT_CLIENT_MAX_HOST_LEN
Definition: mqtt_client.h:73
char char_t
Definition: compiler_port.h:41
MqttTransportProtocol
Transport protocol.
Definition: mqtt_common.h:71
uint8_t payload[MQTT_CLIENT_MAX_WILL_PAYLOAD_LEN]
Will message payload.
Definition: mqtt_client.h:260
systime_t osGetSystemTime(void)
Retrieve system time.
TCP/IP stack core.
void mqttClientChangeState(MqttClientContext *context, MqttClientState newState)
Update MQTT client state.
Helper functions for MQTT client.
Invalid packet.
Definition: mqtt_common.h:98
error_t mqttClientInit(MqttClientContext *context)
Initialize MQTT client context.
Definition: mqtt_client.c:50
Debugging facilities.
uint8_t message[]
Definition: chap.h:150
Client subscribe request.
Definition: mqtt_common.h:106
error_t mqttClientSendData(MqttClientContext *context, const void *data, size_t length, size_t *written, uint_t flags)
Send data using the relevant transport protocol.
Invalid parameter.
Definition: error.h:45
#define MQTT_CLIENT_MAX_USERNAME_LEN
Definition: mqtt_client.h:94
IP network address.
Definition: ip.h:57
MQTT client.
error_t mqttClientFormatPingReq(MqttClientContext *context)
Format PINGREQ packet.
char_t * ipAddrToString(const IpAddr *ipAddr, char_t *str)
Convert a binary IP address to a string representation.
Definition: ip.c:685
error_t tlsInitSessionState(TlsSessionState *session)
Initialize session state.
Definition: tls.c:2312
#define TRACE_DEBUG_ARRAY(p, a, n)
Definition: debug.h:99
error_t mqttClientFormatUnsubscribe(MqttClientContext *context, const char_t *topic)
Format UNSUBSCRIBE packet.
MQTT packet parsing and formatting.
error_t mqttClientProcessPacket(MqttClientContext *context)
Process incoming MQTT packet.
error_t mqttClientBindToInterface(MqttClientContext *context, NetInterface *interface)
Bind the MQTT client to a particular network interface.
Definition: mqtt_client.c:397
error_t mqttClientOpenConnection(MqttClientContext *context)
Open network connection.
uint8_t qos
Definition: mqtt_common.h:177
error_t mqttClientClose(MqttClientContext *context)
Close the connection with the MQTT server.
Definition: mqtt_client.c:1077
error_t mqttClientCheckKeepAlive(MqttClientContext *context)
Check keep-alive time interval.
error_t mqttClientConnect(MqttClientContext *context, const IpAddr *serverIpAddr, uint16_t serverPort, bool_t cleanSession)
Establish connection with the MQTT server.
Definition: mqtt_client.c:423
error_t mqttClientSetUri(MqttClientContext *context, const char_t *uri)
Set the name of the resource being requested.
Definition: mqtt_client.c:253
MQTT client callback functions.
Definition: mqtt_client.h:271
size_t length
Length of the Will message payload.
Definition: mqtt_client.h:261
error_t mqttClientSetTimeout(MqttClientContext *context, uint16_t timeout)
Set communication timeout.
Definition: mqtt_client.c:182
void mqttClientCloseConnection(MqttClientContext *context)
Close network connection.
error_t mqttClientSetKeepAlive(MqttClientContext *context, uint16_t keepAlive)
Set keep-alive value.
Definition: mqtt_client.c:205
Transport protocol abstraction layer.
Client request to connect to server.
Definition: mqtt_common.h:99
MqttQosLevel
Quality of service level.
Definition: mqtt_common.h:84
uint8_t cleanSession
MqttQosLevel qos
QoS level to be used when publishing the Will message.
Definition: mqtt_client.h:262
void tlsFreeSessionState(TlsSessionState *session)
Properly dispose a session state.
Definition: tls.c:2594
char_t clientId[]
error_t mqttClientPublish(MqttClientContext *context, const char_t *topic, const void *message, size_t length, MqttQosLevel qos, bool_t retain, uint16_t *packetId)
Publish message.
Definition: mqtt_client.c:559
error_t mqttClientFormatSubscribe(MqttClientContext *context, const char_t *topic, MqttQosLevel qos)
Format SUBSCRIBE packet.
error_t mqttClientDisconnect(MqttClientContext *context)
Gracefully disconnect from the MQTT server.
Definition: mqtt_client.c:993
error_t mqttClientSetProtocolLevel(MqttClientContext *context, MqttProtocolLevel protocolLevel)
Set the MQTT protocol version to be used.
Definition: mqtt_client.c:137
error_t mqttClientSetTransportProtocol(MqttClientContext *context, MqttTransportProtocol transportProtocol)
Set the transport protocol to be used.
Definition: mqtt_client.c:160
char_t topic[MQTT_CLIENT_MAX_WILL_TOPIC_LEN+1]
Will topic name.
Definition: mqtt_client.h:259
Client is disconnecting.
Definition: mqtt_common.h:112
#define MQTT_CLIENT_MAX_WILL_TOPIC_LEN
Definition: mqtt_client.h:108
#define MQTT_CLIENT_MAX_URI_LEN
Definition: mqtt_client.h:80
#define TRACE_INFO(...)
Definition: debug.h:86
#define MQTT_CLIENT_MAX_ID_LEN
Definition: mqtt_client.h:87
Success.
Definition: error.h:42
#define MQTT_CLIENT_DEFAULT_KEEP_ALIVE
Definition: mqtt_client.h:59
Unsubscribe request.
Definition: mqtt_common.h:108
error_t
Error codes.
Definition: error.h:40
uint8_t retain
Definition: mqtt_common.h:176
error_t mqttClientWaitForData(MqttClientContext *context, systime_t timeout)
Wait for incoming data.
#define MQTT_CLIENT_MAX_PASSWORD_LEN
Definition: mqtt_client.h:101
void mqttClientInitCallbacks(MqttClientCallbacks *callbacks)
Initialize callback structure.
Definition: mqtt_client.c:101
error_t mqttClientProcessEvents(MqttClientContext *context, systime_t timeout)
Process MQTT client events.
Definition: mqtt_client.c:1106
#define MQTT_CLIENT_MAX_WILL_PAYLOAD_LEN
Definition: mqtt_client.h:115
#define PRIuSIZE
Definition: compiler_port.h:72
error_t mqttClientRegisterCallbacks(MqttClientContext *context, const MqttClientCallbacks *callbacks)
Register MQTT client callbacks.
Definition: mqtt_client.c:115
#define NetInterface
Definition: net.h:34
error_t mqttClientShutdownConnection(MqttClientContext *context)
Shutdown network connection.
error_t mqttClientSetAuthInfo(MqttClientContext *context, const char_t *username, const char_t *password)
Set authentication information.
Definition: mqtt_client.c:307
error_t mqttClientFormatDisconnect(MqttClientContext *context)
Format DISCONNECT packet.
#define MQTT_CLIENT_DEFAULT_TIMEOUT
Definition: mqtt_client.h:66
Publish message.
Definition: mqtt_common.h:101
At most once delivery.
Definition: mqtt_common.h:86
error_t mqttClientSetHost(MqttClientContext *context, const char_t *host)
Set the hostname of the resource being requested.
Definition: mqtt_client.c:226
error_t mqttClientSetWillMessage(MqttClientContext *context, const char_t *topic, const void *message, size_t length, MqttQosLevel qos, bool_t retain)
Specify the Will message.
Definition: mqtt_client.c:344
uint8_t length
Definition: dtls_misc.h:140
uint8_t n
error_t mqttClientSetIdentifier(MqttClientContext *context, const char_t *clientId)
Set client identifier.
Definition: mqtt_client.c:280
#define MqttClientContext
Definition: mqtt_client.h:140
int bool_t
Definition: compiler_port.h:47
error_t mqttClientFormatPublish(MqttClientContext *context, const char_t *topic, const void *message, size_t length, MqttQosLevel qos, bool_t retain)
Format PUBLISH packet.
error_t mqttClientPing(MqttClientContext *context, systime_t *rtt)
Send ping request.
Definition: mqtt_client.c:885
MqttProtocolLevel
MQTT protocol level.
Definition: mqtt_common.h:60
bool_t retain
Specifies if the Will message is to be retained.
Definition: mqtt_client.h:263
error_t mqttClientReceivePacket(MqttClientContext *context)
Receive MQTT packet.
error_t mqttClientFormatConnect(MqttClientContext *context, bool_t cleanSession)
Format CONNECT packet.
MQTT version 3.1.1.
Definition: mqtt_common.h:63