Skip to content

Commit

Permalink
Fixed - Invocation timeout isn't applied for RTopic.removeListenerAsy…
Browse files Browse the repository at this point in the history
…nc() methods.
  • Loading branch information
Nikita Koksharov committed Aug 9, 2022
1 parent 7d62d78 commit 7cfa063
Showing 1 changed file with 35 additions and 50 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import java.util.stream.Collectors;

/**
Expand Down Expand Up @@ -637,68 +638,55 @@ private void psubscribe(ChannelName channelName, Collection<RedisPubSubListener<
}

public CompletableFuture<Void> removeListenerAsync(PubSubType type, ChannelName channelName, EventListener listener) {
CompletableFuture<Void> promise = new CompletableFuture<>();
AsyncSemaphore semaphore = getSemaphore(channelName);
semaphore.acquire(() -> {
Collection<MasterSlaveEntry> entries = Collections.singletonList(getEntry(channelName));
if (isMultiEntity(channelName)) {
entries = connectionManager.getEntrySet();
}

AtomicInteger counter = new AtomicInteger(entries.size());
for (MasterSlaveEntry e : entries) {
PubSubConnectionEntry entry = name2PubSubConnection.get(new PubSubKey(channelName, e));
if (entry == null) {
if (counter.decrementAndGet() == 0) {
semaphore.release();
promise.complete(null);
}
continue;
}
return removeListenerAsync(type, channelName, entry -> {
entry.removeListener(channelName, listener);
});
}

entry.removeListener(channelName, listener);
if (!entry.hasListeners(channelName)) {
unsubscribe(type, channelName)
.whenComplete((r, ex) -> {
if (counter.decrementAndGet() == 0) {
semaphore.release();
promise.complete(null);
}
});
} else {
if (counter.decrementAndGet() == 0) {
semaphore.release();
promise.complete(null);
}
}
public CompletableFuture<Void> removeListenerAsync(PubSubType type, ChannelName channelName, Integer... listenerIds) {
return removeListenerAsync(type, channelName, entry -> {
for (int id : listenerIds) {
entry.removeListener(channelName, id);
}
});
return promise;
}

public CompletableFuture<Void> removeListenerAsync(PubSubType type, ChannelName channelName, Integer... listenerIds) {
CompletableFuture<Void> promise = new CompletableFuture<>();
private CompletableFuture<Void> removeListenerAsync(PubSubType type, ChannelName channelName, Consumer<PubSubConnectionEntry> consumer) {
AsyncSemaphore semaphore = getSemaphore(channelName);
semaphore.acquire(() -> {
Collection<MasterSlaveEntry> entries = Collections.singletonList(getEntry(channelName));
CompletableFuture<Void> sf = semaphore.acquire();
int timeout = config.getTimeout() + config.getRetryInterval() * config.getRetryAttempts();
connectionManager.newTimeout(t -> {
sf.completeExceptionally(new RedisTimeoutException("Remove listeners operation timeout: (" + timeout + "ms) for " + channelName + " topic"));
}, timeout, TimeUnit.MILLISECONDS);

return sf.thenCompose(res -> {
Collection<MasterSlaveEntry> entries;
if (isMultiEntity(channelName)) {
entries = connectionManager.getEntrySet();
} else {
MasterSlaveEntry entry = getEntry(channelName);
if (entry == null) {
semaphore.release();
CompletableFuture<Void> f = new CompletableFuture<>();
f.completeExceptionally(new IllegalStateException("Unable to find entry for channel: " + channelName));
return f;
}
entries = Collections.singletonList(entry);
}

AtomicInteger counter = new AtomicInteger(entries.size());
CompletableFuture<Void> promise = new CompletableFuture<>();
for (MasterSlaveEntry e : entries) {
PubSubConnectionEntry entry = name2PubSubConnection.get(new PubSubKey(channelName, e));
if (entry == null) {
if (counter.decrementAndGet() == 0) {
semaphore.release();
promise.complete(null);
return CompletableFuture.completedFuture(null);
}
continue;
}

for (int id : listenerIds) {
entry.removeListener(channelName, id);
}
consumer.accept(entry);
if (!entry.hasListeners(channelName)) {
unsubscribe(type, channelName)
.whenComplete((r, ex) -> {
Expand All @@ -714,23 +702,20 @@ public CompletableFuture<Void> removeListenerAsync(PubSubType type, ChannelName
}
}
}
return promise;
});
return promise;
}

public CompletableFuture<Void> removeAllListenersAsync(PubSubType type, ChannelName channelName) {
AsyncSemaphore semaphore = getSemaphore(channelName);
int timeout = config.getTimeout() + config.getRetryInterval() * config.getRetryAttempts();

CompletableFuture<Void> res = new CompletableFuture<>();
CompletableFuture<Void> sf = semaphore.acquire();
int timeout = config.getTimeout() + config.getRetryInterval() * config.getRetryAttempts();
connectionManager.newTimeout(t -> {
res.completeExceptionally(new RedisTimeoutException("Remove listeners operation timeout: (" + timeout + "ms) for " + channelName + " topic"));
sf.completeExceptionally(new RedisTimeoutException("Remove listeners operation timeout: (" + timeout + "ms) for " + channelName + " topic"));
}, timeout, TimeUnit.MILLISECONDS);
semaphore.acquire(() -> {
res.complete(null);
});

CompletableFuture<Void> f = res.thenCompose(r -> {
CompletableFuture<Void> f = sf.thenCompose(r -> {
PubSubConnectionEntry entry = getPubSubEntry(channelName);
if (entry == null) {
semaphore.release();
Expand Down

0 comments on commit 7cfa063

Please sign in to comment.