diff --git a/appyter/execspec/implementations/docker.py b/appyter/execspec/implementations/docker.py index ac8727e8..91ee74ad 100644 --- a/appyter/execspec/implementations/docker.py +++ b/appyter/execspec/implementations/docker.py @@ -71,6 +71,7 @@ async def _run(self, **job): async for msg, done in self._submit(**job): if not done: try: yield await async_json_loads(msg) + except asyncio.CancelledError: raise except: logger.warning(traceback.format_exc()) if msg != 0: yield dict(type='error', data=f"Container exited with error code") diff --git a/appyter/ext/asyncio/subprocess.py b/appyter/ext/asyncio/subprocess.py index aeecb80a..d854f2e2 100644 --- a/appyter/ext/asyncio/subprocess.py +++ b/appyter/ext/asyncio/subprocess.py @@ -24,6 +24,15 @@ async def stream_readline_to_queue(stream, queue, chunk_size=65536): else: await queue.put((None, True)) +async def ensure_terminated(proc: asyncio.subprocess.Process): + try: + if proc.returncode is None: + proc.terminate() + await asyncio.sleep(5000) + finally: + if proc.returncode is None: + proc.kill() + async def sh(*args, chunk_size=65536, **kwargs): ''' An opinionated asyncio shell that logs stderr & yields stdout, done Unlike standard create_subprocess_exec: LimitOverrunError should not be possible. @@ -36,14 +45,19 @@ async def sh(*args, chunk_size=65536, **kwargs): limit=chunk_size * 2, **kwargs, ) - reader = asyncio.create_task(stream_readline_to_queue(proc.stdout, stdout_queue, chunk_size=chunk_size)) - while True: - msg, done = await stdout_queue.get() - if isinstance(msg, bytes): - yield msg, False - elif isinstance(msg, Exception): - raise msg - stdout_queue.task_done() - if done: break - await reader - yield await proc.wait(), True + try: + reader = asyncio.create_task(stream_readline_to_queue(proc.stdout, stdout_queue, chunk_size=chunk_size)) + while True: + msg, done = await stdout_queue.get() + if isinstance(msg, bytes): + yield msg, False + elif isinstance(msg, Exception): + raise msg + stdout_queue.task_done() + if done: break + await reader + yield await proc.wait(), True + except: + raise + finally: + asyncio.create_task(ensure_terminated(proc)) diff --git a/appyter/orchestration/dispatcher/__init__.py b/appyter/orchestration/dispatcher/__init__.py index 6a2ff61c..e401abf8 100644 --- a/appyter/orchestration/dispatcher/__init__.py +++ b/appyter/orchestration/dispatcher/__init__.py @@ -19,6 +19,7 @@ def create_app(**kwargs): PROXY=kwargs.get('proxy'), JOBS=kwargs.get('jobs'), JOBS_PER_IMAGE=kwargs.get('jobs_per_image'), + TIMEOUT=kwargs.get('timeout'), DEBUG=kwargs.get('debug'), PREFIX=kwargs.get('prefix', '').rstrip('/'), ) @@ -48,6 +49,7 @@ async def redirect_to_prefix(request): @click_option_setenv('--proxy', envvar='APPYTER_PROXY', type=bool, default=False, help='Whether this is running behind a proxy and the IP should be fixed for CORS') @click_option_setenv('--jobs', envvar='APPYTER_JOBS', type=int, default=2, help='Number of concurrent jobs to dispatch') @click_option_setenv('--jobs-per-image', envvar='APPYTER_JOBS_PER_IMAGE', type=int, default=1, help='Number of concurrent jobs to dispatch for any individual appyter image') +@click_option_setenv('--timeout', envvar='APPYTER_TIMEOUT', type=float, default=None, help='How long in seconds a job can run before it is cancelled') @click_option_setenv('--debug', envvar='APPYTER_DEBUG', type=bool, default=True, help='Whether or not we should be in debugging mode, not for use in multi-tenant situations') def dispatcher(*args, **kwargs): from appyter.ext.aiohttp import run_app diff --git a/appyter/orchestration/dispatcher/core.py b/appyter/orchestration/dispatcher/core.py index bfc8ad0a..d323dbc6 100644 --- a/appyter/orchestration/dispatcher/core.py +++ b/appyter/orchestration/dispatcher/core.py @@ -40,6 +40,7 @@ async def dispatcher_ctx(app): # app['dispatch_queue'] = asyncio.Queue() app['tasks'] = LockedOrderedDict() + timeout = app['config']['TIMEOUT'] # logger.info('Starting background tasks...') tasks = [ @@ -48,6 +49,7 @@ async def dispatcher_ctx(app): queued=app['dispatch_queue'], tasks=app['tasks'], jobs_per_image=app['config']['JOBS_PER_IMAGE'], + timeout=timeout, ) ) for _ in range(app['config']['JOBS']) @@ -69,7 +71,16 @@ async def slow_put(queue, item): await asyncio.sleep(0.5 + random.random()) await queue.put(item) -async def dispatcher(queued=None, tasks=None, jobs_per_image=1): +async def dispatch(job_id, job): + from appyter.ext.emitter import url_to_emitter + from appyter.execspec.core import url_to_executor + async with url_to_executor(job['executor']) as executor: + async with url_to_emitter(job['url']) as emitter: + logger.info(f"Dispatching job {job_id}") + async for msg in executor._run(**job): + await emitter(msg) + +async def dispatcher(queued=None, tasks=None, jobs_per_image=1, timeout=None): while True: job_id = await queued.get() async with tasks.lock: @@ -87,13 +98,7 @@ async def dispatcher(queued=None, tasks=None, jobs_per_image=1): ).isoformat() # try: - from appyter.ext.emitter import url_to_emitter - from appyter.execspec.core import url_to_executor - async with url_to_executor(job['executor']) as executor: - async with url_to_emitter(job['url']) as emitter: - logger.info(f"Dispatching job {job_id}") - async for msg in executor._run(**job): - await emitter(msg) + await asyncio.wait_for(dispatch(job_id, job), timeout=timeout) except asyncio.CancelledError: raise except: