Skip to content

Commit 208c5c4

Browse files
authored
Rebalance subscription's sessions on DLQ event type attachment to topology (zalando#1581)
Refactoring: move subscription's sessions rebalance part of the `ZkSubscriptionClient` --------- Co-authored-by: Aleksey Pak <[email protected]>
1 parent 5dd37c6 commit 208c5c4

File tree

8 files changed

+42
-47
lines changed

8 files changed

+42
-47
lines changed

api-consumption/src/main/java/org/zalando/nakadi/service/subscription/StreamingContext.java

+1-22
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@
1616
import org.zalando.nakadi.domain.UnprocessableEventPolicy;
1717
import org.zalando.nakadi.exceptions.runtime.AccessDeniedException;
1818
import org.zalando.nakadi.exceptions.runtime.NakadiRuntimeException;
19-
import org.zalando.nakadi.exceptions.runtime.RebalanceConflictException;
2019
import org.zalando.nakadi.repository.kafka.KafkaRecordDeserializer;
2120
import org.zalando.nakadi.service.AuthorizationValidator;
2221
import org.zalando.nakadi.service.ConsumptionKpiCollector;
@@ -29,7 +28,6 @@
2928
import org.zalando.nakadi.service.FeatureToggleService;
3029
import org.zalando.nakadi.service.publishing.EventPublisher;
3130
import org.zalando.nakadi.service.subscription.autocommit.AutocommitSupport;
32-
import org.zalando.nakadi.service.subscription.model.Partition;
3331
import org.zalando.nakadi.service.subscription.model.Session;
3432
import org.zalando.nakadi.service.subscription.state.CleanupState;
3533
import org.zalando.nakadi.service.subscription.state.DummyState;
@@ -42,15 +40,13 @@
4240

4341
import java.io.Closeable;
4442
import java.io.IOException;
45-
import java.util.Collection;
4643
import java.util.Comparator;
4744
import java.util.List;
4845
import java.util.Optional;
4946
import java.util.concurrent.BlockingQueue;
5047
import java.util.concurrent.LinkedBlockingQueue;
5148
import java.util.concurrent.ScheduledExecutorService;
5249
import java.util.concurrent.TimeUnit;
53-
import java.util.function.BiFunction;
5450

5551
public class StreamingContext implements SubscriptionStreamer {
5652

@@ -67,7 +63,6 @@ public class StreamingContext implements SubscriptionStreamer {
6763
private final EventStreamChecks eventStreamChecks;
6864
private final ScheduledExecutorService timer;
6965
private final BlockingQueue<Runnable> taskQueue = new LinkedBlockingQueue<>();
70-
private final BiFunction<Collection<Session>, Partition[], Partition[]> rebalancer;
7166
private final CursorConverter cursorConverter;
7267
private final Subscription subscription;
7368
private final MetricRegistry metricRegistry;
@@ -102,7 +97,6 @@ private StreamingContext(final Builder builder) {
10297
this.out = builder.out;
10398
this.parameters = builder.parameters;
10499
this.session = builder.session;
105-
this.rebalancer = builder.rebalancer;
106100
this.timer = builder.timer;
107101
this.zkClient = builder.zkClient;
108102
this.kafkaPollTimeout = builder.kafkaPollTimeout;
@@ -363,16 +357,7 @@ private void rebalance() {
363357
if (null != sessionListSubscription) {
364358
// This call is needed to renew subscription for session list changes.
365359
sessionListSubscription.getData();
366-
zkClient.updateTopology(topology -> {
367-
try {
368-
return rebalancer.apply(
369-
zkClient.listSessions(),
370-
topology.getPartitions());
371-
} catch (final RebalanceConflictException e) {
372-
LOG.warn("failed to rebalance partitions: {}", e.getMessage(), e);
373-
return new Partition[0];
374-
}
375-
});
360+
zkClient.rebalanceSessions();
376361
}
377362
}
378363

@@ -437,7 +422,6 @@ public static final class Builder {
437422
private Session session;
438423
private ScheduledExecutorService timer;
439424
private ZkSubscriptionClient zkClient;
440-
private BiFunction<Collection<Session>, Partition[], Partition[]> rebalancer;
441425
private long kafkaPollTimeout;
442426
private CursorTokenService cursorTokenService;
443427
private ObjectMapper objectMapper;
@@ -505,11 +489,6 @@ public Builder setZkClient(final ZkSubscriptionClient zkClient) {
505489
return this;
506490
}
507491

508-
public Builder setRebalancer(final BiFunction<Collection<Session>, Partition[], Partition[]> rebalancer) {
509-
this.rebalancer = rebalancer;
510-
return this;
511-
}
512-
513492
public Builder setKafkaPollTimeout(final long kafkaPollTimeout) {
514493
this.kafkaPollTimeout = kafkaPollTimeout;
515494
return this;

api-consumption/src/main/java/org/zalando/nakadi/service/subscription/SubscriptionStreamerFactory.java

-1
Original file line numberDiff line numberDiff line change
@@ -115,7 +115,6 @@ public SubscriptionStreamer build(
115115
.setSession(session)
116116
.setTimer(executorService)
117117
.setZkClient(zkClient)
118-
.setRebalancer(new SubscriptionRebalancer())
119118
.setKafkaPollTimeout(kafkaPollTimeout)
120119
.setCursorTokenService(cursorTokenService)
121120
.setObjectMapper(objectMapper)

api-consumption/src/test/java/org/zalando/nakadi/service/subscription/StreamingContextTest.java

-1
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,6 @@ public OutputStream getOutputStream() {
8787
.setSubscription(new Subscription())
8888
.setTimer(null)
8989
.setZkClient(zkClient)
90-
.setRebalancer(null)
9190
.setKafkaPollTimeout(0)
9291
.setCursorTokenService(null)
9392
.setObjectMapper(null)

app/src/main/java/org/zalando/nakadi/service/job/DlqRedriveEventTypeAttachmentJob.java

+18-21
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,6 @@
3030
import java.util.List;
3131
import java.util.Map;
3232
import java.util.Set;
33-
import java.util.concurrent.TimeUnit;
3433
import java.util.stream.Collectors;
3534

3635
@Service
@@ -147,33 +146,31 @@ private void addDlqPartitionsToSubscription(
147146
final Subscription subscription,
148147
final List<Partition> unassignedDlqPartitions) throws Exception {
149148

150-
final ZkSubscriptionClient client = subscriptionClientFactory.createClient(subscription);
149+
try (ZkSubscriptionClient client = subscriptionClientFactory.createClient(subscription)) {
151150

152-
// idempotent, does not overwrite existing offsets
153-
client.createOffsetZNodes(
154-
unassignedDlqPartitions.stream()
155-
.map(p -> p.getPartition())
156-
.map(p -> new Cursor(p, Cursor.BEFORE_OLDEST_OFFSET))
157-
.map(c -> cursorConverter.convert(dlqRedriveEventTypeName, c))
158-
.map(nc -> cursorConverter.convertToNoToken(nc))
159-
.collect(Collectors.toList()));
151+
// idempotent, does not overwrite existing offsets
152+
client.createOffsetZNodes(
153+
unassignedDlqPartitions.stream()
154+
.map(p -> p.getPartition())
155+
.map(p -> new Cursor(p, Cursor.BEFORE_OLDEST_OFFSET))
156+
.map(c -> cursorConverter.convert(dlqRedriveEventTypeName, c))
157+
.map(nc -> cursorConverter.convertToNoToken(nc))
158+
.collect(Collectors.toList()));
160159

161-
final boolean[] hasNewPartitions = {false};
162-
client.updateTopology(topology -> {
160+
final boolean[] hasNewPartitions = {false};
161+
client.updateTopology(topology -> {
163162
final var newPartitions = selectMissingPartitions(topology.getPartitions(), unassignedDlqPartitions);
164163
if (!hasNewPartitions[0] && newPartitions.length != 0) {
165164
hasNewPartitions[0] = true;
166165
}
167166
return newPartitions;
168-
});
169-
170-
// shortcut to enable new partitions for streaming, otherwise they will stay unassigned.
171-
// the better way is to rebalance them.
172-
if (hasNewPartitions[0]) {
173-
client.closeSubscriptionStreams(
174-
() -> LOG.info("Subscription `{}` streams were closed due to addition of " +
175-
"Nakadi DLQ Event Type", subscription.getId()),
176-
TimeUnit.SECONDS.toMillis(nakadiSettings.getMaxCommitTimeout()));
167+
});
168+
169+
if (hasNewPartitions[0]) {
170+
LOG.info("Rebalancing `{}` subscription's sessions due to addition of " +
171+
"Nakadi DLQ Event Type", subscription.getId());
172+
client.rebalanceSessions();
173+
}
177174
}
178175
}
179176

core-common/src/main/java/org/zalando/nakadi/service/subscription/zk/NewZkSubscriptionClient.java

+16
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
import org.slf4j.LoggerFactory;
1414
import org.zalando.nakadi.domain.EventTypePartition;
1515
import org.zalando.nakadi.exceptions.runtime.NakadiRuntimeException;
16+
import org.zalando.nakadi.exceptions.runtime.RebalanceConflictException;
1617
import org.zalando.nakadi.exceptions.runtime.ServiceTemporarilyUnavailableException;
1718
import org.zalando.nakadi.exceptions.runtime.ZookeeperException;
1819
import org.zalando.nakadi.repository.zookeeper.ZooKeeperHolder;
@@ -142,6 +143,21 @@ public void updateTopology(final Function<Topology, Partition[]> partitioner)
142143
.withExceptionsThatForceRetry(KeeperException.BadVersionException.class));
143144
}
144145

146+
@Override
147+
public void rebalanceSessions() {
148+
updateTopology(topology -> {
149+
final var rebalancer = new SubscriptionRebalancer();
150+
try {
151+
return rebalancer.apply(
152+
listSessions(),
153+
topology.getPartitions());
154+
} catch (final RebalanceConflictException e) {
155+
LOG.warn("failed to rebalance partitions: {}", e.getMessage(), e);
156+
return new Partition[0];
157+
}
158+
});
159+
}
160+
145161
@Override
146162
public Topology getTopology() throws NakadiRuntimeException,
147163
SubscriptionNotInitializedException {

api-consumption/src/main/java/org/zalando/nakadi/service/subscription/SubscriptionRebalancer.java core-common/src/main/java/org/zalando/nakadi/service/subscription/zk/SubscriptionRebalancer.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
package org.zalando.nakadi.service.subscription;
1+
package org.zalando.nakadi.service.subscription.zk;
22

33
import com.google.common.collect.Lists;
44
import org.zalando.nakadi.domain.EventTypePartition;

core-common/src/main/java/org/zalando/nakadi/service/subscription/zk/ZkSubscriptionClient.java

+5
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,11 @@ Collection<Session> listSessions()
7171

7272
boolean isActiveSession(String streamId) throws ServiceTemporarilyUnavailableException;
7373

74+
/**
75+
* Re-assigns topology's partitions among sessions and updates the topology with new assignment.
76+
*/
77+
void rebalanceSessions();
78+
7479
/**
7580
* Returns subscription {@link Topology} object from Zookeeper
7681
*

api-consumption/src/test/java/org/zalando/nakadi/service/subscription/SubscriptionRebalancerTest.java core-common/src/test/java/org/zalando/nakadi/service/subscription/zk/SubscriptionRebalancerTest.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
package org.zalando.nakadi.service.subscription;
1+
package org.zalando.nakadi.service.subscription.zk;
22

33
import com.google.common.collect.ImmutableList;
44
import org.junit.Test;

0 commit comments

Comments
 (0)