Skip to content

Commit

Permalink
API, Core: Replace deprecated Counter with new Counter API (apache#5506)
Browse files Browse the repository at this point in the history
  • Loading branch information
nastra authored Aug 16, 2022
1 parent c608f16 commit c965af5
Show file tree
Hide file tree
Showing 17 changed files with 159 additions and 124 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@
import java.util.Arrays;
import org.apache.iceberg.io.FileIOMetricsContext;
import org.apache.iceberg.io.SeekableInputStream;
import org.apache.iceberg.metrics.Counter;
import org.apache.iceberg.metrics.MetricsContext;
import org.apache.iceberg.metrics.MetricsContext.Counter;
import org.apache.iceberg.metrics.MetricsContext.Unit;
import org.apache.iceberg.relocated.com.google.common.base.Joiner;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
Expand All @@ -49,8 +49,8 @@ public class OSSInputStream extends SeekableInputStream {
private long next = 0;
private boolean closed = false;

private final Counter<Long> readBytes;
private final Counter<Integer> readOperations;
private final Counter readBytes;
private final Counter readOperations;

/** @deprecated moving to package-private in 0.15.0 */
@Deprecated
Expand All @@ -63,9 +63,8 @@ public OSSInputStream(OSS client, OSSURI uri) {
this.uri = uri;
this.createStack = Thread.currentThread().getStackTrace();

this.readBytes = metrics.counter(FileIOMetricsContext.READ_BYTES, Long.class, Unit.BYTES);
this.readOperations =
metrics.counter(FileIOMetricsContext.READ_OPERATIONS, Integer.class, Unit.COUNT);
this.readBytes = metrics.counter(FileIOMetricsContext.READ_BYTES, Unit.BYTES);
this.readOperations = metrics.counter(FileIOMetricsContext.READ_OPERATIONS, Unit.COUNT);
}

@Override
Expand Down Expand Up @@ -103,7 +102,7 @@ public int read(byte[] b, int off, int len) throws IOException {
int bytesRead = stream.read(b, off, len);
pos += bytesRead;
next += bytesRead;
readBytes.increment((long) bytesRead);
readBytes.increment(bytesRead);
readOperations.increment();

return bytesRead;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@
import org.apache.iceberg.exceptions.NotFoundException;
import org.apache.iceberg.io.FileIOMetricsContext;
import org.apache.iceberg.io.PositionOutputStream;
import org.apache.iceberg.metrics.Counter;
import org.apache.iceberg.metrics.MetricsContext;
import org.apache.iceberg.metrics.MetricsContext.Counter;
import org.apache.iceberg.metrics.MetricsContext.Unit;
import org.apache.iceberg.relocated.com.google.common.base.Joiner;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
Expand All @@ -55,8 +55,8 @@ public class OSSOutputStream extends PositionOutputStream {
private long pos = 0;
private boolean closed = false;

private final Counter<Long> writeBytes;
private final Counter<Integer> writeOperations;
private final Counter writeBytes;
private final Counter writeOperations;

OSSOutputStream(
OSS client, OSSURI uri, AliyunProperties aliyunProperties, MetricsContext metrics) {
Expand All @@ -66,9 +66,8 @@ public class OSSOutputStream extends PositionOutputStream {

this.currentStagingFile = newStagingFile(aliyunProperties.ossStagingDirectory());
this.stream = newStream(currentStagingFile);
this.writeBytes = metrics.counter(FileIOMetricsContext.WRITE_BYTES, Long.class, Unit.BYTES);
this.writeOperations =
metrics.counter(FileIOMetricsContext.WRITE_OPERATIONS, Integer.class, Unit.COUNT);
this.writeBytes = metrics.counter(FileIOMetricsContext.WRITE_BYTES, Unit.BYTES);
this.writeOperations = metrics.counter(FileIOMetricsContext.WRITE_OPERATIONS, Unit.COUNT);
}

private static File newStagingFile(String ossStagingDirectory) {
Expand Down Expand Up @@ -122,7 +121,7 @@ public void write(byte[] b, int off, int len) throws IOException {
Preconditions.checkState(!closed, "Already closed.");
stream.write(b, off, len);
pos += len;
writeBytes.increment((long) len);
writeBytes.increment(len);
writeOperations.increment();
}

Expand Down
18 changes: 8 additions & 10 deletions api/src/main/java/org/apache/iceberg/io/CloseableIterable.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
import java.util.function.Function;
import java.util.function.Predicate;
import org.apache.iceberg.exceptions.RuntimeIOException;
import org.apache.iceberg.metrics.MetricsContext;
import org.apache.iceberg.metrics.Counter;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;

Expand Down Expand Up @@ -117,17 +117,17 @@ protected boolean shouldKeep(E item) {

/**
* Filters the given {@link CloseableIterable} and counts the number of elements that do not match
* the predicate by incrementing the {@link MetricsContext.Counter}.
* the predicate by incrementing the {@link Counter}.
*
* @param skipCounter The {@link MetricsContext.Counter} instance to increment on each skipped
* item during filtering.
* @param skipCounter The {@link Counter} instance to increment on each skipped item during
* filtering.
* @param iterable The underlying {@link CloseableIterable} to filter.
* @param <E> The underlying type to be iterated.
* @return A filtered {@link CloseableIterable} where the given skipCounter is incremented
* whenever the predicate does not match.
*/
static <E> CloseableIterable<E> filter(
MetricsContext.Counter<?> skipCounter, CloseableIterable<E> iterable, Predicate<E> pred) {
Counter skipCounter, CloseableIterable<E> iterable, Predicate<E> pred) {
Preconditions.checkArgument(null != skipCounter, "Invalid counter: null");
Preconditions.checkArgument(null != iterable, "Invalid iterable: null");
Preconditions.checkArgument(null != pred, "Invalid predicate: null");
Expand All @@ -148,17 +148,15 @@ protected boolean shouldKeep(E item) {

/**
* Counts the number of elements in the given {@link CloseableIterable} by incrementing the {@link
* MetricsContext.Counter} instance for each {@link Iterator#next()} call.
* Counter} instance for each {@link Iterator#next()} call.
*
* @param counter The {@link MetricsContext.Counter} instance to increment on each {@link
* Iterator#next()} call.
* @param counter The {@link Counter} instance to increment on each {@link Iterator#next()} call.
* @param iterable The underlying {@link CloseableIterable} to count
* @param <T> The underlying type to be iterated.
* @return A {@link CloseableIterable} that increments the given counter on each {@link
* Iterator#next()} call.
*/
static <T> CloseableIterable<T> count(
MetricsContext.Counter<?> counter, CloseableIterable<T> iterable) {
static <T> CloseableIterable<T> count(Counter counter, CloseableIterable<T> iterable) {
Preconditions.checkArgument(null != counter, "Invalid counter: null");
Preconditions.checkArgument(null != iterable, "Invalid iterable: null");
return new CloseableIterable<T>() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
import java.util.Collections;
import java.util.Iterator;
import java.util.function.Function;
import org.apache.iceberg.metrics.MetricsContext;
import org.apache.iceberg.metrics.Counter;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;

public interface CloseableIterator<T> extends Iterator<T>, Closeable {
Expand Down Expand Up @@ -79,8 +79,7 @@ public O next() {
};
}

static <T> CloseableIterator<T> count(
MetricsContext.Counter<?> counter, CloseableIterator<T> iterator) {
static <T> CloseableIterator<T> count(Counter counter, CloseableIterator<T> iterator) {
return new CloseableIterator<T>() {
@Override
public void close() throws IOException {
Expand Down
55 changes: 26 additions & 29 deletions api/src/main/java/org/apache/iceberg/metrics/ScanReport.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import java.time.Duration;
import java.util.concurrent.TimeUnit;
import org.apache.iceberg.Schema;
import org.apache.iceberg.metrics.MetricsContext.Counter;
import org.apache.iceberg.metrics.MetricsContext.Unit;
import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;
import org.apache.iceberg.relocated.com.google.common.base.Objects;
Expand Down Expand Up @@ -215,9 +214,9 @@ public static class CounterResult {
private final MetricsContext.Unit unit;
private final long value;

static CounterResult fromCounter(Counter<Long> counter) {
static CounterResult fromCounter(Counter counter) {
Preconditions.checkArgument(null != counter, "Invalid counter: null");
if (LongCounter.NOOP.equals(counter)) {
if (counter.isNoop()) {
return null;
}
return new CounterResult(counter.name(), counter.unit(), counter.value());
Expand Down Expand Up @@ -418,71 +417,69 @@ public static class ScanMetrics {
public static final String TOTAL_DELETE_FILE_SIZE_IN_BYTES = "total-delete-file-size-in-bytes";
public static final String SKIPPED_DATA_MANIFESTS = "skipped-data-manifests";
private final Timer totalPlanningDuration;
private final Counter<Long> resultDataFiles;
private final Counter<Long> resultDeleteFiles;
private final Counter<Long> totalDataManifests;
private final Counter<Long> totalDeleteManifests;
private final Counter<Long> scannedDataManifests;
private final Counter<Long> skippedDataManifests;
private final Counter<Long> totalFileSizeInBytes;
private final Counter<Long> totalDeleteFileSizeInBytes;
private final Counter resultDataFiles;
private final Counter resultDeleteFiles;
private final Counter totalDataManifests;
private final Counter totalDeleteManifests;
private final Counter scannedDataManifests;
private final Counter skippedDataManifests;
private final Counter totalFileSizeInBytes;
private final Counter totalDeleteFileSizeInBytes;

public ScanMetrics(MetricsContext metricsContext) {
Preconditions.checkArgument(null != metricsContext, "Invalid metrics context: null");
this.totalPlanningDuration =
metricsContext.timer(TOTAL_PLANNING_DURATION, TimeUnit.NANOSECONDS);
this.resultDataFiles =
metricsContext.counter(RESULT_DATA_FILES, Long.class, MetricsContext.Unit.COUNT);
this.resultDataFiles = metricsContext.counter(RESULT_DATA_FILES, MetricsContext.Unit.COUNT);
this.resultDeleteFiles =
metricsContext.counter(RESULT_DELETE_FILES, Long.class, MetricsContext.Unit.COUNT);
metricsContext.counter(RESULT_DELETE_FILES, MetricsContext.Unit.COUNT);
this.scannedDataManifests =
metricsContext.counter(SCANNED_DATA_MANIFESTS, Long.class, MetricsContext.Unit.COUNT);
metricsContext.counter(SCANNED_DATA_MANIFESTS, MetricsContext.Unit.COUNT);
this.totalDataManifests =
metricsContext.counter(TOTAL_DATA_MANIFESTS, Long.class, MetricsContext.Unit.COUNT);
metricsContext.counter(TOTAL_DATA_MANIFESTS, MetricsContext.Unit.COUNT);
this.totalDeleteManifests =
metricsContext.counter(TOTAL_DELETE_MANIFESTS, Long.class, MetricsContext.Unit.COUNT);
metricsContext.counter(TOTAL_DELETE_MANIFESTS, MetricsContext.Unit.COUNT);
this.totalFileSizeInBytes =
metricsContext.counter(TOTAL_FILE_SIZE_IN_BYTES, Long.class, MetricsContext.Unit.BYTES);
metricsContext.counter(TOTAL_FILE_SIZE_IN_BYTES, MetricsContext.Unit.BYTES);
this.totalDeleteFileSizeInBytes =
metricsContext.counter(
TOTAL_DELETE_FILE_SIZE_IN_BYTES, Long.class, MetricsContext.Unit.BYTES);
metricsContext.counter(TOTAL_DELETE_FILE_SIZE_IN_BYTES, MetricsContext.Unit.BYTES);
this.skippedDataManifests =
metricsContext.counter(SKIPPED_DATA_MANIFESTS, Long.class, MetricsContext.Unit.COUNT);
metricsContext.counter(SKIPPED_DATA_MANIFESTS, MetricsContext.Unit.COUNT);
}

public Timer totalPlanningDuration() {
return totalPlanningDuration;
}

public Counter<Long> resultDataFiles() {
public Counter resultDataFiles() {
return resultDataFiles;
}

public Counter<Long> resultDeleteFiles() {
public Counter resultDeleteFiles() {
return resultDeleteFiles;
}

public Counter<Long> scannedDataManifests() {
public Counter scannedDataManifests() {
return scannedDataManifests;
}

public Counter<Long> totalDataManifests() {
public Counter totalDataManifests() {
return totalDataManifests;
}

public Counter<Long> totalDeleteManifests() {
public Counter totalDeleteManifests() {
return totalDeleteManifests;
}

public Counter<Long> totalFileSizeInBytes() {
public Counter totalFileSizeInBytes() {
return totalFileSizeInBytes;
}

public Counter<Long> totalDeleteFileSizeInBytes() {
public Counter totalDeleteFileSizeInBytes() {
return totalDeleteFileSizeInBytes;
}

public Counter<Long> skippedDataManifests() {
public Counter skippedDataManifests() {
return skippedDataManifests;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.util.function.Predicate;
import org.apache.iceberg.AssertHelpers;
import org.apache.iceberg.io.TestableCloseableIterable.TestableCloseableIterator;
import org.apache.iceberg.metrics.Counter;
import org.apache.iceberg.metrics.DefaultMetricsContext;
import org.apache.iceberg.metrics.MetricsContext;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
Expand Down Expand Up @@ -203,8 +204,7 @@ public CloseableIterator<Integer> iterator() {

@Test
public void count() {
MetricsContext.Counter<Integer> counter =
new DefaultMetricsContext().counter("x", Integer.class, MetricsContext.Unit.COUNT);
Counter counter = new DefaultMetricsContext().counter("x", MetricsContext.Unit.COUNT);
CloseableIterable<Integer> items =
CloseableIterable.count(
counter, CloseableIterable.withNoopClose(Arrays.asList(1, 2, 3, 4, 5)));
Expand All @@ -215,8 +215,7 @@ public void count() {

@Test
public void countSkipped() {
MetricsContext.Counter<Integer> counter =
new DefaultMetricsContext().counter("x", Integer.class, MetricsContext.Unit.COUNT);
Counter counter = new DefaultMetricsContext().counter("x", MetricsContext.Unit.COUNT);
CloseableIterable<Integer> items =
CloseableIterable.filter(
counter,
Expand All @@ -233,8 +232,7 @@ public void countNullCheck() {
.isInstanceOf(IllegalArgumentException.class)
.hasMessage("Invalid counter: null");

MetricsContext.Counter<Integer> counter =
new DefaultMetricsContext().counter("x", Integer.class, MetricsContext.Unit.COUNT);
Counter counter = new DefaultMetricsContext().counter("x", MetricsContext.Unit.COUNT);
Assertions.assertThatThrownBy(() -> CloseableIterable.count(counter, null))
.isInstanceOf(IllegalArgumentException.class)
.hasMessage("Invalid iterable: null");
Expand All @@ -248,8 +246,7 @@ public void countSkippedNullCheck() {
.isInstanceOf(IllegalArgumentException.class)
.hasMessage("Invalid counter: null");

MetricsContext.Counter<Integer> counter =
new DefaultMetricsContext().counter("x", Integer.class, MetricsContext.Unit.COUNT);
Counter counter = new DefaultMetricsContext().counter("x", MetricsContext.Unit.COUNT);
Assertions.assertThatThrownBy(
() -> CloseableIterable.filter(counter, null, Predicate.isEqual(true)))
.isInstanceOf(IllegalArgumentException.class)
Expand Down
13 changes: 6 additions & 7 deletions aws/src/main/java/org/apache/iceberg/aws/s3/S3InputStream.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@
import org.apache.iceberg.io.IOUtil;
import org.apache.iceberg.io.RangeReadable;
import org.apache.iceberg.io.SeekableInputStream;
import org.apache.iceberg.metrics.Counter;
import org.apache.iceberg.metrics.MetricsContext;
import org.apache.iceberg.metrics.MetricsContext.Counter;
import org.apache.iceberg.metrics.MetricsContext.Unit;
import org.apache.iceberg.relocated.com.google.common.base.Joiner;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
Expand All @@ -51,8 +51,8 @@ class S3InputStream extends SeekableInputStream implements RangeReadable {
private long next = 0;
private boolean closed = false;

private final Counter<Long> readBytes;
private final Counter<Integer> readOperations;
private final Counter readBytes;
private final Counter readOperations;

private int skipSize = 1024 * 1024;

Expand All @@ -65,9 +65,8 @@ class S3InputStream extends SeekableInputStream implements RangeReadable {
this.location = location;
this.awsProperties = awsProperties;

this.readBytes = metrics.counter(FileIOMetricsContext.READ_BYTES, Long.class, Unit.BYTES);
this.readOperations =
metrics.counter(FileIOMetricsContext.READ_OPERATIONS, Integer.class, Unit.COUNT);
this.readBytes = metrics.counter(FileIOMetricsContext.READ_BYTES, Unit.BYTES);
this.readOperations = metrics.counter(FileIOMetricsContext.READ_OPERATIONS, Unit.COUNT);

this.createStack = Thread.currentThread().getStackTrace();
}
Expand Down Expand Up @@ -107,7 +106,7 @@ public int read(byte[] b, int off, int len) throws IOException {
int bytesRead = stream.read(b, off, len);
pos += bytesRead;
next += bytesRead;
readBytes.increment((long) bytesRead);
readBytes.increment(bytesRead);
readOperations.increment();

return bytesRead;
Expand Down
Loading

0 comments on commit c965af5

Please sign in to comment.