Skip to content

Commit

Permalink
Add redis as cache storage plugin and minor fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
VDigitall committed Aug 2, 2018
1 parent c050f23 commit 4b2e5dc
Show file tree
Hide file tree
Showing 6 changed files with 61 additions and 6 deletions.
3 changes: 3 additions & 0 deletions openprocurement/bridge/basic/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,10 @@
"mode": "_all_",
"limit": 1000
},
'bridge_mode': 'basic',
'resources_api_server': 'http://localhost:1234',
'resources_api_version': "0",
'resources_api_token': '',
'public_resources_api_server': 'http://localhost:1234',
'resource': 'tenders',
'workers_inc_threshold': 75,
Expand All @@ -55,3 +57,4 @@
'queues_controller_timeout': 60,
'perfomance_window': 300
}
PROCUREMENT_METHOD_TYPE_HANDLERS = {}
14 changes: 12 additions & 2 deletions openprocurement/bridge/basic/databridge.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
from pkg_resources import iter_entry_points
from yaml import load

from openprocurement.bridge.basic.constants import DEFAULTS
from openprocurement.bridge.basic.constants import DEFAULTS, PROCUREMENT_METHOD_TYPE_HANDLERS
from openprocurement.bridge.basic.utils import DataBridgeConfigError


Expand Down Expand Up @@ -52,6 +52,7 @@ def __init__(self, config):
self.retrievers_params = self.config.get('retrievers_params')
self.storage_type = self.config['storage_config'].get('storage_type', 'couchdb')
self.worker_type = self.config['worker_config'].get('worker_type', 'basic_couchdb')
self.filter_type = self.config['filter_config'].get('filter_type', 'basic_couchdb')

# Check up_wait_sleep
up_wait_sleep = self.retrievers_params.get('up_wait_sleep')
Expand Down Expand Up @@ -90,11 +91,20 @@ def __init__(self, config):
else:
raise DataBridgeConfigError('In config dictionary empty or missing \'resources_api_server\'')

worker_type = self.config['worker_config']['worker_type']

# Connecting storage plugin
self.db = None
for entry_point in iter_entry_points('openprocurement.bridge.basic.storage_plugins', self.storage_type):
plugin = entry_point.load()
self.db = plugin(self.config)

# Register contracting procurementMethodType handlers
if worker_type == 'contracting':
for entry_point in iter_entry_points('openprocurement.bridge.contracting.handlers'):
plugin = entry_point.load()
PROCUREMENT_METHOD_TYPE_HANDLERS[entry_point.name] = plugin(self.config, self.db)

if hasattr(self, 'filter_type'):
for entry_point in iter_entry_points('openprocurement.bridge.basic.filter_plugins', self.filter_type):
self.filter_greenlet = entry_point.load()
Expand Down Expand Up @@ -241,14 +251,14 @@ def gevent_watcher(self):

if len(self.workers_pool) < self.workers_min:
for i in xrange(0, (self.workers_min - len(self.workers_pool))):
self.create_api_client()
w = self.worker_greenlet.spawn(self.api_clients_queue,
self.resource_items_queue,
self.db, self.config,
self.retry_resource_items_queue,
self.api_clients_info)
self.workers_pool.add(w)
logger.info('Watcher: Create main queue worker.')
self.create_api_client()
retry_threads = self.retry_workers_max - self.retry_workers_pool.free_count()
logger.info('Retry threads {}'.format(retry_threads), extra={'RETRY_THREADS': retry_threads})
if len(self.retry_workers_pool) < self.retry_workers_min:
Expand Down
2 changes: 1 addition & 1 deletion openprocurement/bridge/basic/storages/couchdb_plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,5 +102,5 @@ def save_bulk(self, bulk):
return results


def includme(config):
def includeme(config):
return CouchDBStorage(config)
Original file line number Diff line number Diff line change
Expand Up @@ -90,5 +90,5 @@ def save_doc(doc):
pass


def includme(config):
def includeme(config):
return ElasticsearchStorage(config)
41 changes: 41 additions & 0 deletions openprocurement/bridge/basic/storages/redis_plugin.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
# -*- coding: utf-8 -*-
class Db(object):
""" Database proxy """

def __init__(self, config):
self.config = config

self._backend = None
self._db_name = None
self._port = None
self._host = None

if 'cache_host' in self.config['storage_config']:
import redis
self._backend = "redis"
self._host = self.config['storage_config'].get('cache_host')
self._port = self.config['storage_config'].get('cache_port') or 6379
self._db_name = self.config['storage_config'].get('cache_db_name') or 0
self.db = redis.StrictRedis(host=self._host, port=self._port, db=self._db_name)
self.set_value = self.db.set
self.has_value = self.db.exists
else:
from lazydb import Db
self._backend = "lazydb"
self._db_name = self.config['storage_config'].get('cache_db_name') or 'databridge_cache_db'
self.db = Db(self._db_name)
self.set_value = self.db.put
self.has_value = self.db.has

def get(self, key):
return self.db.get(key)

def put(self, key, value):
self.set_value(key, value)

def has(self, key):
return self.has_value(key)


def includeme(config):
return Db(config)
5 changes: 3 additions & 2 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,9 @@
'databridge = openprocurement.bridge.basic.databridge:main'
],
'openprocurement.bridge.basic.storage_plugins': [
'couchdb = openprocurement.bridge.basic.storages.couchdb_plugin:includme',
'elasticsearch = openprocurement.bridge.basic.storages.elasticsearch_plugin:includme'
'couchdb = openprocurement.bridge.basic.storages.couchdb_plugin:includeme',
'elasticsearch = openprocurement.bridge.basic.storages.elasticsearch_plugin:includeme',
'redis = openprocurement.bridge.basic.storages.redis_plugin:includeme'
],
'openprocurement.bridge.basic.filter_plugins': [
'basic_couchdb = openprocurement.bridge.basic.filters:BasicCouchDBFilter',
Expand Down

0 comments on commit 4b2e5dc

Please sign in to comment.