Skip to content

Commit

Permalink
LINTed psws folder
Browse files Browse the repository at this point in the history
  • Loading branch information
WillAyd committed Jul 10, 2018
1 parent e224cc4 commit 5244297
Show file tree
Hide file tree
Showing 3 changed files with 151 additions and 124 deletions.
123 changes: 64 additions & 59 deletions tabpy-server/tabpy_server/psws/callbacks.py
Original file line number Diff line number Diff line change
@@ -1,25 +1,25 @@
import os
import logging
import sys
import base64
from time import sleep

from tornado import gen
from tornado.httpclient import AsyncHTTPClient
from common.messages import (LoadObject, DeleteObjects, Msg,
ListObjects, ObjectList)

from common.endpoint_file_mgr import cleanup_endpoint_files, \
get_local_endpoint_file_path
from common.messages import LoadObject, DeleteObjects, ListObjects, ObjectList
from common.endpoint_file_mgr import cleanup_endpoint_files
from common.util import format_exception
from management.state import TabPyState, get_query_object_path

from management import util

from common.tabpy_logging import PYLogging, log_error, log_info, log_debug, log_warning
from common.tabpy_logging import PYLogging, log_error, log_info, log_warning


import logging
logger = logging.getLogger(__name__)


PYLogging.initialize(logger)


def wait_for_endpoint_loaded(py_handler, object_uri):
'''
This method waits for the object to be loaded.
Expand All @@ -29,78 +29,80 @@ def wait_for_endpoint_loaded(py_handler, object_uri):
msg = ListObjects()
list_object_msg = py_handler.manage_request(msg)
if not isinstance(list_object_msg, ObjectList):
log_error("Error loading endpoint %s: %s" % (object_uri, list_object_msg))
log_error("Error loading endpoint %s: %s" % (
object_uri, list_object_msg))
return
for (uri, info) in (list_object_msg.objects.items() if sys.version_info > (3,0) else list_object_msg.objects.iteritems()):

for (uri, info) in (
list_object_msg.objects.items() if sys.version_info > (3, 0)
else list_object_msg.objects.iteritems()):
if uri == object_uri:
if info['status'] != 'LoadInProgress':
log_info("Object load status: %s" % info['status'])
return


sleep(0.1)


@gen.coroutine
def init_ps_server(settings):
tabpy = settings['tabpy']
existing_pos = tabpy.get_endpoints()
for (object_name, obj_info) in (existing_pos.items() if sys.version_info > (3,0) else existing_pos.iteritems()):
for (object_name, obj_info) in (
existing_pos.items() if sys.version_info > (3, 0)
else existing_pos.iteritems()):
try:
object_version = obj_info['version']
object_type = obj_info['type']
object_path = get_query_object_path(
settings['state_file_path'],
object_name, object_version)
get_query_object_path(
settings['state_file_path'],
object_name, object_version)
except Exception as e:
log_error('Exception encounted when downloading object: %s, error: %s' % \
(object_name, e))
log_error('Exception encounted when downloading object: %s'
', error: %s' % (object_name, e))


@gen.coroutine
def init_model_evaluator(settings):
'''
This will go through all models that the service currently have and initialize them.
This will go through all models that the service currently have and
initialize them.
'''
try:
tabpy = settings['tabpy']
py_handler = settings['py_handler']
tabpy = settings['tabpy']
py_handler = settings['py_handler']

existing_pos = tabpy.get_endpoints()

for (object_name, obj_info) in (existing_pos.items() if sys.version_info > (3,0) else existing_pos.iteritems()):
object_version = obj_info['version']
object_type = obj_info['type']
object_path = get_query_object_path(
settings['state_file_path'],
object_name, object_version)
existing_pos = tabpy.get_endpoints()

log_info('Load endpoint: %s, version: %s, type: %s' % \
(object_name, object_version, object_type))
if object_type == 'alias':
msg = LoadObject(object_name, obj_info['target'], object_version,
False, 'alias')
else:
local_path = object_path
msg = LoadObject(object_name, local_path, object_version,
False, object_type)
py_handler.manage_request(msg)
for (object_name, obj_info) in (
existing_pos.items() if sys.version_info > (3, 0)
else existing_pos.iteritems()):
object_version = obj_info['version']
object_type = obj_info['type']
object_path = get_query_object_path(
settings['state_file_path'],
object_name, object_version)

log_info('Load endpoint: %s, version: %s, type: %s' %
(object_name, object_version, object_type))
if object_type == 'alias':
msg = LoadObject(object_name, obj_info['target'],
object_version, False, 'alias')
else:
local_path = object_path
msg = LoadObject(object_name, local_path, object_version,
False, object_type)
py_handler.manage_request(msg)

except Exception as e:
err_msg = format_exception(e, "Exception encounted when initializing evaluator host:%s" % host_to_initialize)
log_error(err_msg)

def _get_latest_service_state(settings, new_ps_state):
'''
Update the endpoints from the latest remote state file.
Returns
--------
(has_changes, endpoint_diff):
has_changes: True or False
endpoint_diff: Summary of what has changed, one entry for each changes
'''
tabpy = settings['tabpy']

# Shortcut when nothing is changed
changes = {'endpoints': {}}

Expand All @@ -117,15 +119,14 @@ def _get_latest_service_state(settings, new_ps_state):
settings['state_file_path'],
endpoint_name, endpoint_info['version'])
endpoint_type = endpoint_info.get('type', 'model')
diff[endpoint_name] = \
(endpoint_type,
endpoint_info['version'],\
path_to_new_version)
diff[endpoint_name] = (endpoint_type, endpoint_info['version'],
path_to_new_version)

# add removed models too
for (endpoint_name, endpoint_info) in current_endpoints.items():
if endpoint_name not in new_endpoints.keys():
endpoint_type = current_endpoints[endpoint_name].get('type', 'model')
endpoint_type = current_endpoints[endpoint_name].get(
'type', 'model')
diff[endpoint_name] = (endpoint_type, None, None)

if diff:
Expand All @@ -134,24 +135,26 @@ def _get_latest_service_state(settings, new_ps_state):
settings['tabpy'] = new_ps_state
return (True, changes)


@gen.coroutine
def on_state_change(settings):
try:
tabpy = settings['tabpy']
py_handler = settings['py_handler']

log_info("Loading state from state file")
config = util._get_state_from_file(settings['state_file_path'])
new_ps_state = TabPyState(config=config)

(has_changes, changes) = _get_latest_service_state(settings, new_ps_state)
(has_changes, changes) = _get_latest_service_state(settings,
new_ps_state)
if not has_changes:
log_info("Nothing changed, return.")
return

new_endpoints = new_ps_state.get_endpoints()
for object_name in changes['endpoints']:
(object_type, object_version, object_path) = changes['endpoints'][object_name]
(object_type, object_version, object_path) = changes['endpoints'][
object_name]

if not object_path and not object_version: # removal
log_info("Removing object", uri=object_name)
Expand All @@ -164,19 +167,21 @@ def on_state_change(settings):
endpoint_info = new_endpoints[object_name]
is_update = object_version > 1
if object_type == 'alias':
msg = LoadObject(object_name, endpoint_info['target'], object_version,
is_update, 'alias')
msg = LoadObject(object_name, endpoint_info['target'],
object_version, is_update, 'alias')
else:
local_path = object_path
msg = LoadObject(object_name, local_path, object_version,
is_update, object_type)
is_update, object_type)

py_handler.manage_request(msg)
wait_for_endpoint_loaded(py_handler, object_name)

# cleanup old version of endpoint files
if object_version > 2:
cleanup_endpoint_files(object_name, settings['upload_dir'], [object_version, object_version - 1])
cleanup_endpoint_files(
object_name, settings['upload_dir'], [
object_version, object_version - 1])

except Exception as e:
err_msg = format_exception(e, 'on_state_change')
Expand Down
Loading

0 comments on commit 5244297

Please sign in to comment.