Skip to content

Commit

Permalink
支持权重和provider disable
Browse files Browse the repository at this point in the history
  • Loading branch information
jingpeicomp committed Sep 26, 2017
1 parent 925531b commit 8115ae7
Show file tree
Hide file tree
Showing 14 changed files with 225 additions and 47 deletions.
4 changes: 2 additions & 2 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
*.pyc
build
*.egg-info
.idea
*.log
Binary file removed dist/dubbo-client-1.0.0b2.tar.gz
Binary file not shown.
Binary file removed dist/dubbo-client-1.0.0b3.tar.gz
Binary file not shown.
Binary file removed dist/dubbo-client-1.0.0b4.tar.gz
Binary file not shown.
Binary file removed dist/dubbo-client-1.0.0b5.tar.gz
Binary file not shown.
Binary file removed dist/dubbo_client-1.0.0b2-py2.7.egg
Binary file not shown.
Binary file removed dist/dubbo_client-1.0.0b3-py2.7.egg
Binary file not shown.
Binary file removed dist/dubbo_client-1.0.0b4-py2.7.egg
Binary file not shown.
Binary file removed dist/dubbo_client-1.0.0b5-py2.7.egg
Binary file not shown.
50 changes: 50 additions & 0 deletions dubbo_client/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,10 @@ class ServiceURL(object):
port = '9090'
version = ''
group = ''
disabled = False
weight = 100
has_disable_value = False
has_weight_value = False

def __init__(self, url):
result = urlparse(url)
Expand All @@ -28,4 +32,50 @@ def __init__(self, url):
if pos > -1:
key = key[pos + 1:]
# print key
if key == 'disabled':
value = value.lower() == 'true' if value else False
self.has_disable_value = True
elif key == 'weight':
value = int(value) if value else 100
self.has_weight_value = True
self.__dict__[key] = value

def __repr__(self):
return str(self.__dict__)

def init_default_config(self):
"""
恢复默认设置,dubbo配置是覆盖形式,如果恢复默认值,那么configurators下的配置会被清空
:return:
"""
self.disabled = False
self.weight = 100

def set_config(self, url_list):
"""
设置自定义dubbo配置
:param url_list:
:return:
"""
if not url_list:
return

param_list = []
for configuration_url in url_list:
result = urlparse(configuration_url)
params = parse_qsl(result[4])
param_list.extend(params)
has_disable_value = False
has_weight_value = False
for key, value in param_list:
if key == 'disabled':
self.disabled = value.lower() == 'true' if value else False
has_disable_value = True
if key == 'weight':
self.weight = int(value) if value else 100
has_weight_value = True

if not has_disable_value:
self.disabled = False
if not has_weight_value:
self.weight = 100
180 changes: 145 additions & 35 deletions dubbo_client/registry.py
Original file line number Diff line number Diff line change
@@ -1,20 +1,21 @@
# coding=utf-8
import time
import logging.config
import os
import os.path
import random
import socket
import struct
from threading import Thread
import threading
import time
import urllib
import logging
import logging.config
import os.path

from kazoo.protocol.states import KazooState
from threading import Thread

from kazoo.client import KazooClient
from kazoo.protocol.states import KazooState

from dubbo_client.config import ApplicationConfig
from dubbo_client.common import ServiceURL
from dubbo_client.config import ApplicationConfig
from dubbo_client.rpcerror import NoProvider

__author__ = 'caozupeng'

Expand All @@ -35,7 +36,10 @@ class Registry(object):
dict 格式为{interface:{providername:{ip+port:service_url}}}
"""
_service_provides = {}

def __init__(self):
self._service_providers = {}
self._mutex = threading.Lock()

def _do_event(self, event):
"""
Expand Down Expand Up @@ -71,7 +75,7 @@ def subscribe(self, interface, **kwargs):
"""
pass

def get_provides(self, interface, **kwargs):
def get_providers(self, interface, **kwargs):
"""
获取已经注册的服务URL对象
:param interface: com.ofpay.demo.api.UserProvider
Expand All @@ -84,6 +88,40 @@ def get_provides(self, interface, **kwargs):
second = self._service_provides.get(interface, {})
return second.get(key, {})

def get_random_provider(self, interface, **kwargs):
"""
根据权重和是否禁用获取一个provider
:param interface:
:param kwargs:
:return:
"""
group = kwargs.get('group', '')
version = kwargs.get('version', '')
key = self._to_key(interface, version, group)
second_dict = self._service_providers.get(interface, {})
service_url_list = [service_url for service_url in second_dict.get(key, {}).itervalues() if
not service_url.disabled and service_url.weight > 0]
if not service_url_list:
raise NoProvider('can not find provider', interface)

total_weight = 0
same_weight = True
last_service_url = None
for service_url in service_url_list:
total_weight += service_url.weight
if same_weight and last_service_url and last_service_url.weight != service_url.weight:
same_weight = False
last_service_url = service_url

if total_weight > 0 and not same_weight:
offset = random.randint(0, total_weight - 1)
for service_url in service_url_list:
offset -= service_url.weight
if offset < 0:
return service_url

return random.choice(service_url_list)

def event_listener(self, event):
"""
node provides上下线的监听回调函数
Expand All @@ -100,19 +138,19 @@ def configuration_listener(self, event):
"""
self._do_config_event(event)

def _to_key(self, interface, versioin, group):
def _to_key(self, interface, version, group):
"""
计算存放在内存中的服务的key,以接口、版本、分组计算
:param interface: 接口 类似com.ofpay.demo.DemoProvider
:param versioin: 版本 1.0
:param version: 版本 1.0
:param group: 分组 product
:return: key 字符串
"""
return '{0}|{1}|{2}'.format(interface, versioin, group)
return '{0}|{1}|{2}'.format(interface, version, group)

def _add_node(self, interface, service_url):
key = self._to_key(service_url.interface, service_url.version, service_url.group)
second_dict = self._service_provides.get(interface)
second_dict = self._service_providers.get(interface)
if second_dict:
# 获取最内层的nest的dict
inner_dict = second_dict.get(key)
Expand All @@ -122,11 +160,11 @@ def _add_node(self, interface, service_url):
second_dict[key] = {service_url.location: service_url}
else:
# create the second dict
self._service_provides[interface] = {key: {service_url.location: service_url}}
self._service_providers[interface] = {key: {service_url.location: service_url}}

def _remove_node(self, interface, service_url):
key = self._to_key(service_url.interface, service_url.version, service_url.group)
second_dict = self._service_provides.get(interface)
second_dict = self._service_providers.get(interface)
if second_dict:
inner_dict = second_dict.get(key)
if inner_dict:
Expand All @@ -144,23 +182,67 @@ def _compare_swap_nodes(self, interface, nodes):
:param nodes: 节点列表
:return: 不需要返回
"""
# 如果已经存在,首先删除原有的服务的集合
if interface in self._service_provides:
del self._service_provides[interface]
logger.debug("delete node {0}".format(interface))
for child_node in nodes:
node = urllib.unquote(child_node).decode('utf8')
logger.debug('child of node is {0}'.format(node))
if node.startswith('jsonrpc'):
service_url = ServiceURL(node)
self._add_node(interface, service_url)
if self._mutex.acquire():
# 存在并发问题,需要线程锁
try:
# 如果已经存在,首先删除原有的服务的集合
if interface in self._service_providers:
del self._service_providers[interface]
logger.debug("delete node {0}".format(interface))
for child_node in nodes:
node = urllib.unquote(child_node).decode('utf8')
logger.debug('child of node is {0}'.format(node))
if node.startswith('jsonrpc'):
service_url = ServiceURL(node)
self._add_node(interface, service_url)
except Exception as e:
logger.warn('swap json-rpc provider error %s', str(e))
finally:
self._mutex.release()

def _set_provider_configuration(self, interface, nodes):
"""
设置provider配置
:param interface:
:param nodes:
:return:
"""
if not nodes:
return
try:
configuration_dict = {}
for _child_node in nodes:
_node = urllib.unquote(_child_node).decode('utf8')
if _node.startswith('override'):
service_url = ServiceURL(_node)
key = self._to_key(interface, service_url.version, service_url.group)

if key not in configuration_dict:
configuration_dict[key] = {}
if service_url.location not in configuration_dict[key]:
configuration_dict[key][service_url.location] = []
configuration_dict[key][service_url.location].append(_node)

if interface in self._service_providers:
provider_dict = self._service_providers.get(interface)
for provider_key, second_dict in provider_dict.iteritems():
for service_location, service_url in second_dict.iteritems():
configuration_service_urls = configuration_dict.get(provider_key, {}).get(service_location)
if not configuration_service_urls:
service_url.init_default_config()
else:
service_url.set_config(configuration_service_urls)

except Exception as e:
logger.warn('set provider configuration error %s', str(e))


class ZookeeperRegistry(Registry):
_app_config = ApplicationConfig('default_app')
_connect_state = 'UNCONNECT'

def __init__(self, zk_hosts, application_config=None):
Registry.__init__(self)
if application_config:
self._app_config = application_config
self.__zk = KazooClient(hosts=zk_hosts)
Expand Down Expand Up @@ -188,17 +270,27 @@ def _do_event(self, event):
# 如果要删除,必须先把/dubbo/和最后的/providers去掉
# 将zookeeper中查询到的服务节点列表加入到一个dict中
# zookeeper中保持的节点url类似如下
logger.debug("receive event is {0}, event state is {1}".format(event, event.state))
logger.info("receive event is {0}, event state is {1}".format(event, event.state))
provide_name = event.path[7:event.path.rfind('/')]
if event.state == 'CONNECTED':
children = self.__zk.get_children(event.path, watch=self.event_listener)
self._compare_swap_nodes(provide_name, self.__unquote(children))
if event.state == 'DELETED':
if event.state in ['CONNECTED', 'DELETED']:
children = self.__zk.get_children(event.path, watch=self.event_listener)
self._compare_swap_nodes(provide_name, self.__unquote(children))
configurators_nodes = self._get_provider_configuration(provide_name)
self._set_provider_configuration(provide_name, configurators_nodes)
print self._service_providers

def _do_config_event(self, event):
print event
"""
zk的目录路径为 /dubbo/com.qianmi.pc.api.es.item.EsGoodsQueryProvider/configurators
:param event:
:return:
"""
logger.info("receive config event is {0}, event state is {1}".format(event, event.state))
provide_name = event.path[7:event.path.rfind('/')]
configurators_nodes = self._get_provider_configuration(provide_name)
self._set_provider_configuration(provide_name, configurators_nodes)

print self._service_providers

def register(self, interface, **kwargs):
ip = self.__zk._connection._socket.getsockname()[0]
Expand Down Expand Up @@ -233,11 +325,28 @@ def subscribe(self, interface, **kwargs):
providers_children = self.__zk.get_children('{0}/{1}/{2}'.format('dubbo', interface, 'providers'),
watch=self.event_listener)
logger.debug("watch node is {0}".format(providers_children))
configurators_children = self.__zk.get_children('{0}/{1}/{2}'.format('dubbo', interface, 'configurators'),
watch=self.configuration_listener)
self.__zk.get_children('{0}/{1}/{2}'.format('dubbo', interface, 'configurators'),
watch=self.configuration_listener)
# 全部重新添加
self._compare_swap_nodes(interface, self.__unquote(providers_children))

configurators_nodes = self._get_provider_configuration(interface)
self._set_provider_configuration(interface, configurators_nodes)

def _get_provider_configuration(self, interface):
"""
获取dubbo自定义配置数据,从"/dubbo/{interface}/configurators" 路径下获取配置
:param interface:
:return:
"""
try:
configurators_nodes = self.__zk.get_children('{0}/{1}/{2}'.format('dubbo', interface, 'configurators'),
watch=self.configuration_listener)
logger.debug("configurators node is {0}".format(configurators_nodes))
return self.__unquote(configurators_nodes)
except Exception as e:
logger.warn("get provider %s configuration error %s", interface, str(e))


class MulticastRegistry(Registry):
class _Loop(Thread):
Expand All @@ -262,6 +371,7 @@ def set_mssage(self, msg):
self.sock.sendto(msg, (self.multicast_group, int(self.multicast_port)))

def __init__(self, address, application_config=None):
Registry.__init__(self)
if application_config:
self._app_config = application_config
self.event_loop = self._Loop(address, self.event_listener)
Expand Down Expand Up @@ -303,6 +413,6 @@ def _do_event(self, event):
# registry = MulticastRegistry('224.5.6.7:1234')
registry = ZookeeperRegistry('zookeeper:2181')
registry.subscribe('com.ofpay.demo.api.UserProvider')
print registry.get_provides('com.ofpay.demo.api.UserProvider')
print registry.get_providers('com.ofpay.demo.api.UserProvider')

time.sleep(500)
10 changes: 3 additions & 7 deletions dubbo_client/rpclib.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
# coding=utf-8
import random
from urllib2 import HTTPError

from pyjsonrpc import HttpClient, JsonRpcError

from dubbo_client.registry import Registry
from dubbo_client.rpcerror import NoProvider, ConnectionFail, dubbo_client_errors, InternalError, DubboClientError
from dubbo_client.rpcerror import ConnectionFail, dubbo_client_errors, InternalError, DubboClientError

__author__ = 'caozupeng'

Expand Down Expand Up @@ -34,12 +33,9 @@ def __init__(self, interface, registry, **kwargs):
self.registry.register(interface)

def call(self, method, *args, **kwargs):
provides = self.registry.get_provides(self.interface, version=self.version, group=self.group)
if len(provides) == 0:
raise NoProvider('can not find provide', self.interface)
ip_port, service_url = random.choice(provides.items())
provider = self.registry.get_random_provider(self.interface, version=self.version, group=self.group)
# print service_url.location
client = HttpClient(url="http://{0}{1}".format(ip_port, service_url.path))
client = HttpClient(url="http://{0}{1}".format(provider.location, provider.path))
try:
return client.call(method, *args, **kwargs)
except HTTPError, e:
Expand Down
Loading

0 comments on commit 8115ae7

Please sign in to comment.