Skip to content

Commit

Permalink
Issue 5798: (SLTS) Fix close in GarbageCollector, ExtendedS3ChunkStor…
Browse files Browse the repository at this point in the history
…age and HDFSChunkStorage. (pravega#5800)

Fix close in GarbageCollector, ExtendedS3ChunkStorage and HDFSChunkStorage.
Make serializer static in TableBasedMetadataStore.

Signed-off-by: Sachin Joshi <[email protected]>
  • Loading branch information
sachin-j-joshi authored Mar 8, 2021
1 parent 29747d8 commit fa55f21
Show file tree
Hide file tree
Showing 4 changed files with 21 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -330,6 +330,7 @@ public void close() {
if (shouldClose && !this.closed.getAndSet(true)) {
this.client.destroy();
}
super.close();
}

private ChunkStorageException convertException(String chunkName, String message, Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ public void close() {
}
}
}
super.close();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import java.util.ArrayList;
import java.util.Collection;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.Delayed;
import java.util.concurrent.ScheduledExecutorService;
Expand Down Expand Up @@ -248,24 +247,21 @@ CompletableFuture<Boolean> deleteGarbage(boolean isBackground, int maxItems) {
// Find chunks to delete.
val chunksToDelete = new ArrayList<GarbageChunkInfo>();
int count = 0;
try {
// Block until you have at least one item.
GarbageChunkInfo info = garbageChunks.take();
log.trace("{}: deleteGarbage - retrieved {}", traceObjectId, info);
while (null != info ) {
queueSize.decrementAndGet();
chunksToDelete.add(info);

count++;
if (count >= maxItems) {
break;
}
// Do not block
info = garbageChunks.poll();
log.trace("{}: deleteGarbage - retrieved {}", traceObjectId, info);

// Wait until you have at least one item or timeout expires.
GarbageChunkInfo info = Exceptions.handleInterruptedCall(() -> garbageChunks.poll(config.getGarbageCollectionDelay().toMillis(), TimeUnit.MILLISECONDS));
log.trace("{}: deleteGarbage - retrieved {}", traceObjectId, info);
while (null != info ) {
queueSize.decrementAndGet();
chunksToDelete.add(info);

count++;
if (count >= maxItems) {
break;
}
} catch (InterruptedException e) {
throw new CompletionException(e);
// Do not block
info = garbageChunks.poll();
log.trace("{}: deleteGarbage - retrieved {}", traceObjectId, info);
}

// Sleep if no chunks to delete.
Expand Down Expand Up @@ -362,6 +358,7 @@ public void close() {
loopFuture.cancel(true);
}
closed.set(true);
executor.shutdownNow();
super.close();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@
*/
@Slf4j
public class TableBasedMetadataStore extends BaseMetadataStore {
private final static BaseMetadataStore.TransactionData.TransactionDataSerializer SERIALIZER = new BaseMetadataStore.TransactionData.TransactionDataSerializer();

/**
* Instance of the {@link TableStore}.
*/
Expand All @@ -59,7 +61,6 @@ public class TableBasedMetadataStore extends BaseMetadataStore {
private final String tableName;
private final Duration timeout = Duration.ofSeconds(30);
private final AtomicBoolean isTableInitialized = new AtomicBoolean(false);
private final BaseMetadataStore.TransactionData.TransactionDataSerializer serializer = new BaseMetadataStore.TransactionData.TransactionDataSerializer();

/**
* Constructor.
Expand Down Expand Up @@ -93,7 +94,7 @@ protected CompletableFuture<TransactionData> read(String key) {
val entry = entries.get(0);
if (null != entry) {
val arr = entry.getValue();
TransactionData txnData = serializer.deserialize(arr);
TransactionData txnData = SERIALIZER.deserialize(arr);
txnData.setDbObject(entry.getKey().getVersion());
txnData.setPersisted(true);
TABLE_GET_LATENCY.reportSuccessEvent(t.getElapsed());
Expand Down Expand Up @@ -142,7 +143,7 @@ protected CompletableFuture<Void> writeAll(Collection<TransactionData> dataList)
}

try {
val arraySegment = serializer.serialize(txnData);
val arraySegment = SERIALIZER.serialize(txnData);
TableEntry tableEntry = TableEntry.versioned(
new ByteArraySegment(txnData.getKey().getBytes(Charsets.UTF_8)),
arraySegment,
Expand Down

0 comments on commit fa55f21

Please sign in to comment.