Skip to content

Commit

Permalink
KAFKA-5059: Implement Transactional Coordinator
Browse files Browse the repository at this point in the history
Author: Damian Guy <[email protected]>
Author: Guozhang Wang <[email protected]>
Author: Apurva Mehta <[email protected]>

Reviewers: Guozhang Wang, Jason Gustafson, Apurva Mehta, Jun Rao

Closes apache#2849 from dguy/exactly-once-tc
  • Loading branch information
dguy authored and guozhangwang committed Apr 26, 2017
1 parent 8d8ab2e commit f69d941
Show file tree
Hide file tree
Showing 58 changed files with 4,452 additions and 442 deletions.
4 changes: 4 additions & 0 deletions checkstyle/import-control-core.xml
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,10 @@
<allow pkg="org.apache.kafka.clients.consumer" />
</subpackage>

<subpackage name="coordinator">
<allow class="kafka.server.MetadataCache" />
</subpackage>

<subpackage name="examples">
<allow pkg="org.apache.kafka.clients" />
<allow pkg="kafka.api" />
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -868,7 +868,13 @@ private PartitionRecords parseCompletedFetch(CompletedFetch completedFetch) {
return partitionRecords;
}

private ConsumerRecord<K, V> parseRecord(TopicPartition partition, RecordBatch batch, Record record) {

/**
* Parse the record entry, deserializing the key / value fields if necessary
*/
private ConsumerRecord<K, V> parseRecord(TopicPartition partition,
RecordBatch batch,
Record record) {
try {
long offset = record.offset();
long timestamp = record.timestamp();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.common.errors;

public class ConcurrentTransactionsException extends ApiException {
private static final long serialVersionUID = 1L;

public ConcurrentTransactionsException(final String message) {
super(message);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,3 +31,4 @@ public InvalidTxnTimeoutException(String message) {
super(message);
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import org.apache.kafka.common.errors.ApiException;
import org.apache.kafka.common.errors.BrokerNotAvailableException;
import org.apache.kafka.common.errors.ClusterAuthorizationException;
import org.apache.kafka.common.errors.ConcurrentTransactionsException;
import org.apache.kafka.common.errors.ControllerMovedException;
import org.apache.kafka.common.errors.CorruptRecordException;
import org.apache.kafka.common.errors.DuplicateSequenceNumberException;
Expand All @@ -38,11 +39,12 @@
import org.apache.kafka.common.errors.InvalidReplicationFactorException;
import org.apache.kafka.common.errors.InvalidRequestException;
import org.apache.kafka.common.errors.InvalidRequiredAcksException;
import org.apache.kafka.common.errors.InvalidTxnTimeoutException;
import org.apache.kafka.common.errors.OutOfOrderSequenceException;
import org.apache.kafka.common.errors.InvalidSessionTimeoutException;
import org.apache.kafka.common.errors.InvalidTimestampException;
import org.apache.kafka.common.errors.InvalidTopicException;
import org.apache.kafka.common.errors.InvalidTxnStateException;
import org.apache.kafka.common.errors.InvalidTxnTimeoutException;
import org.apache.kafka.common.errors.LeaderNotAvailableException;
import org.apache.kafka.common.errors.NetworkException;
import org.apache.kafka.common.errors.NotControllerException;
Expand All @@ -52,7 +54,6 @@
import org.apache.kafka.common.errors.NotLeaderForPartitionException;
import org.apache.kafka.common.errors.OffsetMetadataTooLarge;
import org.apache.kafka.common.errors.OffsetOutOfRangeException;
import org.apache.kafka.common.errors.OutOfOrderSequenceException;
import org.apache.kafka.common.errors.PolicyViolationException;
import org.apache.kafka.common.errors.ProducerFencedException;
import org.apache.kafka.common.errors.RebalanceInProgressException;
Expand Down Expand Up @@ -186,7 +187,10 @@ public enum Errors {
new InvalidPidMappingException("The PID mapping is invalid")),
INVALID_TRANSACTION_TIMEOUT(50,
new InvalidTxnTimeoutException("The transaction timeout is larger than the maximum value allowed by the broker " +
"(as configured by max.transaction.timeout.ms)."));
"(as configured by max.transaction.timeout.ms).")),
CONCURRENT_TRANSACTIONS(51,
new ConcurrentTransactionsException("The producer attempted to update a transaction " +
"while another concurrent operation on the same transaction was ongoing"));

private static final Logger log = LoggerFactory.getLogger(Errors.class);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ public class InitPidRequest extends AbstractRequest {
private static final String TRANSACTIONAL_ID_KEY_NAME = "transactional_id";
private static final String TRANSACTION_TIMEOUT_KEY_NAME = "transaction_timeout_ms";


private final String transactionalId;
private final int transactionTimeoutMs;

Expand Down Expand Up @@ -63,7 +62,6 @@ public String toString() {
return "(type=InitPidRequest, transactionalId=" + transactionalId + ", transactionTimeoutMs=" +
transactionTimeoutMs + ")";
}

}

public InitPidRequest(Struct struct, short version) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;

public class WriteTxnMarkersRequest extends AbstractRequest {
private static final String COORDINATOR_EPOCH_KEY_NAME = "coordinator_epoch";
Expand Down Expand Up @@ -67,6 +68,33 @@ public TransactionResult transactionResult() {
public List<TopicPartition> partitions() {
return partitions;
}


@Override
public String toString() {
return "TxnMarkerEntry{" +
"pid=" + producerId +
", epoch=" + producerEpoch +
", result=" + result +
", partitions=" + partitions +
'}';
}

@Override
public boolean equals(final Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
final TxnMarkerEntry that = (TxnMarkerEntry) o;
return producerId == that.producerId &&
producerEpoch == that.producerEpoch &&
result == that.result &&
Objects.equals(partitions, that.partitions);
}

@Override
public int hashCode() {
return Objects.hash(producerId, producerEpoch, result, partitions);
}
}

public static class Builder extends AbstractRequest.Builder<WriteTxnMarkersRequest> {
Expand Down Expand Up @@ -183,4 +211,17 @@ public static WriteTxnMarkersRequest parse(ByteBuffer buffer, short version) {
return new WriteTxnMarkersRequest(ApiKeys.WRITE_TXN_MARKERS.parseRequest(version, buffer), version);
}

@Override
public boolean equals(final Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
final WriteTxnMarkersRequest that = (WriteTxnMarkersRequest) o;
return coordinatorEpoch == that.coordinatorEpoch &&
Objects.equals(markers, that.markers);
}

@Override
public int hashCode() {
return Objects.hash(coordinatorEpoch, markers);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -568,7 +568,7 @@ private ListGroupsRequest createListGroupsRequest() {
}

private ListGroupsResponse createListGroupsResponse() {
List<ListGroupsResponse.Group> groups = asList(new ListGroupsResponse.Group("test-group", "consumer"));
List<ListGroupsResponse.Group> groups = Collections.singletonList(new ListGroupsResponse.Group("test-group", "consumer"));
return new ListGroupsResponse(Errors.NONE, groups);
}

Expand Down Expand Up @@ -844,6 +844,7 @@ private InitPidResponse createInitPidResponse() {
return new InitPidResponse(Errors.NONE, 3332, (short) 3);
}


private OffsetsForLeaderEpochRequest createLeaderEpochRequest() {
Map<TopicPartition, Integer> epochs = new HashMap<>();

Expand Down
6 changes: 4 additions & 2 deletions config/server.properties
Original file line number Diff line number Diff line change
Expand Up @@ -72,9 +72,11 @@ num.partitions=1
num.recovery.threads.per.data.dir=1

############################# Internal Topic Settings #############################
# The replication factor for the group metadata internal topic "__consumer_offsets".
# For anything other than development testing, a value greater than 1 is recommended to ensure availability such as 3.
# The replication factor for the group metadata internal topics "__consumer_offsets" and "__transaction_state"
# For anything other than development testing, a value greater than 1 is recommended for to ensure availability such as 3.
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1

############################# Log Flush Policy #############################

Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/kafka/admin/AdminClient.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import java.util.concurrent.atomic.AtomicInteger
import java.util.concurrent.{ConcurrentLinkedQueue, TimeUnit, Future}
import kafka.admin.AdminClient.DeleteRecordsResult
import kafka.common.KafkaException
import kafka.coordinator.GroupOverview
import kafka.coordinator.group.GroupOverview
import kafka.utils.Logging

import org.apache.kafka.clients._
Expand Down
4 changes: 2 additions & 2 deletions core/src/main/scala/kafka/cluster/Partition.scala
Original file line number Diff line number Diff line change
Expand Up @@ -241,9 +241,9 @@ class Partition(val topic: String,
getReplica(replicaId) match {
case Some(replica) =>
// No need to calculate low watermark if there is no delayed DeleteRecordsRequest
val oldLeaderLW = if (replicaManager.delayedDeleteRecordsPurgatory.delayed() > 0) lowWatermarkIfLeader else -1L
val oldLeaderLW = if (replicaManager.delayedDeleteRecordsPurgatory.delayed > 0) lowWatermarkIfLeader else -1L
replica.updateLogReadResult(logReadResult)
val newLeaderLW = if (replicaManager.delayedDeleteRecordsPurgatory.delayed() > 0) lowWatermarkIfLeader else -1L
val newLeaderLW = if (replicaManager.delayedDeleteRecordsPurgatory.delayed > 0) lowWatermarkIfLeader else -1L
// check if the LW of the partition has incremented
// since the replica's logStartOffset may have incremented
val leaderLWIncremented = newLeaderLW > oldLeaderLW
Expand Down
69 changes: 69 additions & 0 deletions core/src/main/scala/kafka/common/InterBrokerSendThread.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.common

import kafka.utils.ShutdownableThread
import org.apache.kafka.clients.{ClientResponse, NetworkClient, RequestCompletionHandler}
import org.apache.kafka.common.Node
import org.apache.kafka.common.requests.AbstractRequest
import org.apache.kafka.common.utils.Time


/**
* Class for inter-broker send thread that utilize a non-blocking network client.
*/
class InterBrokerSendThread(name: String,
networkClient: NetworkClient,
requestGenerator: () => Iterable[RequestAndCompletionHandler],
time: Time)
extends ShutdownableThread(name, isInterruptible = false) {

override def doWork() {
val now = time.milliseconds()
var pollTimeout = Long.MaxValue

val requestsToSend: Iterable[RequestAndCompletionHandler] = requestGenerator()

for (request: RequestAndCompletionHandler <- requestsToSend) {
val destination = Integer.toString(request.destination.id())
val completionHandler = request.handler
// TODO: Need to check inter broker protocol and error if new request is not supported
val clientRequest = networkClient.newClientRequest(destination,
request.request,
now,
true,
completionHandler)

if (networkClient.ready(request.destination, now)) {
networkClient.send(clientRequest, now)
} else {
val disConnectedResponse: ClientResponse = new ClientResponse(clientRequest.makeHeader(request.request.desiredOrLatestVersion()),
completionHandler, destination,
now /* createdTimeMs */, now /* receivedTimeMs */, true /* disconnected */, null /* versionMismatch */, null /* responseBody */)

// poll timeout would be the minimum of connection delay if there are any dest yet to be reached;
// otherwise it is infinity
pollTimeout = Math.min(pollTimeout, networkClient.connectionDelay(request.destination, now))

completionHandler.onComplete(disConnectedResponse)
}
}
networkClient.poll(pollTimeout, now)
}
}

case class RequestAndCompletionHandler(destination: Node, request: AbstractRequest.Builder[_ <: AbstractRequest], handler: RequestCompletionHandler)
3 changes: 2 additions & 1 deletion core/src/main/scala/kafka/common/Topic.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@ import scala.collection.immutable
object Topic {

val GroupMetadataTopicName = "__consumer_offsets"
val InternalTopics = immutable.Set(GroupMetadataTopicName)
val TransactionStateTopicName = "__transaction_state"
val InternalTopics = immutable.Set(GroupMetadataTopicName, TransactionStateTopicName)

val legalChars = "[a-zA-Z0-9\\._\\-]"
private val maxNameLength = 249
Expand Down
Loading

0 comments on commit f69d941

Please sign in to comment.