Skip to content

Commit 1621f88

Browse files
authored
KAFKA-17367: Share coordinator infra classes [1/N] (apache#16921)
Introduce ShareCoordinator interface and related classes. Reviewers: Andrew Schofield <[email protected]>, Apoorv Mittal <[email protected]>, David Arthur <[email protected]>, Chia-Ping Tsai <[email protected]>
1 parent 31f408d commit 1621f88

File tree

15 files changed

+1114
-154
lines changed

15 files changed

+1114
-154
lines changed

build.gradle

+11
Original file line numberDiff line numberDiff line change
@@ -1596,6 +1596,17 @@ project(':share-coordinator') {
15961596
implementation project(':clients')
15971597
implementation project(':coordinator-common')
15981598
implementation project(':metadata')
1599+
implementation project(':server-common')
1600+
1601+
testImplementation project(':clients').sourceSets.test.output
1602+
testImplementation project(':server-common').sourceSets.test.output
1603+
testImplementation project(':coordinator-common').sourceSets.test.output
1604+
testImplementation libs.junitJupiter
1605+
testImplementation libs.mockitoCore
1606+
1607+
testRuntimeOnly libs.slf4jReload4j
1608+
testRuntimeOnly libs.junitPlatformLanucher
1609+
15991610
generator project(':generator')
16001611
}
16011612

checkstyle/import-control-share-coordinator.xml

+14-5
Original file line numberDiff line numberDiff line change
@@ -24,24 +24,33 @@
2424

2525
<!-- common library dependencies -->
2626
<allow pkg="java" />
27+
<allow pkg="org.junit.jupiter.api" />
28+
2729
<!-- no one depends on the server -->
2830
<disallow pkg="kafka" />
2931

3032
<!-- anyone can use public classes -->
33+
<allow pkg="org.apache.kafka.common" exact-match="true" />
34+
<allow pkg="org.apache.kafka.common.errors" exact-match="true" />
35+
<allow pkg="org.apache.kafka.common.memory" />
36+
<allow pkg="org.apache.kafka.common.protocol" />
37+
<allow pkg="org.apache.kafka.common.security" />
38+
<allow pkg="org.apache.kafka.common.serialization" />
39+
<allow pkg="org.apache.kafka.common.utils" />
40+
3141
<subpackage name="coordinator">
3242
<subpackage name="share">
43+
<allow pkg="org.apache.kafka.common.annotation" />
3344
<allow pkg="org.apache.kafka.common.config" />
3445
<allow pkg="org.apache.kafka.common.message" />
3546
<allow pkg="org.apache.kafka.common.requests" />
3647
<allow pkg="org.apache.kafka.coordinator.common" />
48+
<allow pkg="org.apache.kafka.coordinator.share.generated" />
3749
<allow pkg="org.apache.kafka.image" />
50+
<allow pkg="org.apache.kafka.server.common" />
51+
<allow pkg="org.apache.kafka.server.group.share" />
3852
<subpackage name="generated">
39-
<allow class="org.apache.kafka.common.Uuid" />
4053
<allow pkg="com.fasterxml.jackson" />
41-
<allow pkg="org.apache.kafka.common.errors" />
42-
<allow pkg="org.apache.kafka.common.protocol" />
43-
<allow pkg="org.apache.kafka.common.utils" />
44-
<allow pkg="org.apache.kafka.coordinator.share.generated" />
4554
</subpackage>
4655
</subpackage>
4756
</subpackage>

checkstyle/suppressions.xml

+2-2
Original file line numberDiff line numberDiff line change
@@ -323,15 +323,15 @@
323323

324324
<!-- group coordinator -->
325325
<suppress checks="CyclomaticComplexity"
326-
files="(ConsumerGroupMember|GroupMetadataManager|GeneralUniformAssignmentBuilder).java"/>
326+
files="(ConsumerGroupMember|GroupMetadataManager|GeneralUniformAssignmentBuilder|GroupCoordinatorRecordSerde).java"/>
327327
<suppress checks="(NPathComplexity|MethodLength)"
328328
files="(GroupMetadataManager|ConsumerGroupTest|ShareGroupTest|GroupMetadataManagerTest|GroupMetadataManagerTestContext|GeneralUniformAssignmentBuilder).java"/>
329329
<suppress checks="ClassFanOutComplexity"
330330
files="(GroupMetadataManager|GroupMetadataManagerTest|GroupMetadataManagerTestContext|GroupCoordinatorService|GroupCoordinatorServiceTest).java"/>
331331
<suppress checks="ParameterNumber"
332332
files="(ConsumerGroupMember|GroupMetadataManager|GroupCoordinatorConfig).java"/>
333333
<suppress checks="ClassDataAbstractionCouplingCheck"
334-
files="(RecordHelpersTest|GroupCoordinatorRecordHelpers|GroupMetadataManager|GroupMetadataManagerTest|OffsetMetadataManagerTest|GroupCoordinatorServiceTest|GroupCoordinatorShardTest).java"/>
334+
files="(RecordHelpersTest|GroupCoordinatorRecordHelpers|GroupMetadataManager|GroupMetadataManagerTest|OffsetMetadataManagerTest|GroupCoordinatorServiceTest|GroupCoordinatorShardTest|GroupCoordinatorRecordSerde).java"/>
335335
<suppress checks="JavaNCSS"
336336
files="(GroupMetadataManager|GroupMetadataManagerTest).java"/>
337337

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,122 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.kafka.coordinator.common.runtime;
19+
20+
import org.apache.kafka.common.protocol.ApiMessage;
21+
import org.apache.kafka.common.protocol.ByteBufferAccessor;
22+
import org.apache.kafka.common.protocol.MessageUtil;
23+
import org.apache.kafka.server.common.ApiMessageAndVersion;
24+
25+
import java.nio.BufferUnderflowException;
26+
import java.nio.ByteBuffer;
27+
28+
/**
29+
* Serializer/Deserializer for {@link CoordinatorRecord}. The format is defined below:
30+
* <pre>
31+
* record_key = [record_type key_message]
32+
* record_value = [value_version value_message]
33+
*
34+
* record_type : The record type is currently define as the version of the key
35+
* {@link ApiMessageAndVersion} object.
36+
* key_message : The serialized message of the key {@link ApiMessageAndVersion} object.
37+
* value_version : The value version is currently define as the version of the value
38+
* {@link ApiMessageAndVersion} object.
39+
* value_message : The serialized message of the value {@link ApiMessageAndVersion} object.
40+
* </pre>
41+
*/
42+
public abstract class CoordinatorRecordSerde implements Serializer<CoordinatorRecord>, Deserializer<CoordinatorRecord> {
43+
@Override
44+
public byte[] serializeKey(CoordinatorRecord record) {
45+
// Record does not accept a null key.
46+
return MessageUtil.toVersionPrefixedBytes(
47+
record.key().version(),
48+
record.key().message()
49+
);
50+
}
51+
52+
@Override
53+
public byte[] serializeValue(CoordinatorRecord record) {
54+
// Tombstone is represented with a null value.
55+
if (record.value() == null) {
56+
return null;
57+
} else {
58+
return MessageUtil.toVersionPrefixedBytes(
59+
record.value().version(),
60+
record.value().message()
61+
);
62+
}
63+
}
64+
65+
@Override
66+
public CoordinatorRecord deserialize(
67+
ByteBuffer keyBuffer,
68+
ByteBuffer valueBuffer
69+
) throws RuntimeException {
70+
final short recordType = readVersion(keyBuffer, "key");
71+
final ApiMessage keyMessage = apiMessageKeyFor(recordType);
72+
readMessage(keyMessage, keyBuffer, recordType, "key");
73+
74+
if (valueBuffer == null) {
75+
return new CoordinatorRecord(new ApiMessageAndVersion(keyMessage, recordType), null);
76+
}
77+
78+
final ApiMessage valueMessage = apiMessageValueFor(recordType);
79+
final short valueVersion = readVersion(valueBuffer, "value");
80+
readMessage(valueMessage, valueBuffer, valueVersion, "value");
81+
82+
return new CoordinatorRecord(
83+
new ApiMessageAndVersion(keyMessage, recordType),
84+
new ApiMessageAndVersion(valueMessage, valueVersion)
85+
);
86+
}
87+
88+
private short readVersion(ByteBuffer buffer, String name) throws RuntimeException {
89+
try {
90+
return buffer.getShort();
91+
} catch (BufferUnderflowException ex) {
92+
throw new RuntimeException(String.format("Could not read version from %s's buffer.", name));
93+
}
94+
}
95+
96+
private void readMessage(ApiMessage message, ByteBuffer buffer, short version, String name) throws RuntimeException {
97+
try {
98+
message.read(new ByteBufferAccessor(buffer), version);
99+
} catch (RuntimeException ex) {
100+
throw new RuntimeException(String.format("Could not read record with version %d from %s's buffer due to: %s.",
101+
version, name, ex.getMessage()), ex);
102+
}
103+
}
104+
105+
/**
106+
* Concrete child class must provide implementation which returns appropriate
107+
* type of {@link ApiMessage} objects representing the key.
108+
*
109+
* @param recordVersion - short representing version
110+
* @return ApiMessage object
111+
*/
112+
protected abstract ApiMessage apiMessageKeyFor(short recordVersion);
113+
114+
/**
115+
* Concrete child class must provide implementation which returns appropriate
116+
* type of {@link ApiMessage} objects representing the value.
117+
*
118+
* @param recordVersion - short representing version
119+
* @return ApiMessage object
120+
*/
121+
protected abstract ApiMessage apiMessageValueFor(short recordVersion);
122+
}

group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorRecordSerde.java

+9-92
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,8 @@
1717
package org.apache.kafka.coordinator.group;
1818

1919
import org.apache.kafka.common.protocol.ApiMessage;
20-
import org.apache.kafka.common.protocol.ByteBufferAccessor;
21-
import org.apache.kafka.common.protocol.MessageUtil;
2220
import org.apache.kafka.coordinator.common.runtime.CoordinatorLoader;
23-
import org.apache.kafka.coordinator.common.runtime.CoordinatorRecord;
24-
import org.apache.kafka.coordinator.common.runtime.Deserializer;
25-
import org.apache.kafka.coordinator.common.runtime.Serializer;
21+
import org.apache.kafka.coordinator.common.runtime.CoordinatorRecordSerde;
2622
import org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentKey;
2723
import org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentValue;
2824
import org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataKey;
@@ -53,91 +49,11 @@
5349
import org.apache.kafka.coordinator.group.generated.ShareGroupTargetAssignmentMemberValue;
5450
import org.apache.kafka.coordinator.group.generated.ShareGroupTargetAssignmentMetadataKey;
5551
import org.apache.kafka.coordinator.group.generated.ShareGroupTargetAssignmentMetadataValue;
56-
import org.apache.kafka.server.common.ApiMessageAndVersion;
57-
58-
import java.nio.BufferUnderflowException;
59-
import java.nio.ByteBuffer;
60-
61-
/**
62-
* Serializer/Deserializer for {@link CoordinatorRecord}. The format is defined below:
63-
* <pre>
64-
* record_key = [record_type key_message]
65-
* record_value = [value_version value_message]
66-
*
67-
* record_type : The record type is currently define as the version of the key
68-
* {@link ApiMessageAndVersion} object.
69-
* key_message : The serialized message of the key {@link ApiMessageAndVersion} object.
70-
* value_version : The value version is currently define as the version of the value
71-
* {@link ApiMessageAndVersion} object.
72-
* value_message : The serialized message of the value {@link ApiMessageAndVersion} object.
73-
* </pre>
74-
*/
75-
@SuppressWarnings({ "ClassDataAbstractionCoupling", "CyclomaticComplexity" })
76-
public class GroupCoordinatorRecordSerde implements Serializer<CoordinatorRecord>, Deserializer<CoordinatorRecord> {
77-
@Override
78-
public byte[] serializeKey(CoordinatorRecord record) {
79-
// Record does not accept a null key.
80-
return MessageUtil.toVersionPrefixedBytes(
81-
record.key().version(),
82-
record.key().message()
83-
);
84-
}
8552

53+
public class GroupCoordinatorRecordSerde extends CoordinatorRecordSerde {
8654
@Override
87-
public byte[] serializeValue(CoordinatorRecord record) {
88-
// Tombstone is represented with a null value.
89-
if (record.value() == null) {
90-
return null;
91-
} else {
92-
return MessageUtil.toVersionPrefixedBytes(
93-
record.value().version(),
94-
record.value().message()
95-
);
96-
}
97-
}
98-
99-
@Override
100-
public CoordinatorRecord deserialize(
101-
ByteBuffer keyBuffer,
102-
ByteBuffer valueBuffer
103-
) throws RuntimeException {
104-
final short recordType = readVersion(keyBuffer, "key");
105-
final ApiMessage keyMessage = apiMessageKeyFor(recordType);
106-
readMessage(keyMessage, keyBuffer, recordType, "key");
107-
108-
if (valueBuffer == null) {
109-
return new CoordinatorRecord(new ApiMessageAndVersion(keyMessage, recordType), null);
110-
}
111-
112-
final ApiMessage valueMessage = apiMessageValueFor(recordType);
113-
final short valueVersion = readVersion(valueBuffer, "value");
114-
readMessage(valueMessage, valueBuffer, valueVersion, "value");
115-
116-
return new CoordinatorRecord(
117-
new ApiMessageAndVersion(keyMessage, recordType),
118-
new ApiMessageAndVersion(valueMessage, valueVersion)
119-
);
120-
}
121-
122-
private short readVersion(ByteBuffer buffer, String name) throws RuntimeException {
123-
try {
124-
return buffer.getShort();
125-
} catch (BufferUnderflowException ex) {
126-
throw new RuntimeException(String.format("Could not read version from %s's buffer.", name));
127-
}
128-
}
129-
130-
private void readMessage(ApiMessage message, ByteBuffer buffer, short version, String name) throws RuntimeException {
131-
try {
132-
message.read(new ByteBufferAccessor(buffer), version);
133-
} catch (RuntimeException ex) {
134-
throw new RuntimeException(String.format("Could not read record with version %d from %s's buffer due to: %s.",
135-
version, name, ex.getMessage()), ex);
136-
}
137-
}
138-
139-
private ApiMessage apiMessageKeyFor(short recordType) {
140-
switch (recordType) {
55+
protected ApiMessage apiMessageKeyFor(short recordVersion) {
56+
switch (recordVersion) {
14157
case 0:
14258
case 1:
14359
return new OffsetCommitKey();
@@ -170,12 +86,13 @@ private ApiMessage apiMessageKeyFor(short recordType) {
17086
case 15:
17187
return new ShareGroupStatePartitionMetadataKey();
17288
default:
173-
throw new CoordinatorLoader.UnknownRecordTypeException(recordType);
89+
throw new CoordinatorLoader.UnknownRecordTypeException(recordVersion);
17490
}
17591
}
17692

177-
private ApiMessage apiMessageValueFor(short recordType) {
178-
switch (recordType) {
93+
@Override
94+
protected ApiMessage apiMessageValueFor(short recordVersion) {
95+
switch (recordVersion) {
17996
case 0:
18097
case 1:
18198
return new OffsetCommitValue();
@@ -208,7 +125,7 @@ private ApiMessage apiMessageValueFor(short recordType) {
208125
case 15:
209126
return new ShareGroupStatePartitionMetadataValue();
210127
default:
211-
throw new CoordinatorLoader.UnknownRecordTypeException(recordType);
128+
throw new CoordinatorLoader.UnknownRecordTypeException(recordVersion);
212129
}
213130
}
214131
}

server-common/src/main/java/org/apache/kafka/server/group/share/PersisterStateBatch.java

+17-14
Original file line numberDiff line numberDiff line change
@@ -56,26 +56,29 @@ public short deliveryCount() {
5656

5757
public static PersisterStateBatch from(ReadShareGroupStateResponseData.StateBatch batch) {
5858
return new PersisterStateBatch(
59-
batch.firstOffset(),
60-
batch.lastOffset(),
61-
batch.deliveryState(),
62-
batch.deliveryCount());
59+
batch.firstOffset(),
60+
batch.lastOffset(),
61+
batch.deliveryState(),
62+
batch.deliveryCount());
6363
}
6464

6565
public static PersisterStateBatch from(WriteShareGroupStateRequestData.StateBatch batch) {
6666
return new PersisterStateBatch(
67-
batch.firstOffset(),
68-
batch.lastOffset(),
69-
batch.deliveryState(),
70-
batch.deliveryCount());
67+
batch.firstOffset(),
68+
batch.lastOffset(),
69+
batch.deliveryState(),
70+
batch.deliveryCount());
7171
}
7272

7373
@Override
7474
public boolean equals(Object o) {
7575
if (this == o) return true;
7676
if (o == null || getClass() != o.getClass()) return false;
7777
PersisterStateBatch that = (PersisterStateBatch) o;
78-
return firstOffset == that.firstOffset && lastOffset == that.lastOffset && deliveryState == that.deliveryState && deliveryCount == that.deliveryCount;
78+
return firstOffset == that.firstOffset &&
79+
lastOffset == that.lastOffset &&
80+
deliveryState == that.deliveryState &&
81+
deliveryCount == that.deliveryCount;
7982
}
8083

8184
@Override
@@ -86,10 +89,10 @@ public int hashCode() {
8689
@Override
8790
public String toString() {
8891
return "PersisterStateBatch(" +
89-
"firstOffset=" + firstOffset + "," +
90-
"lastOffset=" + lastOffset + "," +
91-
"deliveryState=" + deliveryState + "," +
92-
"deliveryCount=" + deliveryCount +
93-
")";
92+
"firstOffset=" + firstOffset + "," +
93+
"lastOffset=" + lastOffset + "," +
94+
"deliveryState=" + deliveryState + "," +
95+
"deliveryCount=" + deliveryCount +
96+
")";
9497
}
9598
}

0 commit comments

Comments
 (0)