forked from nrfconnect/sdk-zephyr
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathnrfutil.py
128 lines (102 loc) · 4.1 KB
/
nrfutil.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
# Copyright (c) 2023 Nordic Semiconductor ASA.
#
# SPDX-License-Identifier: Apache-2.0
'''Runner for flashing with nrfutil.'''
import json
import os
from pathlib import Path
import sys
import subprocess
from runners.core import _DRY_RUN
from runners.nrf_common import NrfBinaryRunner
class NrfUtilBinaryRunner(NrfBinaryRunner):
'''Runner front-end for nrfutil.'''
def __init__(self, cfg, family, softreset, dev_id, erase=False,
reset=True, tool_opt=[], force=False, recover=False):
super().__init__(cfg, family, softreset, dev_id, erase, reset,
tool_opt, force, recover)
self._ops = []
self._op_id = 1
@classmethod
def name(cls):
return 'nrfutil'
@classmethod
def tool_opt_help(cls) -> str:
return 'Additional options for nrfutil, e.g. "--log-level"'
@classmethod
def do_create(cls, cfg, args):
return NrfUtilBinaryRunner(cfg, args.nrf_family, args.softreset,
args.dev_id, erase=args.erase,
reset=args.reset,
tool_opt=args.tool_opt, force=args.force,
recover=args.recover)
def _exec(self, args):
jout_all = []
cmd = ['nrfutil', '--json', 'device'] + args
self._log_cmd(cmd)
if _DRY_RUN:
return {}
with subprocess.Popen(cmd, stdout=subprocess.PIPE) as p:
for line in iter(p.stdout.readline, b''):
# https://github.com/ndjson/ndjson-spec
jout = json.loads(line.decode(sys.getdefaultencoding()))
jout_all.append(jout)
if 'x-execute-batch' in args:
if jout['type'] == 'batch_update':
pld = jout['data']['data']
if (
pld['type'] == 'task_progress' and
pld['data']['progress']['progressPercentage'] == 0
):
self.logger.info(pld['data']['progress']['description'])
elif jout['type'] == 'batch_end' and jout['data']['error']:
raise subprocess.CalledProcessError(
jout['data']['error']['code'], cmd
)
return jout_all
def do_get_boards(self):
out = self._exec(['list'])
devs = []
for o in out:
if o['type'] == 'task_end':
devs = o['data']['data']['devices']
snrs = [dev['serialNumber'] for dev in devs if dev['traits']['jlink']]
self.logger.debug(f'Found boards: {snrs}')
return snrs
def do_require(self):
self.require('nrfutil')
def _insert_op(self, op):
op['operationId'] = f'{self._op_id}'
self._op_id += 1
self._ops.append(op)
def _exec_batch(self):
# prepare the dictionary and convert to JSON
batch = json.dumps({'family': f'{self.family}',
'operations': [op for op in self._ops]},
indent=4) + '\n'
hex_dir = Path(self.hex_).parent
json_file = os.fspath(hex_dir / f'generated_nrfutil_batch.json')
with open(json_file, "w") as f:
f.write(batch)
# reset first in case an exception is thrown
self._ops = []
self._op_id = 1
self.logger.debug(f'Executing batch in: {json_file}')
self._exec(['x-execute-batch', '--batch-path', f'{json_file}',
'--serial-number', f'{self.dev_id}'])
def do_exec_op(self, op, force=False):
self.logger.debug(f'Executing op: {op}')
if force:
if len(self._ops) != 0:
raise RuntimeError(f'Forced exec with {len(self._ops)} ops')
self._insert_op(op)
self._exec_batch()
return True
# Defer by default
return False
def flush_ops(self, force=True):
if not force:
return
while self.ops:
self._insert_op(self.ops.popleft())
self._exec_batch()