forked from chenxiaolong/avbroot
-
Notifications
You must be signed in to change notification settings - Fork 0
/
avbroot.py
308 lines (240 loc) · 9.88 KB
/
avbroot.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
#!/usr/bin/env python3
from avbroot import external
import argparse
import os
import shutil
import tempfile
import zipfile
import avbtool
from avbroot import boot
from avbroot import openssl
from avbroot import ota
from avbroot import util
from avbroot import vbmeta
PATH_METADATA_PB = 'META-INF/com/android/metadata.pb'
PATH_PAYLOAD = 'payload.bin'
PATH_PROPERTIES = 'payload_properties.txt'
SKIP_PATHS = (
PATH_METADATA_PB,
'META-INF/com/android/metadata',
'META-INF/com/android/otacert',
)
def print_status(*args, **kwargs):
print('\x1b[1m*****', *args, '*****\x1b[0m', **kwargs)
def get_images(manifest):
boot_image = 'boot'
vendor_boot_image = None
for p in manifest.partitions:
# Devices launching with Android 13 use a GKI init_boot ramdisk
if p.partition_name == 'init_boot':
boot_image = p.partition_name
# Older devices may not have vendor_boot
elif p.partition_name == 'vendor_boot':
vendor_boot_image = p.partition_name
images = ['vbmeta', boot_image]
if vendor_boot_image is not None:
images.append(vendor_boot_image)
return images, boot_image
def patch_ota_payload(f_in, f_out, file_size, magisk, privkey_avb, privkey_ota,
cert_ota):
with tempfile.TemporaryDirectory() as temp_dir:
extract_dir = os.path.join(temp_dir, 'extract')
patch_dir = os.path.join(temp_dir, 'patch')
payload_dir = os.path.join(temp_dir, 'payload')
os.mkdir(extract_dir)
os.mkdir(patch_dir)
os.mkdir(payload_dir)
version, manifest, blob_offset = ota.parse_payload(f_in)
images, boot_image = get_images(manifest)
print_status('Extracting', ', '.join(images), 'from the payload')
ota.extract_images(f_in, manifest, blob_offset, extract_dir, images)
boot_patches = [boot.MagiskRootPatch(magisk)]
vendor_boot_patches = [boot.OtaCertPatch(magisk, cert_ota)]
# Older devices don't have a vendor_boot
if 'vendor_boot' not in images:
boot_patches.extend(vendor_boot_patches)
vendor_boot_patches.clear()
avb = avbtool.Avb()
print_status('Patching boot image')
boot.patch_boot(
avb,
os.path.join(extract_dir, f'{boot_image}.img'),
os.path.join(patch_dir, f'{boot_image}.img'),
privkey_avb,
True,
boot_patches,
)
if vendor_boot_patches:
print_status('Patching vendor_boot image')
boot.patch_boot(
avb,
os.path.join(extract_dir, 'vendor_boot.img'),
os.path.join(patch_dir, 'vendor_boot.img'),
privkey_avb,
True,
vendor_boot_patches,
)
print_status('Building new root vbmeta image')
vbmeta.patch_vbmeta_root(
avb,
[os.path.join(patch_dir, f'{i}.img')
for i in images if i != 'vbmeta'],
os.path.join(extract_dir, 'vbmeta.img'),
os.path.join(patch_dir, 'vbmeta.img'),
privkey_avb,
manifest.block_size,
)
print_status('Updating OTA payload to reference patched images')
return ota.patch_payload(
f_in,
f_out,
version,
manifest,
blob_offset,
payload_dir,
{i: os.path.join(patch_dir, f'{i}.img') for i in images},
file_size,
privkey_ota,
)
def patch_ota_zip(f_zip_in, f_zip_out, magisk, privkey_avb, privkey_ota,
cert_ota):
with (
zipfile.ZipFile(f_zip_in, 'r') as z_in,
zipfile.ZipFile(f_zip_out, 'w') as z_out,
):
infolist = z_in.infolist()
missing = {PATH_METADATA_PB, PATH_PAYLOAD, PATH_PROPERTIES}
i_payload = -1
i_properties = -1
for i, info in enumerate(infolist):
if info.filename in missing:
missing.remove(info.filename)
if info.filename == PATH_PAYLOAD:
i_payload = i
elif info.filename == PATH_PROPERTIES:
i_properties = i
if not missing and i_payload >= 0 and i_properties >= 0:
break
if missing:
raise Exception(f'Missing files in zip: {missing}')
# Ensure payload is processed before properties
if i_payload > i_properties:
infolist[i_payload], infolist[i_properties] = \
infolist[i_properties], infolist[i_payload]
properties = None
metadata = None
for info in z_in.infolist():
# The existing metadata is needed to generate a new signed zip
if info.filename == PATH_METADATA_PB:
with z_in.open(info, 'r') as f_in:
metadata = f_in.read()
# Skip files that are created during zip signing
if info.filename in SKIP_PATHS:
print_status('Skipping', info.filename)
continue
# Copy other files, patching if needed
with (
z_in.open(info, 'r') as f_in,
z_out.open(info, 'w') as f_out,
):
if info.filename == PATH_PAYLOAD:
print_status('Patching', info.filename)
if info.compress_type != zipfile.ZIP_STORED:
raise Exception(f'{info.filename} is not stored uncompressed')
properties = patch_ota_payload(
f_in,
f_out,
info.file_size,
magisk,
privkey_avb,
privkey_ota,
cert_ota,
)
elif info.filename == PATH_PROPERTIES:
print_status('Patching', info.filename)
if info.compress_type != zipfile.ZIP_STORED:
raise Exception(f'{info.filename} is not stored uncompressed')
f_out.write(properties)
else:
print_status('Copying', info.filename)
shutil.copyfileobj(f_in, f_out)
return metadata
def patch_subcommand(args):
output = args.output
if output is None:
output = args.input + '.patched'
# Decrypt keys to temp directory in RAM
with tempfile.TemporaryDirectory(dir='/dev/shm') as key_dir:
print_status('Decrypting keys to RAM-based temporary directory')
# avbtool requires a PEM-encoded private key
dec_privkey_avb = os.path.join(key_dir, 'avb.key')
openssl.decrypt_key(args.privkey_avb, dec_privkey_avb)
# AOSP's OTA utils require a DER-encoded private key with the `.pk8`
# extension and a PEM-encoded certificate with the `.x509.pem` extension
dec_privkey_ota = os.path.join(key_dir, 'ota.pk8')
key_prefix_ota = dec_privkey_ota[:-4]
openssl.decrypt_key(args.privkey_ota, dec_privkey_ota, out_form='DER')
cert_ota = os.path.join(key_dir, 'ota.x509.pem')
shutil.copyfile(args.cert_ota, cert_ota)
# Ensure that the certificate matches the private key
if not openssl.cert_matches_key(cert_ota, dec_privkey_ota):
raise Exception('OTA certificate does not match private key')
with tempfile.NamedTemporaryFile() as temp_unsigned:
metadata = patch_ota_zip(
args.input,
temp_unsigned,
args.magisk,
dec_privkey_avb,
dec_privkey_ota,
cert_ota,
)
print_status('Signing OTA zip')
with util.open_output_file(output) as temp_signed:
ota.sign_zip(
temp_unsigned.name,
temp_signed.name,
key_prefix_ota,
metadata,
)
def extract_subcommand(args):
with zipfile.ZipFile(args.input, 'r') as z:
info = z.getinfo(PATH_PAYLOAD)
with z.open(info, 'r') as f:
_, manifest, blob_offset = ota.parse_payload(f)
images, _ = get_images(manifest)
print_status('Extracting', ', '.join(images), 'from the payload')
ota.extract_images(f, manifest, blob_offset, args.directory, images)
def parse_args():
parser = argparse.ArgumentParser()
subparsers = parser.add_subparsers(dest='subcommand', required=True,
help='Subcommands')
patch = subparsers.add_parser('patch', help='Patch a full OTA zip')
patch.add_argument('--input', required=True,
help='Path to original raw payload or OTA zip')
patch.add_argument('--output',
help='Path to new raw payload or OTA zip')
patch.add_argument('--magisk', required=True,
help='Path to Magisk API')
patch.add_argument('--privkey-avb', required=True,
help='Private key for signing root vbmeta image')
patch.add_argument('--privkey-ota', required=True,
help='Private key for signing OTA payload')
patch.add_argument('--cert-ota', required=True,
help='Certificate for OTA payload signing key')
extract = subparsers.add_parser(
'extract', help='Extract patched images from a patched OTA zip')
extract.add_argument('--input', required=True,
help='Path to patched OTA zip')
extract.add_argument('--directory', default='.',
help='Output directory for extracted images')
return parser.parse_args()
def main():
args = parse_args()
if args.subcommand == 'patch':
patch_subcommand(args)
elif args.subcommand == 'extract':
extract_subcommand(args)
else:
raise NotImplementedError()
if __name__ == '__main__':
main()