Skip to content

Commit

Permalink
Avoid prefetch too much data when offloading data to HDFS (apache#6717)
Browse files Browse the repository at this point in the history
Fixes apache#6692 

### Motivation
avoid prefetch too much data when offloading, which may lead to OOM;
fix object not close issue, which is also mentioned by congbobo184 apache#6697

*Explain here the context, and why you're making that change. What is the problem you're trying to solve.*

### Does this pull request potentially affect one of the following parts:

*If `yes` was chosen, please highlight the changes*

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API: (no)
  - The schema: (no)
  - The default values of configurations: (no)
  - The wire protocol: (no)
  - The rest endpoints: (no)
  - The admin cli options: (no)
  - Anything that affects deployment: (no)

### Documentation

  - Does this pull request introduce a new feature? (no)
  • Loading branch information
pheecian authored Apr 22, 2020
1 parent b9bdfa1 commit 514b6af
Show file tree
Hide file tree
Showing 4 changed files with 70 additions and 17 deletions.
3 changes: 3 additions & 0 deletions conf/broker.conf
Original file line number Diff line number Diff line change
Expand Up @@ -889,6 +889,9 @@ managedLedgerOffloadDriver=
# Maximum number of thread pool threads for ledger offloading
managedLedgerOffloadMaxThreads=2

# Maximum prefetch rounds for ledger reading for offloading
managedLedgerOffloadPrefetchRounds=1

# Use Open Range-Set to cache unacked messages
managedLedgerUnackedRangesOpenCacheSetEnabled=true

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1536,6 +1536,12 @@ public class ServiceConfiguration implements PulsarConfiguration {
)
private int managedLedgerOffloadMaxThreads = 2;

@FieldContext(
category = CATEGORY_STORAGE_OFFLOADING,
doc = "Maximum prefetch rounds for ledger reading for offloading"
)
private int managedLedgerOffloadPrefetchRounds = 1;

/**** --- Transaction config variables --- ****/
@FieldContext(
category = CATEGORY_TRANSACTION,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ public class OffloadPolicies {
public final static int DEFAULT_MAX_BLOCK_SIZE_IN_BYTES = 64 * 1024 * 1024; // 64MB
public final static int DEFAULT_READ_BUFFER_SIZE_IN_BYTES = 1024 * 1024; // 1MB
public final static int DEFAULT_OFFLOAD_MAX_THREADS = 2;
public final static int DEFAULT_OFFLOAD_MAX_PREFETCH_ROUNDS = 1;
public final static String[] DRIVER_NAMES = {"S3", "aws-s3", "google-cloud-storage", "filesystem"};
public final static String DEFAULT_OFFLOADER_DIRECTORY = "./offloaders";
public final static long DEFAULT_OFFLOAD_THRESHOLD_IN_BYTES = -1;
Expand All @@ -46,6 +47,7 @@ public class OffloadPolicies {
private String offloadersDirectory = DEFAULT_OFFLOADER_DIRECTORY;
private String managedLedgerOffloadDriver = null;
private int managedLedgerOffloadMaxThreads = DEFAULT_OFFLOAD_MAX_THREADS;
private int managedLedgerOffloadPrefetchRounds = DEFAULT_OFFLOAD_MAX_PREFETCH_ROUNDS;
private long managedLedgerOffloadThresholdInBytes = DEFAULT_OFFLOAD_THRESHOLD_IN_BYTES;
private Long managedLedgerOffloadDeletionLagInMillis = DEFAULT_OFFLOAD_DELETION_LAG_IN_MILLIS;

Expand Down Expand Up @@ -161,6 +163,7 @@ public int hashCode() {
return Objects.hash(
managedLedgerOffloadDriver,
managedLedgerOffloadMaxThreads,
managedLedgerOffloadPrefetchRounds,
managedLedgerOffloadThresholdInBytes,
managedLedgerOffloadDeletionLagInMillis,
s3ManagedLedgerOffloadRegion,
Expand Down Expand Up @@ -190,6 +193,7 @@ public boolean equals(Object obj) {
OffloadPolicies other = (OffloadPolicies) obj;
return Objects.equals(managedLedgerOffloadDriver, other.getManagedLedgerOffloadDriver())
&& Objects.equals(managedLedgerOffloadMaxThreads, other.getManagedLedgerOffloadMaxThreads())
&& Objects.equals(managedLedgerOffloadPrefetchRounds, other.getManagedLedgerOffloadPrefetchRounds())
&& Objects.equals(managedLedgerOffloadThresholdInBytes,
other.getManagedLedgerOffloadThresholdInBytes())
&& Objects.equals(managedLedgerOffloadDeletionLagInMillis,
Expand Down Expand Up @@ -222,6 +226,7 @@ public String toString() {
return MoreObjects.toStringHelper(this)
.add("managedLedgerOffloadDriver", managedLedgerOffloadDriver)
.add("managedLedgerOffloadMaxThreads", managedLedgerOffloadMaxThreads)
.add("managedLedgerOffloadPrefetchRounds", managedLedgerOffloadPrefetchRounds)
.add("managedLedgerOffloadThresholdInBytes", managedLedgerOffloadThresholdInBytes)
.add("managedLedgerOffloadDeletionLagInMillis", managedLedgerOffloadDeletionLagInMillis)
.add("s3ManagedLedgerOffloadRegion", s3ManagedLedgerOffloadRegion)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableMap;
import io.netty.util.Recycler;
import org.apache.bookkeeper.client.api.LedgerEntries;
import org.apache.bookkeeper.client.api.LedgerEntry;
import org.apache.bookkeeper.client.api.ReadHandle;
Expand All @@ -45,6 +46,7 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicLong;

import static org.apache.bookkeeper.mledger.offload.OffloadUtils.buildLedgerMetadataFormat;
Expand All @@ -68,6 +70,7 @@ public class FileSystemManagedLedgerOffloader implements LedgerOffloader {
public static boolean driverSupported(String driver) {
return DRIVER_NAMES.equals(driver);
}

@Override
public String getOffloadDriverName() {
return driverName;
Expand All @@ -82,7 +85,7 @@ private FileSystemManagedLedgerOffloader(OffloadPolicies conf, OrderedScheduler
this.configuration = new Configuration();
if (conf.getFileSystemProfilePath() != null) {
String[] paths = conf.getFileSystemProfilePath().split(",");
for (int i =0 ; i < paths.length; i++) {
for (int i = 0; i < paths.length; i++) {
configuration.addResource(new Path(paths[i]));
}
}
Expand All @@ -106,6 +109,7 @@ private FileSystemManagedLedgerOffloader(OffloadPolicies conf, OrderedScheduler
.numThreads(conf.getManagedLedgerOffloadMaxThreads())
.name("offload-assignment").build();
}

@VisibleForTesting
public FileSystemManagedLedgerOffloader(OffloadPolicies conf, OrderedScheduler scheduler, String testHDFSPath, String baseDir) throws IOException {
this.offloadPolicies = conf;
Expand Down Expand Up @@ -137,7 +141,7 @@ public Map<String, String> getOffloadDriverMetadata() {
@Override
public CompletableFuture<Void> offload(ReadHandle readHandle, UUID uuid, Map<String, String> extraMetadata) {
CompletableFuture<Void> promise = new CompletableFuture<>();
scheduler.chooseThread(readHandle.getId()).submit(new LedgerReader(readHandle, uuid, extraMetadata, promise, storageBasePath, configuration, assignmentScheduler));
scheduler.chooseThread(readHandle.getId()).submit(new LedgerReader(readHandle, uuid, extraMetadata, promise, storageBasePath, configuration, assignmentScheduler, offloadPolicies.getManagedLedgerOffloadPrefetchRounds()));
return promise;
}

Expand All @@ -151,16 +155,18 @@ private static class LedgerReader implements Runnable {
private final Configuration configuration;
volatile Exception fileSystemWriteException = null;
private OrderedScheduler assignmentScheduler;
private int managedLedgerOffloadPrefetchRounds = 1;

private LedgerReader(ReadHandle readHandle, UUID uuid, Map<String, String> extraMetadata, CompletableFuture<Void> promise,
String storageBasePath, Configuration configuration, OrderedScheduler assignmentScheduler) {
String storageBasePath, Configuration configuration, OrderedScheduler assignmentScheduler, int managedLedgerOffloadPrefetchRounds) {
this.readHandle = readHandle;
this.uuid = uuid;
this.extraMetadata = extraMetadata;
this.promise = promise;
this.storageBasePath = storageBasePath;
this.configuration = configuration;
this.assignmentScheduler = assignmentScheduler;
this.managedLedgerOffloadPrefetchRounds = managedLedgerOffloadPrefetchRounds;
}

@Override
Expand Down Expand Up @@ -188,13 +194,17 @@ public void run() {
AtomicLong haveOffloadEntryNumber = new AtomicLong(0);
long needToOffloadFirstEntryNumber = 0;
CountDownLatch countDownLatch;
//avoid prefetch too much data into memory
Semaphore semaphore = new Semaphore(managedLedgerOffloadPrefetchRounds);
do {
long end = Math.min(needToOffloadFirstEntryNumber + ENTRIES_PER_READ - 1, readHandle.getLastAddConfirmed());
log.debug("read ledger entries. start: {}, end: {}", needToOffloadFirstEntryNumber, end);
LedgerEntries ledgerEntriesOnce = readHandle.readAsync(needToOffloadFirstEntryNumber, end).get();
semaphore.acquire();
countDownLatch = new CountDownLatch(1);
assignmentScheduler.chooseThread(ledgerId).submit(new FileSystemWriter(ledgerEntriesOnce, dataWriter,
countDownLatch, haveOffloadEntryNumber, this)).addListener(() -> {}, Executors.newSingleThreadExecutor());
assignmentScheduler.chooseThread(ledgerId).submit(FileSystemWriter.create(ledgerEntriesOnce, dataWriter, semaphore,
countDownLatch, haveOffloadEntryNumber, this)).addListener(() -> {
}, Executors.newSingleThreadExecutor());
needToOffloadFirstEntryNumber = end + 1;
} while (needToOffloadFirstEntryNumber - 1 != readHandle.getLastAddConfirmed() && fileSystemWriteException == null);
countDownLatch.await();
Expand All @@ -216,24 +226,50 @@ public void run() {

private static class FileSystemWriter implements Runnable {

private final LedgerEntries ledgerEntriesOnce;
private LedgerEntries ledgerEntriesOnce;

private final LongWritable key = new LongWritable();
private final BytesWritable value = new BytesWritable();

private final MapFile.Writer dataWriter;
private final CountDownLatch countDownLatch;
private final AtomicLong haveOffloadEntryNumber;
private final LedgerReader ledgerReader;
private MapFile.Writer dataWriter;
private CountDownLatch countDownLatch;
private AtomicLong haveOffloadEntryNumber;
private LedgerReader ledgerReader;
private Semaphore semaphore;
private Recycler.Handle<FileSystemWriter> recyclerHandle;

private FileSystemWriter(Recycler.Handle<FileSystemWriter> recyclerHandle) {
this.recyclerHandle = recyclerHandle;
}

private static final Recycler<FileSystemWriter> RECYCLER = new Recycler<FileSystemWriter>() {
@Override
protected FileSystemWriter newObject(Recycler.Handle<FileSystemWriter> handle) {
return new FileSystemWriter(handle);
}
};

private void recycle() {
this.dataWriter = null;
this.countDownLatch = null;
this.haveOffloadEntryNumber = null;
this.ledgerReader = null;
this.ledgerEntriesOnce = null;
this.semaphore = null;
recyclerHandle.recycle(this);
}


private FileSystemWriter(LedgerEntries ledgerEntriesOnce, MapFile.Writer dataWriter,
CountDownLatch countDownLatch, AtomicLong haveOffloadEntryNumber, LedgerReader ledgerReader) {
this.ledgerEntriesOnce = ledgerEntriesOnce;
this.dataWriter = dataWriter;
this.countDownLatch = countDownLatch;
this.haveOffloadEntryNumber = haveOffloadEntryNumber;
this.ledgerReader = ledgerReader;
public static FileSystemWriter create(LedgerEntries ledgerEntriesOnce, MapFile.Writer dataWriter, Semaphore semaphore,
CountDownLatch countDownLatch, AtomicLong haveOffloadEntryNumber, LedgerReader ledgerReader) {
FileSystemWriter writer = RECYCLER.get();
writer.ledgerReader = ledgerReader;
writer.dataWriter = dataWriter;
writer.countDownLatch = countDownLatch;
writer.haveOffloadEntryNumber = haveOffloadEntryNumber;
writer.ledgerEntriesOnce = ledgerEntriesOnce;
writer.semaphore = semaphore;
return writer;
}

@Override
Expand All @@ -255,6 +291,9 @@ public void run() {
}
}
countDownLatch.countDown();
ledgerEntriesOnce.close();
semaphore.release();
this.recycle();
}
}

Expand Down

0 comments on commit 514b6af

Please sign in to comment.