forked from MaJerle/lwesp
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmqtt_client_api_cayenne.c
121 lines (105 loc) · 4.6 KB
/
mqtt_client_api_cayenne.c
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
/*
* MQTT client API example for Cayenne MQTT API
*
* Simple example for testing purposes only.
*
*
*/
#include "lwesp/apps/lwesp_mqtt_client_api.h"
#include "mqtt_client_api.h"
/* Override safeprintf function */
#define safeprintf printf
/**
* \brief MQTT client info for server
*/
static const lwesp_mqtt_client_info_t
mqtt_client_info = {
/* Device ID */
.id = "869f5a20-af9c-11e9-b01f-db5cf74e7fb7",
/* User credentials */
.user = "8a215f70-a644-11e8-ac49-e932ed599553",
.pass = "26aa943f702e5e780f015cd048a91e8fb54cca28",
.keep_alive = 60,
};
static char
mqtt_client_str[256];
static char
mqtt_client_data[256];
/**
* \brief MQTT thread
*/
void
mqtt_client_api_cayenne_thread(void const* arg) {
lwesp_mqtt_client_api_p client = NULL;
lwesp_mqtt_conn_status_t status;
lwesp_mqtt_client_api_buf_p buf;
lwespr_t res;
beg:
while (1) {
/* Wait IP and connected to network */
while (!lwesp_sta_has_ip()) {
lwesp_delay(1000);
}
if (client == NULL) {
client = lwesp_mqtt_client_api_new(256, 256);
}
if (client != NULL) {
safeprintf("[MQTT] Connecting to MQTT broker...\r\n");
status = lwesp_mqtt_client_api_connect(client, "mqtt.mydevices.com", 1883, &mqtt_client_info);
if (status == LWESP_MQTT_CONN_STATUS_ACCEPTED) {
safeprintf("[MQTT] Connected to MQTT broker and ready to publish/subscribe to topics...\r\n");
/* Subscribe to topic */
sprintf(mqtt_client_str, "v1/%s/things/%s/cmd/#", mqtt_client_info.user, mqtt_client_info.id);
if (lwesp_mqtt_client_api_subscribe(client, mqtt_client_str, LWESP_MQTT_QOS_AT_LEAST_ONCE) == lwespOK) {
safeprintf("[MQTT] Subscribed to topic: %s\r\n", mqtt_client_str);
} else {
safeprintf("[MQTT] Problems subscribing to topic!\r\n");
}
/* Start accepting and publishing data */
while (1) {
res = lwesp_mqtt_client_api_receive(client, &buf, 1000);
if (res == lwespOK) {
safeprintf("[MQTT] Receive OK\r\n");
if (buf != NULL) {
const char* s;
safeprintf("[MQTT] Publish received. Topic: %s, payload: %s\r\n", buf->topic, buf->payload);
safeprintf("[MQTT] Topic_Len: %d, Payload_len: %d\r\n", (int)buf->topic_len, (int)buf->payload_len);
/* Find out reason */
if ((s = strstr((void*)buf->topic, "cmd/2")) != NULL) {
s = strstr((void*)buf->payload, ",");
if (s != NULL) {
s++;
if (*s == '0') {
} else {
}
sprintf(mqtt_client_str, "v1/%s/things/%s/data/2", mqtt_client_info.user, mqtt_client_info.id);
sprintf(mqtt_client_data, "%c", *s);
lwesp_mqtt_client_api_publish(client, mqtt_client_str, mqtt_client_data, strlen(mqtt_client_data), LWESP_MQTT_QOS_AT_LEAST_ONCE, 0);
}
}
lwesp_mqtt_client_api_buf_free(buf);
}
} else if (res == lwespCLOSED) {
safeprintf("[MQTT] Connection closed!\r\n");
goto beg;
} else if (res == lwespTIMEOUT) {
static uint32_t temp;
safeprintf("[MQTT] Receive timeout!\r\n");
sprintf(mqtt_client_str, "v1/%s/things/%s/data/1", mqtt_client_info.user, mqtt_client_info.id);
sprintf(mqtt_client_data, "temp,c=%u", (unsigned)temp++);
safeprintf("[MQTT] CLIENT DATA: %s, length: %d\r\n", mqtt_client_data, (int)strlen(mqtt_client_data));
lwesp_mqtt_client_api_publish(client, mqtt_client_str, mqtt_client_data, strlen(mqtt_client_data), LWESP_MQTT_QOS_AT_LEAST_ONCE, 0);
}
}
} else {
printf("[MQTT] Connect error: %d\r\n", (int)status);
}
}
lwesp_delay(1000);
}
if (client != NULL) {
lwesp_mqtt_client_api_delete(client);
client = NULL;
}
lwesp_sys_thread_terminate(NULL);
}