Skip to content

Commit

Permalink
enhance AbstractCluster interceptor
Browse files Browse the repository at this point in the history
  • Loading branch information
chickenlj committed Nov 8, 2019
1 parent dfa8499 commit 1c1101d
Show file tree
Hide file tree
Showing 61 changed files with 602 additions and 318 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1302,6 +1302,8 @@ private static List<Exporter<?>> doExportUrls(ServiceConfig<?> sc) {
String pathKey = URL.buildKey(sc.getContextPath(protocolConfig)
.map(p -> p + "/" + sc.getPath())
.orElse(sc.getPath()), sc.getGroup(), sc.getVersion());
// In case user specified path, register service one more time to map it to path.
repository.registerService(pathKey, sc.getInterfaceClass());
// TODO, uncomment this line once service key is unified
sc.getServiceMetadata().setServiceKey(pathKey);
exporters.addAll(doExportUrlsFor1Protocol(sc, protocolConfig, registryURLs));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@
*/
@Deprecated
public class ReferenceConfig<T> extends org.apache.dubbo.config.service.ReferenceConfig<T> {
DubboBootstrap bootstrap = DubboBootstrap.getInstance();

private DubboBootstrap bootstrap = DubboBootstrap.getInstance();

@Deprecated
public synchronized T get() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,37 +14,43 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dubbo.rpc;
package org.apache.dubbo.rpc.cluster.interceptor;

import org.apache.dubbo.common.extension.SPI;
import org.apache.dubbo.rpc.Filter;
import org.apache.dubbo.rpc.Invocation;
import org.apache.dubbo.rpc.Result;
import org.apache.dubbo.rpc.RpcException;
import org.apache.dubbo.rpc.cluster.support.AbstractClusterInvoker;

/**
* Different from {@link Filter}, ClusterInterceptor works at the outmost layer, before one specific address/invoker is picked.
*/
@SPI
public interface ClusterInterceptor {

void before(Invoker<?> invoker, Invocation invocation);
void before(AbstractClusterInvoker<?> clusterInvoker, Invocation invocation);

void after(Invoker<?> invoker, Invocation invocation);
void after(AbstractClusterInvoker<?> clusterInvoker, Invocation invocation);

/**
* Does not need to override this method, override {@link #before(Invoker, Invocation)} and {@link #after(Invoker, Invocation)}
* methods to add your own logic expected to be executed before and after invoke.
* Does not need to override this method, override {@link #before(AbstractClusterInvoker, Invocation)}
* and {@link #after(AbstractClusterInvoker, Invocation)}, methods to add your own logic expected to be
* executed before and after invoke.
*
* @param invoker
* @param clusterInvoker
* @param invocation
* @return
* @throws RpcException
*/
default Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
return invoker.invoke(invocation);
default Result intercept(AbstractClusterInvoker<?> clusterInvoker, Invocation invocation) throws RpcException {
return clusterInvoker.invoke(invocation);
}

interface Listener {

void onResponse(Result appResponse, Invoker<?> invoker, Invocation invocation);
void onResponse(Result appResponse, AbstractClusterInvoker<?> clusterInvoker, Invocation invocation);

void onError(Throwable t, Invoker<?> invoker, Invocation invocation);
void onError(Throwable t, AbstractClusterInvoker<?> clusterInvoker, Invocation invocation);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dubbo.rpc.cluster.interceptor;

import org.apache.dubbo.common.extension.Activate;
import org.apache.dubbo.common.utils.NetUtils;
import org.apache.dubbo.rpc.Invocation;
import org.apache.dubbo.rpc.Result;
import org.apache.dubbo.rpc.RpcContext;
import org.apache.dubbo.rpc.RpcInvocation;
import org.apache.dubbo.rpc.cluster.support.AbstractClusterInvoker;

@Activate
public class ConsumerContextClusterInterceptor implements ClusterInterceptor, ClusterInterceptor.Listener {

@Override
public void before(AbstractClusterInvoker<?> invoker, Invocation invocation) {
RpcContext.getContext()
.setInvocation(invocation)
.setLocalAddress(NetUtils.getHostAddress(), 0);
if (invocation instanceof RpcInvocation) {
((RpcInvocation) invocation).setInvoker(invoker);
}
RpcContext.removeServerContext();
}

@Override
public void after(AbstractClusterInvoker<?> clusterInvoker, Invocation invocation) {
RpcContext.removeContext();
}

@Override
public void onResponse(Result appResponse, AbstractClusterInvoker<?> invoker, Invocation invocation) {
RpcContext.getServerContext().setAttachments(appResponse.getAttachments());
}

@Override
public void onError(Throwable t, AbstractClusterInvoker<?> invoker, Invocation invocation) {

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,28 +14,29 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dubbo.rpc.interceptors;
package org.apache.dubbo.rpc.cluster.interceptor;

import org.apache.dubbo.common.extension.Activate;
import org.apache.dubbo.common.extension.ExtensionLoader;
import org.apache.dubbo.common.utils.StringUtils;
import org.apache.dubbo.rpc.ClusterInterceptor;
import org.apache.dubbo.rpc.Invocation;
import org.apache.dubbo.rpc.Invoker;
import org.apache.dubbo.rpc.RpcContext;
import org.apache.dubbo.rpc.ZoneDetector;
import org.apache.dubbo.rpc.cluster.support.AbstractClusterInvoker;

import static org.apache.dubbo.common.constants.RegistryConstants.REGISTRY_ZONE;
import static org.apache.dubbo.common.constants.RegistryConstants.REGISTRY_ZONE_FORCE;

/**
* Determines the zone information of current request.
*
* active only when url has key 'cluster=zone-aware'
*/
@Activate(value = "cluster:zone-aware")
public class ZoneAwareClusterInterceptor implements ClusterInterceptor {

@Override
public void before(Invoker<?> invoker, Invocation invocation) {
public void before(AbstractClusterInvoker<?> clusterInvoker, Invocation invocation) {
RpcContext rpcContext = RpcContext.getContext();
String zone = (String) rpcContext.getAttachment(REGISTRY_ZONE);
String force = (String) rpcContext.getAttachment(REGISTRY_ZONE_FORCE);
Expand All @@ -55,7 +56,7 @@ public void before(Invoker<?> invoker, Invocation invocation) {
}

@Override
public void after(Invoker<?> invoker, Invocation invocation) {
public void after(AbstractClusterInvoker<?> clusterInvoker, Invocation invocation) {

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -53,14 +53,17 @@ public abstract class AbstractClusterInvoker<T> implements Invoker<T> {

private static final Logger logger = LoggerFactory.getLogger(AbstractClusterInvoker.class);

protected final Directory<T> directory;
protected Directory<T> directory;

protected final boolean availablecheck;
protected boolean availablecheck;

private AtomicBoolean destroyed = new AtomicBoolean(false);

private volatile Invoker<T> stickyInvoker = null;

public AbstractClusterInvoker() {
}

public AbstractClusterInvoker(Directory<T> directory) {
this(directory, directory.getUrl());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,20 +16,19 @@
*/
package org.apache.dubbo.rpc.cluster.support;

import org.apache.dubbo.rpc.Invoker;
import org.apache.dubbo.rpc.RpcException;
import org.apache.dubbo.rpc.cluster.Cluster;
import org.apache.dubbo.rpc.cluster.Directory;
import org.apache.dubbo.rpc.cluster.support.wrapper.AbstractCluster;

/**
* BroadcastCluster
*
*/
public class BroadcastCluster implements Cluster {
public class BroadcastCluster extends AbstractCluster {

@Override
public <T> Invoker<T> join(Directory<T> directory) throws RpcException {
return new BroadcastClusterInvoker<T>(directory);
public <T> AbstractClusterInvoker<T> doJoin(Directory<T> directory) throws RpcException {
return new BroadcastClusterInvoker<>(directory);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -16,22 +16,21 @@
*/
package org.apache.dubbo.rpc.cluster.support;

import org.apache.dubbo.rpc.Invoker;
import org.apache.dubbo.rpc.RpcException;
import org.apache.dubbo.rpc.cluster.Cluster;
import org.apache.dubbo.rpc.cluster.Directory;
import org.apache.dubbo.rpc.cluster.support.wrapper.AbstractCluster;

/**
* {@link FailbackClusterInvoker}
*
*/
public class FailbackCluster implements Cluster {
public class FailbackCluster extends AbstractCluster {

public final static String NAME = "failback";

@Override
public <T> Invoker<T> join(Directory<T> directory) throws RpcException {
return new FailbackClusterInvoker<T>(directory);
public <T> AbstractClusterInvoker<T> doJoin(Directory<T> directory) throws RpcException {
return new FailbackClusterInvoker<>(directory);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -16,22 +16,21 @@
*/
package org.apache.dubbo.rpc.cluster.support;

import org.apache.dubbo.rpc.Invoker;
import org.apache.dubbo.rpc.RpcException;
import org.apache.dubbo.rpc.cluster.Cluster;
import org.apache.dubbo.rpc.cluster.Directory;
import org.apache.dubbo.rpc.cluster.support.wrapper.AbstractCluster;

/**
* {@link FailfastClusterInvoker}
*
*/
public class FailfastCluster implements Cluster {
public class FailfastCluster extends AbstractCluster {

public final static String NAME = "failfast";

@Override
public <T> Invoker<T> join(Directory<T> directory) throws RpcException {
return new FailfastClusterInvoker<T>(directory);
public <T> AbstractClusterInvoker<T> doJoin(Directory<T> directory) throws RpcException {
return new FailfastClusterInvoker<>(directory);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -16,22 +16,21 @@
*/
package org.apache.dubbo.rpc.cluster.support;

import org.apache.dubbo.rpc.Invoker;
import org.apache.dubbo.rpc.RpcException;
import org.apache.dubbo.rpc.cluster.Cluster;
import org.apache.dubbo.rpc.cluster.Directory;
import org.apache.dubbo.rpc.cluster.support.wrapper.AbstractCluster;

/**
* {@link FailoverClusterInvoker}
*
*/
public class FailoverCluster implements Cluster {
public class FailoverCluster extends AbstractCluster {

public final static String NAME = "failover";

@Override
public <T> Invoker<T> join(Directory<T> directory) throws RpcException {
return new FailoverClusterInvoker<T>(directory);
public <T> AbstractClusterInvoker<T> doJoin(Directory<T> directory) throws RpcException {
return new FailoverClusterInvoker<>(directory);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -16,22 +16,21 @@
*/
package org.apache.dubbo.rpc.cluster.support;

import org.apache.dubbo.rpc.Invoker;
import org.apache.dubbo.rpc.RpcException;
import org.apache.dubbo.rpc.cluster.Cluster;
import org.apache.dubbo.rpc.cluster.Directory;
import org.apache.dubbo.rpc.cluster.support.wrapper.AbstractCluster;

/**
* {@link FailsafeClusterInvoker}
*
*/
public class FailsafeCluster implements Cluster {
public class FailsafeCluster extends AbstractCluster {

public final static String NAME = "failsafe";

@Override
public <T> Invoker<T> join(Directory<T> directory) throws RpcException {
return new FailsafeClusterInvoker<T>(directory);
public <T> AbstractClusterInvoker<T> doJoin(Directory<T> directory) throws RpcException {
return new FailsafeClusterInvoker<>(directory);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -16,22 +16,21 @@
*/
package org.apache.dubbo.rpc.cluster.support;

import org.apache.dubbo.rpc.Invoker;
import org.apache.dubbo.rpc.RpcException;
import org.apache.dubbo.rpc.cluster.Cluster;
import org.apache.dubbo.rpc.cluster.Directory;
import org.apache.dubbo.rpc.cluster.support.wrapper.AbstractCluster;

/**
* {@link ForkingClusterInvoker}
*
*/
public class ForkingCluster implements Cluster {
public class ForkingCluster extends AbstractCluster {

public final static String NAME = "forking";

@Override
public <T> Invoker<T> join(Directory<T> directory) throws RpcException {
return new ForkingClusterInvoker<T>(directory);
public <T> AbstractClusterInvoker<T> doJoin(Directory<T> directory) throws RpcException {
return new ForkingClusterInvoker<>(directory);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,16 @@
*/
package org.apache.dubbo.rpc.cluster.support;

import org.apache.dubbo.rpc.Invoker;
import org.apache.dubbo.rpc.RpcException;
import org.apache.dubbo.rpc.cluster.Cluster;
import org.apache.dubbo.rpc.cluster.Directory;
import org.apache.dubbo.rpc.cluster.support.wrapper.AbstractCluster;

public class MergeableCluster implements Cluster {
public class MergeableCluster extends AbstractCluster {

public static final String NAME = "mergeable";

@Override
public <T> Invoker<T> join(Directory<T> directory) throws RpcException {
public <T> AbstractClusterInvoker<T> doJoin(Directory<T> directory) throws RpcException {
return new MergeableClusterInvoker<T>(directory);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@
*/
package org.apache.dubbo.rpc.cluster.support.registry;

import org.apache.dubbo.rpc.Invoker;
import org.apache.dubbo.rpc.RpcException;
import org.apache.dubbo.rpc.cluster.Directory;
import org.apache.dubbo.rpc.cluster.support.AbstractClusterInvoker;
import org.apache.dubbo.rpc.cluster.support.wrapper.AbstractCluster;

/**
Expand All @@ -29,7 +29,7 @@ public class ZoneAwareCluster extends AbstractCluster {
public final static String NAME = "zone-aware";

@Override
protected <T> Invoker<T> doJoin(Directory<T> directory) throws RpcException {
protected <T> AbstractClusterInvoker<T> doJoin(Directory<T> directory) throws RpcException {
return new ZoneAwareClusterInvoker<T>(directory);
}

Expand Down
Loading

0 comments on commit 1c1101d

Please sign in to comment.