Skip to content

Commit 389f96a

Browse files
authored
MINOR: Various cleanups in coordinator modules (apache#17828)
Reviewers: David Jacot <[email protected]>, Ken Huang <[email protected]>
1 parent 624cd4f commit 389f96a

File tree

20 files changed

+89
-131
lines changed

20 files changed

+89
-131
lines changed

coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntime.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -2333,7 +2333,7 @@ public void scheduleUnloadOperation(
23332333
if (context != null) {
23342334
context.lock.lock();
23352335
try {
2336-
if (!partitionEpoch.isPresent() || context.epoch < partitionEpoch.getAsInt()) {
2336+
if (partitionEpoch.isEmpty() || context.epoch < partitionEpoch.getAsInt()) {
23372337
log.info("Started unloading metadata for {} with epoch {}.", tp, partitionEpoch);
23382338
context.transitionTo(CoordinatorState.CLOSED);
23392339
coordinators.remove(tp, context);

coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/EventAccumulatorTest.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@
3737

3838
public class EventAccumulatorTest {
3939

40-
private class MockEvent implements EventAccumulator.Event<Integer> {
40+
private static class MockEvent implements EventAccumulator.Event<Integer> {
4141
int key;
4242
int value;
4343

@@ -153,7 +153,7 @@ public void testKeyConcurrentAndOrderingGuarantees() {
153153
accumulator.addLast(event2);
154154
assertEquals(3, accumulator.size());
155155

156-
MockEvent event = null;
156+
MockEvent event;
157157

158158
// Poll event0.
159159
event = accumulator.poll();

coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/InMemoryPartitionWriter.java

+8-10
Original file line numberDiff line numberDiff line change
@@ -37,10 +37,10 @@
3737
*/
3838
public class InMemoryPartitionWriter implements PartitionWriter {
3939

40-
private class PartitionState {
41-
private ReentrantLock lock = new ReentrantLock();
42-
private List<Listener> listeners = new ArrayList<>();
43-
private List<MemoryRecords> entries = new ArrayList<>();
40+
private static class PartitionState {
41+
private final ReentrantLock lock = new ReentrantLock();
42+
private final List<Listener> listeners = new ArrayList<>();
43+
private final List<MemoryRecords> entries = new ArrayList<>();
4444
private long endOffset = 0L;
4545
private long committedOffset = 0L;
4646
}
@@ -134,9 +134,8 @@ public void commit(
134134
state.lock.lock();
135135
try {
136136
state.committedOffset = offset;
137-
state.listeners.forEach(listener -> {
138-
listener.onHighWatermarkUpdated(tp, state.committedOffset);
139-
});
137+
state.listeners.forEach(listener ->
138+
listener.onHighWatermarkUpdated(tp, state.committedOffset));
140139
} finally {
141140
state.lock.unlock();
142141
}
@@ -149,9 +148,8 @@ public void commit(
149148
state.lock.lock();
150149
try {
151150
state.committedOffset = state.endOffset;
152-
state.listeners.forEach(listener -> {
153-
listener.onHighWatermarkUpdated(tp, state.committedOffset);
154-
});
151+
state.listeners.forEach(listener ->
152+
listener.onHighWatermarkUpdated(tp, state.committedOffset));
155153
} finally {
156154
state.lock.unlock();
157155
}

group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinator.java

-1
Original file line numberDiff line numberDiff line change
@@ -420,7 +420,6 @@ void onNewMetadataImage(
420420
*
421421
* @param groupId The group id.
422422
* @param newGroupConfig The new group config
423-
* @return void
424423
*/
425424
void updateGroupConfig(String groupId, Properties newGroupConfig);
426425

group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java

+11-10
Original file line numberDiff line numberDiff line change
@@ -78,8 +78,10 @@ public class GroupCoordinatorConfig {
7878
public static final String GROUP_COORDINATOR_REBALANCE_PROTOCOLS_DOC = "The list of enabled rebalance protocols. Supported protocols: " +
7979
Arrays.stream(Group.GroupType.values()).map(Group.GroupType::toString).collect(Collectors.joining(",")) + ". " +
8080
"The " + Group.GroupType.SHARE + " rebalance protocol is in early access and therefore must not be used in production.";
81-
public static final List<String> GROUP_COORDINATOR_REBALANCE_PROTOCOLS_DEFAULT =
82-
Collections.unmodifiableList(Arrays.asList(Group.GroupType.CLASSIC.toString(), Group.GroupType.CONSUMER.toString()));
81+
public static final List<String> GROUP_COORDINATOR_REBALANCE_PROTOCOLS_DEFAULT = List.of(
82+
Group.GroupType.CLASSIC.toString(),
83+
Group.GroupType.CONSUMER.toString()
84+
);
8385
public static final String GROUP_COORDINATOR_APPEND_LINGER_MS_CONFIG = "group.coordinator.append.linger.ms";
8486
public static final String GROUP_COORDINATOR_APPEND_LINGER_MS_DOC = "The duration in milliseconds that the coordinator will " +
8587
"wait for writes to accumulate before flushing them to disk. Transactional writes are not accumulated.";
@@ -120,10 +122,10 @@ public class GroupCoordinatorConfig {
120122

121123
public static final String CONSUMER_GROUP_ASSIGNORS_CONFIG = "group.consumer.assignors";
122124
public static final String CONSUMER_GROUP_ASSIGNORS_DOC = "The server side assignors as a list of full class names. The first one in the list is considered as the default assignor to be used in the case where the consumer does not specify an assignor.";
123-
public static final List<String> CONSUMER_GROUP_ASSIGNORS_DEFAULT = Collections.unmodifiableList(Arrays.asList(
124-
UniformAssignor.class.getName(),
125-
RangeAssignor.class.getName()
126-
));
125+
public static final List<String> CONSUMER_GROUP_ASSIGNORS_DEFAULT = List.of(
126+
UniformAssignor.class.getName(),
127+
RangeAssignor.class.getName()
128+
);
127129

128130
public static final String CONSUMER_GROUP_MIGRATION_POLICY_CONFIG = "group.consumer.migration.policy";
129131
public static final String CONSUMER_GROUP_MIGRATION_POLICY_DEFAULT = ConsumerGroupMigrationPolicy.BIDIRECTIONAL.toString();
@@ -380,10 +382,9 @@ public Map<String, Integer> extractGroupConfigMap(ShareGroupConfig shareGroupCon
380382
* Copy the subset of properties that are relevant to consumer group.
381383
*/
382384
public Map<String, Integer> extractConsumerGroupConfigMap() {
383-
Map<String, Integer> groupProps = new HashMap<>();
384-
groupProps.put(GroupConfig.CONSUMER_SESSION_TIMEOUT_MS_CONFIG, consumerGroupSessionTimeoutMs());
385-
groupProps.put(GroupConfig.CONSUMER_HEARTBEAT_INTERVAL_MS_CONFIG, consumerGroupHeartbeatIntervalMs());
386-
return Collections.unmodifiableMap(groupProps);
385+
return Map.of(
386+
GroupConfig.CONSUMER_SESSION_TIMEOUT_MS_CONFIG, consumerGroupSessionTimeoutMs(),
387+
GroupConfig.CONSUMER_HEARTBEAT_INTERVAL_MS_CONFIG, consumerGroupHeartbeatIntervalMs());
387388
}
388389

389390
/**

group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorRecordHelpers.java

+6-6
Original file line numberDiff line numberDiff line change
@@ -141,13 +141,13 @@ public static CoordinatorRecord newConsumerGroupSubscriptionMetadataRecord(
141141
Map<String, TopicMetadata> newSubscriptionMetadata
142142
) {
143143
ConsumerGroupPartitionMetadataValue value = new ConsumerGroupPartitionMetadataValue();
144-
newSubscriptionMetadata.forEach((topicName, topicMetadata) -> {
144+
newSubscriptionMetadata.forEach((topicName, topicMetadata) ->
145145
value.topics().add(new ConsumerGroupPartitionMetadataValue.TopicMetadata()
146146
.setTopicId(topicMetadata.id())
147147
.setTopicName(topicMetadata.name())
148148
.setNumPartitions(topicMetadata.numPartitions())
149-
);
150-
});
149+
)
150+
);
151151

152152
return new CoordinatorRecord(
153153
new ApiMessageAndVersion(
@@ -674,13 +674,13 @@ public static CoordinatorRecord newShareGroupSubscriptionMetadataRecord(
674674
Map<String, TopicMetadata> newSubscriptionMetadata
675675
) {
676676
ShareGroupPartitionMetadataValue value = new ShareGroupPartitionMetadataValue();
677-
newSubscriptionMetadata.forEach((topicName, topicMetadata) -> {
677+
newSubscriptionMetadata.forEach((topicName, topicMetadata) ->
678678
value.topics().add(new ShareGroupPartitionMetadataValue.TopicMetadata()
679679
.setTopicId(topicMetadata.id())
680680
.setTopicName(topicMetadata.name())
681681
.setNumPartitions(topicMetadata.numPartitions())
682-
);
683-
});
682+
)
683+
);
684684

685685
return new CoordinatorRecord(
686686
new ApiMessageAndVersion(

group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -116,11 +116,11 @@ public class GroupCoordinatorShard implements CoordinatorShard<CoordinatorRecord
116116

117117
public static class Builder implements CoordinatorShardBuilder<GroupCoordinatorShard, CoordinatorRecord> {
118118
private final GroupCoordinatorConfig config;
119+
private final GroupConfigManager groupConfigManager;
119120
private LogContext logContext;
120121
private SnapshotRegistry snapshotRegistry;
121122
private Time time;
122123
private CoordinatorTimer<Void, CoordinatorRecord> timer;
123-
private GroupConfigManager groupConfigManager;
124124
private CoordinatorMetrics coordinatorMetrics;
125125
private TopicPartition topicPartition;
126126

group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java

+12-12
Original file line numberDiff line numberDiff line change
@@ -731,7 +731,7 @@ public List<DescribeGroupsResponseData.DescribedGroup> describeGroups(
731731
ClassicGroup group = classicGroup(groupId, committedOffset);
732732

733733
if (group.isInState(STABLE)) {
734-
if (!group.protocolName().isPresent()) {
734+
if (group.protocolName().isEmpty()) {
735735
throw new IllegalStateException("Invalid null group protocol for stable group");
736736
}
737737

@@ -751,7 +751,7 @@ public List<DescribeGroupsResponseData.DescribedGroup> describeGroups(
751751
.setGroupState(group.stateAsString())
752752
.setProtocolType(group.protocolType().orElse(""))
753753
.setMembers(group.allMembers().stream()
754-
.map(member -> member.describeNoMetadata())
754+
.map(ClassicGroupMember::describeNoMetadata)
755755
.collect(Collectors.toList())
756756
)
757757
);
@@ -3737,9 +3737,9 @@ public void replay(
37373737

37383738
if (value != null) {
37393739
Map<String, TopicMetadata> subscriptionMetadata = new HashMap<>();
3740-
value.topics().forEach(topicMetadata -> {
3741-
subscriptionMetadata.put(topicMetadata.topicName(), TopicMetadata.fromRecord(topicMetadata));
3742-
});
3740+
value.topics().forEach(topicMetadata ->
3741+
subscriptionMetadata.put(topicMetadata.topicName(), TopicMetadata.fromRecord(topicMetadata))
3742+
);
37433743
group.setSubscriptionMetadata(subscriptionMetadata);
37443744
} else {
37453745
group.setSubscriptionMetadata(Collections.emptyMap());
@@ -3947,19 +3947,19 @@ public void onUnloaded() {
39473947
case DEAD:
39483948
break;
39493949
case PREPARING_REBALANCE:
3950-
classicGroup.allMembers().forEach(member -> {
3950+
classicGroup.allMembers().forEach(member ->
39513951
classicGroup.completeJoinFuture(member, new JoinGroupResponseData()
39523952
.setMemberId(member.memberId())
3953-
.setErrorCode(NOT_COORDINATOR.code()));
3954-
});
3953+
.setErrorCode(NOT_COORDINATOR.code()))
3954+
);
39553955

39563956
break;
39573957
case COMPLETING_REBALANCE:
39583958
case STABLE:
3959-
classicGroup.allMembers().forEach(member -> {
3959+
classicGroup.allMembers().forEach(member ->
39603960
classicGroup.completeSyncFuture(member, new SyncGroupResponseData()
3961-
.setErrorCode(NOT_COORDINATOR.code()));
3962-
});
3961+
.setErrorCode(NOT_COORDINATOR.code()))
3962+
);
39633963
}
39643964
break;
39653965
case SHARE:
@@ -6086,7 +6086,7 @@ private boolean maybeDeleteEmptyClassicGroup(Group group, List<CoordinatorRecord
60866086
if (isEmptyClassicGroup(group)) {
60876087
// Delete the classic group by adding tombstones.
60886088
// There's no need to remove the group as the replay of tombstones removes it.
6089-
if (group != null) createGroupTombstoneRecords(group, records);
6089+
createGroupTombstoneRecords(group, records);
60906090
return true;
60916091
}
60926092
return false;

group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -872,7 +872,7 @@ public boolean cleanupExpiredOffsets(String groupId, List<CoordinatorRecord> rec
872872
long currentTimestampMs = time.milliseconds();
873873
Optional<OffsetExpirationCondition> offsetExpirationCondition = group.offsetExpirationCondition();
874874

875-
if (!offsetExpirationCondition.isPresent()) {
875+
if (offsetExpirationCondition.isEmpty()) {
876876
return false;
877877
}
878878

group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/RangeSet.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ public boolean contains(Object o) {
6363

6464
@Override
6565
public Iterator<Integer> iterator() {
66-
return new Iterator<Integer>() {
66+
return new Iterator<>() {
6767
private int current = from;
6868

6969
@Override

group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/UniformHeterogeneousAssignmentBuilder.java

+32-50
Original file line numberDiff line numberDiff line change
@@ -24,9 +24,6 @@
2424
import org.apache.kafka.coordinator.group.api.assignor.SubscribedTopicDescriber;
2525
import org.apache.kafka.coordinator.group.modern.MemberAssignmentImpl;
2626

27-
import org.slf4j.Logger;
28-
import org.slf4j.LoggerFactory;
29-
3027
import java.util.ArrayList;
3128
import java.util.Arrays;
3229
import java.util.Collection;
@@ -55,7 +52,6 @@
5552
* Balance > Stickiness.
5653
*/
5754
public class UniformHeterogeneousAssignmentBuilder {
58-
private static final Logger LOG = LoggerFactory.getLogger(UniformHeterogeneousAssignmentBuilder.class);
5955

6056
/**
6157
* The maximum number of iterations to perform in the final iterative balancing phase.
@@ -181,50 +177,44 @@ public UniformHeterogeneousAssignmentBuilder(GroupSpec groupSpec, SubscribedTopi
181177
}
182178
}
183179

184-
this.topicComparator = new Comparator<Uuid>() {
185-
@Override
186-
public int compare(final Uuid topic1Id, final Uuid topic2Id) {
187-
int topic1PartitionCount = subscribedTopicDescriber.numPartitions(topic1Id);
188-
int topic2PartitionCount = subscribedTopicDescriber.numPartitions(topic2Id);
189-
int topic1SubscriberCount = topicSubscribers.get(topic1Id).size();
190-
int topic2SubscriberCount = topicSubscribers.get(topic2Id).size();
191-
192-
// Order by partitions per subscriber, descending.
193-
int order = Double.compare(
194-
(double) topic2PartitionCount / topic2SubscriberCount,
195-
(double) topic1PartitionCount / topic1SubscriberCount
196-
);
197-
198-
// Then order by subscriber count, ascending.
199-
if (order == 0) {
200-
order = Integer.compare(topic1SubscriberCount, topic2SubscriberCount);
201-
}
202-
203-
// Then order by topic id, ascending.
204-
if (order == 0) {
205-
order = topic1Id.compareTo(topic2Id);
206-
}
180+
this.topicComparator = (topic1Id, topic2Id) -> {
181+
int topic1PartitionCount = subscribedTopicDescriber.numPartitions(topic1Id);
182+
int topic2PartitionCount = subscribedTopicDescriber.numPartitions(topic2Id);
183+
int topic1SubscriberCount = topicSubscribers.get(topic1Id).size();
184+
int topic2SubscriberCount = topicSubscribers.get(topic2Id).size();
185+
186+
// Order by partitions per subscriber, descending.
187+
int order = Double.compare(
188+
(double) topic2PartitionCount / topic2SubscriberCount,
189+
(double) topic1PartitionCount / topic1SubscriberCount
190+
);
191+
192+
// Then order by subscriber count, ascending.
193+
if (order == 0) {
194+
order = Integer.compare(topic1SubscriberCount, topic2SubscriberCount);
195+
}
207196

208-
return order;
197+
// Then order by topic id, ascending.
198+
if (order == 0) {
199+
order = topic1Id.compareTo(topic2Id);
209200
}
210-
};
211201

212-
this.memberComparator = new Comparator<Integer>() {
213-
@Override
214-
public int compare(final Integer memberIndex1, final Integer memberIndex2) {
215-
// Order by number of assigned partitions, ascending.
216-
int order = Integer.compare(
217-
memberTargetAssignmentSizes[memberIndex1],
218-
memberTargetAssignmentSizes[memberIndex2]
219-
);
202+
return order;
203+
};
220204

221-
// Then order by member index, ascending.
222-
if (order == 0) {
223-
order = memberIndex1.compareTo(memberIndex2);
224-
}
205+
this.memberComparator = (memberIndex1, memberIndex2) -> {
206+
// Order by number of assigned partitions, ascending.
207+
int order = Integer.compare(
208+
memberTargetAssignmentSizes[memberIndex1],
209+
memberTargetAssignmentSizes[memberIndex2]
210+
);
225211

226-
return order;
212+
// Then order by member index, ascending.
213+
if (order == 0) {
214+
order = memberIndex1.compareTo(memberIndex2);
227215
}
216+
217+
return order;
228218
};
229219

230220
// Initialize partition owners for the target assignments.
@@ -851,14 +841,6 @@ private void assignPartition(Uuid topicId, int partition, int memberIndex) {
851841
addPartitionToTargetAssignment(topicId, partition, memberIndex);
852842
}
853843

854-
/**
855-
* @param memberIndex The member index.
856-
* @return The current assignment size for the given member.
857-
*/
858-
private int targetAssignmentSize(int memberIndex) {
859-
return memberTargetAssignmentSizes[memberIndex];
860-
}
861-
862844
/**
863845
* Assigns a partition to a member and updates the current assignment size.
864846
*

0 commit comments

Comments
 (0)