Skip to content

Commit

Permalink
KAFKA-15856: Add integration tests for JoinGroup API and SyncGroup API (
Browse files Browse the repository at this point in the history
apache#14800)

This patch adds integration tests for JoinGroup API and SyncGroup API.

Reviewers: David Jacot <[email protected]>
  • Loading branch information
dongnuo123 authored Nov 23, 2023
1 parent 891dd2a commit d5a8b89
Show file tree
Hide file tree
Showing 8 changed files with 724 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,12 @@ public JoinGroupResponse(JoinGroupResponseData data, short version) {
if (version < 7 && data.protocolName() == null) {
data.setProtocolName("");
}

// If nullable string for the protocol name is supported,
// we set empty string to be null to ensure compliance.
if (version >= 7 && data.protocolName() != null && data.protocolName().isEmpty()) {
data.setProtocolName(null);
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,4 +39,19 @@ public void testProtocolNameBackwardCompatibility(short version) {
assertNull(joinGroupResponse.data().protocolName());
}
}

@ParameterizedTest
@ApiKeyVersionsSource(apiKey = ApiKeys.JOIN_GROUP)
public void testProtocolNameComplianceWithVersion7AndAbove(short version) {
JoinGroupResponseData data = new JoinGroupResponseData()
.setProtocolName("");

JoinGroupResponse joinGroupResponse = new JoinGroupResponse(data, version);

if (version < 7) {
assertEquals("", joinGroupResponse.data().protocolName());
} else {
assertNull(joinGroupResponse.data().protocolName());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -122,10 +122,7 @@ class DeleteGroupsRequestTest(cluster: ClusterInstance) extends GroupCoordinator
.setGroupId("grp")
.setGroupState(GenericGroupState.DEAD.toString)
),
describeGroups(
groupIds = List("grp"),
version = ApiKeys.DESCRIBE_GROUPS.latestVersion(isUnstableApiEnabled)
)
describeGroups(List("grp"))
)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -253,20 +253,34 @@ class GroupCoordinatorBaseRequestTest(cluster: ClusterInstance) {
groupId: String,
memberId: String,
generationId: Int,
protocolType: String = "consumer",
protocolName: String = "consumer-range",
assignments: List[SyncGroupRequestData.SyncGroupRequestAssignment] = List.empty,
expectedError: Errors = Errors.NONE
expectedProtocolType: String = "consumer",
expectedProtocolName: String = "consumer-range",
expectedAssignment: Array[Byte] = Array.empty,
expectedError: Errors = Errors.NONE,
version: Short = ApiKeys.SYNC_GROUP.latestVersion(isUnstableApiEnabled)
): SyncGroupResponseData = {
val syncGroupRequestData = new SyncGroupRequestData()
.setGroupId(groupId)
.setMemberId(memberId)
.setGenerationId(generationId)
.setProtocolType("consumer")
.setProtocolName("consumer-range")
.setProtocolType(protocolType)
.setProtocolName(protocolName)
.setAssignments(assignments.asJava)

val syncGroupRequest = new SyncGroupRequest.Builder(syncGroupRequestData).build()
val syncGroupRequest = new SyncGroupRequest.Builder(syncGroupRequestData).build(version)
val syncGroupResponse = connectAndReceive[SyncGroupResponse](syncGroupRequest)
assertEquals(expectedError.code, syncGroupResponse.data.errorCode)

assertEquals(
new SyncGroupResponseData()
.setErrorCode(expectedError.code)
.setProtocolType(if (version >= 5) expectedProtocolType else null)
.setProtocolName(if (version >= 5) expectedProtocolName else null)
.setAssignment(expectedAssignment),
syncGroupResponse.data
)

syncGroupResponse.data
}
Expand Down Expand Up @@ -335,7 +349,8 @@ class GroupCoordinatorBaseRequestTest(cluster: ClusterInstance) {
generationId = rejoinGroupResponseData.generationId,
assignments = List(new SyncGroupRequestAssignment()
.setMemberId(rejoinGroupResponseData.memberId)
.setAssignment(assignment))
.setAssignment(assignment)),
expectedAssignment = assignment
)
}

Expand Down Expand Up @@ -384,7 +399,7 @@ class GroupCoordinatorBaseRequestTest(cluster: ClusterInstance) {
} else {
// Note that we don't heartbeat and assume that the test will
// complete within the session timeout.
joinDynamicConsumerGroupWithOldProtocol(groupId)
joinDynamicConsumerGroupWithOldProtocol(groupId = groupId)
}
}

Expand All @@ -404,11 +419,10 @@ class GroupCoordinatorBaseRequestTest(cluster: ClusterInstance) {

protected def describeGroups(
groupIds: List[String],
version: Short
version: Short = ApiKeys.DESCRIBE_GROUPS.latestVersion(isUnstableApiEnabled)
): List[DescribeGroupsResponseData.DescribedGroup] = {
val describeGroupsRequest = new DescribeGroupsRequest.Builder(
new DescribeGroupsRequestData()
.setGroups(groupIds.asJava)
new DescribeGroupsRequestData().setGroups(groupIds.asJava)
).build(version)

val describeGroupsResponse = connectAndReceive[DescribeGroupsResponse](describeGroupsRequest)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,8 @@ class HeartbeatRequestTest(cluster: ClusterInstance) extends GroupCoordinatorBas
assignments = List(new SyncGroupRequestData.SyncGroupRequestAssignment()
.setMemberId(leaderMemberId)
.setAssignment(Array[Byte](1))
)
),
expectedAssignment = Array[Byte](1)
)

// Heartbeat STABLE group.
Expand All @@ -154,7 +155,7 @@ class HeartbeatRequestTest(cluster: ClusterInstance) extends GroupCoordinatorBas
}

TestUtils.waitUntilTrue(() => {
val described = describeGroups(groupIds = List("grp"), version = ApiKeys.DESCRIBE_GROUPS.latestVersion(isUnstableApiEnabled))
val described = describeGroups(groupIds = List("grp"))
GenericGroupState.PREPARING_REBALANCE.toString == described.head.groupState
}, msg = s"The group is not in PREPARING_REBALANCE state.")

Expand Down
Loading

0 comments on commit d5a8b89

Please sign in to comment.