mqtt_client_misc.c
Go to the documentation of this file.
1 /**
2  * @file mqtt_client_misc.c
3  * @brief Helper functions for MQTT client
4  *
5  * @section License
6  *
7  * SPDX-License-Identifier: GPL-2.0-or-later
8  *
9  * Copyright (C) 2010-2019 Oryx Embedded SARL. All rights reserved.
10  *
11  * This file is part of CycloneTCP Open.
12  *
13  * This program is free software; you can redistribute it and/or
14  * modify it under the terms of the GNU General Public License
15  * as published by the Free Software Foundation; either version 2
16  * of the License, or (at your option) any later version.
17  *
18  * This program is distributed in the hope that it will be useful,
19  * but WITHOUT ANY WARRANTY; without even the implied warranty of
20  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
21  * GNU General Public License for more details.
22  *
23  * You should have received a copy of the GNU General Public License
24  * along with this program; if not, write to the Free Software Foundation,
25  * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
26  *
27  * @author Oryx Embedded SARL (www.oryx-embedded.com)
28  * @version 1.9.6
29  **/
30 
31 //Switch to the appropriate trace level
32 #define TRACE_LEVEL MQTT_TRACE_LEVEL
33 
34 //Dependencies
35 #include "core/net.h"
36 #include "mqtt/mqtt_client.h"
39 #include "mqtt/mqtt_client_misc.h"
40 #include "debug.h"
41 
42 //Check TCP/IP stack configuration
43 #if (MQTT_CLIENT_SUPPORT == ENABLED)
44 
45 
46 /**
47  * @brief Update MQTT client state
48  * @param[in] context Pointer to the MQTT client context
49  * @param[in] newState New state to switch to
50  **/
51 
53 {
54  //Switch to the new state
55  context->state = newState;
56 }
57 
58 
59 /**
60  * @brief Process MQTT client events
61  * @param[in] context Pointer to the MQTT client context
62  * @param[in] timeout Maximum time to wait before returning
63  * @return Error code
64  **/
65 
67 {
68  error_t error;
69  size_t n;
70 
71  //It is the responsibility of the client to ensure that the interval
72  //between control packets being sent does not exceed the keep-alive value
73  error = mqttClientCheckKeepAlive(context);
74 
75  //Check status code
76  if(!error)
77  {
78  //Check current state
79  if(context->state == MQTT_CLIENT_STATE_IDLE ||
80  context->state == MQTT_CLIENT_STATE_PACKET_SENT)
81  {
82  //Wait for incoming data
83  error = mqttClientWaitForData(context, timeout);
84 
85  //Check status code
86  if(!error)
87  {
88  //Initialize context
89  context->packet = context->buffer;
90  context->packetPos = 0;
91  context->packetLen = 0;
92  context->remainingLen = 0;
93 
94  //Start receiving the packet
96  }
97  }
98  else if(context->state == MQTT_CLIENT_STATE_RECEIVING_PACKET)
99  {
100  //Receive the incoming packet
101  error = mqttClientReceivePacket(context);
102 
103  //Check status code
104  if(!error)
105  {
106  //Process MQTT control packet
107  error = mqttClientProcessPacket(context);
108 
109  //Update MQTT client state
110  if(context->state == MQTT_CLIENT_STATE_RECEIVING_PACKET)
111  {
112  if(context->packetType == MQTT_PACKET_TYPE_INVALID)
114  else
116  }
117  }
118  }
119  else if(context->state == MQTT_CLIENT_STATE_SENDING_PACKET)
120  {
121  //Any remaining data to be sent?
122  if(context->packetPos < context->packetLen)
123  {
124  //Send more data
125  error = mqttClientSendData(context, context->packet + context->packetPos,
126  context->packetLen - context->packetPos, &n, 0);
127 
128  //Advance data pointer
129  context->packetPos += n;
130  }
131  else
132  {
133  //Save the time at which the message was sent
134  context->keepAliveTimestamp = osGetSystemTime();
135 
136  //Update MQTT client state
137  if(context->packetType == MQTT_PACKET_TYPE_INVALID)
139  else
141  }
142  }
143  }
144 
145  //Return status code
146  return error;
147 }
148 
149 
150 /**
151  * @brief Check keep-alive time interval
152  * @param[in] context Pointer to the MQTT client context
153  * @return Error code
154  **/
155 
157 {
158  error_t error;
159  systime_t time;
160  systime_t keepAlive;
161 
162  //Initialize status code
163  error = NO_ERROR;
164 
165  //In the absence of sending any other control packets, the client must
166  //send a PINGREQ packet
167  if(context->state == MQTT_CLIENT_STATE_IDLE ||
168  context->state == MQTT_CLIENT_STATE_PACKET_SENT)
169  {
170  //A keep-alive value of zero has the effect of turning off the keep
171  //alive mechanism
172  if(context->settings.keepAlive != 0)
173  {
174  //Get current time
175  time = osGetSystemTime();
176 
177  //Convert the keep-alive value to milliseconds
178  keepAlive = context->settings.keepAlive * 1000;
179 
180  //It is the responsibility of the client to ensure that the interval
181  //between control packets being sent does not exceed the keep-alive value
182  if(timeCompare(time, context->keepAliveTimestamp + keepAlive) >= 0)
183  {
184  //Format PINGREQ packet
185  error = mqttClientFormatPingReq(context);
186 
187  //Check status code
188  if(!error)
189  {
190  //Debug message
191  TRACE_INFO("MQTT: Sending PINGREQ packet (%" PRIuSIZE " bytes)...\r\n", context->packetLen);
192  TRACE_DEBUG_ARRAY(" ", context->packet, context->packetLen);
193 
194  //Point to the beginning of the packet
195  context->packetPos = 0;
196 
197  //Send PINGREQ packet
199  }
200  }
201  }
202  }
203 
204  //Return status code
205  return error;
206 }
207 
208 
209 /**
210  * @brief Serialize fixed header
211  * @param[in] buffer Pointer to the output buffer
212  * @param[in,out] pos Current position
213  * @param[in] type MQTT control packet type
214  * @param[in] dup DUP flag
215  * @param[in] qos QoS field
216  * @param[in] retain RETAIN flag
217  * @param[in] remainingLen Length of the variable header and the payload
218  * @return Error code
219  **/
220 
221 error_t mqttSerializeHeader(uint8_t *buffer, size_t *pos, MqttPacketType type,
222  bool_t dup, MqttQosLevel qos, bool_t retain, size_t remainingLen)
223 {
224  uint_t i;
225  uint_t k;
226  size_t n;
227  MqttPacketHeader *header;
228 
229  //Point to the current position
230  n = *pos;
231 
232  //The Remaining Length is encoded using a variable length encoding scheme
233  if(remainingLen < 128)
234  k = 1;
235  else if(remainingLen < 16384)
236  k = 2;
237  else if(remainingLen < 2097152)
238  k = 3;
239  else if(remainingLen < 268435456)
240  k = 4;
241  else
242  return ERROR_INVALID_LENGTH;
243 
244  //Sanity check
245  if(n < (sizeof(MqttPacketHeader) + k))
246  return ERROR_BUFFER_OVERFLOW;
247 
248  //Position where to format the header
249  n -= sizeof(MqttPacketHeader) + k;
250 
251  //Point to the MQTT packet header
252  header = (MqttPacketHeader *) (buffer + n);
253 
254  //Encode the first byte of the header
255  header->type = type;
256  header->dup = dup;
257  header->qos = qos;
258  header->retain = retain;
259 
260  //Encode the Remaining Length field
261  for(i = 0; i < k; i++)
262  {
263  //The least significant seven bits of each byte encode the data
264  header->length[i] = remainingLen & 0xFF;
265  remainingLen >>= 7;
266 
267  //The most significant bit is used to indicate that there are
268  //following bytes in the representation
269  if(remainingLen > 0)
270  header->length[i] |= 0x80;
271  }
272 
273  //Update current position
274  *pos = n;
275 
276  //Successful processing
277  return NO_ERROR;
278 }
279 
280 
281 /**
282  * @brief Write a 8-bit integer to the output buffer
283  * @param[in] buffer Pointer to the output buffer
284  * @param[in] bufferLen Maximum number of bytes the output buffer can hold
285  * @param[in,out] pos Current position
286  * @param[in] value 8-bit integer to be serialized
287  * @return Error code
288  **/
289 
290 error_t mqttSerializeByte(uint8_t *buffer, size_t bufferLen,
291  size_t *pos, uint8_t value)
292 {
293  size_t n;
294 
295  //Point to the current position
296  n = *pos;
297 
298  //Make sure the output buffer is large enough
299  if((n + sizeof(uint8_t)) > bufferLen)
300  return ERROR_BUFFER_OVERFLOW;
301 
302  //Write the byte to the output buffer
303  buffer[n++] = value;
304 
305  //Advance current position
306  *pos = n;
307 
308  //Successful processing
309  return NO_ERROR;
310 }
311 
312 
313 /**
314  * @brief Write a 16-bit integer to the output buffer
315  * @param[in] buffer Pointer to the output buffer
316  * @param[in] bufferLen Maximum number of bytes the output buffer can hold
317  * @param[in,out] pos Current position
318  * @param[in] value 16-bit integer to be serialized
319  * @return Error code
320  **/
321 
322 error_t mqttSerializeShort(uint8_t *buffer, size_t bufferLen,
323  size_t *pos, uint16_t value)
324 {
325  size_t n;
326 
327  //Point to the current position
328  n = *pos;
329 
330  //Make sure the output buffer is large enough
331  if((n + sizeof(uint16_t)) > bufferLen)
332  return ERROR_BUFFER_OVERFLOW;
333 
334  //Write the short integer to the output buffer
335  buffer[n++] = MSB(value);
336  buffer[n++] = LSB(value);
337 
338  //Advance current position
339  *pos = n;
340 
341  //Successful processing
342  return NO_ERROR;
343 }
344 
345 
346 /**
347  * @brief Serialize string
348  * @param[in] buffer Pointer to the output buffer
349  * @param[in] bufferLen Maximum number of bytes the output buffer can hold
350  * @param[in,out] pos Current position
351  * @param[in] string Pointer to the string to be serialized
352  * @param[in] stringLen Length of the string, in bytes
353  * @return Error code
354  **/
355 
356 error_t mqttSerializeString(uint8_t *buffer, size_t bufferLen,
357  size_t *pos, const void *string, size_t stringLen)
358 {
359  size_t n;
360 
361  //Point to the current position
362  n = *pos;
363 
364  //Make sure the output buffer is large enough to hold the string
365  if((n + sizeof(uint16_t) + stringLen) > bufferLen)
366  return ERROR_BUFFER_OVERFLOW;
367 
368  //Encode the length field
369  buffer[n++] = MSB(stringLen);
370  buffer[n++] = LSB(stringLen);
371 
372  //Write the string to the output buffer
373  memcpy(buffer + n, string, stringLen);
374 
375  //Advance current position
376  *pos = n + stringLen;
377 
378  //Successful processing
379  return NO_ERROR;
380 }
381 
382 
383 /**
384  * @brief Serialize raw data
385  * @param[in] buffer Pointer to the output buffer
386  * @param[in] bufferLen Maximum number of bytes the output buffer can hold
387  * @param[in,out] pos Current position
388  * @param[in] data Pointer to the raw data to be serialized
389  * @param[in] dataLen Length of the raw data, in bytes
390  * @return Error code
391  **/
392 
393 error_t mqttSerializeData(uint8_t *buffer, size_t bufferLen,
394  size_t *pos, const void *data, size_t dataLen)
395 {
396  size_t n;
397 
398  //Point to the current position
399  n = *pos;
400 
401  //Make sure the output buffer is large enough to hold the data
402  if((n + dataLen) > bufferLen)
403  return ERROR_BUFFER_OVERFLOW;
404 
405  //Write the data to the output buffer
406  memcpy(buffer + n, data, dataLen);
407 
408  //Advance current position
409  *pos = n + dataLen;
410 
411  //Successful processing
412  return NO_ERROR;
413 }
414 
415 
416 /**
417  * @brief Deserialize fixed header
418  * @param[in] buffer Pointer to the input buffer
419  * @param[in] bufferLen Length of the input buffer
420  * @param[in,out] pos Current position
421  * @param[out] type MQTT control packet type
422  * @param[out] dup DUP flag from the fixed header
423  * @param[out] qos QoS field from the fixed header
424  * @param[out] retain RETAIN flag from the fixed header
425  * @param[out] remainingLen Length of the variable header and the payload
426  * @return Error code
427  **/
428 
429 error_t mqttDeserializeHeader(uint8_t *buffer, size_t bufferLen, size_t *pos,
430  MqttPacketType *type, bool_t *dup, MqttQosLevel *qos, bool_t *retain, size_t *remainingLen)
431 {
432  uint_t i;
433  size_t n;
434  MqttPacketHeader *header;
435 
436  //Point to the current position
437  n = *pos;
438 
439  //Make sure the input buffer is large enough
440  if((n + sizeof(MqttPacketHeader)) > bufferLen)
441  return ERROR_INVALID_LENGTH;
442 
443  //Point to the MQTT packet header
444  header = (MqttPacketHeader *) (buffer + n);
445 
446  //Save MQTT control packet type
447  *type = (MqttPacketType) header->type;
448 
449  //Save flags
450  *dup = header->dup;
451  *qos = (MqttQosLevel) header->qos;
452  *retain = header->retain;
453 
454  //Advance current position
455  n += sizeof(MqttPacketHeader);
456 
457  //Prepare to decode the Remaining Length field
458  *remainingLen = 0;
459 
460  //The Remaining Length is encoded using a variable length encoding scheme
461  for(i = 0; i < 4; i++)
462  {
463  //Sanity check
464  if((n + sizeof(uint8_t)) > bufferLen)
465  return ERROR_INVALID_LENGTH;
466 
467  //Advance current position
468  n += sizeof(uint8_t);
469 
470  //The most significant bit is used to indicate that there are
471  //following bytes in the representation
472  if(header->length[i] & 0x80)
473  {
474  //Applications can send control packets of size up to 256 MB
475  if(i == 3)
476  return ERROR_INVALID_SYNTAX;
477 
478  //The least significant seven bits of each byte encode the data
479  *remainingLen |= (header->length[i] & 0x7F) << (7 * i);
480  }
481  else
482  {
483  //The least significant seven bits of each byte encode the data
484  *remainingLen |= header->length[i] << (7 * i);
485  //This is the last byte
486  break;
487  }
488  }
489 
490  //Return the current position
491  *pos = n;
492 
493  //Successful processing
494  return NO_ERROR;
495 }
496 
497 
498 /**
499  * @brief Read a 8-bit integer from the input buffer
500  * @param[in] buffer Pointer to the input buffer
501  * @param[in] bufferLen Length of the input buffer
502  * @param[in,out] pos Current position
503  * @param[out] value Value of the 8-bit integer
504  * @return Error code
505  **/
506 
507 error_t mqttDeserializeByte(uint8_t *buffer, size_t bufferLen,
508  size_t *pos, uint8_t *value)
509 {
510  size_t n;
511 
512  //Point to the current position
513  n = *pos;
514 
515  //Make sure the input buffer is large enough
516  if((n + sizeof(uint8_t)) > bufferLen)
517  return ERROR_BUFFER_OVERFLOW;
518 
519  //Read the short integer from the input buffer
520  *value = buffer[n];
521 
522  //Advance current position
523  *pos = n + sizeof(uint8_t);
524 
525  //Successful processing
526  return NO_ERROR;
527 }
528 
529 
530 /**
531  * @brief Read a 16-bit integer from the input buffer
532  * @param[in] buffer Pointer to the input buffer
533  * @param[in] bufferLen Length of the input buffer
534  * @param[in,out] pos Current position
535  * @param[out] value Value of the 16-bit integer
536  * @return Error code
537  **/
538 
539 error_t mqttDeserializeShort(uint8_t *buffer, size_t bufferLen,
540  size_t *pos, uint16_t *value)
541 {
542  size_t n;
543 
544  //Point to the current position
545  n = *pos;
546 
547  //Make sure the input buffer is large enough
548  if((n + sizeof(uint16_t)) > bufferLen)
549  return ERROR_BUFFER_OVERFLOW;
550 
551  //Read the short integer from the input buffer
552  *value = (buffer[n] << 8) | buffer[n + 1];
553 
554  //Advance current position
555  *pos = n + sizeof(uint16_t);
556 
557  //Successful processing
558  return NO_ERROR;
559 }
560 
561 
562 /**
563  * @brief Deserialize string
564  * @param[in] buffer Pointer to the input buffer
565  * @param[in] bufferLen Length of the input buffer
566  * @param[in,out] pos Current position
567  * @param[out] string Pointer to the string
568  * @param[out] stringLen Length of the string, in bytes
569  * @return Error code
570  **/
571 
572 error_t mqttDeserializeString(uint8_t *buffer, size_t bufferLen,
573  size_t *pos, char_t **string, size_t *stringLen)
574 {
575  size_t n;
576 
577  //Point to the current position
578  n = *pos;
579 
580  //Make sure the input buffer is large enough
581  if((n + sizeof(uint16_t)) > bufferLen)
582  return ERROR_BUFFER_OVERFLOW;
583 
584  //Decode the length field
585  *stringLen = (buffer[n] << 8) | buffer[n + 1];
586 
587  //Make sure the input buffer is large enough
588  if((n + sizeof(uint16_t) + *stringLen) > bufferLen)
589  return ERROR_BUFFER_OVERFLOW;
590 
591  //Read the string from the input buffer
592  *string = (char_t *) buffer + n + 2;
593 
594  //Advance current position
595  *pos = n + 2 + *stringLen;
596 
597  //Successful processing
598  return NO_ERROR;
599 }
600 
601 #endif
error_t mqttSerializeData(uint8_t *buffer, size_t bufferLen, size_t *pos, const void *data, size_t dataLen)
Serialize raw data.
int bool_t
Definition: compiler_port.h:49
@ MQTT_PACKET_TYPE_INVALID
Invalid packet.
Definition: mqtt_common.h:100
error_t mqttClientFormatPingReq(MqttClientContext *context)
Format PINGREQ packet.
@ ERROR_BUFFER_OVERFLOW
Definition: error.h:140
error_t mqttDeserializeHeader(uint8_t *buffer, size_t bufferLen, size_t *pos, MqttPacketType *type, bool_t *dup, MqttQosLevel *qos, bool_t *retain, size_t *remainingLen)
Deserialize fixed header.
error_t mqttClientReceivePacket(MqttClientContext *context)
Receive MQTT packet.
error_t mqttClientSendData(MqttClientContext *context, const void *data, size_t length, size_t *written, uint_t flags)
Send data using the relevant transport protocol.
error_t mqttDeserializeString(uint8_t *buffer, size_t bufferLen, size_t *pos, char_t **string, size_t *stringLen)
Deserialize string.
error_t mqttClientWaitForData(MqttClientContext *context, systime_t timeout)
Wait for incoming data.
uint8_t qos
Definition: mqtt_common.h:179
error_t mqttDeserializeByte(uint8_t *buffer, size_t bufferLen, size_t *pos, uint8_t *value)
Read a 8-bit integer from the input buffer.
@ MQTT_CLIENT_STATE_SENDING_PACKET
Definition: mqtt_client.h:160
#define timeCompare(t1, t2)
Definition: os_port.h:42
error_t mqttSerializeHeader(uint8_t *buffer, size_t *pos, MqttPacketType type, bool_t dup, MqttQosLevel qos, bool_t retain, size_t remainingLen)
Serialize fixed header.
Helper functions for MQTT client.
void mqttClientChangeState(MqttClientContext *context, MqttClientState newState)
Update MQTT client state.
char_t type
error_t
Error codes.
Definition: error.h:42
Transport protocol abstraction layer.
error_t mqttClientProcessEvents(MqttClientContext *context, systime_t timeout)
Process MQTT client events.
@ MQTT_CLIENT_STATE_PACKET_SENT
Definition: mqtt_client.h:161
error_t mqttSerializeByte(uint8_t *buffer, size_t bufferLen, size_t *pos, uint8_t value)
Write a 8-bit integer to the output buffer.
MqttQosLevel
Quality of service level.
Definition: mqtt_common.h:86
uint8_t retain
Definition: mqtt_common.h:178
@ ERROR_INVALID_LENGTH
Definition: error.h:109
MQTT packet parsing and formatting.
#define MSB(x)
Definition: os_port.h:58
#define TRACE_INFO(...)
Definition: debug.h:94
#define LSB(x)
Definition: os_port.h:54
error_t mqttClientProcessPacket(MqttClientContext *context)
Process incoming MQTT packet.
@ MQTT_CLIENT_STATE_RECEIVING_PACKET
Definition: mqtt_client.h:163
@ MQTT_CLIENT_STATE_IDLE
Definition: mqtt_client.h:159
char char_t
Definition: compiler_port.h:43
uint32_t time
#define TRACE_DEBUG_ARRAY(p, a, n)
Definition: debug.h:107
uint8_t n
error_t mqttClientCheckKeepAlive(MqttClientContext *context)
Check keep-alive time interval.
MqttClientState
MQTT client states.
Definition: mqtt_client.h:154
@ ERROR_INVALID_SYNTAX
Definition: error.h:68
uint8_t dup
Definition: mqtt_common.h:180
#define MqttClientContext
Definition: mqtt_client.h:142
error_t mqttDeserializeShort(uint8_t *buffer, size_t bufferLen, size_t *pos, uint16_t *value)
Read a 16-bit integer from the input buffer.
error_t mqttSerializeShort(uint8_t *buffer, size_t bufferLen, size_t *pos, uint16_t value)
Write a 16-bit integer to the output buffer.
uint8_t value[]
Definition: dtls_misc.h:150
#define PRIuSIZE
Definition: compiler_port.h:78
unsigned int uint_t
Definition: compiler_port.h:45
TCP/IP stack core.
uint8_t data[]
Definition: dtls_misc.h:176
uint32_t systime_t
Definition: compiler_port.h:46
error_t mqttSerializeString(uint8_t *buffer, size_t bufferLen, size_t *pos, const void *string, size_t stringLen)
Serialize string.
__start_packed struct @230 MqttPacketHeader
Fixed header.
@ NO_ERROR
Success.
Definition: error.h:44
MqttPacketType
MQTT control packet type.
Definition: mqtt_common.h:98
Debugging facilities.
systime_t osGetSystemTime(void)
Retrieve system time.
MQTT client.