-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmojo_perception_api.py
306 lines (275 loc) · 13 KB
/
mojo_perception_api.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
"""
@license
Copyright 2022 Hoomano SAS. All Rights Reserved.
Licensed under the MIT License, (the "License");
you may not use this file except in compliance with the License.
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in
all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
=============================================================================
"""
import logging
import socketio
from datetime import datetime
import mediapipe as mp
import cv2
import requests
class MojoPerceptionAPI:
"""
This class is a MojoPerception API client.
See README for a procedure to generate api_key.
"""
def __init__(self, api_key, expiration=360):
"""
Initializes the MojoPerceptionAPI client. Places default callbacks on
calculation reception for each emotion and load the anonymization model
:param api_key: String - Valid API Key
:param expiration: Int - Time in seconds for the object to expire.
"""
logging.basicConfig(level=logging.DEBUG)
assert api_key is not None, "Please provide an API key"
self.initialized = False
self.api_key = api_key
self.expiration = expiration
self.mojo_perception_uri = "https://api.mojo.ai/mojo_perception_api"
self.auth_token, self.host, self.port, self.user_namespace = self.create_user()
self.socketIo_uri = "https://{}:{}".format(self.host, self.port)
self.emotions = ["attention", "confusion", "surprise", "amusement", "engagement", "interaction"]
self.subscribe_realtime_output = False
self.api_socket = socketio.Client()
self.sending = False
self.attention_callback = self.default_callback
self.amusement_callback = self.default_callback
self.confusion_callback = self.default_callback
self.surprise_callback = self.default_callback
self.engagement_callback = self.default_callback
self.interaction_callback = self.default_callback
self.warmup_done_callback = self.default_callback
self.warmup_callback_done = False
self.first_emit_done = False
self.first_emit_done_callback = self.default_callback
self.on_error_callback = self.default_callback
self.video_stream = None
self.image_width = None
self.image_height = None
def set_options(self, options):
"""
Set options for MojoPerceptionAPI, to change the list of emotions calculated and
manage subscription to realtime output.
:param options: dict - Options to set
- options["emotions"] : list of emotions to be calculated by the API
- options["subscribe_realtime_output"] : boolean, true to activate the callbacks @see attentionCallback
"""
try:
if "emotions" in options:
self.emotions = options["emotions"]
if "subscribe_realtime_output" in options:
self.subscribe_realtime_output = options["subscribe_realtime_output"]
except Exception as e:
logging.error("Could not set options: {} : {}".format(options, e))
def create_user(self):
try:
internal_request = requests.put(self.mojo_perception_uri + '/user',
json={"datetime": str(datetime.now()), "expiration": self.expiration},
headers={"Authorization": self.api_key})
if internal_request.status_code != 200:
logging.error("error : " + internal_request.text)
return internal_request.json()["auth_token"], internal_request.json()["host_name"], \
internal_request.json()["port"], internal_request.json()["user_namespace"].replace("-", ""),
except Exception as e:
logging.error("Could not create user: {}".format(e))
def __str__(self):
"""
Returns a string representing the MojoPerceptionAPI object
:return: String - emotions, socketIo_uri, subscribe_realtime_output, auth_token
"""
return "emotions={}\nsocketIoURI={}\nsubscribeRealtimeOutput={}\nkey={}".format(
self.emotions, self.socketIo_uri, self.subscribe_realtime_output, self.auth_token)
def default_callback(self, message=None):
"""
Used by default for all callbacks. Does nothing.
:param message: String - not used
"""
return
def _connect_callback(self):
"""
Called when the socketIO client connects to the Stream SocketIO server.
Sets initialized value to True
"""
self.initialized = True
def _error_handler(self, msg):
"""
Called when the socketIO client encounters an error.
Calls on_error_callback with the error message.
Stops facial expression recognition api.
:param msg: String - error message returned by socketio
"""
logging.error("Error: {}".format(msg))
self.on_error_callback(msg)
self.stop_facial_expression_recognition_api()
def message_handler(self, msg):
"""
Called when "calculation" event is received from the API through socketio.
Calls the appropriate callbacks depending on the emotions received.
:param msg: dict - Message received from the API
"""
if "attention" in msg:
self.attention_callback(msg["attention"])
if "amusement" in msg:
self.amusement_callback(msg["amusement"])
if "confusion" in msg:
self.confusion_callback(msg["confusion"])
if "surprise" in msg:
self.surprise_callback(msg["surprise"])
if "engagement" in msg:
self.engagement_callback(msg["engagement"])
if "interaction" in msg:
self.interaction_callback(msg["interaction"])
def get_image_dimensions(self):
"""
Get the image dimensions of the video stream
:return: set images dimensions
"""
try:
self.image_width = int(self.video_stream.get(cv2.CAP_PROP_FRAME_WIDTH))
self.image_height = int(self.video_stream.get(cv2.CAP_PROP_FRAME_HEIGHT))
except Exception as e:
logging.error("Could not get image dimensions: {}".format(e))
def read_video_and_connect_api(self, video_path):
try:
self.video_stream = cv2.VideoCapture(video_path)
self.get_image_dimensions()
self._api_connect()
except Exception as e:
logging.error("Error during initialization: {}".format(e))
def start_camera_and_connect_api(self):
"""
Starts the camera and connects to the MojoPerceptionAPI through socketio.
Defines socketio callbacks.
:param video_path: String - path to the video file to be evaluated by MojoPerceptionAPI. If 0, the camera is used.
"""
try:
self.video_stream = cv2.VideoCapture(0)
self.get_image_dimensions()
self._api_connect()
except Exception as e:
logging.error("Error during initialization: {}".format(e))
def set_image_dimensions_and_connect_api(self, image_width, image_height):
"""
Sets image dimensions and connects to the MojoPerceptionAPI through socketio.
Defines socketio callbacks.
:param image_width: Width of image
:param image_height: Height of image
"""
try:
self.image_width = image_width
self.image_height = image_height
self._api_connect()
except Exception as e:
logging.error("Error during initialization: {}".format(e))
def _api_connect(self):
"""
Connects to the MojoPerceptionAPI through socketio.
Defines socketio callbacks.
"""
try:
self.api_socket.connect(self.socketIo_uri,
namespaces=[f'/{self.user_namespace}'],
transports=['websocket', 'polling'],
auth={"token": self.auth_token})
if self.subscribe_realtime_output:
self.api_socket.on('calculation', self.message_handler, namespace=f'/{self.user_namespace}')
self.api_socket.on('error', self._error_handler, namespace=f'/{self.user_namespace}')
self.api_socket.on('connect', self._connect_callback(), namespace=f'/{self.user_namespace}')
except Exception as e:
logging.error("Error during connexion: {}".format(e))
def compute_anonymized_facemesh(self, image):
"""
Computes the anonymized facemesh of the image and calls the emit function.
:param image: numpy array - image to be processed (frame from video)
"""
try:
if self.first_emit_done and not self.warmup_callback_done:
self.warmup_done_callback()
self.warmup_callback_done = True
with mp.solutions.face_mesh.FaceMesh(
max_num_faces=1,
refine_landmarks=True, # with iris
min_detection_confidence=0.9,
min_tracking_confidence=0.9) as face_mesh:
image.flags.writeable = False
frame = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
results = face_mesh.process(frame)
if results.multi_face_landmarks:
face_landmarks = results.multi_face_landmarks[0]
face_mesh = self.face_landmarks_to_json(face_landmarks.landmark, self.image_width, self.image_height )
self.emit_facemesh(face_mesh)
else:
self.emit_facemesh([])
except Exception as e:
self.on_error_callback(e)
logging.error("Error during anonymized facemesh computation: {}".format(e))
def emit_facemesh(self, face_mesh):
"""
Sends the facemesh to the streaming SocketIO server.
:param face_mesh: List of lists - Facemesh of the image computed from image input
"""
try:
if face_mesh is None:
return
if self.auth_token is None:
return
self.api_socket.emit('facemesh',
{'facemesh': face_mesh,
'timestamp': datetime.now().isoformat(),
'output': self.emotions},
namespace=f'/{self.user_namespace}')
if not self.first_emit_done:
self.first_emit_done = True
self.sending = True
self.first_emit_done_callback()
except Exception as e:
logging.error("Error during emitting facemesh: {}".format(e))
def face_landmarks_to_json(self, face_landmarks, image_width, image_height):
"""
Converts the face landmarks to a json format that is compatible with the MojoPerceptionAPI (multiply by image dimensions).
:param face_landmarks: List of tuples - Face landmarks of the image given by Mediapipe's model
:param image_width: Int - Width of the image
:param image_height: Int - Height of the image
:return: List of lists - Facemesh of the image computed from image input
"""
face_mesh = []
for landmark in face_landmarks:
face_mesh.append([landmark.x * image_width, landmark.y * image_height, landmark.z * image_width])
# z denormalized from : https://pyup.io/changelogs/mediapipe/ 0.7.6
return face_mesh
def release_camera(self):
"""
Releases the camera.
"""
self.video_stream.release()
def stop_facial_expression_recognition_api(self):
"""
Stops sending to the API, disconnects from the stream SocketIO server and releases the camera.
"""
try:
if not self.sending:
return
self.sending = False
self.api_socket.disconnect()
if self.video_stream is not None:
self.release_camera()
except Exception as e:
self.on_error_callback(e)
logging.error("Error during stopping facial expression recognition api: {}".format(e))