Skip to content

Commit

Permalink
Merge pull request apache#9431 from je-ik/BEAM-8092: [BEAM-8092] chan…
Browse files Browse the repository at this point in the history
…ge guava's Optional to java.util in DirectRunner
je-ik authored Oct 4, 2019

Verified

This commit was created on GitHub.com and signed with GitHub’s verified signature. The key has expired.
2 parents 6a3fdfd + bfdf6b8 commit ff8653d
Showing 8 changed files with 22 additions and 22 deletions.
Original file line number Diff line number Diff line change
@@ -18,10 +18,10 @@
package org.apache.beam.runners.direct;

import com.google.auto.value.AutoValue;
import java.util.Optional;
import java.util.Set;
import org.apache.beam.sdk.runners.AppliedPTransform;
import org.apache.beam.sdk.transforms.View.CreatePCollectionView;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Optional;

/** A {@link TransformResult} that has been committed. */
@AutoValue
Original file line number Diff line number Diff line change
@@ -22,6 +22,7 @@
import java.util.Collection;
import java.util.HashSet;
import java.util.Map;
import java.util.Optional;
import javax.annotation.Nullable;
import org.apache.beam.runners.core.InMemoryStateInternals.InMemoryBag;
import org.apache.beam.runners.core.InMemoryStateInternals.InMemoryCombiningState;
@@ -51,7 +52,6 @@
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
import org.apache.beam.sdk.util.CombineFnUtil;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Optional;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
import org.joda.time.Instant;

@@ -162,9 +162,9 @@ private static class CopyOnAccessInMemoryStateTable extends StateTable {
private Optional<Instant> earliestWatermarkHold;

public CopyOnAccessInMemoryStateTable(StateTable underlying) {
this.underlying = Optional.fromNullable(underlying);
this.underlying = Optional.ofNullable(underlying);
binderFactory = new CopyOnBindBinderFactory(this.underlying);
earliestWatermarkHold = Optional.absent();
earliestWatermarkHold = Optional.empty();
}

/**
@@ -193,7 +193,7 @@ private void commit() {
earliestWatermarkHold = Optional.of(earliestHold);
clearEmpty();
binderFactory = new InMemoryStateBinderFactory();
underlying = Optional.absent();
underlying = Optional.empty();
}

/**
Original file line number Diff line number Diff line change
@@ -23,6 +23,7 @@
import java.util.EnumSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
@@ -45,7 +46,6 @@
import org.apache.beam.sdk.values.PValue;
import org.apache.beam.sdk.values.WindowingStrategy;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Optional;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.MoreExecutors;
@@ -178,7 +178,7 @@ public void initialize(
completedBundle,
result.getTimerUpdate().withCompletedTimers(completedTimers),
committedResult.getExecutable(),
committedResult.getUnprocessedInputs().orNull(),
committedResult.getUnprocessedInputs().orElse(null),
committedResult.getOutputs(),
result.getWatermarkHold());
return committedResult;
@@ -193,7 +193,7 @@ public void initialize(
private Optional<? extends CommittedBundle<?>> getUnprocessedInput(
CommittedBundle<?> completedBundle, TransformResult<?> result) {
if (completedBundle == null || Iterables.isEmpty(result.getUnprocessedElements())) {
return Optional.absent();
return Optional.empty();
}
CommittedBundle<?> residual =
completedBundle.withElements((Iterable) result.getUnprocessedElements());
Original file line number Diff line number Diff line change
@@ -20,6 +20,7 @@
import java.util.ArrayList;
import java.util.Collection;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutorService;
@@ -38,7 +39,6 @@
import org.apache.beam.sdk.util.UserCodeException;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PValue;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Optional;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.CacheBuilder;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.CacheLoader;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LoadingCache;
@@ -357,7 +357,7 @@ public static VisibleExecutorUpdate cancelled() {
}

private VisibleExecutorUpdate(State newState, @Nullable Throwable exception) {
this.thrown = Optional.fromNullable(exception);
this.thrown = Optional.ofNullable(exception);
this.newState = newState;
}

Original file line number Diff line number Diff line change
@@ -22,6 +22,7 @@
import java.util.Collection;
import java.util.Collections;
import java.util.Map;
import java.util.Optional;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicLong;
@@ -36,7 +37,6 @@
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Optional;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -311,12 +311,12 @@ abstract static class WorkUpdate {
private static WorkUpdate fromBundle(
CommittedBundle<?> bundle, Collection<AppliedPTransform<?, ?, ?>> consumers) {
return new AutoValue_QuiescenceDriver_WorkUpdate(
Optional.of(bundle), consumers, Optional.absent());
Optional.of(bundle), consumers, Optional.empty());
}

private static WorkUpdate fromException(Exception e) {
return new AutoValue_QuiescenceDriver_WorkUpdate(
Optional.absent(), Collections.emptyList(), Optional.of(e));
Optional.empty(), Collections.emptyList(), Optional.of(e));
}

/** Returns the bundle that produced this update. */
Original file line number Diff line number Diff line change
@@ -24,6 +24,7 @@
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;
import javax.annotation.Nullable;
@@ -42,7 +43,6 @@
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.WindowingStrategy;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.MoreObjects;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Optional;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.CacheBuilder;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.CacheLoader;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LoadingCache;
@@ -283,7 +283,7 @@ private class CurrentViewContentsLoader
@Override
public Optional<? extends Iterable<? extends WindowedValue<?>>> load(
PCollectionViewWindow<?> key) {
return Optional.fromNullable(viewByWindows.getUnchecked(key).get());
return Optional.ofNullable(viewByWindows.getUnchecked(key).get());
}
}
}
Original file line number Diff line number Diff line change
@@ -23,6 +23,7 @@
import java.util.Collections;
import java.util.EnumSet;
import java.util.List;
import java.util.Optional;
import org.apache.beam.runners.direct.CommittedResult.OutputType;
import org.apache.beam.sdk.coders.VarIntCoder;
import org.apache.beam.sdk.runners.AppliedPTransform;
@@ -34,7 +35,6 @@
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PDone;
import org.apache.beam.sdk.values.WindowingStrategy;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Optional;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
import org.hamcrest.Matchers;
import org.joda.time.Instant;
@@ -70,7 +70,7 @@ public void getTransformExtractsFromResult() {
CommittedResult<AppliedPTransform<?, ?, ?>> result =
CommittedResult.create(
StepTransformResult.withoutHold(transform).build(),
Optional.absent(),
Optional.empty(),
Collections.emptyList(),
EnumSet.noneOf(OutputType.class));

@@ -99,11 +99,11 @@ public void getUncommittedElementsNull() {
CommittedResult<AppliedPTransform<?, ?, ?>> result =
CommittedResult.create(
StepTransformResult.withoutHold(transform).build(),
Optional.absent(),
Optional.empty(),
Collections.emptyList(),
EnumSet.noneOf(OutputType.class));

assertThat(result.getUnprocessedInputs(), Matchers.equalTo(Optional.absent()));
assertThat(result.getUnprocessedInputs(), Matchers.equalTo(Optional.empty()));
}

@Test
@@ -129,7 +129,7 @@ public void getOutputsEqualInput() {
CommittedResult<AppliedPTransform<?, ?, ?>> result =
CommittedResult.create(
StepTransformResult.withoutHold(transform).build(),
Optional.absent(),
Optional.empty(),
outputs,
EnumSet.of(OutputType.BUNDLE, OutputType.PCOLLECTION_VIEW));

Original file line number Diff line number Diff line change
@@ -30,6 +30,7 @@
import java.util.Collections;
import java.util.EnumSet;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
@@ -42,7 +43,6 @@
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Optional;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.MoreExecutors;
import org.hamcrest.Matchers;
@@ -414,7 +414,7 @@ public CommittedResult handleResult(CommittedBundle<?> inputBundle, TransformRes

Optional<? extends CommittedBundle<?>> unprocessedBundle;
if (inputBundle == null || Iterables.isEmpty(unprocessedElements)) {
unprocessedBundle = Optional.absent();
unprocessedBundle = Optional.empty();
} else {
unprocessedBundle =
Optional.<CommittedBundle<?>>of(inputBundle.withElements(unprocessedElements));

0 comments on commit ff8653d

Please sign in to comment.