Skip to content

Commit

Permalink
Merge pull request Azure#263 from linkernetworks/feat/add-mqtt-export
Browse files Browse the repository at this point in the history
add backend mqtt export
  • Loading branch information
Haishi2016 authored May 21, 2023
2 parents d594f1f + 41876c5 commit 2ddc01d
Show file tree
Hide file tree
Showing 4 changed files with 74 additions and 16 deletions.
71 changes: 57 additions & 14 deletions src/edge/EdgeSolution/modules/kanai/app/exports.py
Original file line number Diff line number Diff line change
Expand Up @@ -132,32 +132,38 @@ def process(self, frame):


# MQTT setup
mqtt_client = None

def on_connect(client: mqtt.Client, userdata, flags, rc, properties=None):
print("connected", flush=True)


def send_message(message):
# payload = json.dumps(message)
payload = message
mqtt_client.reconnect()
mqtt_client.publish("iothubmessage", payload=payload)
print("Sent message: " + payload)


if not is_iotedge():
def mqtt_init(broker_address):
global mqtt_client
print("MQTT init")
try:
mqtt_client = mqtt.Client(INSTANCE, protocol=4)
addr, port = broker_address.split(":")
print(f"Connecting MQTT broker: {addr}:{port} for instance {INSTANCE}")
mqtt_client = mqtt.Client(INSTANCE, protocol=5)
mqtt_client.on_connect = on_connect

mqtt_client.connect("mqtt.default.svc.cluster.local", 1883, 60)
mqtt_client.connect(addr, int(port), 60)
except Exception as e:
mqtt_client = None
print(f"MQTT client error: {e}", flush=True)
else:
mqtt_client = None


def send_message(message):
global mqtt_client
if mqtt_client:
# payload = json.dumps(message)
payload = message
mqtt_client.reconnect()
mqtt_client.publish("mqttmessage", payload=payload)
print("Sent message: " + payload)
else:
print("MQTT client not set.")

try:
if IOTEDGE_CONNECTION_STRING:
iot = IoTHubModuleClient.create_from_connection_string(
Expand Down Expand Up @@ -185,7 +191,9 @@ def _exporter():
print('IotHubExport: send a message to metrics')
if iot:
iot.send_message_to_output(j, 'metrics')
elif mqtt_client:
else:
global mqtt_client
mqtt_init("mqtt.default.svc.cluster.local:1883")
send_message(j)
except Exception as e:
print(
Expand All @@ -206,6 +214,41 @@ def process(self, frame):
print(f'IotHubExport: drop result since queue is full', flush=True)
self.last_timestamp = cur_timestamp

class MqttExport(Export):
def __init__(self, delay_buffer=6, broker_address="mqtt.default.svc.cluster.local:1883", **kwargs):
super().__init__()
mqtt_init(broker_address)
self.delay_buffer = float(delay_buffer)

self.last_timestamp = -1

self._export_q = queue.Queue(30)

def _exporter():
while True:
j = self._export_q.get()
try:
print('MqttExport: send a message to metrics')
send_message(j)
except Exception as e:
print(
f"MqttExport: An error occurred while sending message to metrics: {e}.")

self._export_thread = threading.Thread(target=_exporter)
self._export_thread.start()

def process(self, frame):

cur_timestamp = time.time()
if cur_timestamp > self.last_timestamp + self.delay_buffer:

if len(frame.insights_meta.objects_meta) > 0:
if not self._export_q.full():
self._export_q.put(frame.json())
else:
print(f'MqttExport: drop result since queue is full', flush=True)
self.last_timestamp = cur_timestamp


class IotedgeExport(Export):
def __init__(self, module_name, delay_buffer=6, **kwargs):
Expand Down
3 changes: 2 additions & 1 deletion src/edge/EdgeSolution/modules/kanai/app/stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

from sources import RtspSource
from transforms import FilterTransform, GrpcTransform
from exports import VideoSnippetExport, IothubExport, IotedgeExport, Cv2ImshowExport, HttpExport
from exports import VideoSnippetExport, IothubExport, MqttExport, IotedgeExport, Cv2ImshowExport, HttpExport
from models import FakeModel, ObjectDetectionModel, ClassificationModel


Expand All @@ -22,6 +22,7 @@
supported_exports = {
'video_snippet_export': VideoSnippetExport,
'iothub_export': IothubExport,
'mqtt_export': MqttExport,
'iotedge_export': IotedgeExport,
'cv2_imshow_export': Cv2ImshowExport,
'http_export': HttpExport
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,20 @@ def create_default_nodes():
},
)

Project.objects.update_or_create(
name="mqtt_export",
defaults={
"is_cascade": True,
"node_type": "export",
"inputs": json.dumps([
{
"route": "f",
"type": "frame"
}
])
},
)

Project.objects.update_or_create(
name="iotedge_export",
defaults={
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ def on_event_batch(partition_context, events):
# mqtt receiver

def on_connect(client: mqtt.Client, userdata, flags, rc, properties=None):
client.subscribe("iothubmessage")
client.subscribe("mqttmessage")
logger.warning("connected")


Expand Down

0 comments on commit 2ddc01d

Please sign in to comment.