Skip to content

Commit

Permalink
if handshake fail,try again pair
Browse files Browse the repository at this point in the history
  • Loading branch information
YueChen-C committed Jul 7, 2021
1 parent 1f0db97 commit fee4bfb
Showing 1 changed file with 30 additions and 16 deletions.
46 changes: 30 additions & 16 deletions ios_device/util/lockdown.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,11 +60,14 @@ def __init__(
self.session_id = None
self.host_id = str(uuid.uuid3(uuid.NAMESPACE_DNS, platform.node())).upper()
self.svc = PlistService(62078, udid, device, network=network)
self.udid = self.svc.device.serial
self.device = device
self.network = network
self._verify_query_type()
self.device_info = self.get_value()
self.device_info['UniqueDeviceID'] = self.svc.device.serial
self.device_info['DeviceID'] = self.svc.device.device_id
self.paired = self._pair()
self.paired = None
log.info(f"Connecting Device {self.svc.device.serial} ")

def _verify_query_type(self):
Expand All @@ -83,7 +86,7 @@ def identifier(self):
def _pair(self):
if self._validate_pairing():
return True
self._pair_full()
self.pair_full()
self.svc.close()
self.svc = PlistService(62078, self.udid, self.svc.device, network=self.network)
if self._validate_pairing():
Expand All @@ -102,14 +105,12 @@ def _get_pair_record(self) -> Optional[Dict[str, Any]]:
log.debug(f'{E}')
log.debug(f'No iTunes pairing record found for device {self.identifier}')
log.debug('Getting pair record from usbmuxd')
finally:
with UsbmuxdClient() as usb:
return usb.get_pair_record(self.udid)
return None

def _validate_pairing(self):
pair_record = self._get_pair_record()
if not pair_record:
return False
pair_record = self._get_pair_record() or {}
self.record = pair_record
if self.ios_version < LooseVersion('11.0'): # 11 以下需要双向认证
resp = self._plist_request('ValidatePair', PairRecord=pair_record)
Expand All @@ -126,20 +127,28 @@ def _validate_pairing(self):
if resp['Error'] == 'InvalidHostID':
with UsbmuxdClient() as usb:
usb.delete_pair_record(self.udid)
pair_record = self._get_pair_record() or self._pair_full()
pair_record = self._get_pair_record() or self.pair_full()
if not pair_record:
return False

if resp.get('EnableSessionSSL'):
self.sslfile = write_home_file(
self.cache_dir,
f'{self.identifier}_ssl.pem',
pair_record['HostCertificate'] + b'\n' + pair_record['HostPrivateKey']
)
self.svc.ssl_start(self.sslfile, self.sslfile)
if not pair_record:
self.sslfile = get_home_path(self.cache_dir,f'{self.identifier}.pem')

else:
self.sslfile = write_home_file(
self.cache_dir,
f'{self.identifier}.pem',
pair_record['HostCertificate'] + b'\n' + pair_record['HostPrivateKey']
)
try:
self.svc.ssl_start(self.sslfile, self.sslfile)
except OSError:
self.svc = PlistService(62078, self.udid, self.device, network=self.network)
return False
return True

def _pair_full(self):
def pair_full(self):
device_public_key = self.get_value(key='DevicePublicKey')
wifi_address = self.get_value(key="WiFiAddress")
with UsbmuxdClient() as usb:
Expand Down Expand Up @@ -170,9 +179,13 @@ def _pair_full(self):
pair_record['HostPrivateKey'] = priv_key_pem
pair_record['EscrowBag'] = pair.get('EscrowBag')
pair_record['WiFiMACAddress'] = wifi_address
write_home_file(self.cache_dir, '%s.plist' % self.identifier, plistlib.dumps(pair_record))
with UsbmuxdClient() as usb:
usb.save_pair_record(self.udid, pair_record, self.device_id)
write_home_file(
self.cache_dir,
f'{self.identifier}.pem',
pair_record['HostCertificate'] + b'\n' + pair_record['HostPrivateKey']
)
return pair_record
elif pair and pair.get('Error') == 'PasswordProtected':
self.svc.close()
Expand Down Expand Up @@ -230,7 +243,7 @@ def enable_wireless(self, enable, wireless_id=None, buddy_id=None):

def _start_service(self, name: str, escrow_bag=None) -> PlistService:
if not self.paired:
raise NotPairedError(f'Unable to start service={name!r} - not paired')
self._pair()
elif not name:
raise ValueError('Name must be a valid string')

Expand Down Expand Up @@ -368,6 +381,7 @@ def read_home_file(foldername: str, filename: str) -> Optional[bytes]:

def write_home_file(foldername: str, filename: str, data: bytes) -> str:
path = get_home_path(foldername, filename)
log.info(f'save path :{path}')
with path.open('wb') as f:
f.write(data)
return path.as_posix()
Expand Down

0 comments on commit fee4bfb

Please sign in to comment.