Skip to content

Commit

Permalink
Begin re-implementation of proxy and heartbeat 'bypass'
Browse files Browse the repository at this point in the history
  • Loading branch information
dvingerh committed Jun 17, 2022
1 parent 5c3856e commit 279d417
Show file tree
Hide file tree
Showing 6 changed files with 60 additions and 171 deletions.
4 changes: 3 additions & 1 deletion pyinstalive/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,6 @@ class Config:
ffmpeg_path = None
log_to_file = True
no_assemble = False
use_locks = True
use_locks = True
send_heartbeat = True
proxy = None
19 changes: 10 additions & 9 deletions pyinstalive/download.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from . import live
from .constants import Constants

import json
import threading
import os
import time
Expand Down Expand Up @@ -141,8 +142,8 @@ def download_livestream(self):
max_connection_error_retry=3,
duplicate_etag_retry=30,
mpd_download_timeout=3,
callback_check=self.update_stream_data,
download_timeout=3,
callback_check=api.do_heartbeat,
ffmpeg_binary=globals.config.ffmpeg_path)

self.livestream_owner = self.livestream_object_init.get('broadcast_owner').get("username")
Expand Down Expand Up @@ -171,7 +172,6 @@ def download_livestream(self):
logger.info('Downloading livestream, press [CTRL+C] to abort.')
logger.separator()
self.update_stream_data()
helpers.print_durations()
self.tasks_worker = threading.Thread(target=helpers.handle_tasks_worker)
self.tasks_worker.daemon = True
self.tasks_worker.start()
Expand All @@ -180,7 +180,7 @@ def download_livestream(self):
logger.separator()
logger.binfo("The livestream has been ended.")
logger.separator()
helpers.print_durations(download_ended=True)
helpers.print_durations()
logger.separator()
self.finish_download()
except Exception as e:
Expand All @@ -190,7 +190,7 @@ def download_livestream(self):
logger.separator()
logger.binfo('The process was aborted by the user.')
logger.separator()
helpers.print_durations(download_ended=True)
helpers.print_durations()
logger.separator()
if not self.downloader_object.is_aborted:
self.downloader_object.stop()
Expand Down Expand Up @@ -270,12 +270,13 @@ def get_guest_status(self):
def update_stream_data(self, from_thread=False):
if not self.download_stop:
if not self.livestream_object:
self.livestream_object = api.get_stream_data()
self.livestream_object['initial_buffered_duration'] = self.downloader_object.initial_buffered_duration
self.livestream_object["delay"] = int(globals.download.timestamp) - int(self.livestream_object.get("published_time"))
self.livestream_object = api.get_stream_data()
if not self.livestream_object.get("initial_buffered_duration", None):
self.livestream_object['initial_buffered_duration'] = self.downloader_object.initial_buffered_duration
self.livestream_object["delay"] = int(globals.download.timestamp) - int(self.livestream_object.get("published_time"))

last_stream_status = self.livestream_object.get("broadcast_status")
stream_heartbeat = api.do_heartbeat()
stream_heartbeat = api.do_heartbeat() if globals.config.send_heartbeat else api.get_stream_data()
stream_status = stream_heartbeat.get("broadcast_status")
if stream_heartbeat.get("status") == "fail":
logger.separator()
Expand All @@ -294,7 +295,7 @@ def update_stream_data(self, from_thread=False):
if not from_thread or (last_stream_status != stream_status):
if from_thread:
logger.separator()
helpers.print_durations()
logger.info('Airing time : {}'.format(helpers.get_stream_duration("airtime")))
logger.info('Status : {}'.format(self.livestream_object.get("broadcast_status", "?").capitalize()))
logger.info('Viewers : {}'.format( int(self.livestream_object.get("viewer_count", "?"))))
return self.livestream_object.get('broadcast_status') not in ['active', 'interrupted']
28 changes: 20 additions & 8 deletions pyinstalive/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,11 @@
import subprocess
import shlex
import shutil
import requests

from urllib.parse import urlparse

from requests import request

from . import globals
from . import logger
Expand Down Expand Up @@ -143,19 +148,16 @@ def get_stream_duration(duration_type="airtime"):
if stream_started_secs < 0:
stream_started_secs = 0

stream_duration_str = '{} minute{}'.format(stream_started_mins, 's' if stream_started_mins > 1 else '')
if stream_started_secs:
stream_duration_str += ' and {} seconds'.format(stream_started_secs)
stream_duration_str = '{} minute{}'.format(stream_started_mins, 's' if stream_started_mins != 0 else '') + ' and {} second{}'.format(stream_started_secs, 's' if stream_started_secs != 0 else '')
return stream_duration_str
except Exception as e:
print(str(e))
return "?"

def print_durations(download_ended=False):
def print_durations():
logger.info('Airing time : {}'.format(get_stream_duration("airtime")))
if download_ended:
logger.info('Downloaded : {}'.format(get_stream_duration("download")))
logger.info('Missing : {}'.format(get_stream_duration("missing")))
logger.info('Downloaded : {}'.format(get_stream_duration("download")))
logger.info('Missing : {}'.format(get_stream_duration("missing")))


def command_exists(command):
Expand Down Expand Up @@ -272,7 +274,7 @@ def show_info():

if os.path.exists(globals.config.config_path):
logger.whiteline()
logger.info("Configuration file contents:")
logger.info("Configuration file:")
logger.whiteline()
with open(globals.config.config_path) as f:
for line in f:
Expand All @@ -281,3 +283,13 @@ def show_info():
logger.error("Configuration file: Not found")
logger.whiteline()
logger.info("End of PyInstaLive information screen.")

def test_proxy():
parsed_url = urlparse(globals.config.proxy)
if parsed_url.netloc and parsed_url.scheme:
proxy = {"https": '{0!s}'.format(parsed_url.netloc)}
try:
response = requests.get(Constants.BASE_WEB, headers=Constants.BASE_HEADERS, proxies=proxy, timeout=10)
return proxy if response.status_code == 200 else None
except Exception as e:
return None
157 changes: 5 additions & 152 deletions pyinstalive/live.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
import shutil
import subprocess

from . import globals

import requests
import urllib.parse as compat_urlparse

Expand All @@ -27,18 +29,13 @@

class Downloader(object):
"""Downloads and assembles a given IG live stream"""

USER_AGENT = 'Instagram 10.26.0 (iPhone8,1; iOS 10_2; en_US; en-US; ' \
'scale=2.00; gamut=normal; 750x1334) AppleWebKit/420+'
MPD_DOWNLOAD_TIMEOUT = 2
DOWNLOAD_TIMEOUT = 15
DUPLICATE_ETAG_RETRY = 30
MAX_CONNECTION_ERROR_RETRY = 10
SLEEP_INTERVAL_BEFORE_RETRY = 5

def __init__(self, mpd, output_dir, callback_check=None, singlethreaded=False, user_agent=None, **kwargs):
def __init__(self, mpd, output_dir, callback_check=None, singlethreaded=False, **kwargs):
"""
:param mpd: URL to mpd
:param output_dir: folder to store the downloaded files
:param callback_check: callback function that can be used to check
Expand All @@ -62,7 +59,6 @@ def __init__(self, mpd, output_dir, callback_check=None, singlethreaded=False, u
self.singlethreaded = singlethreaded
self.stream_id = ''
self.segment_meta = {}
self.user_agent = user_agent or self.USER_AGENT
self.mpd_download_timeout = kwargs.pop('mpd_download_timeout', None) or self.MPD_DOWNLOAD_TIMEOUT
self.download_timeout = kwargs.pop('download_timeout', None) or self.DOWNLOAD_TIMEOUT
self.duplicate_etag_retry = kwargs.pop('duplicate_etag_retry', None) or self.DUPLICATE_ETAG_RETRY
Expand All @@ -71,7 +67,8 @@ def __init__(self, mpd, output_dir, callback_check=None, singlethreaded=False, u
self.sleep_interval_before_retry = (kwargs.pop('sleep_interval_before_retry', None)
or self.SLEEP_INTERVAL_BEFORE_RETRY)

session = requests.Session()
session = globals.session.session

adapter = requests.adapters.HTTPAdapter(max_retries=2, pool_maxsize=25)
session.mount('http://', adapter)
session.mount('https://', adapter)
Expand All @@ -80,9 +77,6 @@ def __init__(self, mpd, output_dir, callback_check=None, singlethreaded=False, u
# to store the duration of the initial buffered sgements available
self.initial_buffered_duration = 0.0

# custom ffmpeg binary path, fallback to ffmpeg_binary path in env if available
self.ffmpeg_binary = kwargs.pop('ffmpeg_binary', None) or os.getenv('FFMPEG_BINARY', 'ffmpeg')

def _store_segment_meta(self, segment, representation):
if segment not in self.segment_meta:
self.segment_meta[segment] = representation
Expand Down Expand Up @@ -153,7 +147,6 @@ def _download_mpd(self):
"""Downloads the mpd stream info and returns the xml object."""
logger.debug('Requesting {0!s}'.format(self.mpd))
res = self.session.get(self.mpd, headers={
'User-Agent': self.user_agent,
'Accept': '*/*',
}, timeout=self.mpd_download_timeout)
res.raise_for_status()
Expand Down Expand Up @@ -321,7 +314,6 @@ def _download(self, target, output, timeout=None, init_chunk=None):
for i in range(1, retry_attempts + 1):
try:
res = self.session.get(target, headers={
'User-Agent': self.user_agent,
'Accept': '*/*',
}, timeout=timeout or self.download_timeout)
res.raise_for_status()
Expand Down Expand Up @@ -354,142 +346,3 @@ def _get_file_index(filename):
if mobj:
return int(mobj.group('idx'))
return -1

def stitch(self, output_filename,
skipffmpeg=False,
cleartempfiles=True):
"""
Combines all the dowloaded stream segments into the final mp4 file.
:param output_filename: Output file path
:param skipffmpeg: bool flag to not use ffmpeg to join audio and video file into final mp4
:param cleartempfiles: bool flag to remove downloaded and temp files
"""
if not self.stream_id:
raise ValueError('No stream ID found.')

has_ffmpeg_error = False
files_generated = []

all_segments = sorted(
self.segment_meta.keys(),
key=lambda x: self._get_file_index(x)) # pylint: disable=unnecessary-lambda
prev_res = ''
sources = []
audio_stream_format = 'source_{0}_{1}_m4a.tmp'
video_stream_format = 'source_{0}_{1}_m4v.tmp'
video_stream = ''
audio_stream = ''

# Iterate through all the segments and generate a pair of source files
# for each time a resolution change is detected
for segment in all_segments:

video_stream = os.path.join(
self.output_dir, video_stream_format.format(self.stream_id, len(sources)))
audio_stream = os.path.join(
self.output_dir, audio_stream_format.format(self.stream_id, len(sources)))

if not os.path.isfile(os.path.join(self.output_dir, segment)):
logger.warning('Segment not found: {0!s}'.format(segment))
continue

if not os.path.isfile(os.path.join(self.output_dir, segment.replace('.m4v', '.m4a'))):
logger.warning('Segment not found: {0!s}'.format(segment.replace('.m4v', '.m4a')))
continue

if prev_res and prev_res != self.segment_meta[segment]:
# resolution change detected
# push current generated file pair into sources
sources.append({'video': video_stream, 'audio': audio_stream})
video_stream = os.path.join(
self.output_dir, video_stream_format.format(self.stream_id, len(sources)))
audio_stream = os.path.join(
self.output_dir, audio_stream_format.format(self.stream_id, len(sources)))

prev_res = self.segment_meta[segment]
file_mode = 'ab' if os.path.exists(video_stream) else 'wb'
seg_file = os.path.join(self.output_dir, segment)

with open(video_stream, file_mode) as outfile,\
open(seg_file, 'rb') as readfile:
shutil.copyfileobj(readfile, outfile)
logger.debug(
'Assembling video stream {0!s} => {1!s}'.format(segment, video_stream))

with open(audio_stream, file_mode) as outfile,\
open(seg_file.replace('.m4v', '.m4a'), 'rb') as readfile:
shutil.copyfileobj(readfile, outfile)
logger.debug(
'Assembling audio stream {0!s} => {1!s}'.format(segment, audio_stream))

if audio_stream and video_stream:
# push last pair into source
sources.append({'video': video_stream, 'audio': audio_stream})

if len(sources) > 1:
logger.warning(
'Stream has sections with different resolutions.\n'
'{0:d} mp4 files will be generated in total.'.format(len(sources)))

if not skipffmpeg:
for n, source in enumerate(sources):

if len(sources) == 1:
# use supplied output filename as-is if it's the only one
generated_filename = output_filename
else:
# Generate a new filename by appending n+1
# to the original specified output filename
# so that it looks like output-1.mp4, output-2.mp4, etc
dir_name = os.path.dirname(output_filename)
file_name = os.path.basename(output_filename)
dot_pos = file_name.rfind('.')
if dot_pos >= 0:
filename_no_ext = file_name[0:dot_pos]
ext = file_name[dot_pos:]
else:
filename_no_ext = file_name
ext = ''
generated_filename = os.path.join(
dir_name, '{0!s}-{1:d}{2!s}'.format(filename_no_ext, n + 1, ext))

ffmpeg_loglevel = 'error'
if logger.level == logging.DEBUG:
ffmpeg_loglevel = 'warning'
cmd = [
self.ffmpeg_binary, '-y',
'-loglevel', ffmpeg_loglevel,
'-i', source['audio'],
'-i', source['video'],
'-c:v', 'copy',
'-c:a', 'copy',
generated_filename]
exit_code = subprocess.call(cmd)

if exit_code:
logger.error('ffmpeg exited with the code: {0!s}'.format(exit_code))
logger.error('Command: {0!s}'.format(' '.join(cmd)))
has_ffmpeg_error = True
else:
files_generated.append(generated_filename)
if cleartempfiles and not skipffmpeg:
# Don't del source*.tmp files if not using ffmpeg
# so that user can still use the source* files with another
# tool such as avconv
for f in (source['audio'], source['video']):
try:
os.remove(f)
except (IOError, OSError) as ioe:
logger.warning('Error removing {0!s}: {1!s}'.format(f, str(ioe)))

if cleartempfiles and not has_ffmpeg_error:
# Specifically only remove this stream's segment files
for seg in all_segments:
for f in (seg, seg.replace('.m4v', '.m4a')):
try:
os.remove(os.path.join(self.output_dir, f))
except (IOError, OSError) as ioe:
logger.warning('Error removing {0!s}: {1!s}'.format(f, str(ioe)))

return files_generated
Loading

0 comments on commit 279d417

Please sign in to comment.