Skip to content

Commit

Permalink
[KAFKA-13369] Follower fetch protocol changes for tiered storage. (ap…
Browse files Browse the repository at this point in the history
…ache#11390)

This PR implements the follower fetch protocol as mentioned in KIP-405.

Added a new version for ListOffsets protocol to receive local log start offset on the leader replica. This is used by follower replicas to find the local log star offset on the leader.

Added a new version for FetchRequest protocol to receive OffsetMovedToTieredStorageException error. This is part of the enhanced fetch protocol as described in KIP-405.

We introduced a new field locaLogStartOffset to maintain the log start offset in the local logs. Existing logStartOffset will continue to be the log start offset of the effective log that includes the segments in remote storage.

When a follower receives OffsetMovedToTieredStorage, then it tries to build the required state from the leader and remote storage so that it can be ready to move to fetch state.

Introduced RemoteLogManager which is responsible for

initializing RemoteStorageManager and RemoteLogMetadataManager instances.
receives any leader and follower replica events and partition stop events and act on them
also provides APIs to fetch indexes, metadata about remote log segments.
Followup PRs will add more functionality like copying segments to tiered storage, retention checks to clean local and remote log segments. This will change the local log start offset and make sure the follower fetch protocol works fine for several cases.

You can look at the detailed protocol changes in KIP: https://cwiki.apache.org/confluence/display/KAFKA/KIP-405%3A+Kafka+Tiered+Storage#KIP405:KafkaTieredStorage-FollowerReplication

Co-authors: [email protected], [email protected], [email protected]

Reviewers: Kowshik Prakasam <[email protected]>, Cong Ding <[email protected]>, Tirtha Chatterjee <[email protected]>, Yaodong Yang <[email protected]>, Divij Vaidya <[email protected]>, Luke Chen <[email protected]>, Jun Rao <[email protected]>
  • Loading branch information
satishd authored Dec 17, 2022
1 parent f247aac commit 7146ac5
Show file tree
Hide file tree
Showing 65 changed files with 3,564 additions and 234 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -56,3 +56,5 @@ jmh-benchmarks/src/main/generated
**/.jqwik-database
**/src/generated
**/src/generated-test

storage/kafka-tiered-storage/
2 changes: 2 additions & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -875,6 +875,7 @@ project(':core') {
implementation project(':server-common')
implementation project(':group-coordinator')
implementation project(':metadata')
implementation project(':storage:api')
implementation project(':raft')
implementation project(':storage')

Expand Down Expand Up @@ -910,6 +911,7 @@ project(':core') {
testImplementation project(':metadata').sourceSets.test.output
testImplementation project(':raft').sourceSets.test.output
testImplementation project(':server-common').sourceSets.test.output
testImplementation project(':storage:api').sourceSets.test.output
testImplementation libs.bcpkix
testImplementation libs.mockitoCore
testImplementation(libs.apacheda) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.common.errors;

public class OffsetMovedToTieredStorageException extends ApiException {

private static final long serialVersionUID = 1L;

public OffsetMovedToTieredStorageException(String message) {
super(message);
}

public OffsetMovedToTieredStorageException(String message, Throwable cause) {
super(message, cause);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@
import org.apache.kafka.common.errors.OffsetMetadataTooLarge;
import org.apache.kafka.common.errors.OffsetNotAvailableException;
import org.apache.kafka.common.errors.OffsetOutOfRangeException;
import org.apache.kafka.common.errors.OffsetMovedToTieredStorageException;
import org.apache.kafka.common.errors.OperationNotAttemptedException;
import org.apache.kafka.common.errors.OutOfOrderSequenceException;
import org.apache.kafka.common.errors.PolicyViolationException;
Expand Down Expand Up @@ -370,7 +371,8 @@ public enum Errors {
TRANSACTIONAL_ID_NOT_FOUND(105, "The transactionalId could not be found", TransactionalIdNotFoundException::new),
FETCH_SESSION_TOPIC_ID_ERROR(106, "The fetch session encountered inconsistent topic ID usage", FetchSessionTopicIdException::new),
INELIGIBLE_REPLICA(107, "The new ISR contains at least one ineligible replica.", IneligibleReplicaException::new),
NEW_LEADER_ELECTED(108, "The AlterPartition request successfully updated the partition state but the leader has changed.", NewLeaderElectedException::new);
NEW_LEADER_ELECTED(108, "The AlterPartition request successfully updated the partition state but the leader has changed.", NewLeaderElectedException::new),
OFFSET_MOVED_TO_TIERED_STORAGE(109, "The requested offset is moved to tiered storage.", OffsetMovedToTieredStorageException::new);

private static final Logger log = LoggerFactory.getLogger(Errors.class);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,9 @@ public interface Records extends TransferableRecords {
int SIZE_LENGTH = 4;
int LOG_OVERHEAD = SIZE_OFFSET + SIZE_LENGTH;

// the magic offset is at the same offset for all current message formats, but the 4 bytes
// The magic offset is at the same offset for all current message formats, but the 4 bytes
// between the size and the magic is dependent on the version.
int MAGIC_OFFSET = 16;
int MAGIC_OFFSET = LOG_OVERHEAD + 4;
int MAGIC_LENGTH = 1;
int HEADER_SIZE_UP_TO_MAGIC = MAGIC_OFFSET + MAGIC_LENGTH;

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.common.record;

import org.apache.kafka.common.errors.CorruptRecordException;
import org.apache.kafka.common.utils.Utils;

import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;

import static org.apache.kafka.common.record.Records.HEADER_SIZE_UP_TO_MAGIC;
import static org.apache.kafka.common.record.Records.LOG_OVERHEAD;
import static org.apache.kafka.common.record.Records.MAGIC_OFFSET;
import static org.apache.kafka.common.record.Records.SIZE_OFFSET;

public class RemoteLogInputStream implements LogInputStream<RecordBatch> {
private final InputStream inputStream;
// LogHeader buffer up to magic.
private final ByteBuffer logHeaderBuffer = ByteBuffer.allocate(HEADER_SIZE_UP_TO_MAGIC);

public RemoteLogInputStream(InputStream inputStream) {
this.inputStream = inputStream;
}

@Override
public RecordBatch nextBatch() throws IOException {
logHeaderBuffer.clear();
Utils.readFully(inputStream, logHeaderBuffer);

if (logHeaderBuffer.position() < HEADER_SIZE_UP_TO_MAGIC)
return null;

logHeaderBuffer.rewind();
int size = logHeaderBuffer.getInt(SIZE_OFFSET);

// V0 has the smallest overhead, stricter checking is done later
if (size < LegacyRecord.RECORD_OVERHEAD_V0)
throw new CorruptRecordException(String.format("Found record size %d smaller than minimum record " +
"overhead (%d).", size, LegacyRecord.RECORD_OVERHEAD_V0));

// Total size is: "LOG_OVERHEAD + the size of the rest of the content"
int bufferSize = LOG_OVERHEAD + size;
// buffer contains the complete payload including header and records.
ByteBuffer buffer = ByteBuffer.allocate(bufferSize);

// write log header into buffer
buffer.put(logHeaderBuffer);

// write the records payload into the buffer
Utils.readFully(inputStream, buffer);
if (buffer.position() != bufferSize)
return null;
buffer.rewind();

byte magic = logHeaderBuffer.get(MAGIC_OFFSET);
MutableRecordBatch batch;
if (magic > RecordBatch.MAGIC_VALUE_V1)
batch = new DefaultRecordBatch(buffer);
else
batch = new AbstractLegacyRecordBatch.ByteBufferLegacyRecordBatch(buffer);

return batch;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,11 @@ public class ListOffsetsRequest extends AbstractRequest {
public static final long LATEST_TIMESTAMP = -1L;
public static final long MAX_TIMESTAMP = -3L;

/**
* It is used to represent the earliest message stored in the local log which is also called the local-log-start-offset
*/
public static final long EARLIEST_LOCAL_TIMESTAMP = -4L;

public static final int CONSUMER_REPLICA_ID = -1;
public static final int DEBUGGING_REPLICA_ID = -2;

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.common.utils;

import org.apache.kafka.common.KafkaException;

import java.io.File;
import java.io.IOException;
import java.net.URL;
import java.net.URLClassLoader;
import java.util.ArrayList;
import java.util.Enumeration;
import java.util.Locale;
import java.util.NoSuchElementException;

/**
* A class loader that looks for classes and resources in a specified class path first, before delegating to its parent
* class loader.
*/
public class ChildFirstClassLoader extends URLClassLoader {
static {
ClassLoader.registerAsParallelCapable();
}

/**
* @param classPath Class path string
* @param parent The parent classloader. If the required class / resource cannot be found in the given classPath,
* this classloader will be used to find the class / resource.
*/
public ChildFirstClassLoader(String classPath, ClassLoader parent) {
super(classpathToURLs(classPath), parent);
}

static private URL[] classpathToURLs(String classPath) {
ArrayList<URL> urls = new ArrayList<>();
for (String path : classPath.split(File.pathSeparator)) {
if (path == null || path.trim().isEmpty())
continue;
File file = new File(path);

try {
if (path.endsWith("/*")) {
File parent = new File(new File(file.getCanonicalPath()).getParent());
if (parent.isDirectory()) {
File[] files = parent.listFiles((dir, name) -> {
String lower = name.toLowerCase(Locale.ROOT);
return lower.endsWith(".jar") || lower.endsWith(".zip");
});
if (files != null) {
for (File jarFile : files) {
urls.add(jarFile.getCanonicalFile().toURI().toURL());
}
}
}
} else if (file.exists()) {
urls.add(file.getCanonicalFile().toURI().toURL());
}
} catch (IOException e) {
throw new KafkaException(e);
}
}
return urls.toArray(new URL[0]);
}

@Override
protected Class<?> loadClass(String name, boolean resolve) throws ClassNotFoundException {
synchronized (getClassLoadingLock(name)) {
Class<?> c = findLoadedClass(name);

if (c == null) {
try {
c = findClass(name);
} catch (ClassNotFoundException e) {
// Try parent
c = super.loadClass(name, false);
}
}

if (resolve)
resolveClass(c);

return c;
}
}

@Override
public URL getResource(String name) {
URL url = findResource(name);
if (url == null) {
// try parent
url = super.getResource(name);
}
return url;
}

@Override
public Enumeration<URL> getResources(String name) throws IOException {
Enumeration<URL> urls1 = findResources(name);
Enumeration<URL> urls2 = getParent() != null ? getParent().getResources(name) : null;

return new Enumeration<URL>() {
@Override
public boolean hasMoreElements() {
return (urls1 != null && urls1.hasMoreElements()) || (urls2 != null && urls2.hasMoreElements());
}

@Override
public URL nextElement() {
if (urls1 != null && urls1.hasMoreElements())
return urls1.nextElement();
if (urls2 != null && urls2.hasMoreElements())
return urls2.nextElement();
throw new NoSuchElementException();
}
};
}
}
4 changes: 3 additions & 1 deletion clients/src/main/resources/common/message/FetchRequest.json
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,9 @@
// the `LastFetchedEpoch` field
//
// Version 13 replaces topic names with topic IDs (KIP-516). May return UNKNOWN_TOPIC_ID error code.
"validVersions": "0-13",
//
// Version 14 is the same as version 13 but it also receives a new error called OffsetMovedToTieredStorageException(KIP-405)
"validVersions": "0-14",
"flexibleVersions": "12+",
"fields": [
{ "name": "ClusterId", "type": "string", "versions": "12+", "nullableVersions": "12+", "default": "null",
Expand Down
4 changes: 3 additions & 1 deletion clients/src/main/resources/common/message/FetchResponse.json
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,9 @@
// and leader discovery through the `CurrentLeader` field
//
// Version 13 replaces the topic name field with topic ID (KIP-516).
"validVersions": "0-13",
//
// Version 14 is the same as version 13 but it also receives a new error called OffsetMovedToTieredStorageException (KIP-405)
"validVersions": "0-14",
"flexibleVersions": "12+",
"fields": [
{ "name": "ThrottleTimeMs", "type": "int32", "versions": "1+", "ignorable": true,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,9 @@
// Version 6 enables flexible versions.
//
// Version 7 enables listing offsets by max timestamp (KIP-734).
"validVersions": "0-7",
//
// Version 8 enables listing offsets by local log start offset (KIP-405).
"validVersions": "0-8",
"flexibleVersions": "6+",
"fields": [
{ "name": "ReplicaId", "type": "int32", "versions": "0+", "entityType": "brokerId",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,10 @@
// Version 6 enables flexible versions.
//
// Version 7 is the same as version 6 (KIP-734).
"validVersions": "0-7",
//
// Version 8 enables listing offsets by local log start offset.
// This is the ealiest log start offset in the local log. (KIP-405).
"validVersions": "0-8",
"flexibleVersions": "6+",
"fields": [
{ "name": "ThrottleTimeMs", "type": "int32", "versions": "2+", "ignorable": true,
Expand Down
Loading

0 comments on commit 7146ac5

Please sign in to comment.