forked from ArduPilot/ardupilot
-
Notifications
You must be signed in to change notification settings - Fork 0
/
mavlink_parse.py
executable file
·502 lines (437 loc) · 21.3 KB
/
mavlink_parse.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
#!/usr/bin/env python
import re
from enum import StrEnum # requires Python >= 3.11
from pathlib import Path
from itertools import chain
from dataclasses import dataclass, astuple
from pymavlink.dialects.v20 import (
common, icarous, cubepilot, uAvionix, ardupilotmega
)
class MAVLinkDialect(StrEnum):
# in subset, superset, unknown order, for correct links
# supported values must match imported dialect names
COMMON = 'common'
ICAROUS = 'icarous'
CUBEPILOT = 'cubepilot'
UAVIONIX = 'uAvionix'
ARDUPILOTMEGA = 'ardupilotmega'
UNKNOWN = 'UNKNOWN'
@dataclass(slots=True, order=True)
class MAVLinkMessage:
name: str
source: str
dialect: MAVLinkDialect = MAVLinkDialect.UNKNOWN
PREFIX = 'MAVLINK_MSG_ID_'
# Get message sets, for quick containment checks.
# Function is required because of Python's class scoping rules.
# See https://stackoverflow.com/questions/13905741.
def _get_known_messages(prefix):
''' Returns a dictionary of {dialect: {messages}} given 'prefix'. '''
return {
dialect: set(m for m in dir(globals()[dialect])
if m.startswith(prefix))
for dialect in MAVLinkDialect
if dialect != MAVLinkDialect.UNKNOWN
}
KNOWN_DIALECTS = _get_known_messages(PREFIX)
# Try to determine dialect if initialised without one specified.
def __post_init__(self):
if self.dialect == MAVLinkDialect.UNKNOWN:
self.determine_dialect()
@property
def id_name(self):
return self.PREFIX + self.name
def determine_dialect(self):
for dialect, message_set in self.KNOWN_DIALECTS.items():
if self.id_name in message_set:
self.dialect = dialect
break # dialect found, no need to continue searching
else:
self.dialect = MAVLinkDialect.UNKNOWN
def as_tuple(self):
return astuple(self)
def __str__(self):
return f'{self.name:<45}{self.source:<55}{self.dialect}'
@classmethod
def get_unsupported(cls, supported: set, remove_prefix=True):
''' Yields known messages that are not included in 'supported'. '''
offset = len(cls.PREFIX) if remove_prefix else 0
known_missing = set() # don't double-count for supersets
for dialect, message_set in cls.KNOWN_DIALECTS.items():
missing_names = message_set - supported - known_missing
for name in missing_names:
yield cls(name[offset:], 'UNSUPPORTED', dialect)
known_missing |= missing_names
class MAVLinkCommand(MAVLinkMessage):
PREFIX = 'MAV_CMD_'
KNOWN_DIALECTS = MAVLinkMessage._get_known_messages(PREFIX)
@property
def id_name(self):
return self.name # commands are registered with their prefix
@classmethod
def get_unsupported(cls, supported: set, remove_prefix=False):
''' Yields known commands that are not included in 'supported'. '''
# avoid accidentally treating enum values as commands
enums = [f'{e}_' for e in ardupilotmega.enums
if e.startswith(cls.PREFIX)]
for command in super().get_unsupported(supported, remove_prefix):
if not any(command.name.startswith(e) for e in enums):
yield command
class MAVLinkDetector:
# file paths
BASE_DIR = Path(__file__).parent / '../..'
COMMON_FILE = BASE_DIR / 'libraries/GCS_MAVLink/GCS_Common.cpp'
STREAM_GROUP_FILE = 'GCS_MAVLink.cpp'
# regex for messages handled by the autopilot
INCOMING_MESSAGES = re.compile(r'case MAVLINK_MSG_ID_([A-Z0-9_]*)')
# regex for commands handled by the autopilot
INCOMING_COMMANDS = re.compile(r'case (MAV_CMD_[A-Z0-9_]*)')
# regex for messages that can be requested from the autopilot
REQUESTABLE_REGION = re.compile(' map\[\]([^;]*);')
REQUESTABLE_MAP = re.compile(r'MAVLINK_MSG_ID_([A-Z0-9_]*),\s*MSG_([A-Z0-9_]*)')
# regex for messages the autopilot might send, but cannot be requested
OUTGOING_MESSAGES = re.compile(r'mavlink_msg_([a-z0-9_]*)_send\(')
# regex for extracting messages in stream groups
STREAM_GROUPS = re.compile(r'ap_message STREAM_([A-Z0-9_]*)_msgs\[\] = \{([^\}]*)')
AP_MESSAGE = re.compile(r'MSG_([A-Z0-9_]*)')
# regex for named values
NAMED_FLOAT = re.compile(r'send_named_float\("([\w]*)"')
NAMED_INT = re.compile(r'send_named_int\("([\w]*)"')
TYPE_DESCRIPTIONS = {
'incoming_messages':
'Messages the autopilot handles when received.',
'requestable_messages':
'Messages that can be requested/streamed from the autopilot.',
'outgoing_messages':
'Messages the autopilot will send automatically (unrequested).',
'named_floats':
'Breakout of named floating-point (numerical) values sent by the autopilot.',
'named_ints':
'Breakout of named integer values sent by the autopilot.',
}
EXTRA_DESCRIPTIONS = {
'stream_groups':
'Message groups with stream rates requestable by `SRn_*` parameters.'
' Messages in a group are only sent if the corresponding feature'
' is active.',
'missing_messages':
'Unsupported / unhandled messages.',
'incoming_commands':
TYPE_DESCRIPTIONS['incoming_messages'].replace('Messages','Commands'),
}
EXTRA_DESCRIPTIONS['missing_commands'] = \
EXTRA_DESCRIPTIONS['missing_messages'].replace('messages', 'commands')
TYPE_OPTIONS = {
'messages': MAVLinkMessage,
'commands': MAVLinkCommand,
}
MAVLINK_URL = 'https://mavlink.io/en/messages/{dialect}.html#{message_name}'
ARDUPILOT_URL = 'https://github.com/ArduPilot/ardupilot/tree/{branch}/{source}'
EXPORT_FILETYPES = {
'csv': 'csv',
'markdown': 'md'
}
MARKDOWN_INTRO = (
'The [MAVLink](https://mavlink.io/en/) protocol supports a variety'
' of features and functionalities, but not all'
' [messages](https://mavlink.io/en/messages/) or'
' [commands](https://mavlink.io/en/services/command.html)'
' are implemented by the ArduPilot ecosystem, or relevant to a'
' particular autopilot firmware.\n\n'
'This page is auto-generated from analysing the {vehicle} source'
' code, and provides an indication of which messages{commands} are'
' handled by, requestable from, and sent from the firmware. '
'A message being handled does not guarantee full support, but at'
' least shows that the autopilot is aware it exists, and will try'
' to do something meaningful with it.{unsupported}{stream_groups}'
)
VEHICLES = ('AntennaTracker', 'ArduCopter', 'ArduPlane', 'ArduSub', 'Rover')
def __init__(self, common_files, vehicle='ALL',
exclude_libraries=['SITL', 'AP_Scripting']):
self.vehicle = vehicle
vehicles = [vehicle] if vehicle != 'ALL' else self.VEHICLES
files = chain(*((self.BASE_DIR / vehicle).glob('**/*.cpp')
for vehicle in vehicles),
common_files)
self.incoming_messages = {}
self.incoming_commands = {}
self.outgoing_messages = {}
self.requestable_messages = {}
self._ap_to_mavlink = {
'NAMED_FLOAT': 'NAMED_VALUE_FLOAT', # manual inclusion
}
self.named_floats = {}
self.named_ints = {}
for file in files:
folder = file.parent.stem
if folder in exclude_libraries:
continue
text = file.read_text()
source = f'{folder}/{file.name}'
if file == self.COMMON_FILE:
for mavlink, ap_message in self.find_requestable_messages(text):
self.requestable_messages[mavlink] = \
MAVLinkMessage(mavlink, source)
if ap_message != mavlink:
self._ap_to_mavlink[ap_message] = mavlink
named_types = ('float', 'int') if folder in vehicles else ()
for type_ in named_types:
substring = f'named_{type_}s'
method = getattr(self, f'find_{substring}')
names = getattr(self, substring)
new_names = set(method(text)) - names.keys()
for name in new_names:
names[name] = MAVLinkMessage(f'NAMED_VALUE_{type_.upper()}:{name}',
source, MAVLinkDialect.COMMON)
for method, data, type_ in (
(self.find_incoming_messages, self.incoming_messages, 'messages'),
(self.find_incoming_commands, self.incoming_commands, 'commands'),
(self.find_outgoing_messages, self.outgoing_messages, 'messages'),
):
new_data = set(method(text)) - data.keys()
cls = self.TYPE_OPTIONS[type_]
for datum in new_data:
data[datum] = cls(datum, source)
self._supported_names = {'messages': None, 'commands': None}
self._unsupported = self._supported_names.copy()
self._stream_groups = self.get_stream_groups(vehicle) if len(vehicles) == 1 else []
@classmethod
def get_description(cls, query):
return cls.TYPE_DESCRIPTIONS.get(query,
cls.EXTRA_DESCRIPTIONS.get(query, '')
)
@classmethod
def find_incoming_messages(cls, text: str):
return cls.INCOMING_MESSAGES.findall(text)
@classmethod
def find_incoming_commands(cls, text: str):
return cls.INCOMING_COMMANDS.findall(text)
@classmethod
def find_outgoing_messages(cls, text: str):
return (msg.upper() for msg in
cls.OUTGOING_MESSAGES.findall(text))
@classmethod
def find_requestable_messages(cls, text: str):
region = cls.REQUESTABLE_REGION.search(text).group()
return cls.REQUESTABLE_MAP.findall(region)
@classmethod
def find_named_floats(cls, text: str):
return cls.NAMED_FLOAT.findall(text)
@classmethod
def find_named_ints(cls, text: str):
return cls.NAMED_INT.findall(text)
def get_stream_groups(self, vehicle):
stream_groups = ['stream_groups']
text = (self.BASE_DIR / vehicle / self.STREAM_GROUP_FILE).read_text()
for group_name, message_data in self.STREAM_GROUPS.findall(text):
stream_groups.extend(sorted(
MAVLinkMessage(self._ap_to_mavlink.get(ap_message, ap_message),
f'SRn_{group_name}')
for ap_message in self.AP_MESSAGE.findall(message_data)
))
return stream_groups
def get_supported(self, type: str, inject_commands=False):
if type == 'messages':
for message_type in self.TYPE_DESCRIPTIONS:
values = getattr(self, message_type).values()
if not values:
continue
yield message_type
yield from sorted(values)
# add in incoming_commands right after incoming_messages
if inject_commands and message_type == 'incoming_messages':
yield from self.get_supported('commands')
elif type == 'commands':
yield 'incoming_commands'
yield from sorted(self.incoming_commands.values())
def get_supported_names(self, type: str):
if self._supported_names[type] is None:
self._supported_names[type] = set(
m.id_name for m in self.get_supported(type)
if isinstance(m, MAVLinkMessage)
)
return self._supported_names[type]
def get_unsupported(self, type='messages'):
if self._unsupported[type] is None:
supported_messages = self.get_supported_names(type)
cls = self.TYPE_OPTIONS[type]
self._unsupported[type] = sorted(
cls.get_unsupported(supported_messages)
)
if self._unsupported[type]:
yield f'missing_{type}'
yield from self._unsupported[type]
def get_iterable(self, include_commands=False, include_stream_groups=False,
include_unsupported=False):
iterables = [self.get_supported('messages', include_commands)]
if include_stream_groups:
iterables.append(self._stream_groups)
if include_unsupported:
iterables.append(self.get_unsupported('messages'))
if include_commands:
iterables.append(self.get_unsupported('commands'))
return chain(*iterables)
def printout(self, **iter_options):
for data in self.get_iterable(**iter_options):
match data: # requires Python >= 3.10
case str() as type_:
print(f'\n{type_}:',
self.get_description(type_),
sep='\n')
case MAVLinkMessage() as message:
print(message)
def export(self, filename: Path, type='csv', include_commands=False,
include_stream_groups=False, include_unsupported=False,
**export_options):
export_method = getattr(self, f'export_{type}')
# ensure export_method and get_iterable have the same options specified
iter_options = dict(
include_commands = include_commands,
include_stream_groups = include_stream_groups,
include_unsupported = include_unsupported,
)
with open(filename, 'w') as file:
export_method(file, self.get_iterable(**iter_options),
**export_options, **iter_options)
def export_csv(self, file, iterable, **ignore):
file.write('MAVLinkMessage,CodeSource,MAVLinkDialect,MessageType\n')
for data in iterable:
match data:
case str():
current_type = data
case MAVLinkMessage() as message:
print(*message.as_tuple(), current_type, sep=',', file=file)
def export_markdown(self, file, iterable, branch='master', header=None,
use_intro=True, **extra_kwargs):
if header == 'ArduSub':
import time
now = time.strftime('%Y-%m-%dT%H:%M:%S%z')
date = f'{now[:-2]}:{now[-2:]}' # add colon to the timezone
header = '\n'.join((
'+++',
'title = "MAVLink Support"',
'description = "MAVLink message support details."',
f'{date = }', 'template = "docs/page.html"',
'sort_by = "weight"', 'weight = 20', 'draft = false',
'[extra]', 'toc = true', 'top = false',
'+++'
))
if header:
print(header, file=file)
if use_intro:
commands = stream_groups = unsupported = ''
if extra_kwargs['include_commands']:
commands = ' (and commands)'
if extra_kwargs['include_unsupported']:
unsupported = (
'\n\nKnown [unsupported messages](#missing-messages)'
f'{commands} are shown at the end.'
)
if extra_kwargs['include_stream_groups']:
stream_groups = (
'\n\nThe autopilot includes a set of [stream groups]'
'(#stream-groups) for convenience, which allow'
' configuring the stream rates of groups of'
' requestable messages by setting parameter values. '
'It is also possible to manually request messages,'
' and request individual messages be streamed at a'
' specified rate.'
)
vehicle = self.vehicle.replace('ALL', 'ArduPilot')
print(self.MARKDOWN_INTRO.format(
vehicle=vehicle, commands=commands,
stream_groups=stream_groups, unsupported=unsupported
), file=file)
for data in iterable:
match data:
case str() as type_:
heading = type_.title().replace('_', ' ')
source_header = (
'Code Source' if type_ != 'stream_groups' else
'Stream Group Parameter'
)
print(f'## {heading}',
self.get_description(type_),
f'\nMAVLink Message | {source_header} | MAVLink Dialect',
'--- | --- | ---', sep='\n', file=file)
case MAVLinkMessage() as message:
name, source, dialect = message.as_tuple()
if dialect != MAVLinkDialect.UNKNOWN:
msg_url = self.MAVLINK_URL.format(dialect=dialect,
message_name=name.split(':')[0])
name = f'[{name}]({msg_url})'
if source != 'UNSUPPORTED' and not source.startswith('SRn'):
folder = source.split('/')[0]
base = 'libraries/' if folder not in self.VEHICLES else ''
code_url = self.ARDUPILOT_URL.format(branch=branch,
source=base+source)
source = f'[{source}]({code_url})'
print(name, source, dialect, sep=' | ', file=file)
if __name__ == '__main__':
from inspect import signature
from argparse import ArgumentParser, ArgumentDefaultsHelpFormatter
detector_init_params = signature(MAVLinkDetector.__init__).parameters
default_vehicle = detector_init_params['vehicle'].default
vehicle_options = [default_vehicle, *MAVLinkDetector.VEHICLES]
default_exclusions = detector_init_params['exclude_libraries'].default
parser = ArgumentParser(formatter_class=ArgumentDefaultsHelpFormatter)
parse_opts = parser.add_argument_group('parsing options')
parse_opts.add_argument('-v', '--vehicle', default=default_vehicle,
choices=vehicle_options, help='Vehicle folder, or ALL.')
parse_opts.add_argument('-e', '--exclude-library', action='append',
default=default_exclusions,
help='Libraries to exclude from the search.')
parse_opts.add_argument('-c', '--include-commands', action='store_true',
help='Include MAVLink commands as well as messages.')
parse_opts.add_argument('-g', '--include-stream-groups', action='store_true',
help='Include stream group message sets in the output.')
parse_opts.add_argument('-u', '--include-unsupported', action='store_true',
help='Include unsupported messages in the output.')
export_opts = parser.add_argument_group('export options')
export_opts.add_argument('-q', '--quiet', action='store_true',
help='Disable printout, only export a file.')
export_opts.add_argument('-f', '--format', default='markdown',
choices=['csv', 'markdown', 'none'],
help='Desired format for the exported file.')
export_opts.add_argument('-b', '--branch',
help=('The branch to link to in markdown mode.'
' Defaults to the branch in the working directory.'))
export_opts.add_argument('--filename', help='Override default filename.')
export_opts.add_argument('--header', help='Header for the markdown file.')
export_opts.add_argument('--no-intro', action='store_true',
help="Flag to not use the automatic markdown intro.")
args = parser.parse_args()
assert (args.vehicle in MAVLinkDetector.VEHICLES
or not args.include_stream_groups), \
'Determining stream groups requires a single vehicle to be specified.'
common_files = (MAVLinkDetector.BASE_DIR / 'libraries').glob('**/*.cpp')
messages = MAVLinkDetector(common_files, args.vehicle, args.exclude_library)
include_options = dict(
include_commands = args.include_commands,
include_stream_groups = args.include_stream_groups,
include_unsupported = args.include_unsupported,
)
if not args.quiet:
messages.printout(**include_options)
if args.format != 'none':
ext = messages.EXPORT_FILETYPES[args.format]
branch = args.branch
if not branch:
import subprocess
pattern = re.compile(r'On branch ([\S]*)')
result = subprocess.run(['git', 'status'], capture_output=True).stdout
try:
branch, = pattern.search(result.decode()).groups()
except AttributeError as e:
raise Exception(
'No --branch specified, and "git status" failed to find one.'
'Please manually specify an ardupilot firmware branch for '
'code source hyperlinks (e.g. Sub-4.1) or ensure this '
'repository copy is managed by git.'
)
filename = (
args.filename or
f'{args.vehicle}_{branch}_MAVLink_Messages.{ext}'
)
messages.export(filename, type=args.format, branch=branch, header=args.header,
use_intro=not args.no_intro, **include_options)