Skip to content

Commit

Permalink
MINOR: Avoid some unnecessary collection copies in KafkaApis
Browse files Browse the repository at this point in the history
Author: Jason Gustafson <[email protected]>

Reviewers: Manikumar Reddy <[email protected]>, Ismael Juma <[email protected]>

Closes apache#4035 from hachikuji/KAFKA-5547-followup and squashes the following commits:

f6b04ce [Jason Gustafson] Add a couple missed common fields
d3473b1 [Jason Gustafson] Fix compilation errors and a few warnings
58a0ae6 [Jason Gustafson] MINOR: Avoid some unnecessary collection copies in KafkaApis
  • Loading branch information
hachikuji committed Oct 10, 2017
1 parent 90b5ce3 commit 1027ff3
Show file tree
Hide file tree
Showing 8 changed files with 96 additions and 128 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ public class CommonFields {

// Transactional APIs
public static final Field.Str TRANSACTIONAL_ID = new Field.Str("transactional_id", "The transactional id corresponding to the transaction.");
public static final Field.NullableStr NULLABLE_TRANSACTIONAL_ID = new Field.NullableStr("transactional_id",
"The transactional id or null if the producer is not transactional");
public static final Field.Int64 PRODUCER_ID = new Field.Int64("producer_id", "Current producer id in use by the transactional id.");
public static final Field.Int16 PRODUCER_EPOCH = new Field.Int16("producer_epoch", "Current epoch associated with the producer id.");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,16 +26,15 @@

import java.nio.ByteBuffer;

import static org.apache.kafka.common.protocol.CommonFields.GROUP_ID;
import static org.apache.kafka.common.protocol.types.Type.INT8;
import static org.apache.kafka.common.protocol.types.Type.STRING;

public class FindCoordinatorRequest extends AbstractRequest {
private static final String GROUP_ID_KEY_NAME = "group_id";
private static final String COORDINATOR_KEY_KEY_NAME = "coordinator_key";
private static final String COORDINATOR_TYPE_KEY_NAME = "coordinator_type";

private static final Schema FIND_COORDINATOR_REQUEST_V0 = new Schema(
new Field("group_id", STRING, "The unique group id."));
private static final Schema FIND_COORDINATOR_REQUEST_V0 = new Schema(GROUP_ID);

private static final Schema FIND_COORDINATOR_REQUEST_V1 = new Schema(
new Field("coordinator_key", STRING, "Id to use for finding the coordinator (for groups, this is the groupId, " +
Expand Down Expand Up @@ -102,8 +101,8 @@ public FindCoordinatorRequest(Struct struct, short version) {
this.coordinatorType = CoordinatorType.forId(struct.getByte(COORDINATOR_TYPE_KEY_NAME));
else
this.coordinatorType = CoordinatorType.GROUP;
if (struct.hasField(GROUP_ID_KEY_NAME))
this.coordinatorKey = struct.getString(GROUP_ID_KEY_NAME);
if (struct.hasField(GROUP_ID))
this.coordinatorKey = struct.get(GROUP_ID);
else
this.coordinatorKey = struct.getString(COORDINATOR_KEY_KEY_NAME);
}
Expand Down Expand Up @@ -138,8 +137,8 @@ public static FindCoordinatorRequest parse(ByteBuffer buffer, short version) {
@Override
protected Struct toStruct() {
Struct struct = new Struct(ApiKeys.FIND_COORDINATOR.requestSchema(version()));
if (struct.hasField(GROUP_ID_KEY_NAME))
struct.set(GROUP_ID_KEY_NAME, coordinatorKey);
if (struct.hasField(GROUP_ID))
struct.set(GROUP_ID, coordinatorKey);
else
struct.set(COORDINATOR_KEY_KEY_NAME, coordinatorKey);
if (struct.hasField(COORDINATOR_TYPE_KEY_NAME))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,17 +24,16 @@

import java.nio.ByteBuffer;

import static org.apache.kafka.common.protocol.CommonFields.NULLABLE_TRANSACTIONAL_ID;
import static org.apache.kafka.common.protocol.types.Type.INT32;
import static org.apache.kafka.common.protocol.types.Type.NULLABLE_STRING;

public class InitProducerIdRequest extends AbstractRequest {
public static final int NO_TRANSACTION_TIMEOUT_MS = Integer.MAX_VALUE;

private static final String TRANSACTIONAL_ID_KEY_NAME = "transactional_id";
private static final String TRANSACTION_TIMEOUT_KEY_NAME = "transaction_timeout_ms";

private static final Schema INIT_PRODUCER_ID_REQUEST_V0 = new Schema(
new Field(TRANSACTIONAL_ID_KEY_NAME, NULLABLE_STRING, "The transactional id whose producer id we want to retrieve or generate."),
NULLABLE_TRANSACTIONAL_ID,
new Field(TRANSACTION_TIMEOUT_KEY_NAME, INT32, "The time in ms to wait for before aborting idle transactions sent by this producer."));

public static Schema[] schemaVersions() {
Expand Down Expand Up @@ -79,7 +78,7 @@ public String toString() {

public InitProducerIdRequest(Struct struct, short version) {
super(version);
this.transactionalId = struct.getString(TRANSACTIONAL_ID_KEY_NAME);
this.transactionalId = struct.get(NULLABLE_TRANSACTIONAL_ID);
this.transactionTimeoutMs = struct.getInt(TRANSACTION_TIMEOUT_KEY_NAME);
}

Expand Down Expand Up @@ -109,7 +108,7 @@ public int transactionTimeoutMs() {
@Override
protected Struct toStruct() {
Struct struct = new Struct(ApiKeys.INIT_PRODUCER_ID.requestSchema(version()));
struct.set(TRANSACTIONAL_ID_KEY_NAME, transactionalId);
struct.set(NULLABLE_TRANSACTIONAL_ID, transactionalId);
struct.set(TRANSACTION_TIMEOUT_KEY_NAME, transactionTimeoutMs);
return struct;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.CommonFields;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.protocol.types.ArrayOf;
import org.apache.kafka.common.protocol.types.Field;
Expand All @@ -39,15 +40,14 @@
import java.util.List;
import java.util.Map;

import static org.apache.kafka.common.protocol.CommonFields.NULLABLE_TRANSACTIONAL_ID;
import static org.apache.kafka.common.protocol.CommonFields.PARTITION_ID;
import static org.apache.kafka.common.protocol.CommonFields.TOPIC_NAME;
import static org.apache.kafka.common.protocol.types.Type.INT16;
import static org.apache.kafka.common.protocol.types.Type.INT32;
import static org.apache.kafka.common.protocol.types.Type.NULLABLE_STRING;
import static org.apache.kafka.common.protocol.types.Type.RECORDS;

public class ProduceRequest extends AbstractRequest {
private static final String TRANSACTIONAL_ID_KEY_NAME = "transactional_id";
private static final String ACKS_KEY_NAME = "acks";
private static final String TIMEOUT_KEY_NAME = "timeout";
private static final String TOPIC_DATA_KEY_NAME = "topic_data";
Expand Down Expand Up @@ -87,8 +87,7 @@ public class ProduceRequest extends AbstractRequest {
// Produce request V3 adds the transactional id which is used for authorization when attempting to write
// transactional data. This version also adds support for message format V2.
private static final Schema PRODUCE_REQUEST_V3 = new Schema(
new Field(TRANSACTIONAL_ID_KEY_NAME, NULLABLE_STRING, "The transactional ID of the producer. This is used to " +
"authorize transaction produce requests. This can be null for non-transactional producers."),
CommonFields.NULLABLE_TRANSACTIONAL_ID,
new Field(ACKS_KEY_NAME, INT16, "The number of acknowledgments the producer requires the leader to have " +
"received before considering a request complete. Allowed values: 0 for no acknowledgments, 1 " +
"for only the leader and -1 for the full ISR."),
Expand Down Expand Up @@ -229,7 +228,7 @@ public ProduceRequest(Struct struct, short version) {
partitionSizes = createPartitionSizes(partitionRecords);
acks = struct.getShort(ACKS_KEY_NAME);
timeout = struct.getInt(TIMEOUT_KEY_NAME);
transactionalId = struct.hasField(TRANSACTIONAL_ID_KEY_NAME) ? struct.getString(TRANSACTIONAL_ID_KEY_NAME) : null;
transactionalId = struct.getOrElse(NULLABLE_TRANSACTIONAL_ID, null);
}

private void validateRecords(short version, MemoryRecords records) {
Expand Down Expand Up @@ -268,9 +267,7 @@ public Struct toStruct() {
Map<String, Map<Integer, MemoryRecords>> recordsByTopic = CollectionUtils.groupDataByTopic(partitionRecords);
struct.set(ACKS_KEY_NAME, acks);
struct.set(TIMEOUT_KEY_NAME, timeout);

if (struct.hasField(TRANSACTIONAL_ID_KEY_NAME))
struct.set(TRANSACTIONAL_ID_KEY_NAME, transactionalId);
struct.setIfExists(NULLABLE_TRANSACTIONAL_ID, transactionalId);

List<Struct> topicDatas = new ArrayList<>(recordsByTopic.size());
for (Map.Entry<String, Map<Integer, MemoryRecords>> topicEntry : recordsByTopic.entrySet()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import org.apache.kafka.common.utils.Utils
import org.apache.kafka.common.security.JaasUtils
import org.apache.kafka.common.TopicPartitionReplica
import org.apache.kafka.common.errors.{LogDirNotFoundException, ReplicaNotAvailableException}
import org.apache.kafka.clients.admin.{AdminClientConfig, AlterReplicaLogDirsOptions, DescribeReplicaLogDirsResult, AdminClient => JAdminClient}
import org.apache.kafka.clients.admin.{AdminClientConfig, AlterReplicaLogDirsOptions, AdminClient => JAdminClient}
import LogConfig._
import joptsimple.OptionParser
import org.apache.kafka.clients.admin.DescribeReplicaLogDirsResult.ReplicaLogDirInfo
Expand Down
Loading

0 comments on commit 1027ff3

Please sign in to comment.