forked from eclipse-mosquitto/mosquitto
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathcontrol.c
138 lines (117 loc) · 4.18 KB
/
control.c
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
/*
Copyright (c) 2020 Roger Light <[email protected]>
All rights reserved. This program and the accompanying materials
are made available under the terms of the Eclipse Public License 2.0
and Eclipse Distribution License v1.0 which accompany this distribution.
The Eclipse Public License is available at
https://www.eclipse.org/legal/epl-2.0/
and the Eclipse Distribution License is available at
http://www.eclipse.org/org/documents/edl-v10.php.
SPDX-License-Identifier: EPL-2.0 OR BSD-3-Clause
Contributors:
Roger Light - initial implementation and documentation.
*/
#include "config.h"
#include <stdio.h>
#include "mqtt_protocol.h"
#include "mosquitto_broker_internal.h"
#include "memory_mosq.h"
#include "send_mosq.h"
#ifdef WITH_CONTROL
/* Process messages coming in on $CONTROL/<feature>. These messages aren't
* passed on to other clients. */
int control__process(struct mosquitto *context, struct mosquitto_msg_store *stored)
{
struct mosquitto__callback *cb_found;
struct mosquitto_evt_control event_data;
struct mosquitto__security_options *opts;
mosquitto_property *properties = NULL;
int rc = MOSQ_ERR_SUCCESS;
if(db.config->per_listener_settings){
opts = &context->listener->security_options;
}else{
opts = &db.config->security_options;
}
HASH_FIND(hh, opts->plugin_callbacks.control, stored->topic, strlen(stored->topic), cb_found);
if(cb_found){
memset(&event_data, 0, sizeof(event_data));
event_data.client = context;
event_data.topic = stored->topic;
event_data.payload = stored->payload;
event_data.payloadlen = stored->payloadlen;
event_data.qos = stored->qos;
event_data.retain = stored->retain;
event_data.properties = stored->properties;
event_data.reason_code = MQTT_RC_SUCCESS;
event_data.reason_string = NULL;
rc = cb_found->cb(MOSQ_EVT_CONTROL, &event_data, cb_found->userdata);
if(rc){
if(context->protocol == mosq_p_mqtt5 && event_data.reason_string){
mosquitto_property_add_string(&properties, MQTT_PROP_REASON_STRING, event_data.reason_string);
}
}
free(event_data.reason_string);
event_data.reason_string = NULL;
}
if(stored->qos == 1){
rc = send__puback(context, stored->source_mid, MQTT_RC_SUCCESS, properties);
}else if(stored->qos == 2){
rc = send__pubrec(context, stored->source_mid, MQTT_RC_SUCCESS, properties);
}
mosquitto_property_free_all(&properties);
return rc;
}
#endif
int control__register_callback(struct mosquitto__security_options *opts, MOSQ_FUNC_generic_callback cb_func, const char *topic, void *userdata)
{
#ifdef WITH_CONTROL
struct mosquitto__callback *cb_found, *cb_new;
size_t topic_len;
if(topic == NULL || cb_func == NULL) return MOSQ_ERR_INVAL;
topic_len = strlen(topic);
if(topic_len == 0 || topic_len > 65535) return MOSQ_ERR_INVAL;
if(strncmp(topic, "$CONTROL/", strlen("$CONTROL/")) || strlen(topic) < strlen("$CONTROL/A/v1")){
return MOSQ_ERR_INVAL;
}
HASH_FIND(hh, opts->plugin_callbacks.control, topic, topic_len, cb_found);
if(cb_found){
return MOSQ_ERR_ALREADY_EXISTS;
}
cb_new = mosquitto__calloc(1, sizeof(struct mosquitto__callback));
if(cb_new == NULL){
return MOSQ_ERR_NOMEM;
}
cb_new->data = mosquitto__strdup(topic);
if(cb_new->data == NULL){
mosquitto__free(cb_new);
return MOSQ_ERR_NOMEM;
}
cb_new->cb = cb_func;
cb_new->userdata = userdata;
HASH_ADD_KEYPTR(hh, opts->plugin_callbacks.control, cb_new->data, strlen(cb_new->data), cb_new);
return MOSQ_ERR_SUCCESS;
#else
return MOSQ_ERR_NOT_SUPPORTED;
#endif
}
int control__unregister_callback(struct mosquitto__security_options *opts, MOSQ_FUNC_generic_callback cb_func, const char *topic)
{
#ifdef WITH_CONTROL
struct mosquitto__callback *cb_found;
size_t topic_len;
if(topic == NULL) return MOSQ_ERR_INVAL;
topic_len = strlen(topic);
if(topic_len == 0 || topic_len > 65535) return MOSQ_ERR_INVAL;
if(strncmp(topic, "$CONTROL/", strlen("$CONTROL/"))) return MOSQ_ERR_INVAL;
HASH_FIND(hh, opts->plugin_callbacks.control, topic, topic_len, cb_found);
if(cb_found && cb_found->cb == cb_func){
HASH_DELETE(hh, opts->plugin_callbacks.control, cb_found);
mosquitto__free(cb_found->data);
mosquitto__free(cb_found);
return MOSQ_ERR_SUCCESS;;
}
return MOSQ_ERR_NOT_FOUND;
#else
return MOSQ_ERR_NOT_SUPPORTED;
#endif
}