Skip to content

Commit

Permalink
GH zatosource#282 - Working on SFTP subprocesses.
Browse files Browse the repository at this point in the history
  • Loading branch information
dsuch committed Feb 27, 2019
1 parent a33664e commit ac4d500
Show file tree
Hide file tree
Showing 5 changed files with 76 additions and 20 deletions.
57 changes: 51 additions & 6 deletions code/zato-server/src/zato/server/base/worker/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,9 @@
# Zato
from zato.broker import BrokerMessageReceiver
from zato.bunch import Bunch
from zato.common import broker_message, CHANNEL, GENERIC, HTTP_SOAP_SERIALIZATION_TYPE, IPC, KVDB, NOTIF, PUBSUB, SEC_DEF_TYPE, \
simple_types, URL_TYPE, TRACE1, ZATO_NONE, ZATO_ODB_POOL_NAME, ZMQ
from zato.common.broker_message import code_to_name, SERVICE
from zato.common import broker_message, CHANNEL, GENERIC as COMMON_GENERIC, HTTP_SOAP_SERIALIZATION_TYPE, IPC, KVDB, NOTIF, \
PUBSUB, SEC_DEF_TYPE, simple_types, URL_TYPE, TRACE1, ZATO_NONE, ZATO_ODB_POOL_NAME, ZMQ
from zato.common.broker_message import code_to_name, GENERIC as BROKER_MSG_GENERIC, SERVICE
from zato.common.dispatch import dispatcher
from zato.common.match import Matcher
from zato.common.odb.api import PoolStore, SessionWrapper
Expand Down Expand Up @@ -89,12 +89,22 @@

# ################################################################################################################################

class _generic_msg:
create = BROKER_MSG_GENERIC.CONNECTION_CREATE.value
edit = BROKER_MSG_GENERIC.CONNECTION_EDIT.value
delete = BROKER_MSG_GENERIC.CONNECTION_DELETE.value
change_password = BROKER_MSG_GENERIC.CONNECTION_CHANGE_PASSWORD.value

# ################################################################################################################################
# ################################################################################################################################

class GeventWorker(GunicornGeventWorker):
def __init__(self, *args, **kwargs):
self.deployment_key = '{}.{}'.format(datetime.utcnow().isoformat(), uuid4().hex)
super(GunicornGeventWorker, self).__init__(*args, **kwargs)

# ################################################################################################################################
# ################################################################################################################################

class SyncWorker(GunicornSyncWorker):
def __init__(self, *args, **kwargs):
Expand Down Expand Up @@ -124,6 +134,7 @@ def _get_base_classes():
return tuple(out)

# ################################################################################################################################
# ################################################################################################################################

_base_type = '_WorkerStoreBase'
_base_type = _base_type if PY3 else _base_type.encode('utf8')
Expand Down Expand Up @@ -213,13 +224,16 @@ def init(self):

# Maps generic connection types to their API handler objects
self.generic_conn_api = {
GENERIC.CONNECTION.TYPE.OUTCONN_WSX: self.outconn_wsx,
COMMON_GENERIC.CONNECTION.TYPE.OUTCONN_WSX: self.outconn_wsx,
}

self._generic_conn_handler = {
GENERIC.CONNECTION.TYPE.OUTCONN_WSX: OutconnWSXWrapper
COMMON_GENERIC.CONNECTION.TYPE.OUTCONN_WSX: OutconnWSXWrapper
}

# Maps message actions against generic connection types and their message handlers
self.generic_impl_func_map = {}

# Message-related config - init_msg_ns_store must come before init_xpath_store
# so the latter has access to the former's namespace map.

Expand Down Expand Up @@ -300,6 +314,7 @@ def init(self):
self.init_amqp()

# Generic connections
self.init_generic_connections_config()
self.init_generic_connections()

# All set, whoever is waiting for us, if anyone at all, can now proceed
Expand Down Expand Up @@ -896,11 +911,41 @@ def init_generic_connections(self):
for config_dict in self.worker_config.generic_connection.values():

# Not all generic connections are created here
if config_dict['config']['type_'] != GENERIC.CONNECTION.TYPE.OUTCONN_WSX:
if config_dict['config']['type_'] != COMMON_GENERIC.CONNECTION.TYPE.OUTCONN_WSX:
continue

self._create_generic_connection(bunchify(config_dict['config']))

# ################################################################################################################################

def init_generic_connections_config(self):

# Local aliases
outconn_wsx_map = self.generic_impl_func_map.setdefault(COMMON_GENERIC.CONNECTION.TYPE.OUTCONN_WSX, {})
outconn_sftp_map = self.generic_impl_func_map.setdefault(COMMON_GENERIC.CONNECTION.TYPE.OUTCONN_SFTP, {})

# Outgoing WSX connections are pure generic objects that we can handle ourselves
outconn_wsx_map[_generic_msg.create] = self._create_generic_connection
outconn_wsx_map[_generic_msg.edit] = self._edit_generic_connection
outconn_wsx_map[_generic_msg.delete] = self._delete_generic_connection

# Outgoing SFTP connections require for a different API to be called (provided by ParallelServer)
outconn_sftp_map[_generic_msg.create] = self.server.connector_sftp.invoke_connector
outconn_sftp_map[_generic_msg.edit] = self.server.connector_sftp.invoke_connector
outconn_sftp_map[_generic_msg.delete] = self.server.connector_sftp.invoke_connector
outconn_sftp_map[_generic_msg.change_password] = self.server.connector_sftp.invoke_connector

# ################################################################################################################################

def _get_generic_impl_func(self, msg, *args, **kwargs):
""" Returns a function/method to invoke depending on which generic connection type is given on input.
Required because some connection types (e.g. SFTP) are not managed via GenericConnection objects,
for instance, in the case of SFTP, it uses subprocesses and a different management API.
"""
func_map = self.generic_impl_func_map[msg['type_']]
func = func_map[msg['action']]
return func(msg, *args, **kwargs)

# ################################################################################################################################

def apikey_get(self, name):
Expand Down
24 changes: 12 additions & 12 deletions code/zato-server/src/zato/server/base/worker/generic.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
from __future__ import absolute_import, division, print_function, unicode_literals

# Zato
from zato.common import GENERIC as COMMON_GENERIC
from zato.common.broker_message import GENERIC as BROKER_MSG_GENERIC
from zato.server.base.worker.common import WorkerImpl
from zato.server.generic.connection import GenericConnection

Expand Down Expand Up @@ -68,6 +70,12 @@ def _create_generic_connection(self, msg, needs_roundtrip=False, skip=None):
config_attr[msg.name].conn = wrapper(item_dict, self.server)
config_attr[msg.name].conn.build_queue()

# ################################################################################################################################

def _edit_generic_connection(self, msg, skip):
self._delete_generic_connection(msg)
self._create_generic_connection(msg, True, skip)

# ################################################################################################################################

def ping_generic_connection(self, conn_id):
Expand All @@ -85,18 +93,10 @@ def reconnect_generic(self, conn_id):

# ################################################################################################################################

def on_broker_msg_GENERIC_CONNECTION_CREATE(self, msg):
self._create_generic_connection(msg, True)

# ################################################################################################################################
def on_broker_msg_GENERIC_CONNECTION_CREATE(self, msg, *args, **kwargs):
func = self._get_generic_impl_func(msg)
func(msg)

def on_broker_msg_GENERIC_CONNECTION_DELETE(self, msg):
self._delete_generic_connection(msg)

# ################################################################################################################################

def on_broker_msg_GENERIC_CONNECTION_EDIT(self, msg, skip=None):
self._delete_generic_connection(msg)
self._create_generic_connection(msg, True, skip)
on_broker_msg_GENERIC_CONNECTION_EDIT = on_broker_msg_GENERIC_CONNECTION_DELETE = on_broker_msg_GENERIC_CONNECTION_CREATE

# ################################################################################################################################
2 changes: 1 addition & 1 deletion code/zato-server/src/zato/server/base/worker/wmq.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ class WebSphereMQ(WorkerImpl):

def _on_broker_msg_invoke_wmq_connector(self, msg):
if self.server.is_first_worker:
self.server.invoke_wmq_connector(msg)
self.server.connector_ibm_mq.invoke_wmq_connector(msg)

# Everything is delegated to connectors ..
on_broker_msg_DEFINITION_WMQ_CREATE = _on_broker_msg_invoke_wmq_connector
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,8 @@ def _on_OUTGOING_SFTP_PING(self, msg):
def _on_OUTGOING_SFTP_DELETE(self, msg):
return super(SFTPConnectionContainer, self).on_definition_delete(msg)

_on_GENERIC_CONNECTION_EDIT = _on_OUTGOING_SFTP_DELETE

# ################################################################################################################################

def _on_OUTGOING_SFTP_CREATE(self, msg):
Expand All @@ -96,6 +98,15 @@ def _on_OUTGOING_SFTP_CREATE(self, msg):
def _on_OUTGOING_SFTP_EDIT(self, msg):
return super(SFTPConnectionContainer, self).on_definition_edit(msg)

_on_GENERIC_CONNECTION_EDIT = _on_OUTGOING_SFTP_EDIT

# ################################################################################################################################

def _on_OUTGOING_SFTP_CHANGE_PASSWORD(self, msg):
return super(SFTPConnectionContainer, self).on_definition_change_password(msg)

_on_GENERIC_CONNECTION_CHANGE_PASSWORD = _on_OUTGOING_SFTP_CHANGE_PASSWORD

# ################################################################################################################################

def _on_OUTGOING_SFTP_EXECUTE(self, msg, is_reconnect=False):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,6 @@ class SimpleIO(AdminSIO):
input_optional = _optional + ('reply_to', AsIs('correl_id'), AsIs('msg_id'))

def handle(self):
self.server.send_wmq_message(self.request.input)
self.server.connector_ibm_mq.send_wmq_message(self.request.input)

# ################################################################################################################################

0 comments on commit ac4d500

Please sign in to comment.