Skip to content

Commit

Permalink
Address some of the issues in review comments. Rename NettyHttpClient…
Browse files Browse the repository at this point in the history
… to LoadBalancingHttpClient to be consistent with other namings.
  • Loading branch information
Allen Wang committed Sep 13, 2014
1 parent 4b8909b commit fac90f5
Show file tree
Hide file tree
Showing 17 changed files with 142 additions and 92 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@

import com.google.common.collect.Lists;
import com.netflix.ribbon.transport.netty.RibbonTransport;
import com.netflix.ribbon.transport.netty.http.NettyHttpClient;
import com.netflix.ribbon.transport.netty.http.LoadBalancingHttpClient;
import com.netflix.loadbalancer.BaseLoadBalancer;
import com.netflix.loadbalancer.LoadBalancerBuilder;
import com.netflix.loadbalancer.Server;
Expand All @@ -23,7 +23,7 @@ public static void main(String[] args) throws Exception {
BaseLoadBalancer lb = LoadBalancerBuilder.newBuilder()
.buildFixedServerListLoadBalancer(servers);

NettyHttpClient<ByteBuf, ByteBuf> client = RibbonTransport.newHttpClient(lb);
LoadBalancingHttpClient<ByteBuf, ByteBuf> client = RibbonTransport.newHttpClient(lb);
final CountDownLatch latch = new CountDownLatch(servers.size());
Observer<HttpClientResponse<ByteBuf>> observer = new Observer<HttpClientResponse<ByteBuf>>() {
@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package com.netflix.ribbon.examples.netty.http;

import com.netflix.ribbon.transport.netty.RibbonTransport;
import com.netflix.ribbon.transport.netty.http.NettyHttpClient;
import com.netflix.ribbon.transport.netty.http.LoadBalancingHttpClient;
import io.netty.buffer.ByteBuf;
import io.reactivex.netty.protocol.http.client.HttpClientRequest;
import io.reactivex.netty.protocol.http.client.HttpClientResponse;
Expand All @@ -14,7 +14,7 @@
public class SimpleGet {
@edu.umd.cs.findbugs.annotations.SuppressWarnings
public static void main(String[] args) throws Exception {
NettyHttpClient<ByteBuf, ByteBuf> client = RibbonTransport.newHttpClient();
LoadBalancingHttpClient<ByteBuf, ByteBuf> client = RibbonTransport.newHttpClient();
HttpClientRequest<ByteBuf> request = HttpClientRequest.createGet("http://www.google.com/");
final CountDownLatch latch = new CountDownLatch(1);
client.submit(request)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
import com.netflix.client.config.CommonClientConfigKey;
import com.netflix.client.config.IClientConfig;
import com.netflix.ribbon.transport.netty.RibbonTransport;
import com.netflix.ribbon.transport.netty.http.NettyHttpClient;
import com.netflix.ribbon.transport.netty.http.LoadBalancingHttpClient;
import com.netflix.ribbon.examples.rx.AbstractRxMovieClient;
import com.netflix.ribbon.examples.rx.RxMovieServer;
import com.netflix.ribbon.examples.rx.common.Movie;
Expand All @@ -40,7 +40,7 @@
*/
public class RxMovieTransportExample extends AbstractRxMovieClient {

private final NettyHttpClient<ByteBuf, ByteBuf> client;
private final LoadBalancingHttpClient<ByteBuf, ByteBuf> client;

public RxMovieTransportExample(int port) {
IClientConfig clientConfig = IClientConfig.Builder.newBuilder("movieServiceClient").build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
import com.netflix.client.config.ClientConfigFactory.DefaultClientConfigFactory;
import com.netflix.client.config.DefaultClientConfigImpl;
import com.netflix.client.config.IClientConfig;
import com.netflix.ribbon.transport.netty.http.NettyHttpClient;
import com.netflix.ribbon.transport.netty.http.LoadBalancingHttpClient;
import com.netflix.config.ConfigurationManager;
import com.netflix.ribbon.DefaultResourceFactory;
import com.netflix.ribbon.RibbonResourceFactory;
Expand Down Expand Up @@ -115,7 +115,7 @@ protected void configure() {

RibbonTransportFactory transportFactory = injector.getInstance(RibbonTransportFactory.class);
HttpClient<ByteBuf, ByteBuf> client = transportFactory.newHttpClient("myClient");
IClientConfig config = ((NettyHttpClient) client).getClientConfig();
IClientConfig config = ((LoadBalancingHttpClient) client).getClientConfig();
assertEquals("MyConfig", config.getNameSpace());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@
import rx.Observable;

import java.net.URI;
import java.util.Collection;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;

/**
Expand Down Expand Up @@ -56,7 +59,11 @@ public CommandBuilder<T> withLoadBalancer(ILoadBalancer loadBalancer) {
}

public CommandBuilder<T> withListeners(List<? extends ExecutionListener<?, T>> listeners) {
this.listeners = listeners;
if (this.listeners == null) {
this.listeners = new LinkedList<ExecutionListener<?, T>>(listeners);
} else {
this.listeners.addAll((Collection) listeners);
}
return this;
}

Expand Down Expand Up @@ -104,7 +111,7 @@ public LoadBalancerRetrySameServerCommand<T> build(ExecutionContext<?> execution
ExecutionContextListenerInvoker invoker = null;

if (listeners != null && listeners.size() > 0 && executionContext != null) {
invoker = new ExecutionContextListenerInvoker(executionContext, listeners);
invoker = new ExecutionContextListenerInvoker(executionContext, Collections.unmodifiableList(listeners));
}
LoadBalancerContext loadBalancerContext1 = loadBalancerContext == null ? new LoadBalancerContext(loadBalancer, config) : loadBalancerContext;
return new LoadBalancerRetrySameServerCommand<T>(loadBalancerContext1, retryHandler, invoker);
Expand All @@ -121,7 +128,7 @@ public LoadBalancerObservableCommand<T> build(final LoadBalancerObservable<T> ex
ExecutionContextListenerInvoker invoker = null;

if (listeners != null && listeners.size() > 0) {
invoker = new ExecutionContextListenerInvoker(executionContext, listeners);
invoker = new ExecutionContextListenerInvoker(executionContext, Collections.unmodifiableList(listeners));
}
LoadBalancerContext loadBalancerContext1 = loadBalancerContext == null ? new LoadBalancerContext(loadBalancer, config) : loadBalancerContext;
return new LoadBalancerObservableCommand<T>(loadBalancerContext1, retryHandler, serviceLocator, loadBalancerKey, invoker) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,13 +83,12 @@ ExecutionContext<T> getChildContext(Object obj) {
ChildContext<T> subContext = subContexts.get(obj);
if (subContext == null) {
subContext = new ChildContext<T>(this);
ChildContext<T> old = subContexts.putIfAbsent(obj, subContext);
if (old != null) {
subContext = old;
}
}
ChildContext<T> old = subContexts.putIfAbsent(obj, subContext);
if (old != null) {
return old;
} else {
return subContext;
}
return subContext;
}

public T getRequest() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import org.slf4j.LoggerFactory;

import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;

/**
* Utility class to invoke the list of {@link ExecutionListener} with {@link ExecutionContext}
Expand All @@ -34,24 +33,21 @@ public class ExecutionContextListenerInvoker<I, O> {
private final static Logger logger = LoggerFactory.getLogger(ExecutionContextListenerInvoker.class);
private final ExecutionContext<I> context;
private final List<ExecutionListener<I, O>> listeners;
private final AtomicBoolean onStartInvoked = new AtomicBoolean();

public ExecutionContextListenerInvoker(ExecutionContext<I> context, List<ExecutionListener<I, O>> listeners) {
this.listeners = listeners;
this.context = context;
}

public void onExecutionStart() {
if (onStartInvoked.compareAndSet(false, true)) {
for (ExecutionListener<I, O> listener : listeners) {
try {
listener.onExecutionStart(context.getChildContext(listener));
} catch (Throwable e) {
if (e instanceof AbortExecutionException) {
throw (AbortExecutionException) e;
}
logger.error("Error invoking listener " + listener, e);
for (ExecutionListener<I, O> listener : listeners) {
try {
listener.onExecutionStart(context.getChildContext(listener));
} catch (Throwable e) {
if (e instanceof AbortExecutionException) {
throw (AbortExecutionException) e;
}
logger.error("Error invoking listener " + listener, e);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ private class RetryNextServerOperator implements Operator<T, T> {

@Override
public Subscriber<? super T> call(final Subscriber<? super T> t1) {
if (listenerInvoker != null) {
if (listenerInvoker != null && counter.get() == 0) {
try {
listenerInvoker.onExecutionStart();
} catch (AbortExecutionException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ public Subscriber<? super T> call(final Subscriber<? super T> t1) {
if (listenerInvoker != null) {
executionInfo = ExecutionInfo.create(server, counter.get(), numberServersAttempted);
try {
if (invokeOnStartAndEnd) {
if (invokeOnStartAndEnd && counter.get() == 0) {
listenerInvoker.onExecutionStart();
}
listenerInvoker.onStartWithServer(executionInfo);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
*
* Copyright 2014 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package com.netflix.loadbalancer.reactive;

import com.netflix.client.RetryHandler;
import com.netflix.client.config.DefaultClientConfigImpl;
import org.junit.Test;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertSame;

/**
* @author Allen Wang
*/
public class ExecutionContextTest {
@Test
public void testSubContext() {
ExecutionContext<String> context = new ExecutionContext<String>("hello", DefaultClientConfigImpl.getEmptyConfig(),
DefaultClientConfigImpl.getClientConfigWithDefaultValues(), RetryHandler.DEFAULT);
ExecutionContext<String> subContext1 = context.getChildContext("foo");
ExecutionContext<String> subContext2 = context.getChildContext("bar");
assertSame(context, context.getGlobalContext());
context.put("dummy", "globalValue");
context.put("dummy2", "globalValue");
subContext1.put("dummy", "context1Value");
subContext2.put("dummy", "context2Value");
assertEquals("context1Value", subContext1.get("dummy"));
assertEquals("context2Value", subContext2.get("dummy"));
assertEquals("globalValue", subContext1.getGlobalContext().get("dummy"));
assertNull(subContext1.get("dummy2"));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
import com.netflix.client.RetryHandler;
import com.netflix.client.config.DefaultClientConfigImpl;
import com.netflix.client.config.IClientConfig;
import com.netflix.ribbon.transport.netty.http.NettyHttpClient;
import com.netflix.ribbon.transport.netty.http.LoadBalancingHttpClient;
import com.netflix.ribbon.transport.netty.http.NettyHttpLoadBalancerErrorHandler;
import com.netflix.ribbon.transport.netty.http.SSEClient;
import com.netflix.ribbon.transport.netty.tcp.LoadBalancingTcpClient;
Expand Down Expand Up @@ -109,75 +109,75 @@ public static <I, O> RxClient<I, O> newUdpClient(PipelineConfigurator<O, I> pipe
return new LoadBalancingUdpClient<I, O>(config, getDefaultRetryHandlerWithConfig(config), pipelineConfigurator);
}

public static NettyHttpClient<ByteBuf, ByteBuf> newHttpClient() {
public static LoadBalancingHttpClient<ByteBuf, ByteBuf> newHttpClient() {
IClientConfig config = DefaultClientConfigImpl.getClientConfigWithDefaultValues();
return newHttpClient(config);
}

public static NettyHttpClient<ByteBuf, ByteBuf> newHttpClient(ILoadBalancer loadBalancer, IClientConfig config) {
return new NettyHttpClient<ByteBuf, ByteBuf>(loadBalancer, config, getDefaultHttpRetryHandlerWithConfig(config), DEFAULT_HTTP_PIPELINE_CONFIGURATOR, poolCleanerScheduler);
public static LoadBalancingHttpClient<ByteBuf, ByteBuf> newHttpClient(ILoadBalancer loadBalancer, IClientConfig config) {
return new LoadBalancingHttpClient<ByteBuf, ByteBuf>(loadBalancer, config, getDefaultHttpRetryHandlerWithConfig(config), DEFAULT_HTTP_PIPELINE_CONFIGURATOR, poolCleanerScheduler);
}

public static NettyHttpClient<ByteBuf, ByteBuf> newHttpClient(ILoadBalancer loadBalancer, IClientConfig config, RetryHandler retryHandler) {
return new NettyHttpClient<ByteBuf, ByteBuf>(loadBalancer, config, retryHandler, DEFAULT_HTTP_PIPELINE_CONFIGURATOR, poolCleanerScheduler);
public static LoadBalancingHttpClient<ByteBuf, ByteBuf> newHttpClient(ILoadBalancer loadBalancer, IClientConfig config, RetryHandler retryHandler) {
return new LoadBalancingHttpClient<ByteBuf, ByteBuf>(loadBalancer, config, retryHandler, DEFAULT_HTTP_PIPELINE_CONFIGURATOR, poolCleanerScheduler);
}

public static NettyHttpClient<ByteBuf, ByteBuf> newHttpClient(ILoadBalancer loadBalancer, IClientConfig config, RetryHandler retryHandler,
public static LoadBalancingHttpClient<ByteBuf, ByteBuf> newHttpClient(ILoadBalancer loadBalancer, IClientConfig config, RetryHandler retryHandler,
List<ExecutionListener<HttpClientRequest<ByteBuf>, HttpClientResponse<ByteBuf>>> listeners) {
return new NettyHttpClient<ByteBuf, ByteBuf>(loadBalancer, config, retryHandler, DEFAULT_HTTP_PIPELINE_CONFIGURATOR, poolCleanerScheduler, listeners);
return new LoadBalancingHttpClient<ByteBuf, ByteBuf>(loadBalancer, config, retryHandler, DEFAULT_HTTP_PIPELINE_CONFIGURATOR, poolCleanerScheduler, listeners);
}


public static NettyHttpClient<ByteBuf, ByteBuf> newHttpClient(IClientConfig config) {
return new NettyHttpClient<ByteBuf, ByteBuf>(config, getDefaultHttpRetryHandlerWithConfig(config), DEFAULT_HTTP_PIPELINE_CONFIGURATOR, poolCleanerScheduler);
public static LoadBalancingHttpClient<ByteBuf, ByteBuf> newHttpClient(IClientConfig config) {
return new LoadBalancingHttpClient<ByteBuf, ByteBuf>(config, getDefaultHttpRetryHandlerWithConfig(config), DEFAULT_HTTP_PIPELINE_CONFIGURATOR, poolCleanerScheduler);
}

public static NettyHttpClient<ByteBuf, ByteBuf> newHttpClient(ILoadBalancer loadBalancer) {
public static LoadBalancingHttpClient<ByteBuf, ByteBuf> newHttpClient(ILoadBalancer loadBalancer) {
IClientConfig config = DefaultClientConfigImpl.getClientConfigWithDefaultValues();
return newHttpClient(loadBalancer, config);
}


public static <I, O> NettyHttpClient<I, O> newHttpClient(PipelineConfigurator<HttpClientResponse<O>, HttpClientRequest<I>> pipelineConfigurator,
public static <I, O> LoadBalancingHttpClient<I, O> newHttpClient(PipelineConfigurator<HttpClientResponse<O>, HttpClientRequest<I>> pipelineConfigurator,
ILoadBalancer loadBalancer, IClientConfig config) {
return new NettyHttpClient<I, O>(loadBalancer, config, getDefaultHttpRetryHandlerWithConfig(config), pipelineConfigurator, poolCleanerScheduler);
return new LoadBalancingHttpClient<I, O>(loadBalancer, config, getDefaultHttpRetryHandlerWithConfig(config), pipelineConfigurator, poolCleanerScheduler);
}

public static <I, O> NettyHttpClient<I, O> newHttpClient(PipelineConfigurator<HttpClientResponse<O>, HttpClientRequest<I>> pipelineConfigurator,
public static <I, O> LoadBalancingHttpClient<I, O> newHttpClient(PipelineConfigurator<HttpClientResponse<O>, HttpClientRequest<I>> pipelineConfigurator,
IClientConfig config) {
return new NettyHttpClient<I, O>(config, getDefaultHttpRetryHandlerWithConfig(config), pipelineConfigurator, poolCleanerScheduler);
return new LoadBalancingHttpClient<I, O>(config, getDefaultHttpRetryHandlerWithConfig(config), pipelineConfigurator, poolCleanerScheduler);
}

public static <I, O> NettyHttpClient<I, O> newHttpClient(PipelineConfigurator<HttpClientResponse<O>, HttpClientRequest<I>> pipelineConfigurator,
public static <I, O> LoadBalancingHttpClient<I, O> newHttpClient(PipelineConfigurator<HttpClientResponse<O>, HttpClientRequest<I>> pipelineConfigurator,
IClientConfig config, RetryHandler retryHandler) {
return new NettyHttpClient<I, O>(config, retryHandler, pipelineConfigurator, poolCleanerScheduler);
return new LoadBalancingHttpClient<I, O>(config, retryHandler, pipelineConfigurator, poolCleanerScheduler);
}

public static <I, O> NettyHttpClient<I, O> newHttpClient(PipelineConfigurator<HttpClientResponse<O>, HttpClientRequest<I>> pipelineConfigurator,
public static <I, O> LoadBalancingHttpClient<I, O> newHttpClient(PipelineConfigurator<HttpClientResponse<O>, HttpClientRequest<I>> pipelineConfigurator,
ILoadBalancer loadBalancer, IClientConfig config, RetryHandler retryHandler,
List<ExecutionListener<HttpClientRequest<I>, HttpClientResponse<O>>> listeners) {
return new NettyHttpClient<I, O>(loadBalancer, config, retryHandler, pipelineConfigurator, poolCleanerScheduler, listeners);
return new LoadBalancingHttpClient<I, O>(loadBalancer, config, retryHandler, pipelineConfigurator, poolCleanerScheduler, listeners);
}

public static NettyHttpClient<ByteBuf, ServerSentEvent> newSSEClient(ILoadBalancer loadBalancer, IClientConfig config) {
public static LoadBalancingHttpClient<ByteBuf, ServerSentEvent> newSSEClient(ILoadBalancer loadBalancer, IClientConfig config) {
return new SSEClient<ByteBuf>(loadBalancer, config, getDefaultHttpRetryHandlerWithConfig(config), DEFAULT_SSE_PIPELINE_CONFIGURATOR);
}

public static NettyHttpClient<ByteBuf, ServerSentEvent> newSSEClient(IClientConfig config) {
public static LoadBalancingHttpClient<ByteBuf, ServerSentEvent> newSSEClient(IClientConfig config) {
return new SSEClient<ByteBuf>(config, getDefaultHttpRetryHandlerWithConfig(config), DEFAULT_SSE_PIPELINE_CONFIGURATOR);
}

public static <I> NettyHttpClient<I, ServerSentEvent> newSSEClient(PipelineConfigurator<HttpClientResponse<ServerSentEvent>, HttpClientRequest<I>> pipelineConfigurator,
public static <I> LoadBalancingHttpClient<I, ServerSentEvent> newSSEClient(PipelineConfigurator<HttpClientResponse<ServerSentEvent>, HttpClientRequest<I>> pipelineConfigurator,
ILoadBalancer loadBalancer, IClientConfig config) {
return new SSEClient<I>(loadBalancer, config, getDefaultHttpRetryHandlerWithConfig(config), pipelineConfigurator);
}

public static <I> NettyHttpClient<I, ServerSentEvent> newSSEClient(PipelineConfigurator<HttpClientResponse<ServerSentEvent>, HttpClientRequest<I>> pipelineConfigurator,
public static <I> LoadBalancingHttpClient<I, ServerSentEvent> newSSEClient(PipelineConfigurator<HttpClientResponse<ServerSentEvent>, HttpClientRequest<I>> pipelineConfigurator,
IClientConfig config) {
return new SSEClient<I>(config, getDefaultHttpRetryHandlerWithConfig(config), pipelineConfigurator);
}

public static NettyHttpClient<ByteBuf, ServerSentEvent> newSSEClient() {
public static LoadBalancingHttpClient<ByteBuf, ServerSentEvent> newSSEClient() {
IClientConfig config = DefaultClientConfigImpl.getClientConfigWithDefaultValues();
return newSSEClient(config);
}
Expand Down
Loading

0 comments on commit fac90f5

Please sign in to comment.