Skip to content

Commit

Permalink
Merge pull request Netflix#276 from qiangdavidliu/eureka-notification…
Browse files Browse the repository at this point in the history
…-lb-take2

(take 2 of) Adding eureka aware Loadbalancers that make use of new notification mechanism from eurekClient
  • Loading branch information
qiangdavidliu committed Apr 12, 2016
2 parents 759bc7c + abe999f commit 1a4fbcc
Show file tree
Hide file tree
Showing 16 changed files with 887 additions and 163 deletions.
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
plugins {
id 'nebula.netflixoss' version '2.2.9'
id 'nebula.netflixoss' version '2.2.10'
}

// Establish version and status
Expand Down
6 changes: 3 additions & 3 deletions gradle.properties
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
rx_java_version=1.0.9
rx_netty_version=0.4.9
servo_version=0.9.2
servo_version=0.10.1
hystrix_version=1.4.3
guava_version=14.0.1
archaius_version=0.6.6
eureka_version=1.1.153
archaius_version=0.7.4
eureka_version=1.4.6
jersey_version=1.19.1

junit_version=4.12
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,8 @@ public abstract class CommonClientConfigKey<T> implements IClientConfigKey<T> {
public static final IClientConfigKey<Integer> NFLoadBalancerMaxTotalPingTime = new CommonClientConfigKey<Integer>("NFLoadBalancerMaxTotalPingTime"){};

public static final IClientConfigKey<String> NIWSServerListClassName = new CommonClientConfigKey<String>("NIWSServerListClassName"){};

public static final IClientConfigKey<String> ServerListUpdaterClassName = new CommonClientConfigKey<String>("ServerListUpdaterClassName"){};

public static final IClientConfigKey<String> NIWSServerListFilterClassName = new CommonClientConfigKey<String>("NIWSServerListFilterClassName"){};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,8 @@ public class DefaultClientConfigImpl implements IClientConfig {

public static final String DEFAULT_SEVER_LIST_CLASS = "com.netflix.loadbalancer.ConfigurationBasedServerList";

public static final String DEFAULT_SERVER_LIST_UPDATER_CLASS = "com.netflix.loadbalancer.PollingServerListUpdater";

public static final int DEFAULT_CONNECTION_IDLE_TIMERTASK_REPEAT_IN_MSECS = 30000; // every half minute (30 secs)

public static final int DEFAULT_CONNECTIONIDLE_TIME_IN_MSECS = 30000; // all connections idle for 30 secs
Expand Down
2 changes: 2 additions & 0 deletions ribbon-eureka/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,7 @@ dependencies {

testCompile "junit:junit:${junit_version}"
testCompile "org.powermock:powermock-easymock-release-full:${powermock_version}"
testCompile "org.powermock:powermock-mockito-release-full:${powermock_version}"
testCompile "org.easymock:easymock:${easymock_version}"
testCompile "com.netflix.eureka:eureka-test-utils:${eureka_version}"
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,15 +28,17 @@
import com.netflix.client.config.IClientConfigKey.Keys;
import com.netflix.config.ConfigurationManager;
import com.netflix.discovery.DiscoveryClient;
import com.netflix.discovery.DiscoveryManager;
import com.netflix.discovery.EurekaClient;
import com.netflix.loadbalancer.AbstractServerList;
import com.netflix.loadbalancer.DynamicServerListLoadBalancer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.inject.Provider;

/**
* The server list class that fetches the server information from Eureka client. ServerList is used by
* {@link DynamicServerListLoadBalancer} to get server list dynamically.
* {@link DynamicServerListLoadBalancer} to get server list dynamically.
*
* @author stonse
*
Expand All @@ -58,21 +60,43 @@ public class DiscoveryEnabledNIWSServerList extends AbstractServerList<Discovery
boolean shouldUseOverridePort = false;
boolean shouldUseIpAddr = false;

private final Provider<EurekaClient> eurekaClientProvider;

/**
* @deprecated use {@link #DiscoveryEnabledNIWSServerList(String)}
* or {@link #DiscoveryEnabledNIWSServerList(IClientConfig)}
*/
@Deprecated
public DiscoveryEnabledNIWSServerList() {
this.eurekaClientProvider = new LegacyEurekaClientProvider();
}

/**
* @deprecated
* use {@link #DiscoveryEnabledNIWSServerList(String, javax.inject.Provider)}
* @param vipAddresses
*/
@Deprecated
public DiscoveryEnabledNIWSServerList(String vipAddresses) {
IClientConfig clientConfig = DefaultClientConfigImpl.getClientConfigWithDefaultValues();
clientConfig.set(Keys.DeploymentContextBasedVipAddresses, vipAddresses);
initWithNiwsConfig(clientConfig);
this(vipAddresses, new LegacyEurekaClientProvider());
}

/**
* @deprecated
* use {@link #DiscoveryEnabledNIWSServerList(com.netflix.client.config.IClientConfig, javax.inject.Provider)}
* @param clientConfig
*/
@Deprecated
public DiscoveryEnabledNIWSServerList(IClientConfig clientConfig) {
this(clientConfig, new LegacyEurekaClientProvider());
}

public DiscoveryEnabledNIWSServerList(String vipAddresses, Provider<EurekaClient> eurekaClientProvider) {
this(createClientConfig(vipAddresses), eurekaClientProvider);
}

public DiscoveryEnabledNIWSServerList(IClientConfig clientConfig, Provider<EurekaClient> eurekaClientProvider) {
this.eurekaClientProvider = eurekaClientProvider;
initWithNiwsConfig(clientConfig);
}

Expand Down Expand Up @@ -116,11 +140,8 @@ public void initWithNiwsConfig(IClientConfig clientConfig) {
}
}
}


}


@Override
public List<DiscoveryEnabledServer> getInitialListOfServers(){
return obtainServersViaDiscovery();
Expand All @@ -134,16 +155,17 @@ public List<DiscoveryEnabledServer> getUpdatedListOfServers(){
private List<DiscoveryEnabledServer> obtainServersViaDiscovery() {
List<DiscoveryEnabledServer> serverList = new ArrayList<DiscoveryEnabledServer>();

DiscoveryClient discoveryClient = DiscoveryManager.getInstance()
.getDiscoveryClient();
if (discoveryClient == null) {
if (eurekaClientProvider == null || eurekaClientProvider.get() == null) {
logger.warn("EurekaClient has not been initialized yet, returning an empty list");
return new ArrayList<DiscoveryEnabledServer>();
}

EurekaClient eurekaClient = eurekaClientProvider.get();
if (vipAddresses!=null){
for (String vipAddress : vipAddresses.split(",")) {
// if targetRegion is null, it will be interpreted as the same region of client
List<InstanceInfo> listOfinstanceInfo = discoveryClient.getInstancesByVipAddress(vipAddress, isSecure, targetRegion);
for (InstanceInfo ii : listOfinstanceInfo) {
List<InstanceInfo> listOfInstanceInfo = eurekaClient.getInstancesByVipAddress(vipAddress, isSecure, targetRegion);
for (InstanceInfo ii : listOfInstanceInfo) {
if (ii.getStatus().equals(InstanceStatus.UP)) {

if(shouldUseOverridePort){
Expand Down Expand Up @@ -194,4 +216,9 @@ public String toString(){
}


}
private static IClientConfig createClientConfig(String vipAddresses) {
IClientConfig clientConfig = DefaultClientConfigImpl.getClientConfigWithDefaultValues();
clientConfig.set(Keys.DeploymentContextBasedVipAddresses, vipAddresses);
return clientConfig;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
package com.netflix.niws.loadbalancer;

import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.netflix.discovery.CacheRefreshedEvent;
import com.netflix.discovery.EurekaClient;
import com.netflix.discovery.EurekaEvent;
import com.netflix.discovery.EurekaEventListener;
import com.netflix.loadbalancer.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.inject.Provider;
import java.util.Date;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;

/**
* A server list updater for the {@link com.netflix.loadbalancer.DynamicServerListLoadBalancer} that
* utilizes eureka's event listener to trigger LB cache updates.
*
* Note that when a cache refreshed notification is received, the actual update on the serverList is
* done on a separate scheduler as the notification is delivered on an eurekaClient thread.
*
* @author David Liu
*/
public class EurekaNotificationServerListUpdater implements ServerListUpdater {

private static final Logger logger = LoggerFactory.getLogger(EurekaNotificationServerListUpdater.class);

private static class LazyHolder {
private static final ExecutorService DEFAULT_SERVER_LIST_UPDATE_EXECUTOR = Executors.newCachedThreadPool(
new ThreadFactoryBuilder()
.setNameFormat("EurekaNotificationServerListUpdater-%d")
.setDaemon(true)
.build()
);

private static final Thread SHUTDOWN_THREAD = new Thread(new Runnable() {
@Override
public void run() {
logger.info("Shutting down the Executor for EurekaNotificationServerListUpdater");
try {
DEFAULT_SERVER_LIST_UPDATE_EXECUTOR.shutdown();
Runtime.getRuntime().removeShutdownHook(SHUTDOWN_THREAD);
} catch (Exception e) {
// this can happen in the middle of a real shutdown, and that's ok.
}
}
});

static {
Runtime.getRuntime().addShutdownHook(SHUTDOWN_THREAD);
}
}

public static ExecutorService getDefaultRefreshExecutor() {
return LazyHolder.DEFAULT_SERVER_LIST_UPDATE_EXECUTOR;
}

private final AtomicBoolean isActive = new AtomicBoolean(false);
private final AtomicLong lastUpdated = new AtomicLong(System.currentTimeMillis());
private final Provider<EurekaClient> eurekaClientProvider;
private final ExecutorService refreshExecutor;

private volatile EurekaEventListener updateListener;
private volatile EurekaClient eurekaClient;

public EurekaNotificationServerListUpdater() {
this(new LegacyEurekaClientProvider());
}

public EurekaNotificationServerListUpdater(final Provider<EurekaClient> eurekaClientProvider) {
this(eurekaClientProvider, getDefaultRefreshExecutor());
}

public EurekaNotificationServerListUpdater(final Provider<EurekaClient> eurekaClientProvider, ExecutorService refreshExecutor) {
this.eurekaClientProvider = eurekaClientProvider;
this.refreshExecutor = refreshExecutor;
}

@Override
public synchronized void start(final UpdateAction updateAction) {
if (isActive.compareAndSet(false, true)) {
this.updateListener = new EurekaEventListener() {
@Override
public void onEvent(EurekaEvent event) {
if (event instanceof CacheRefreshedEvent) {
refreshExecutor.submit(new Runnable() {
@Override
public void run() {
try {
updateAction.doUpdate();
lastUpdated.set(System.currentTimeMillis());
} catch (Exception e) {
logger.warn("Failed to update serverList", e);
}
}
}); // fire and forget
}
}
};
if (eurekaClient == null) {
eurekaClient = eurekaClientProvider.get();
}
if (eurekaClient != null) {
eurekaClient.registerEventListener(updateListener);
}
} else {
logger.info("Update listener already registered, no-op");
}
}

@Override
public synchronized void stop() {
if (isActive.compareAndSet(true, false)) {
if (eurekaClient != null) {
eurekaClient.unregisterEventListener(updateListener);
}
} else {
logger.info("Not currently active, no-op");
}
}

@Override
public String getLastUpdate() {
return new Date(lastUpdated.get()).toString();
}

@Override
public long getDurationSinceLastUpdateMs() {
return System.currentTimeMillis() - lastUpdated.get();
}

@Override
public int getNumberMissedCycles() {
return 0;
}

@Override
public int getCoreThreads() {
if (isActive.get()) {
if (refreshExecutor != null && refreshExecutor instanceof ThreadPoolExecutor) {
return ((ThreadPoolExecutor) refreshExecutor).getCorePoolSize();
}
}
return 0;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package com.netflix.niws.loadbalancer;

import com.netflix.discovery.DiscoveryManager;
import com.netflix.discovery.EurekaClient;

import javax.inject.Provider;

/**
* A legacy class to provide eurekaclient via static singletons
*/
class LegacyEurekaClientProvider implements Provider<EurekaClient> {

private volatile EurekaClient eurekaClient;

@Override
public synchronized EurekaClient get() {
if (eurekaClient == null) {
eurekaClient = DiscoveryManager.getInstance().getDiscoveryClient();
}

return eurekaClient;
}
}
Loading

0 comments on commit 1a4fbcc

Please sign in to comment.