Skip to content

Commit

Permalink
KAFKA-6718: Update SubscriptionInfoData with clientTags (apache#10802)
Browse files Browse the repository at this point in the history
adds ClientTags to SubscriptionInfoData

Reviewer: Luke Chen <[email protected]>, Bruno Cadonna <[email protected]>
  • Loading branch information
lkokhreidze authored Mar 11, 2022
1 parent f025a93 commit 87eb0cf
Show file tree
Hide file tree
Showing 11 changed files with 132 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,8 @@ public ByteBuffer subscriptionUserData(final Set<String> topics) {
userEndPoint,
taskOffsetSums,
uniqueField,
assignmentErrorCode.get()
assignmentErrorCode.get(),
Collections.emptyMap()
).encode();
}

Expand Down Expand Up @@ -1307,6 +1308,7 @@ public void onAssignment(final Assignment assignment, final ConsumerGroupMetadat
case 8:
case 9:
case 10:
case 11:
validateActiveTaskEncoding(partitions, info, logPrefix);

activeTasks = getActiveTasks(partitions, info);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,7 @@ public ByteBuffer encode() {
case 8:
case 9:
case 10:
case 11:
out.writeInt(usedVersion);
out.writeInt(commonlySupportedVersion);
encodeActiveAndStandbyTaskAssignment(out);
Expand Down Expand Up @@ -361,6 +362,7 @@ public static AssignmentInfo decode(final ByteBuffer data) {
case 8:
case 9:
case 10:
case 11:
commonlySupportedVersion = in.readInt();
assignmentInfo = new AssignmentInfo(usedVersion, commonlySupportedVersion);
decodeActiveTasks(assignmentInfo, in);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ public final class StreamsAssignmentProtocolVersions {
public static final int UNKNOWN = -1;
public static final int EARLIEST_PROBEABLE_VERSION = 3;
public static final int MIN_NAMED_TOPOLOGY_VERSION = 10;
public static final int LATEST_SUPPORTED_VERSION = 10;
public static final int LATEST_SUPPORTED_VERSION = 11;
/*
* Any time you modify the subscription or assignment info, you need to bump the latest supported version, unless
* the version has already been bumped within the current release cycle.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.kafka.common.protocol.MessageUtil;
import org.apache.kafka.streams.errors.TaskAssignmentException;
import org.apache.kafka.streams.internals.generated.SubscriptionInfoData;
import org.apache.kafka.streams.internals.generated.SubscriptionInfoData.ClientTag;
import org.apache.kafka.streams.internals.generated.SubscriptionInfoData.PartitionToOffsetSum;
import org.apache.kafka.streams.internals.generated.SubscriptionInfoData.TaskOffsetSum;
import org.apache.kafka.streams.processor.TaskId;
Expand Down Expand Up @@ -87,7 +88,8 @@ public SubscriptionInfo(final int version,
final String userEndPoint,
final Map<TaskId, Long> taskOffsetSums,
final byte uniqueField,
final int errorCode) {
final int errorCode,
final Map<String, String> clientTags) {
validateVersions(version, latestSupportedVersion);
final SubscriptionInfoData data = new SubscriptionInfoData();
data.setVersion(version);
Expand All @@ -108,6 +110,9 @@ public SubscriptionInfo(final int version,
if (version >= 9) {
data.setErrorCode(errorCode);
}
if (version >= 11) {
data.setClientTags(buildClientTagsFromMap(clientTags));
}

this.data = data;

Expand All @@ -125,10 +130,31 @@ private SubscriptionInfo(final SubscriptionInfoData subscriptionInfoData) {
this.data = subscriptionInfoData;
}

public Map<String, String> clientTags() {
return data.clientTags().stream()
.collect(
Collectors.toMap(
clientTag -> new String(clientTag.key(), StandardCharsets.UTF_8),
clientTag -> new String(clientTag.value(), StandardCharsets.UTF_8)
)
);
}

public int errorCode() {
return data.errorCode();
}

private List<ClientTag> buildClientTagsFromMap(final Map<String, String> clientTags) {
return clientTags.entrySet().stream()
.map(clientTagEntry -> {
final ClientTag clientTag = new ClientTag();
clientTag.setKey(clientTagEntry.getKey().getBytes(StandardCharsets.UTF_8));
clientTag.setValue(clientTagEntry.getValue().getBytes(StandardCharsets.UTF_8));
return clientTag;
})
.collect(Collectors.toList());
}

// For version > MIN_NAMED_TOPOLOGY_VERSION
private void setTaskOffsetSumDataWithNamedTopologiesFromTaskOffsetSumMap(final Map<TaskId, Long> taskOffsetSums) {
data.setTaskOffsetSums(taskOffsetSums.entrySet().stream().map(t -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@

{
"name": "SubscriptionInfoData",
"validVersions": "1-10",
"validVersions": "1-11",
"flexibleVersions": "none",
"fields": [
{
Expand Down Expand Up @@ -65,6 +65,11 @@
"name": "errorCode",
"versions": "9+",
"type": "int32"
},
{
"name": "clientTags",
"versions": "11+",
"type": "[]ClientTag"
}
],
"commonStructs": [
Expand Down Expand Up @@ -136,6 +141,22 @@
"type": "int64"
}
]
},
{
"name": "ClientTag",
"versions": "11+",
"fields": [
{
"name": "key",
"versions": "11+",
"type": "bytes"
},
{
"name": "value",
"versions": "11+",
"type": "bytes"
}
]
}
],
"type": "data"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -328,7 +328,7 @@ private static Map<TopicPartition, Long> getTopicPartitionOffsetsMap(final List<
private static SubscriptionInfo getInfo(final UUID processId,
final Set<TaskId> prevTasks) {
return new SubscriptionInfo(
LATEST_SUPPORTED_VERSION, LATEST_SUPPORTED_VERSION, processId, null, getTaskOffsetSums(prevTasks), (byte) 0, 0);
LATEST_SUPPORTED_VERSION, LATEST_SUPPORTED_VERSION, processId, null, getTaskOffsetSums(prevTasks), (byte) 0, 0, Collections.emptyMap());
}

// Stub offset sums for when we only care about the prev/standby task sets, not the actual offsets
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@
import static org.apache.kafka.common.utils.Utils.mkSortedSet;
import static org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.assignTasksToThreads;
import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.EMPTY_CHANGELOG_END_OFFSETS;
import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.EMPTY_CLIENT_TAGS;
import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.EMPTY_TASKS;
import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.TASK_0_0;
import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.TASK_0_1;
Expand Down Expand Up @@ -2332,7 +2333,7 @@ private static SubscriptionInfo getInfoForOlderVersion(final int version,
final Set<TaskId> prevTasks,
final Set<TaskId> standbyTasks) {
return new SubscriptionInfo(
version, LATEST_SUPPORTED_VERSION, processId, null, getTaskOffsetSums(prevTasks, standbyTasks), (byte) 0, 0);
version, LATEST_SUPPORTED_VERSION, processId, null, getTaskOffsetSums(prevTasks, standbyTasks), (byte) 0, 0, EMPTY_CLIENT_TAGS);
}

// Stub offset sums for when we only care about the prev/standby task sets, not the actual offsets
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,23 +147,23 @@ public static SubscriptionInfo getInfo(final UUID processId,
final Set<TaskId> prevTasks,
final Set<TaskId> standbyTasks) {
return new SubscriptionInfo(
LATEST_SUPPORTED_VERSION, LATEST_SUPPORTED_VERSION, processId, null, getTaskOffsetSums(prevTasks, standbyTasks), (byte) 0, 0);
LATEST_SUPPORTED_VERSION, LATEST_SUPPORTED_VERSION, processId, null, getTaskOffsetSums(prevTasks, standbyTasks), (byte) 0, 0, EMPTY_CLIENT_TAGS);
}

public static SubscriptionInfo getInfo(final UUID processId,
final Set<TaskId> prevTasks,
final Set<TaskId> standbyTasks,
final String userEndPoint) {
return new SubscriptionInfo(
LATEST_SUPPORTED_VERSION, LATEST_SUPPORTED_VERSION, processId, userEndPoint, getTaskOffsetSums(prevTasks, standbyTasks), (byte) 0, 0);
LATEST_SUPPORTED_VERSION, LATEST_SUPPORTED_VERSION, processId, userEndPoint, getTaskOffsetSums(prevTasks, standbyTasks), (byte) 0, 0, EMPTY_CLIENT_TAGS);
}

public static SubscriptionInfo getInfo(final UUID processId,
final Set<TaskId> prevTasks,
final Set<TaskId> standbyTasks,
final byte uniqueField) {
return new SubscriptionInfo(
LATEST_SUPPORTED_VERSION, LATEST_SUPPORTED_VERSION, processId, null, getTaskOffsetSums(prevTasks, standbyTasks), uniqueField, 0);
LATEST_SUPPORTED_VERSION, LATEST_SUPPORTED_VERSION, processId, null, getTaskOffsetSums(prevTasks, standbyTasks), uniqueField, 0, EMPTY_CLIENT_TAGS);
}

// Stub offset sums for when we only care about the prev/standby task sets, not the actual offsets
Expand Down
Loading

0 comments on commit 87eb0cf

Please sign in to comment.