Skip to content

Commit

Permalink
Merge pull request #6 from tarasvaskiv/fa_859233153364691_handlers
Browse files Browse the repository at this point in the history
Move handler and filter
  • Loading branch information
VDigitall authored Oct 18, 2018
2 parents 3aef656 + b941450 commit 08ca080
Show file tree
Hide file tree
Showing 12 changed files with 1,297 additions and 10 deletions.
56 changes: 56 additions & 0 deletions openprocurement/bridge/basic/filters.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
monkey.patch_all()

import logging
import jmespath

from datetime import datetime
from httplib import IncompleteRead
from time import time, sleep
Expand All @@ -12,6 +14,7 @@
from zope.interface import implementer

from openprocurement.bridge.basic.interfaces import IFilter
from openprocurement.bridge.basic.utils import journal_context


logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -134,3 +137,56 @@ def _check_bulk(self, bulk, priority_cache):
'Ignored {}: SYNC - {}, ElasticSearch or Queue - {}'.format(doc_id, bulk[doc_id], date_modified),
extra={'MESSAGE_ID': 'skipped'}
)


@implementer(IFilter)
class JMESPathFilter(Greenlet):

def __init__(self, conf, input_queue, filtered_queue, db):
logger.info("Init Close Framework Agreement JMESPath Filter.")
Greenlet.__init__(self)
self.config = conf
self.cache_db = db
self.input_queue = input_queue
self.filtered_queue = filtered_queue
self.resource = self.config['resource']
self.resource_id = "{}_ID".format(self.resource[:-1]).upper()
self.filters = [jmespath.compile(expression['expression'])
for expression in self.config['filter_config'].get('filters', [])]
self.timeout = self.config['filter_config']['timeout']

def _run(self):
while INFINITY:
if not self.input_queue.empty():
priority, resource = self.input_queue.get()
else:
try:
priority, resource = self.input_queue.get(timeout=self.timeout)
except Empty:
sleep(self.timeout)
continue

cached = self.cache_db.get(resource['id'])
if cached and cached == resource['dateModified']:
logger.info(
"{} {} not modified from last check. Skipping".format(self.resource[:-1].title(), resource['id']),
extra=journal_context({"MESSAGE_ID": "SKIPPED"}, params={self.resource_id: resource['id']})
)
continue

for re in self.filters:
if re.search(resource):
continue
else:
break
else:
logger.debug(
"Put to filtered queue {} {} {}".format(self.resource[:-1], resource['id'], resource['status'])
)
self.filtered_queue.put((priority, resource))
continue

logger.info(
"Skip {} {}".format(self.resource[:-1], resource['id']),
extra=journal_context({"MESSAGE_ID": "SKIPPED"}, params={self.resource_id: resource['id']})
)
115 changes: 115 additions & 0 deletions openprocurement/bridge/basic/handlers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
# -*- coding: utf-8 -*-
import logging

from gevent import sleep
from retrying import retry
from uuid import uuid4

from openprocurement_client.clients import APIResourceClient as APIClient
from openprocurement_client.exceptions import (
RequestFailed,
ResourceNotFound,
ResourceGone
)

from openprocurement.bridge.basic.utils import DataBridgeConfigError, journal_context, generate_req_id

CONFIG_MAPPING = {
'input_resources_api_token': 'resources_api_token',
'output_resources_api_token': 'resources_api_token',
'resources_api_version': 'resources_api_version',
'input_resources_api_server': 'resources_api_server',
'input_public_resources_api_server': 'public_resources_api_server',
'input_resource': 'resource',
'output_resources_api_server': 'resources_api_server',
'output_public_resources_api_server': 'public_resources_api_server'
}

logger = logging.getLogger(__name__)


class HandlerTemplate(object):

def __init__(self, config, cache_db):
self.cache_db = cache_db
self.handler_config = config['worker_config'].get(self.handler_name, {})
self.main_config = config
self.config_keys = ('input_resources_api_token', 'output_resources_api_token', 'resources_api_version',
'input_resources_api_server',
'input_public_resources_api_server', 'input_resource', 'output_resources_api_server',
'output_public_resources_api_server', 'output_resource')
self.validate_and_fix_handler_config()
self.initialize_clients()

def validate_and_fix_handler_config(self):
for key in self.config_keys:
if key not in self.handler_config:
self.handler_config[key] = self.main_config['worker_config'].get(key, '')
for key in CONFIG_MAPPING.keys():
if not self.handler_config[key]:
self.handler_config[key] = self.main_config[CONFIG_MAPPING[key]]

if not self.handler_config['output_resource']:
raise DataBridgeConfigError("Missing 'output_resource' in handler configuration.")

def initialize_clients(self):
self.output_client = self.create_api_client()
self.public_output_client = self.create_api_client(read_only=True)
self.input_client = self.create_api_client(input_resource=True)

def create_api_client(self, input_resource=False, read_only=False):
client_user_agent = 'contracting_worker' + '/' + uuid4().hex
timeout = 0.1
while 1:
try:
if input_resource:
api_client = APIClient(host_url=self.handler_config['input_resources_api_server'],
user_agent=client_user_agent,
api_version=self.handler_config['resources_api_version'],
key=self.handler_config['input_resources_api_token'],
resource=self.handler_config['input_resource'])
else:
if read_only:
api_client = APIClient(host_url=self.handler_config['output_public_resources_api_server'],
user_agent=client_user_agent,
api_version=self.handler_config['resources_api_version'],
key='',
resource=self.handler_config['output_resource'])
else:
api_client = APIClient(host_url=self.handler_config['output_resources_api_server'],
user_agent=client_user_agent,
api_version=self.handler_config['resources_api_version'],
key=self.handler_config['output_resources_api_token'],
resource=self.handler_config['output_resource'])
return api_client
except RequestFailed as e:
logger.error('Failed start api_client with status code {}'.format(e.status_code),
extra={'MESSAGE_ID': 'exceptions'})
timeout = timeout * 2
logger.info('create_api_client will be sleep {} sec.'.format(timeout))
sleep(timeout)
except Exception as e:
logger.error('Failed start api client with error: {}'.format(e.message),
extra={'MESSAGE_ID': 'exceptions'})
timeout = timeout * 2
logger.info('create_api_client will be sleep {} sec.'.format(timeout))
sleep(timeout)

def _put_resource_in_cache(self, resource):
date_modified = self.cache_db.get(resource['id'])
if not date_modified or date_modified < resource['dateModified']:
self.cache_db.put(resource['id'], resource['dateModified'])

@retry(stop_max_attempt_number=3, wait_exponential_multiplier=1000)
def get_resource_credentials(self, resource_id):
self.input_client.headers.update({'X-Client-Request-ID': generate_req_id()})
logger.info(
"Getting credentials for tender {}".format(resource_id),
extra=journal_context({"MESSAGE_ID": "databridge_get_credentials"}, {"TENDER_ID": resource_id})
)
data = self.input_client.extract_credentials(resource_id)
logger.info(
"Got tender {} credentials".format(resource_id),
extra=journal_context({"MESSAGE_ID": "databridge_got_credentials"}, {"TENDER_ID": resource_id})
)
return data
22 changes: 21 additions & 1 deletion openprocurement/bridge/basic/tests/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@

from yaml import safe_load


CONFIG_FILE = "{}/test.yml".format(os.path.dirname(__file__))
with open(CONFIG_FILE, 'r') as f:
TEST_CONFIG = safe_load(f.read())
Expand All @@ -28,3 +27,24 @@ def __nonzero__(self):
self.current_iteration += 1
return bool(1)
return bool(0)


class AdaptiveCache(object):

def __init__(self, data):
self.data = data

def get(self, key):
return self.data.get(key, '')

def put(self, key, value):
self.data[key] = value

def has(self, key):
return key in self.data

def __getitem__(self, item):
return self.data[item]

def __contains__(self, item):
return item in self.data
95 changes: 93 additions & 2 deletions openprocurement/bridge/basic/tests/filters.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,31 @@
from datetime import datetime
from uuid import uuid4

import jmespath
from gevent.queue import PriorityQueue
from mock import MagicMock, patch
from mock import MagicMock, patch, call
from munch import munchify

from openprocurement.bridge.basic.filters import BasicCouchDBFilter, BasicElasticSearchFilter
from openprocurement.bridge.basic.filters import (
BasicCouchDBFilter,
BasicElasticSearchFilter,
JMESPathFilter,
)
from openprocurement.bridge.basic.tests.base import TEST_CONFIG


CONFIG = {
'filter_config': {
'statuses': [],
'procurementMethodTypes': [],
'lot_status': None,
'timeout': 0,
'filters': [],
},
'resource': 'tenders'
}


class TestBasicCouchDBFilter(unittest.TestCase):

config = deepcopy(TEST_CONFIG['main'])
Expand Down Expand Up @@ -129,10 +146,84 @@ def test__check_bulk(self):
self.assertEqual(queue.qsize(), 2)


class TestResourceFilters(unittest.TestCase):
db = {}
conf = CONFIG

@patch('openprocurement.bridge.basic.filters.INFINITY')
@patch('openprocurement.bridge.basic.filters.logger')
def test_JMESPathFilter(self, logger, infinity):
self.input_queue = PriorityQueue()
self.filtered_queue = PriorityQueue()

resource = self.conf['resource'][:-1]
jmes_filter = JMESPathFilter(self.conf, self.input_queue, self.filtered_queue, self.db)
mock_calls = [call.info('Init Close Framework Agreement JMESPath Filter.')]
self.assertEqual(logger.mock_calls, mock_calls)
extra = {'MESSAGE_ID': 'SKIPPED', 'JOURNAL_{}_ID'.format(resource.upper()): 'test_id'}

infinity.__nonzero__.side_effect = [True, False]
jmes_filter._run()

doc = {
'id': 'test_id',
'dateModified': '1970-01-01',
'status': 'draft.pending'
}

self.input_queue.put((None, doc))
self.db['test_id'] = '1970-01-01'
infinity.__nonzero__.side_effect = [True, False]
jmes_filter._run()
mock_calls.append(
call.info('{} test_id not modified from last check. Skipping'.format(resource.title()),
extra=extra)
)
self.assertEqual(logger.mock_calls, mock_calls)

# no filters
doc['dateModified'] = '1970-01-02'
self.input_queue.put((None, doc))
infinity.__nonzero__.side_effect = [True, False]
jmes_filter._run()
mock_calls.append(
call.debug('Put to filtered queue {} test_id {}'.format(resource, doc['status']))
)
self.assertEqual(logger.mock_calls, mock_calls)
priority, filtered_doc = self.filtered_queue.get()
self.assertIsNone(priority)
self.assertEqual(filtered_doc, doc)

# not found
jmes_filter.filters = [jmespath.compile("contains([`test_status`], status)")]
doc['status'] = 'spam_status'
self.input_queue.put((None, doc))
infinity.__nonzero__.side_effect = [True, False]
jmes_filter._run()
mock_calls.append(
call.info('Skip {} test_id'.format(resource),
extra=extra)
)

# has found
doc['status'] = 'test_status'
self.input_queue.put((None, doc))
infinity.__nonzero__.side_effect = [True, False]
jmes_filter._run()
mock_calls.append(
call.debug('Put to filtered queue {} test_id {}'.format(resource, doc['status']))
)
self.assertEqual(logger.mock_calls, mock_calls)
priority, filtered_doc = self.filtered_queue.get()
self.assertIsNone(priority)
self.assertEqual(filtered_doc, doc)


def suite():
suite = unittest.TestSuite()
suite.addTest(unittest.makeSuite(TestBasicCouchDBFilter))
suite.addTest(unittest.makeSuite(TestBasicElasticSearchFilter))
suite.addTest(unittest.makeSuite(TestResourceFilters))
return suite


Expand Down
Loading

0 comments on commit 08ca080

Please sign in to comment.