Skip to content

Commit

Permalink
[3.0] allocate invokers space at initial time of AbstractDirectory, f…
Browse files Browse the repository at this point in the history
…ix ConnectivityValidationTest (apache#9211)

* allocate space for invokers and validInvokers to avoid NPE

* extract operations of invokers and validInvokers

* remove useless import

* reset validInvokersInitialized at destroy()

* add invokersInitialized for refreshInvoker

* optimize codes

* use local reference to avoid NPE

* optimize codes

* set empty instead of clearing at destroyInvokers()

* remove useless lock

* add lock for validInvoker

* return unmodifiableList for getInvokers and getValidInvokers

* return clone object to avoid being modified

* fix ut

* optimize code

* refresh validInvokers at setInvokers

* wait new task finished at checkConnectivity()

* retrieve checkConnectivity and fix ConnectivityValidationTest

* retrieve checkConnectivity

* optimize setInvokers()
  • Loading branch information
zrlw authored Nov 8, 2021
1 parent c1bb96a commit 8a14fb3
Show file tree
Hide file tree
Showing 9 changed files with 184 additions and 136 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,6 @@
import java.util.concurrent.Semaphore;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

import static org.apache.dubbo.common.constants.CommonConstants.DUBBO;
import static org.apache.dubbo.common.constants.CommonConstants.INTERFACE_KEY;
Expand Down Expand Up @@ -80,17 +78,20 @@ public abstract class AbstractDirectory<T> implements Directory<T> {

protected final Map<String, String> queryMap;

protected final Lock invokerLock = new ReentrantLock();
/**
* Invokers initialized flag.
*/
private volatile boolean invokersInitialized = false;

/**
* All invokers from registry
*/
protected volatile BitList<Invoker<T>> invokers;
private volatile BitList<Invoker<T>> invokers = BitList.emptyList();

/**
* Valid Invoker. All invokers from registry exclude unavailable and disabled invokers.
*/
protected volatile BitList<Invoker<T>> validInvokers;
private volatile BitList<Invoker<T>> validInvokers = BitList.emptyList();

/**
* Waiting to reconnect invokers.
Expand Down Expand Up @@ -177,18 +178,19 @@ public List<Invoker<T>> list(Invocation invocation) throws RpcException {
}

BitList<Invoker<T>> availableInvokers;
if (validInvokers != null) {
availableInvokers = validInvokers;
// use clone to avoid being modified at doList().
if (invokersInitialized) {
availableInvokers = validInvokers.clone();
} else {
availableInvokers = invokers;
availableInvokers = invokers.clone();
}

List<Invoker<T>> routedResult = doList(availableInvokers, invocation);
if (routedResult.isEmpty()) {
logger.warn("No provider available after connectivity filter for the service " + getConsumerUrl().getServiceKey()
+ " all validInvokers' size: " + (validInvokers == null ? 0 : validInvokers.size())
+ " all validInvokers' size: " + validInvokers.size()
+ "/ all routed invokers' size: " + routedResult.size()
+ "/ all invokers' size: " + (invokers == null ? 0 : invokers.size())
+ "/ all invokers' size: " + invokers.size()
+ " from registry " + getUrl().getAddress()
+ " on the consumer " + NetUtils.getLocalHost()
+ " using the dubbo version " + Version.getVersion() + ".");
Expand Down Expand Up @@ -230,12 +232,7 @@ public boolean isDestroyed() {
@Override
public void destroy() {
destroyed = true;
if (invokers != null) {
invokers.clear();
}
if (validInvokers != null) {
validInvokers.clear();
}
destroyInvokers();
invokersToReconnect.clear();
disabledInvokers.clear();
}
Expand All @@ -247,17 +244,12 @@ public void discordAddresses() {

@Override
public void addInvalidateInvoker(Invoker<T> invoker) {
invokerLock.lock();
try {
// 1. remove this invoker from validInvokers list, this invoker will not be listed in the next time
if (validInvokers.remove(invoker)) {
// 2. add this invoker to reconnect list
invokersToReconnect.add(invoker);
// 3. try start check connectivity task
checkConnectivity();
}
} finally {
invokerLock.unlock();
// 1. remove this invoker from validInvokers list, this invoker will not be listed in the next time
if (removeValidInvoker(invoker)) {
// 2. add this invoker to reconnect list
invokersToReconnect.add(invoker);
// 3. try start check connectivity task
checkConnectivity();
}
}

Expand Down Expand Up @@ -299,17 +291,12 @@ public void checkConnectivity() {
}

// 3. recover valid invoker
invokerLock.lock();
try {
for (Invoker<T> tInvoker : needDeleteList) {
if (invokers.contains(tInvoker)) {
validInvokers.add(tInvoker);
logger.info("Recover service address: " + tInvoker.getUrl() + " from invalid list.");
}
invokersToReconnect.remove(tInvoker);
for (Invoker<T> tInvoker : needDeleteList) {
if (invokers.contains(tInvoker)) {
addValidInvoker(tInvoker);
logger.info("Recover service address: " + tInvoker.getUrl() + " from invalid list.");
}
} finally {
invokerLock.unlock();
invokersToReconnect.remove(tInvoker);
}
} finally {
checkConnectivityPermit.release();
Expand All @@ -330,20 +317,19 @@ public void checkConnectivity() {
* 3. all the invokers disappeared from total invokers should be removed in the need to reconnect list
* 4. all the invokers disappeared from total invokers should be removed in the disabled invokers list
*/
public synchronized void refreshInvoker() {
invokerLock.lock();
try {
if (invokers != null) {
BitList<Invoker<T>> copiedInvokers = invokers.clone();
refreshInvokers(copiedInvokers, invokersToReconnect);
refreshInvokers(copiedInvokers, disabledInvokers);
validInvokers = copiedInvokers;
}
} finally {
invokerLock.unlock();
public void refreshInvoker() {
if (invokersInitialized) {
refreshInvokerInternal();
}
}

private synchronized void refreshInvokerInternal() {
BitList<Invoker<T>> copiedInvokers = invokers.clone();
refreshInvokers(copiedInvokers, invokersToReconnect);
refreshInvokers(copiedInvokers, disabledInvokers);
validInvokers = copiedInvokers;
}

private void refreshInvokers(BitList<Invoker<T>> targetInvokers, Collection<Invoker<T>> invokersToRemove) {
List<Invoker<T>> needToRemove = new LinkedList<>();
for (Invoker<T> tInvoker : invokersToRemove) {
Expand All @@ -358,32 +344,22 @@ private void refreshInvokers(BitList<Invoker<T>> targetInvokers, Collection<Invo

@Override
public void addDisabledInvoker(Invoker<T> invoker) {
invokerLock.lock();
try {
if (invokers.contains(invoker)) {
disabledInvokers.add(invoker);
validInvokers.remove(invoker);
logger.info("Disable service address: " + invoker.getUrl() + ".");
}
} finally {
invokerLock.unlock();
if (invokers.contains(invoker)) {
disabledInvokers.add(invoker);
removeValidInvoker(invoker);
logger.info("Disable service address: " + invoker.getUrl() + ".");
}
}

@Override
public void recoverDisabledInvoker(Invoker<T> invoker) {
invokerLock.lock();
try {
if (disabledInvokers.remove(invoker)) {
try {
validInvokers.add(invoker);
logger.info("Recover service address: " + invoker.getUrl() + " from disabled list.");
} catch (Throwable ignore) {
if (disabledInvokers.remove(invoker)) {
try {
addValidInvoker(invoker);
logger.info("Recover service address: " + invoker.getUrl() + " from disabled list.");
} catch (Throwable ignore) {

}
}
} finally {
invokerLock.unlock();
}
}

Expand All @@ -404,11 +380,13 @@ public ScheduledFuture<?> getConnectivityCheckFuture() {
}

public BitList<Invoker<T>> getInvokers() {
return invokers;
// return clone to avoid being modified.
return invokers.clone();
}

public BitList<Invoker<T>> getValidInvokers() {
return validInvokers;
// return clone to avoid being modified.
return validInvokers.clone();
}

public List<Invoker<T>> getInvokersToReconnect() {
Expand All @@ -419,6 +397,31 @@ public Set<Invoker<T>> getDisabledInvokers() {
return disabledInvokers;
}

protected void setInvokers(BitList<Invoker<T>> invokers) {
this.invokers = invokers;
refreshInvokerInternal();
this.invokersInitialized = true;
}

protected void destroyInvokers() {
// set empty instead of clearing to support concurrent access.
this.invokers = BitList.emptyList();
this.validInvokers = BitList.emptyList();
this.invokersInitialized = false;
}

private boolean addValidInvoker(Invoker<T> invoker) {
synchronized (this.validInvokers) {
return this.validInvokers.add(invoker);
}
}

private boolean removeValidInvoker(Invoker<T> invoker) {
synchronized (this.validInvokers) {
return this.validInvokers.remove(invoker);
}
}

protected abstract List<Invoker<T>> doList(BitList<Invoker<T>> invokers, Invocation invocation) throws RpcException;

}
Original file line number Diff line number Diff line change
Expand Up @@ -51,26 +51,25 @@ public StaticDirectory(URL url, List<Invoker<T>> invokers, RouterChain<T> router
if (CollectionUtils.isEmpty(invokers)) {
throw new IllegalArgumentException("invokers == null");
}
this.invokers = new BitList<>(invokers);
this.validInvokers = this.invokers.clone();
this.setInvokers(new BitList<>(invokers));
}

@Override
public Class<T> getInterface() {
return invokers.get(0).getInterface();
return getInvokers().get(0).getInterface();
}

@Override
public List<Invoker<T>> getAllInvokers() {
return invokers;
return getInvokers();
}

@Override
public boolean isAvailable() {
if (isDestroyed()) {
return false;
}
for (Invoker<T> invoker : validInvokers) {
for (Invoker<T> invoker : getValidInvokers()) {
if (invoker.isAvailable()) {
return true;
}
Expand All @@ -83,24 +82,23 @@ public void destroy() {
if (isDestroyed()) {
return;
}
for (Invoker<T> invoker : invokers) {
for (Invoker<T> invoker : getInvokers()) {
invoker.destroy();
}
super.destroy();
}

public void buildRouterChain() {
RouterChain<T> routerChain = RouterChain.buildChain(getUrl());
routerChain.setInvokers(invokers);
routerChain.setInvokers(getInvokers());
routerChain.loop(true);
this.setRouterChain(routerChain);
}

public void notify(List<Invoker<T>> invokers) {
this.invokers = new BitList<>(invokers);
refreshInvoker();
this.setInvokers(new BitList<>(invokers));
if (routerChain != null) {
routerChain.setInvokers(this.invokers);
routerChain.setInvokers(this.getInvokers());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ public void testStaticDirectory() {
List<Invoker<String>> newInvokers = staticDirectory.list(new MockDirInvocation());
Assertions.assertTrue(newInvokers.size() > 0);
staticDirectory.destroy();
Assertions.assertEquals(0, staticDirectory.invokers.size());
Assertions.assertEquals(0, staticDirectory.validInvokers.size());
Assertions.assertEquals(0, staticDirectory.getInvokers().size());
Assertions.assertEquals(0, staticDirectory.getValidInvokers().size());
}
}
Loading

0 comments on commit 8a14fb3

Please sign in to comment.