Skip to content

Commit

Permalink
Merge branch 'timeoutv4'
Browse files Browse the repository at this point in the history
  • Loading branch information
u8sand committed Jul 2, 2024
2 parents 1b74115 + 06856d9 commit 9f11866
Show file tree
Hide file tree
Showing 4 changed files with 41 additions and 19 deletions.
1 change: 1 addition & 0 deletions appyter/execspec/implementations/docker.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ async def _run(self, **job):
async for msg, done in self._submit(**job):
if not done:
try: yield await async_json_loads(msg)
except asyncio.CancelledError: raise
except: logger.warning(traceback.format_exc())
if msg != 0:
yield dict(type='error', data=f"Container exited with error code")
36 changes: 25 additions & 11 deletions appyter/ext/asyncio/subprocess.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,15 @@ async def stream_readline_to_queue(stream, queue, chunk_size=65536):
else:
await queue.put((None, True))

async def ensure_terminated(proc: asyncio.subprocess.Process):
try:
if proc.returncode is None:
proc.terminate()
await asyncio.sleep(5000)
finally:
if proc.returncode is None:
proc.kill()

async def sh(*args, chunk_size=65536, **kwargs):
''' An opinionated asyncio shell that logs stderr & yields stdout, done
Unlike standard create_subprocess_exec: LimitOverrunError should not be possible.
Expand All @@ -36,14 +45,19 @@ async def sh(*args, chunk_size=65536, **kwargs):
limit=chunk_size * 2,
**kwargs,
)
reader = asyncio.create_task(stream_readline_to_queue(proc.stdout, stdout_queue, chunk_size=chunk_size))
while True:
msg, done = await stdout_queue.get()
if isinstance(msg, bytes):
yield msg, False
elif isinstance(msg, Exception):
raise msg
stdout_queue.task_done()
if done: break
await reader
yield await proc.wait(), True
try:
reader = asyncio.create_task(stream_readline_to_queue(proc.stdout, stdout_queue, chunk_size=chunk_size))
while True:
msg, done = await stdout_queue.get()
if isinstance(msg, bytes):
yield msg, False
elif isinstance(msg, Exception):
raise msg
stdout_queue.task_done()
if done: break
await reader
yield await proc.wait(), True
except:
raise
finally:
asyncio.create_task(ensure_terminated(proc))
2 changes: 2 additions & 0 deletions appyter/orchestration/dispatcher/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ def create_app(**kwargs):
PROXY=kwargs.get('proxy'),
JOBS=kwargs.get('jobs'),
JOBS_PER_IMAGE=kwargs.get('jobs_per_image'),
TIMEOUT=kwargs.get('timeout'),
DEBUG=kwargs.get('debug'),
PREFIX=kwargs.get('prefix', '').rstrip('/'),
)
Expand Down Expand Up @@ -48,6 +49,7 @@ async def redirect_to_prefix(request):
@click_option_setenv('--proxy', envvar='APPYTER_PROXY', type=bool, default=False, help='Whether this is running behind a proxy and the IP should be fixed for CORS')
@click_option_setenv('--jobs', envvar='APPYTER_JOBS', type=int, default=2, help='Number of concurrent jobs to dispatch')
@click_option_setenv('--jobs-per-image', envvar='APPYTER_JOBS_PER_IMAGE', type=int, default=1, help='Number of concurrent jobs to dispatch for any individual appyter image')
@click_option_setenv('--timeout', envvar='APPYTER_TIMEOUT', type=float, default=None, help='How long in seconds a job can run before it is cancelled')
@click_option_setenv('--debug', envvar='APPYTER_DEBUG', type=bool, default=True, help='Whether or not we should be in debugging mode, not for use in multi-tenant situations')
def dispatcher(*args, **kwargs):
from appyter.ext.aiohttp import run_app
Expand Down
21 changes: 13 additions & 8 deletions appyter/orchestration/dispatcher/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ async def dispatcher_ctx(app):
#
app['dispatch_queue'] = asyncio.Queue()
app['tasks'] = LockedOrderedDict()
timeout = app['config']['TIMEOUT']
#
logger.info('Starting background tasks...')
tasks = [
Expand All @@ -48,6 +49,7 @@ async def dispatcher_ctx(app):
queued=app['dispatch_queue'],
tasks=app['tasks'],
jobs_per_image=app['config']['JOBS_PER_IMAGE'],
timeout=timeout,
)
)
for _ in range(app['config']['JOBS'])
Expand All @@ -69,7 +71,16 @@ async def slow_put(queue, item):
await asyncio.sleep(0.5 + random.random())
await queue.put(item)

async def dispatcher(queued=None, tasks=None, jobs_per_image=1):
async def dispatch(job_id, job):
from appyter.ext.emitter import url_to_emitter
from appyter.execspec.core import url_to_executor
async with url_to_executor(job['executor']) as executor:
async with url_to_emitter(job['url']) as emitter:
logger.info(f"Dispatching job {job_id}")
async for msg in executor._run(**job):
await emitter(msg)

async def dispatcher(queued=None, tasks=None, jobs_per_image=1, timeout=None):
while True:
job_id = await queued.get()
async with tasks.lock:
Expand All @@ -87,13 +98,7 @@ async def dispatcher(queued=None, tasks=None, jobs_per_image=1):
).isoformat()
#
try:
from appyter.ext.emitter import url_to_emitter
from appyter.execspec.core import url_to_executor
async with url_to_executor(job['executor']) as executor:
async with url_to_emitter(job['url']) as emitter:
logger.info(f"Dispatching job {job_id}")
async for msg in executor._run(**job):
await emitter(msg)
await asyncio.wait_for(dispatch(job_id, job), timeout=timeout)
except asyncio.CancelledError:
raise
except:
Expand Down

0 comments on commit 9f11866

Please sign in to comment.