-
Notifications
You must be signed in to change notification settings - Fork 16
/
liblp.py
executable file
·340 lines (293 loc) · 12.7 KB
/
liblp.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
#!/usr/bin/python3
# This file is part of Extractor.
# Copyright (C) 2021 Security Research Labs GmbH
# SPDX-License-Identifier: Apache-2.0
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from typing import BinaryIO, List, Dict, Union
import os
import copy
import sys
import mmap
import hashlib
from construct import Struct, Int32ul, Int16ul, Int64ul, Bytes
from construct_typing import TypedContainer
def main():
breakpoint()
super_img = SuperImage(sys.argv[1])
print("PARTITIONS: %r" % super_img.get_partition_names())
for partition_name in super_img.get_partition_names():
if partition_name in ("system", "vendor", "product"):
with open("%s.img" % partition_name, 'wb') as f:
super_img.write_partition(partition_name, f)
def check_magic(f: BinaryIO):
buf = f.read(4096)
if buf != b'\0' * 4096:
return False
buf = f.read(4)
return buf == b'\x67\x44\x6c\x61'
class SuperImage:
"""
Class to parse a liblp super image.
More details at
https://android.googlesource.com/platform/system/core
=> fs_mgr/liblp/include/liblp/metadata_format.h
"""
filename: str
file_size: int
fh: BinaryIO
mmap: mmap.mmap
geometry: "LpMetadataGeometry"
metadata_header: "Union[LpMetadataHeaderV1_0, LpMetadataHeaderV1_2]"
partitions: List["LpMetadataPartition"]
extents: List["LpMetadataExtent"]
block_device: "LpMetadataBlockDevice"
partition_name_to_nr: Dict[str, int]
def __init__(self, filename: Union[str, bytes]):
self.filename = filename
self.file_size = os.stat(self.filename).st_size
self.fh = open(self.filename, 'rb')
self.mmap = mmap.mmap(self.fh.fileno(), 0, access=mmap.ACCESS_READ)
# Read and validate LpMetadataGeometry
lmg = LpMetadataGeometry.parse(self.mmap[0x1000:0x1000 + LpMetadataGeometry.sizeof()])
lmg.validate()
lmg_copy = LpMetadataGeometry.parse(self.mmap[0x2000:0x2000 + LpMetadataGeometry.sizeof()])
lmg_copy.validate()
assert lmg == lmg_copy
# Read and validate
tmp_metadata_header = LpMetadataHeaderV1_0.parse(self.mmap[0x3000:0x3000 + LpMetadataHeaderV1_0.sizeof()])
if tmp_metadata_header.header_size == LpMetadataHeaderV1_0.sizeof():
self.metadata_header = tmp_metadata_header
self.metadata_header.validate()
elif tmp_metadata_header.header_size == LpMetadataHeaderV1_2.sizeof():
self.metadata_header = LpMetadataHeaderV1_2.parse(self.mmap[0x3000:0x3000 + LpMetadataHeaderV1_2.sizeof()])
self.metadata_header.validate()
else:
raise ValueError(f"Invalid LpMetadataHeader.header_size={tmp_metadata_header.header_size}")
table_data = self.mmap[0x3000 + self.metadata_header.header_size:0x3000 + self.metadata_header.header_size + self.metadata_header.tables_size]
self.metadata_header.validate_table_data(table_data)
# Read partitions from table_data
self.partition_name_to_nr = {}
self.partitions = []
for partition_nr in range(self.metadata_header.partitions.num_entries):
pos = self.metadata_header.partitions.offset + partition_nr * LpMetadataPartition.sizeof()
partition = LpMetadataPartition.parse(table_data[pos:pos + LpMetadataPartition.sizeof()])
print("partition %d: %r" % (partition_nr, partition))
self.partitions.append(partition)
assert partition.get_name() not in self.partition_name_to_nr, "Duplicate partition %r" % partition.get_name()
self.partition_name_to_nr[partition.get_name()] = partition_nr
# Read extents from table_data
self.extents = []
for extent_nr in range(self.metadata_header.extents.num_entries):
pos = self.metadata_header.extents.offset + extent_nr * LpMetadataExtent.sizeof()
extent = LpMetadataExtent.parse(table_data[pos:pos + LpMetadataExtent.sizeof()])
print("Extent %d: %r" % (extent_nr, extent))
self.extents.append(extent)
# Read block devices from table_data
assert self.metadata_header.block_devices.num_entries == 1, "Not exactly one block device: self.metadata_header.block_devices.num_entries=%r" % self.metadata_header.block_devices.num_entries
pos = self.metadata_header.block_devices.offset
self.block_device = LpMetadataBlockDevice.parse(table_data[pos:pos + LpMetadataBlockDevice.sizeof()])
assert self.block_device.alignment % 512 == 0
def get_partition_names(self) -> List[str]:
return [partition.get_name() for partition in self.partitions]
def close(self):
self.mmap.close()
self.fh.close()
def write_partition(self, partition_name: str, f: BinaryIO):
partition_nr = self.partition_name_to_nr[partition_name]
partition = self.partitions[partition_nr]
assert partition.num_extents == 1, "Not exactly one extent: %d" % partition.num_extents
extent = self.extents[partition.first_extent_index]
assert extent.target_source == 0
assert extent.target_type == 0 # LP_TARGET_TYPE_LINEAR
start_pos = extent.target_data * 512
assert start_pos % self.block_device.alignment == 0, "Alignment error: start_pos=%d self.block_device.alignment=%d offset=%r" % (start_pos, self.block_device.alignment, start_pos % self.block_device.alignment)
end_pos = start_pos + extent.num_sectors * 512
pos = start_pos
while pos < end_pos:
next_pos = min(end_pos, pos + 1024**2)
f.write(self.mmap[pos:next_pos])
pos = next_pos
class LpMetadataGeometry(TypedContainer):
magic: int
struct_size: int
checksum: bytes
metadata_max_size: int
metadata_slot_count: int
logical_block_size: int
# noinspection PyUnresolvedReferences
construct_struct = Struct(
"magic" / Int32ul,
"struct_size" / Int32ul,
"checksum" / Bytes(32),
"metadata_max_size" / Int32ul,
"metadata_slot_count" / Int32ul,
"logical_block_size" / Int32ul,
)
def validate(self):
assert self.magic == 0x616c4467
assert self.struct_size == LpMetadataGeometry.sizeof()
tmp = copy.copy(self)
tmp.checksum = b'\0' * 32
tmp_encoded = tmp.build()
digest = hashlib.sha256(tmp_encoded).digest()
assert self.checksum == digest
assert LpMetadataGeometry.sizeof() == 52
class LpMetadataTableDescriptor(TypedContainer):
offset: int
num_entries: int
entry_size: int
# noinspection PyUnresolvedReferences
construct_struct = Struct(
"offset" / Int32ul,
"num_entries" / Int32ul,
"entry_size" / Int32ul
)
assert LpMetadataTableDescriptor.sizeof() == 12
class LpMetadataHeaderV1_0(TypedContainer):
magic: int
major_version: int
minor_version: int
header_size: int
header_checksum: bytes
tables_size: int
tables_checksum: bytes
partitions: LpMetadataTableDescriptor
extents: LpMetadataTableDescriptor
groups: LpMetadataTableDescriptor
block_devices: LpMetadataTableDescriptor
# flags: int
# reserved: bytes
# noinspection PyUnresolvedReferences
construct_struct = Struct(
"magic" / Int32ul,
"major_version" / Int16ul,
"minor_version" / Int16ul,
"header_size" / Int32ul,
"header_checksum" / Bytes(32),
"tables_size" / Int32ul,
"tables_checksum" / Bytes(32),
"partitions" / LpMetadataTableDescriptor.as_inner_type(),
"extents" / LpMetadataTableDescriptor.as_inner_type(),
"groups" / LpMetadataTableDescriptor.as_inner_type(),
"block_devices" / LpMetadataTableDescriptor.as_inner_type()
)
def validate(self):
assert self.magic == 0x414C5030
assert self.header_size == LpMetadataHeaderV1_0.sizeof(), "Bad LpMetadataHeaderV1_0.header_size %d, should be %d" % (self.header_size, LpMetadataHeaderV1_0.sizeof())
tmp = copy.copy(self)
tmp.header_checksum = b'\0' * 32
tmp_encoded = tmp.build()
digest = hashlib.sha256(tmp_encoded).digest()
assert self.header_checksum == digest
assert self.partitions.entry_size == LpMetadataPartition.sizeof(), "Bad LpMetadataHeaderV1_0.partitions.entry_size %d, should be %d" % (self.partitions.entry_size, LpMetadataPartition.sizeof())
assert self.extents.entry_size == LpMetadataExtent.sizeof(), "Bad LpMetadataHeaderV1_0.extents.entry_size %d, should be %d" % (self.extents.entry_size, LpMetadataExtent.sizeof())
assert self.tables_size < 1e6
def validate_table_data(self, buf: bytes):
assert len(buf) == self.tables_size
digest = hashlib.sha256(buf).digest()
assert self.tables_checksum == digest
class LpMetadataHeaderV1_2(TypedContainer):
magic: int
major_version: int
minor_version: int
header_size: int
header_checksum: bytes
tables_size: int
tables_checksum: bytes
partitions: LpMetadataTableDescriptor
extents: LpMetadataTableDescriptor
groups: LpMetadataTableDescriptor
block_devices: LpMetadataTableDescriptor
flags: int
# flags: int
# reserved: bytes
# noinspection PyUnresolvedReferences
construct_struct = Struct(
"magic" / Int32ul,
"major_version" / Int16ul,
"minor_version" / Int16ul,
"header_size" / Int32ul,
"header_checksum" / Bytes(32),
"tables_size" / Int32ul,
"tables_checksum" / Bytes(32),
"partitions" / LpMetadataTableDescriptor.as_inner_type(),
"extents" / LpMetadataTableDescriptor.as_inner_type(),
"groups" / LpMetadataTableDescriptor.as_inner_type(),
"block_devices" / LpMetadataTableDescriptor.as_inner_type(),
"flags" / Int32ul,
"_reserved" / Bytes(124)
)
def validate(self):
assert self.magic == 0x414C5030
assert self.header_size == LpMetadataHeaderV1_2.sizeof(), "Bad LpMetadataHeaderV1_2.header_size %d, should be %d" % (self.header_size, LpMetadataHeaderV1_0.sizeof())
tmp = copy.copy(self)
tmp.header_checksum = b'\0' * 32
tmp_encoded = tmp.build()
digest = hashlib.sha256(tmp_encoded).digest()
assert self.header_checksum == digest
assert self.partitions.entry_size == LpMetadataPartition.sizeof(), "Bad LpMetadataHeaderV1_2.partitions.entry_size %d, should be %d" % (self.partitions.entry_size, LpMetadataPartition.sizeof())
assert self.extents.entry_size == LpMetadataExtent.sizeof(), "Bad LpMetadataHeaderV1_2.extents.entry_size %d, should be %d" % (self.extents.entry_size, LpMetadataExtent.sizeof())
assert self.tables_size < 1e6
def validate_table_data(self, buf: bytes):
assert len(buf) == self.tables_size
digest = hashlib.sha256(buf).digest()
assert self.tables_checksum == digest
class LpMetadataPartition(TypedContainer):
name: bytes
attributes: int
first_extent_index: int
num_extents: int
group_index: int
# noinspection PyUnresolvedReferences
construct_struct = Struct(
"name" / Bytes(36),
"attributes" / Int32ul,
"first_extent_index" / Int32ul,
"num_extents" / Int32ul,
"group_index" / Int32ul
)
def get_name(self) -> str:
return self.name.rstrip(b'\0').decode()
assert LpMetadataPartition.sizeof() == 52
class LpMetadataExtent(TypedContainer):
num_sectors: int
target_type: int
target_data: int
target_source: int
# noinspection PyUnresolvedReferences
construct_struct = Struct(
"num_sectors" / Int64ul,
"target_type" / Int32ul,
"target_data" / Int64ul,
"target_source" / Int32ul,
)
assert LpMetadataExtent.sizeof() == 24
class LpMetadataBlockDevice(TypedContainer):
first_logical_sector: int
alignment: int
alignment_offset: int
size: int
partition_name: bytes
flags: int
# noinspection PyUnresolvedReferences
construct_struct = Struct(
"first_logical_sector" / Int64ul,
"alignment" / Int32ul,
"alignment_offset" / Int32ul,
"size" / Int64ul,
"partition_name" / Bytes(36),
"flags" / Int32ul
)
assert LpMetadataBlockDevice.sizeof() == 64
if __name__ == "__main__":
main()