Skip to content

Commit

Permalink
KAFKA-1760: New consumer.
Browse files Browse the repository at this point in the history
  • Loading branch information
jkreps committed Jan 29, 2015
1 parent 11ec9bf commit 0699ff2
Show file tree
Hide file tree
Showing 89 changed files with 3,984 additions and 1,947 deletions.
1 change: 1 addition & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -369,6 +369,7 @@ project(':clients') {
}

javadoc {
include "**/org/apache/kafka/clients/consumer/*"
include "**/org/apache/kafka/clients/producer/*"
include "**/org/apache/kafka/common/*"
include "**/org/apache/kafka/common/errors/*"
Expand Down
19 changes: 12 additions & 7 deletions clients/src/main/java/org/apache/kafka/clients/ClientRequest.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,24 +22,25 @@ public final class ClientRequest {
private final long createdMs;
private final boolean expectResponse;
private final RequestSend request;
private final Object attachment;
private final RequestCompletionHandler callback;

/**
* @param createdMs The unix timestamp in milliseconds for the time at which this request was created.
* @param expectResponse Should we expect a response message or is this request complete once it is sent?
* @param request The request
* @param attachment Associated data with the request
* @param callback A callback to execute when the response has been received (or null if no callback is necessary)
*/
public ClientRequest(long createdMs, boolean expectResponse, RequestSend request, Object attachment) {
public ClientRequest(long createdMs, boolean expectResponse, RequestSend request, RequestCompletionHandler callback) {
this.createdMs = createdMs;
this.attachment = attachment;
this.callback = callback;
this.request = request;
this.expectResponse = expectResponse;
}

@Override
public String toString() {
return "ClientRequest(expectResponse=" + expectResponse + ", payload=" + attachment + ", request=" + request + ")";
return "ClientRequest(expectResponse=" + expectResponse + ", callback=" + callback + ", request=" + request
+ ")";
}

public boolean expectResponse() {
Expand All @@ -50,8 +51,12 @@ public RequestSend request() {
return request;
}

public Object attachment() {
return attachment;
public boolean hasCallback() {
return callback != null;
}

public RequestCompletionHandler callback() {
return callback;
}

public long createdTime() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,16 +119,42 @@ public void connected(int node) {
public void disconnected(int node) {
nodeState(node).state = ConnectionState.DISCONNECTED;
}

/**
* Get the state of our connection to the given state
* Get the state of our connection to the given node
* @param node The id of the node
* @return The state of our connection
*/
public ConnectionState connectionState(int node) {
return nodeState(node).state;
}

/**
* Get the state of a given node
* @param node The node to fetch the state for
*/
private NodeConnectionState nodeState(int node) {
NodeConnectionState state = this.nodeState.get(node);
if (state == null)
throw new IllegalStateException("No entry found for node " + node);
return state;
}

/**
* The state of our connection to a node
*/
private static class NodeConnectionState {

ConnectionState state;
long lastConnectAttemptMs;

public NodeConnectionState(ConnectionState state, long lastConnectAttempt) {
this.state = state;
this.lastConnectAttemptMs = lastConnectAttempt;
}

public String toString() {
return "NodeState(" + state + ", " + lastConnectAttemptMs + ")";
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
* file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
* to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
* License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package org.apache.kafka.clients;

/**
* Some configurations shared by both producer and consumer
*/
public class CommonClientConfigs {

/*
* NOTE: DO NOT CHANGE EITHER CONFIG NAMES AS THESE ARE PART OF THE PUBLIC API AND CHANGE WILL BREAK USER CODE.
*/

public static final String BOOTSTRAP_SERVERS_CONFIG = "bootstrap.servers";
public static final String BOOSTRAP_SERVERS_DOC = "A list of host/port pairs to use for establishing the initial connection to the Kafka cluster. The client will make use of all servers irrespective of which servers are specified here for bootstrapping—this list only impacts the initial hosts used to discover the full set of servers. This list should be in the form "
+ "<code>host1:port1,host2:port2,...</code>. Since these servers are just used for the initial connection to "
+ "discover the full cluster membership (which may change dynamically), this list need not contain the full set of "
+ "servers (you may want more than one, though, in case a server is down).";

public static final String METADATA_MAX_AGE_CONFIG = "metadata.max.age.ms";
public static final String METADATA_MAX_AGE_DOC = "The period of time in milliseconds after which we force a refresh of metadata even if we haven't seen any partition leadership changes to proactively discover any new brokers or partitions.";

public static final String SEND_BUFFER_CONFIG = "send.buffer.bytes";
public static final String SEND_BUFFER_DOC = "The size of the TCP send buffer (SO_SNDBUF) to use when sending data.";

public static final String RECEIVE_BUFFER_CONFIG = "receive.buffer.bytes";
public static final String RECEIVE_BUFFER_DOC = "The size of the TCP receive buffer (SO_RCVBUF) to use when reading data.";

public static final String CLIENT_ID_CONFIG = "client.id";
public static final String CLIENT_ID_DOC = "An id string to pass to the server when making requests. The purpose of this is to be able to track the source of requests beyond just ip/port by allowing a logical application name to be included in server-side request logging.";

public static final String RECONNECT_BACKOFF_MS_CONFIG = "reconnect.backoff.ms";
public static final String RECONNECT_BACKOFF_MS_DOC = "The amount of time to wait before attempting to reconnect to a given host. This avoids repeatedly connecting to a host in a tight loop. This backoff applies to all requests sent by the consumer to the broker.";

public static final String RETRY_BACKOFF_MS_CONFIG = "retry.backoff.ms";
public static final String RETRY_BACKOFF_MS_DOC = "The amount of time to wait before attempting to retry a failed fetch request to a given topic partition. This avoids repeated fetching-and-failing in a tight loop.";

public static final String METRICS_SAMPLE_WINDOW_MS_CONFIG = "metrics.sample.window.ms";
public static final String METRICS_SAMPLE_WINDOW_MS_DOC = "The number of samples maintained to compute metrics.";

public static final String METRICS_NUM_SAMPLES_CONFIG = "metrics.num.samples";
public static final String METRICS_NUM_SAMPLES_DOC = "The number of samples maintained to compute metrics.";

public static final String METRIC_REPORTER_CLASSES_CONFIG = "metric.reporters";
public static final String METRIC_REPORTER_CLASSES_DOC = "A list of classes to use as metrics reporters. Implementing the <code>MetricReporter</code> interface allows plugging in classes that will be notified of new metric creation. The JmxReporter is always included to register JMX statistics.";

}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,6 @@
/**
* The states of a node connection
*/
enum ConnectionState {
public enum ConnectionState {
DISCONNECTED, CONNECTING, CONNECTED
}
44 changes: 40 additions & 4 deletions clients/src/main/java/org/apache/kafka/clients/KafkaClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ public interface KafkaClient {
/**
* Check if we are currently ready to send another request to the given node but don't attempt to connect if we
* aren't.
*
* @param node The node to check
* @param now The current timestamp
*/
Expand All @@ -34,6 +35,7 @@ public interface KafkaClient {
/**
* Initiate a connection to the given node (if necessary), and return true if already connected. The readiness of a
* node will change only when poll is invoked.
*
* @param node The node to connect to.
* @param now The current time
* @return true iff we are ready to immediately initiate the sending of another request to the given node.
Expand All @@ -44,26 +46,52 @@ public interface KafkaClient {
* Returns the number of milliseconds to wait, based on the connection state, before attempting to send data. When
* disconnected, this respects the reconnect backoff time. When connecting or connected, this handles slow/stalled
* connections.
*
* @param node The node to check
* @param now The current timestamp
* @return The number of milliseconds to wait.
*/
public long connectionDelay(Node node, long now);

/**
* Initiate the sending of the given requests and return any completed responses. Requests can only be sent on ready
* connections.
* @param requests The requests to send
* Queue up the given request for sending. Requests can only be sent on ready connections.
*
* @param request The request
* @param now The current time
*/
public void send(ClientRequest request);

/**
* Do actual reads and writes from sockets.
*
* @param timeout The maximum amount of time to wait for responses in ms
* @param now The current time in ms
* @throws IllegalStateException If a request is sent to an unready node
*/
public List<ClientResponse> poll(List<ClientRequest> requests, long timeout, long now);
public List<ClientResponse> poll(long timeout, long now);

/**
* Complete all in-flight requests for a given node
*
* @param node The node to complete requests for
* @param now The current time in ms
* @return All requests that complete during this time period.
*/
public List<ClientResponse> completeAll(int node, long now);

/**
* Complete all in-flight requests
*
* @param now The current time in ms
* @return All requests that complete during this time period.
*/
public List<ClientResponse> completeAll(long now);

/**
* Choose the node with the fewest outstanding requests. This method will prefer a node with an existing connection,
* but will potentially choose a node for which we don't yet have a connection if all existing connections are in
* use.
*
* @param now The current time in ms
* @return The node with the fewest in-flight requests.
*/
Expand All @@ -74,8 +102,16 @@ public interface KafkaClient {
*/
public int inFlightRequestCount();

/**
* Get the total in-flight requests for a particular node
*
* @param nodeId The id of the node
*/
public int inFlightRequestCount(int nodeId);

/**
* Generate a request header for the next request
*
* @param key The API key of the request
*/
public RequestHeader nextRequestHeader(ApiKeys key);
Expand Down
Loading

0 comments on commit 0699ff2

Please sign in to comment.