diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/datastore/V1Beta3.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/datastore/V1Beta3.java index 10c093745d..06b11eeb3d 100644 --- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/datastore/V1Beta3.java +++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/datastore/V1Beta3.java @@ -22,7 +22,6 @@ import static com.google.datastore.v1beta3.PropertyFilter.Operator.EQUAL; import static com.google.datastore.v1beta3.PropertyOrder.Direction.DESCENDING; import static com.google.datastore.v1beta3.QueryResultBatch.MoreResultsType.NOT_FINISHED; -import static com.google.datastore.v1beta3.client.DatastoreHelper.makeAndFilter; import static com.google.datastore.v1beta3.client.DatastoreHelper.makeFilter; import static com.google.datastore.v1beta3.client.DatastoreHelper.makeOrder; import static com.google.datastore.v1beta3.client.DatastoreHelper.makeUpsert; @@ -35,24 +34,29 @@ import com.google.cloud.dataflow.sdk.annotations.Experimental; import com.google.cloud.dataflow.sdk.coders.Coder; import com.google.cloud.dataflow.sdk.coders.SerializableCoder; -import com.google.cloud.dataflow.sdk.coders.protobuf.ProtoCoder; -import com.google.cloud.dataflow.sdk.io.BoundedSource; import com.google.cloud.dataflow.sdk.io.Sink; import com.google.cloud.dataflow.sdk.io.Sink.WriteOperation; import com.google.cloud.dataflow.sdk.io.Sink.Writer; import com.google.cloud.dataflow.sdk.options.GcpOptions; import com.google.cloud.dataflow.sdk.options.PipelineOptions; +import com.google.cloud.dataflow.sdk.transforms.Create; +import com.google.cloud.dataflow.sdk.transforms.DoFn; +import com.google.cloud.dataflow.sdk.transforms.Flatten; +import com.google.cloud.dataflow.sdk.transforms.GroupByKey; import com.google.cloud.dataflow.sdk.transforms.PTransform; +import com.google.cloud.dataflow.sdk.transforms.ParDo; +import com.google.cloud.dataflow.sdk.transforms.Values; import com.google.cloud.dataflow.sdk.transforms.display.DisplayData; +import com.google.cloud.dataflow.sdk.transforms.display.DisplayData.Builder; import com.google.cloud.dataflow.sdk.util.AttemptBoundedExponentialBackOff; import com.google.cloud.dataflow.sdk.util.RetryHttpRequestInitializer; +import com.google.cloud.dataflow.sdk.values.KV; import com.google.cloud.dataflow.sdk.values.PBegin; import com.google.cloud.dataflow.sdk.values.PCollection; import com.google.cloud.dataflow.sdk.values.PDone; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.MoreObjects; import com.google.common.collect.ImmutableList; -import com.google.common.primitives.Ints; import com.google.datastore.v1beta3.CommitRequest; import com.google.datastore.v1beta3.Entity; import com.google.datastore.v1beta3.EntityResult; @@ -77,7 +81,6 @@ import java.io.IOException; import java.io.Serializable; import java.util.ArrayList; -import java.util.Iterator; import java.util.List; import java.util.NoSuchElementException; @@ -97,7 +100,8 @@ *

To read a {@link PCollection} from a query to Datastore, use {@link V1Beta3#read} and * its methods {@link V1Beta3.Read#withProjectId} and {@link V1Beta3.Read#withQuery} to * specify the project to query and the query to read from. You can optionally provide a namespace - * to query within using {@link V1Beta3.Read#withNamespace}. + * to query within using {@link V1Beta3.Read#withNamespace}. You could also optionally specify + * how many splits you want for the query using {@link V1Beta3.Read#withNumQuerySplits}. * *

For example: * @@ -155,7 +159,7 @@ @Experimental(Experimental.Kind.SOURCE_SINK) public class V1Beta3 { - // A package-private constructor to prevent direct instantiation from outside of this package + // A package-private constructor to prevent direct instantiation from outside of this package V1Beta3() {} /** @@ -166,11 +170,12 @@ public class V1Beta3 { /** * Returns an empty {@link V1Beta3.Read} builder. Configure the source {@code projectId}, - * {@code query}, and optionally {@code namespace} using {@link V1Beta3.Read#withProjectId}, - * {@link V1Beta3.Read#withQuery}, and {@link V1Beta3.Read#withNamespace}. + * {@code query}, and optionally {@code namespace} and {@code numQuerySplits} using + * {@link V1Beta3.Read#withProjectId}, {@link V1Beta3.Read#withQuery}, + * {@link V1Beta3.Read#withNamespace}, {@link V1Beta3.Read#withNumQuerySplits}. */ public V1Beta3.Read read() { - return new V1Beta3.Read(null, null, null); + return new V1Beta3.Read(null, null, null, 0); } /** @@ -180,6 +185,24 @@ public V1Beta3.Read read() { * @see DatastoreIO */ public static class Read extends PTransform> { + private static final Logger LOG = LoggerFactory.getLogger(Read.class); + + /** An upper bound on the number of splits for a query. */ + public static final int NUM_QUERY_SPLITS_MAX = 50000; + + /** A lower bound on the number of splits for a query. */ + static final int NUM_QUERY_SPLITS_MIN = 12; + + /** Default bundle size of 64MB. */ + static final long DEFAULT_BUNDLE_SIZE_BYTES = 64 * 1024 * 1024; + + /** + * Maximum number of results to request per query. + * + *

Must be set, or it may result in an I/O error when querying Cloud Datastore. + */ + static final int QUERY_BATCH_LIMIT = 500; + @Nullable private final String projectId; @@ -189,15 +212,100 @@ public static class Read extends PTransform> { @Nullable private final String namespace; + private final int numQuerySplits; + + /** + * Computes the number of splits to be performed on the given query by querying the estimated + * size from Datastore. + */ + static int getEstimatedNumSplits(Datastore datastore, Query query, @Nullable String namespace) { + int numSplits; + try { + long estimatedSizeBytes = getEstimatedSizeBytes(datastore, query, namespace); + numSplits = (int) Math.min(NUM_QUERY_SPLITS_MAX, + Math.round(((double) estimatedSizeBytes) / DEFAULT_BUNDLE_SIZE_BYTES)); + } catch (Exception e) { + LOG.warn("Failed the fetch estimatedSizeBytes for query: {}", query, e); + // Fallback in case estimated size is unavailable. + numSplits = NUM_QUERY_SPLITS_MIN; + } + return Math.max(numSplits, NUM_QUERY_SPLITS_MIN); + } + + /** + * Get the estimated size of the data returned by the given query. + * + *

Datastore provides no way to get a good estimate of how large the result of a query + * entity kind being queried, using the __Stat_Kind__ system table, assuming exactly 1 kind + * is specified in the query. + * + *

See https://cloud.google.com/datastore/docs/concepts/stats. + */ + static long getEstimatedSizeBytes(Datastore datastore, Query query, @Nullable String namespace) + throws DatastoreException { + String ourKind = query.getKind(0).getName(); + Query.Builder queryBuilder = Query.newBuilder(); + if (namespace == null) { + queryBuilder.addKindBuilder().setName("__Stat_Kind__"); + } else { + queryBuilder.addKindBuilder().setName("__Ns_Stat_Kind__"); + } + queryBuilder.setFilter(makeFilter("kind_name", EQUAL, makeValue(ourKind).build())); + + // Get the latest statistics + queryBuilder.addOrder(makeOrder("timestamp", DESCENDING)); + queryBuilder.setLimit(Int32Value.newBuilder().setValue(1)); + + RunQueryRequest request = makeRequest(queryBuilder.build(), namespace); + + long now = System.currentTimeMillis(); + RunQueryResponse response = datastore.runQuery(request); + LOG.debug("Query for per-kind statistics took {}ms", System.currentTimeMillis() - now); + + QueryResultBatch batch = response.getBatch(); + if (batch.getEntityResultsCount() == 0) { + throw new NoSuchElementException( + "Datastore statistics for kind " + ourKind + " unavailable"); + } + Entity entity = batch.getEntityResults(0).getEntity(); + return entity.getProperties().get("entity_bytes").getIntegerValue(); + } + + /** Builds a {@link RunQueryRequest} from the {@code query} and {@code namespace}. */ + static RunQueryRequest makeRequest(Query query, @Nullable String namespace) { + RunQueryRequest.Builder requestBuilder = RunQueryRequest.newBuilder().setQuery(query); + if (namespace != null) { + requestBuilder.getPartitionIdBuilder().setNamespaceId(namespace); + } + return requestBuilder.build(); + } + + /** + * A helper function to get the split queries, taking into account the optional + * {@code namespace}. + */ + private static List splitQuery(Query query, @Nullable String namespace, + Datastore datastore, QuerySplitter querySplitter, int numSplits) throws DatastoreException { + // If namespace is set, include it in the split request so splits are calculated accordingly. + PartitionId.Builder partitionBuilder = PartitionId.newBuilder(); + if (namespace != null) { + partitionBuilder.setNamespaceId(namespace); + } + + return querySplitter.getSplits(query, partitionBuilder.build(), numSplits, datastore); + } + /** * Note that only {@code namespace} is really {@code @Nullable}. The other parameters may be * {@code null} as a matter of build order, but if they are {@code null} at instantiation time, * an error will be thrown. */ - private Read(@Nullable String projectId, @Nullable Query query, @Nullable String namespace) { + private Read(@Nullable String projectId, @Nullable Query query, @Nullable String namespace, + int numQuerySplits) { this.projectId = projectId; this.query = query; this.namespace = namespace; + this.numQuerySplits = numQuerySplits; } /** @@ -205,7 +313,7 @@ private Read(@Nullable String projectId, @Nullable Query query, @Nullable String */ public V1Beta3.Read withProjectId(String projectId) { checkNotNull(projectId, "projectId"); - return new V1Beta3.Read(projectId, query, namespace); + return new V1Beta3.Read(projectId, query, namespace, numQuerySplits); } /** @@ -220,14 +328,35 @@ public V1Beta3.Read withQuery(Query query) { checkNotNull(query, "query"); checkArgument(!query.hasLimit() || query.getLimit().getValue() > 0, "Invalid query limit %s: must be positive", query.getLimit().getValue()); - return new V1Beta3.Read(projectId, query, namespace); + return new V1Beta3.Read(projectId, query, namespace, numQuerySplits); } /** * Returns a new {@link V1Beta3.Read} that reads from the given namespace. */ public V1Beta3.Read withNamespace(String namespace) { - return new V1Beta3.Read(projectId, query, namespace); + return new V1Beta3.Read(projectId, query, namespace, numQuerySplits); + } + + /** + * Returns a new {@link V1Beta3.Read} that reads by splitting the given {@code query} into + * {@code numQuerySplits}. + * + *

The semantics for the query splitting is defined below: + *

+ */ + public V1Beta3.Read withNumQuerySplits(int numQuerySplits) { + return new V1Beta3.Read(projectId, query, namespace, + Math.min(Math.max(numQuerySplits, 0), NUM_QUERY_SPLITS_MAX)); } @Nullable @@ -245,9 +374,45 @@ public String getNamespace() { return namespace; } + + /** + * {@inheritDoc} + */ @Override public PCollection apply(PBegin input) { - return input.apply(com.google.cloud.dataflow.sdk.io.Read.from(getSource())); + V1Beta3Options v1Beta3Options = V1Beta3Options.from(getProjectId(), getQuery(), + getNamespace()); + + /* + * This composite transform involves the following steps: + * 1. Create a singleton of the user provided {@code query} and apply a {@link ParDo} that + * splits the query into {@code numQuerySplits} and assign each split query a unique + * {@code Integer} as the key. The resulting output is of the type + * {@code PCollection>}. + * + * If the value of {@code numQuerySplits} is less than or equal to 0, then the number of + * splits will be computed dynamically based on the size of the data for the {@code query}. + * + * 2. The resulting {@code PCollection} is sharded using a {@link GroupByKey} operation. The + * queries are extracted from they {@code KV>} and flattened to + * output a {@code PCollection}. + * + * 3. In the third step, a {@code ParDo} reads entities for each query and outputs + * a {@code PCollection}. + */ + PCollection> queries = input + .apply(Create.of(query)) + .apply(ParDo.of(new SplitQueryFn(v1Beta3Options, numQuerySplits))); + + PCollection shardedQueries = queries + .apply(GroupByKey.create()) + .apply(Values.>create()) + .apply(Flatten.iterables()); + + PCollection entities = shardedQueries + .apply(ParDo.of(new ReadFn(v1Beta3Options))); + + return entities; } @Override @@ -277,482 +442,294 @@ public String toString() { .toString(); } - @VisibleForTesting - DatastoreSource getSource() { - return new DatastoreSource(projectId, query, namespace); - } - } - - /** - * Returns an empty {@link V1Beta3.Write} builder. Configure the destination - * {@code projectId} using {@link V1Beta3.Write#withProjectId}. - */ - public Write write() { - return new Write(null); - } - - /** - * A {@link PTransform} that writes {@link Entity} objects to Cloud Datastore. - * - * @see DatastoreIO - */ - public static class Write extends PTransform, PDone> { - @Nullable - private final String projectId; - - /** - * Note that {@code projectId} is only {@code @Nullable} as a matter of build order, but if - * it is {@code null} at instantiation time, an error will be thrown. - */ - public Write(@Nullable String projectId) { - this.projectId = projectId; - } - /** - * Returns a new {@link Write} that writes to the Cloud Datastore for the specified project. + * A class for v1beta3 Datastore related options. */ - public Write withProjectId(String projectId) { - checkNotNull(projectId, "projectId"); - return new Write(projectId); - } - - @Override - public PDone apply(PCollection input) { - return input.apply( - com.google.cloud.dataflow.sdk.io.Write.to(new DatastoreSink(projectId))); - } - - @Override - public void validate(PCollection input) { - checkNotNull(projectId, "projectId"); - } - - @Nullable - public String getProjectId() { - return projectId; - } - - @Override - public String toString() { - return MoreObjects.toStringHelper(getClass()) - .add("projectId", projectId) - .toString(); - } + @VisibleForTesting + static class V1Beta3Options implements Serializable { + private final Query query; + private final String projectId; + @Nullable + private final String namespace; + + private V1Beta3Options(String projectId, Query query, @Nullable String namespace) { + this.projectId = checkNotNull(projectId, "projectId"); + this.query = checkNotNull(query, "query"); + this.namespace = namespace; + } - @Override - public void populateDisplayData(DisplayData.Builder builder) { - super.populateDisplayData(builder); - builder - .addIfNotNull(DisplayData.item("projectId", projectId) - .withLabel("Output Project")); - } - } + public static V1Beta3Options from(String projectId, Query query, @Nullable String namespace) { + return new V1Beta3Options(projectId, query, namespace); + } - /** - * A {@link com.google.cloud.dataflow.sdk.io.Source} that reads data from Datastore. - */ - static class DatastoreSource extends BoundedSource { + public Query getQuery() { + return query; + } - @Override - public Coder getDefaultOutputCoder() { - return ProtoCoder.of(Entity.class); - } + public String getProjectId() { + return projectId; + } - @Override - public boolean producesSortedKeys(PipelineOptions options) { - return false; + @Nullable + public String getNamespace() { + return namespace; + } } - @Override - public List splitIntoBundles(long desiredBundleSizeBytes, - PipelineOptions options) throws Exception { - // Users may request a limit on the number of results. We can currently support this by - // simply disabling parallel reads and using only a single split. - if (query.hasLimit()) { - return ImmutableList.of(this); + /** + * A {@link DoFn} that splits a given query into multiple sub-queries, assigns them unique keys + * and outputs them as {@link KV}. + */ + @VisibleForTesting + static class SplitQueryFn extends DoFn> { + private final V1Beta3Options options; + // number of splits to make for a given query + private final int numSplits; + + private final V1Beta3DatastoreFactory datastoreFactory; + // Datastore client + private transient Datastore datastore; + // Query splitter + private transient QuerySplitter querySplitter; + + public SplitQueryFn(V1Beta3Options options, int numSplits) { + this(options, numSplits, new V1Beta3DatastoreFactory()); } - long numSplits; - try { - numSplits = Math.round(((double) getEstimatedSizeBytes(options)) / desiredBundleSizeBytes); - } catch (Exception e) { - // Fallback in case estimated size is unavailable. TODO: fix this, it's horrible. - numSplits = 12; + @VisibleForTesting + SplitQueryFn(V1Beta3Options options, int numSplits, + V1Beta3DatastoreFactory datastoreFactory) { + this.options = options; + this.numSplits = numSplits; + this.datastoreFactory = datastoreFactory; } - // If the desiredBundleSize or number of workers results in 1 split, simply return - // a source that reads from the original query. - if (numSplits <= 1) { - return ImmutableList.of(this); + @Override + public void startBundle(Context c) throws Exception { + datastore = datastoreFactory.getDatastore(c.getPipelineOptions(), options.projectId); + querySplitter = datastoreFactory.getQuerySplitter(); } - List datastoreSplits; - try { - datastoreSplits = getSplitQueries(Ints.checkedCast(numSplits), options); - } catch (IllegalArgumentException | DatastoreException e) { - LOG.warn("Unable to parallelize the given query: {}", query, e); - return ImmutableList.of(this); - } + @Override + public void processElement(ProcessContext c) throws Exception { + int key = 1; + Query query = c.element(); - ImmutableList.Builder splits = ImmutableList.builder(); - for (Query splitQuery : datastoreSplits) { - splits.add(new DatastoreSource(projectId, splitQuery, namespace)); - } - return splits.build(); - } + // If query has a user set limit, then do not split. + if (query.hasLimit()) { + c.output(KV.of(key, query)); + return; + } - @Override - public BoundedReader createReader(PipelineOptions pipelineOptions) throws IOException { - return new DatastoreReader(this, getDatastore(pipelineOptions)); - } + int estimatedNumSplits; + // Compute the estimated numSplits if numSplits is not specified by the user. + if (numSplits <= 0) { + estimatedNumSplits = getEstimatedNumSplits(datastore, query, options.getNamespace()); + } else { + estimatedNumSplits = numSplits; + } - @Override - public void validate() { - checkNotNull(query, "query"); - checkNotNull(projectId, "projectId"); - } + List querySplits; + try { + querySplits = splitQuery(query, options.getNamespace(), datastore, querySplitter, + estimatedNumSplits); + } catch (Exception e) { + LOG.warn("Unable to parallelize the given query: {}", query, e); + querySplits = ImmutableList.of(query); + } - @Override - public long getEstimatedSizeBytes(PipelineOptions options) throws Exception { - // Datastore provides no way to get a good estimate of how large the result of a query - // will be. As a rough approximation, we attempt to fetch the statistics of the whole - // entity kind being queried, using the __Stat_Kind__ system table, assuming exactly 1 kind - // is specified in the query. - // - // See https://cloud.google.com/datastore/docs/concepts/stats - if (mockEstimateSizeBytes != null) { - return mockEstimateSizeBytes; + // assign unique keys to query splits. + for (Query subquery : querySplits) { + c.output(KV.of(key++, subquery)); + } } - Datastore datastore = getDatastore(options); - if (query.getKindCount() != 1) { - throw new UnsupportedOperationException( - "Can only estimate size for queries specifying exactly 1 kind."); + @Override + public void populateDisplayData(Builder builder) { + super.populateDisplayData(builder); + builder + .addIfNotNull(DisplayData.item("projectId", options.getProjectId()) + .withLabel("ProjectId")) + .addIfNotNull(DisplayData.item("namespace", options.getNamespace()) + .withLabel("Namespace")) + .addIfNotNull(DisplayData.item("query", options.getQuery().toString()) + .withLabel("Query")); } - String ourKind = query.getKind(0).getName(); - long latestTimestamp = queryLatestStatisticsTimestamp(datastore); - Query.Builder query = Query.newBuilder(); - if (namespace == null) { - query.addKindBuilder().setName("__Stat_Kind__"); - } else { - query.addKindBuilder().setName("__Ns_Stat_Kind__"); + } + + /** + * A {@link DoFn} that reads entities from Datastore for each query. + */ + @VisibleForTesting + static class ReadFn extends DoFn { + private final V1Beta3Options options; + private final V1Beta3DatastoreFactory datastoreFactory; + // Datastore client + private transient Datastore datastore; + + public ReadFn(V1Beta3Options options) { + this(options, new V1Beta3DatastoreFactory()); } - query.setFilter(makeAndFilter( - makeFilter("kind_name", EQUAL, makeValue(ourKind)).build(), - makeFilter("timestamp", EQUAL, makeValue(latestTimestamp)).build())); - RunQueryRequest request = makeRequest(query.build()); - long now = System.currentTimeMillis(); - RunQueryResponse response = datastore.runQuery(request); - LOG.info("Query for per-kind statistics took {}ms", System.currentTimeMillis() - now); + @VisibleForTesting + ReadFn(V1Beta3Options options, V1Beta3DatastoreFactory datastoreFactory) { + this.options = options; + this.datastoreFactory = datastoreFactory; + } - QueryResultBatch batch = response.getBatch(); - if (batch.getEntityResultsCount() == 0) { - throw new NoSuchElementException( - "Datastore statistics for kind " + ourKind + " unavailable"); + @Override + public void startBundle(Context c) throws Exception { + datastore = datastoreFactory.getDatastore(c.getPipelineOptions(), options.getProjectId()); } - Entity entity = batch.getEntityResults(0).getEntity(); - return entity.getProperties().get("entity_bytes").getIntegerValue(); - } - @Override - public void populateDisplayData(DisplayData.Builder builder) { - super.populateDisplayData(builder); - builder - .addIfNotNull(DisplayData.item("projectId", projectId) - .withLabel("ProjectId")) - .addIfNotNull(DisplayData.item("namespace", namespace) - .withLabel("Namespace")) - .addIfNotNull(DisplayData.item("query", query.toString()) - .withLabel("Query")); - } + /** Read and output entities for the given query. */ + @Override + public void processElement(ProcessContext context) throws Exception { + Query query = context.element(); + String namespace = options.getNamespace(); + int userLimit = query.hasLimit() + ? query.getLimit().getValue() : Integer.MAX_VALUE; - @Override - public String toString() { - return MoreObjects.toStringHelper(getClass()) - .add("projectId", projectId) - .add("query", query) - .add("namespace", namespace) - .toString(); - } + boolean moreResults = true; + QueryResultBatch currentBatch = null; - private static final Logger LOG = LoggerFactory.getLogger(DatastoreSource.class); - private final String projectId; - private final Query query; - @Nullable - private final String namespace; + while (moreResults) { + Query.Builder queryBuilder = query.toBuilder().clone(); + queryBuilder.setLimit(Int32Value.newBuilder().setValue( + Math.min(userLimit, QUERY_BATCH_LIMIT))); - /** For testing only. TODO: This could be much cleaner with dependency injection. */ - @Nullable - private QuerySplitter mockSplitter; - @Nullable - private Long mockEstimateSizeBytes; + if (currentBatch != null && !currentBatch.getEndCursor().isEmpty()) { + queryBuilder.setStartCursor(currentBatch.getEndCursor()); + } - DatastoreSource(String projectId, Query query, @Nullable String namespace) { - this.projectId = projectId; - this.query = query; - this.namespace = namespace; - } + RunQueryRequest request = makeRequest(queryBuilder.build(), namespace); + RunQueryResponse response = datastore.runQuery(request); - /** - * A helper function to get the split queries, taking into account the optional - * {@code namespace} and whether there is a mock splitter. - */ - private List getSplitQueries(int numSplits, PipelineOptions options) - throws DatastoreException { - // If namespace is set, include it in the split request so splits are calculated accordingly. - PartitionId.Builder partitionBuilder = PartitionId.newBuilder(); - if (namespace != null) { - partitionBuilder.setNamespaceId(namespace); - } + currentBatch = response.getBatch(); - if (mockSplitter != null) { - // For testing. - return mockSplitter.getSplits(query, partitionBuilder.build(), numSplits, null); - } + // MORE_RESULTS_AFTER_LIMIT is not implemented yet: + // https://groups.google.com/forum/#!topic/gcd-discuss/iNs6M1jA2Vw, so + // use result count to determine if more results might exist. + int numFetch = currentBatch.getEntityResultsCount(); + if (query.hasLimit()) { + verify(userLimit >= numFetch, + "Expected userLimit %s >= numFetch %s, because query limit %s must be <= userLimit", + userLimit, numFetch, query.getLimit()); + userLimit -= numFetch; + } - return DatastoreHelper.getQuerySplitter().getSplits( - query, partitionBuilder.build(), numSplits, getDatastore(options)); - } + // output all the entities from the current batch. + for (EntityResult entityResult : currentBatch.getEntityResultsList()) { + context.output(entityResult.getEntity()); + } - /** - * Builds a {@link RunQueryRequest} from the {@code query}, using the properties set on this - * {@code DatastoreSource}. For example, sets the {@code namespace} for the request. - */ - private RunQueryRequest makeRequest(Query query) { - RunQueryRequest.Builder requestBuilder = RunQueryRequest.newBuilder().setQuery(query); - if (namespace != null) { - requestBuilder.getPartitionIdBuilder().setNamespaceId(namespace); + // Check if we have more entities to be read. + moreResults = + // User-limit does not exist (so userLimit == MAX_VALUE) and/or has not been satisfied + (userLimit > 0) + // All indications from the API are that there are/may be more results. + && ((numFetch == QUERY_BATCH_LIMIT) + || (currentBatch.getMoreResults() == NOT_FINISHED)); + } } - return requestBuilder.build(); } /** - * Datastore system tables with statistics are periodically updated. This method fetches - * the latest timestamp of statistics update using the {@code __Stat_Total__} table. + * A wrapper factory class for Datastore singleton classes {@link DatastoreFactory} and + * {@link QuerySplitter} + * + *

{@link DatastoreFactory} and {@link QuerySplitter} are not java serializable, hence + * wrapping them under this class, which implements {@link Serializable}. */ - private long queryLatestStatisticsTimestamp(Datastore datastore) throws DatastoreException { - Query.Builder query = Query.newBuilder(); - query.addKindBuilder().setName("__Stat_Total__"); - query.addOrder(makeOrder("timestamp", DESCENDING)); - query.setLimit(Int32Value.newBuilder().setValue(1)); - RunQueryRequest request = makeRequest(query.build()); + @VisibleForTesting + static class V1Beta3DatastoreFactory implements Serializable { + + /** Builds a Datastore client for the given pipeline options and project. */ + public Datastore getDatastore(PipelineOptions pipelineOptions, String projectId) { + DatastoreOptions.Builder builder = + new DatastoreOptions.Builder() + .projectId(projectId) + .initializer( + new RetryHttpRequestInitializer() + ); + + Credential credential = pipelineOptions.as(GcpOptions.class).getGcpCredential(); + if (credential != null) { + builder.credential(credential); + } - long now = System.currentTimeMillis(); - RunQueryResponse response = datastore.runQuery(request); - LOG.info("Query for latest stats timestamp of project {} took {}ms", projectId, - System.currentTimeMillis() - now); - QueryResultBatch batch = response.getBatch(); - if (batch.getEntityResultsCount() == 0) { - throw new NoSuchElementException( - "Datastore total statistics for project " + projectId + " unavailable"); + return DatastoreFactory.get().create(builder.build()); } - Entity entity = batch.getEntityResults(0).getEntity(); - return entity.getProperties().get("timestamp").getTimestampValue().getNanos(); - } - - private Datastore getDatastore(PipelineOptions pipelineOptions) { - DatastoreOptions.Builder builder = - new DatastoreOptions.Builder() - .projectId(projectId) - .initializer( - new RetryHttpRequestInitializer() - ); - Credential credential = pipelineOptions.as(GcpOptions.class).getGcpCredential(); - if (credential != null) { - builder.credential(credential); + /** Builds a Datastore {@link QuerySplitter}. */ + public QuerySplitter getQuerySplitter() { + return DatastoreHelper.getQuerySplitter(); } - return DatastoreFactory.get().create(builder.build()); - } - - /** For testing only. */ - DatastoreSource withMockSplitter(QuerySplitter splitter) { - DatastoreSource res = new DatastoreSource(projectId, query, namespace); - res.mockSplitter = splitter; - res.mockEstimateSizeBytes = mockEstimateSizeBytes; - return res; - } - - /** For testing only. */ - DatastoreSource withMockEstimateSizeBytes(Long estimateSizeBytes) { - DatastoreSource res = new DatastoreSource(projectId, query, namespace); - res.mockSplitter = mockSplitter; - res.mockEstimateSizeBytes = estimateSizeBytes; - return res; } + } - @VisibleForTesting - Query getQuery() { - return query; - } + /** + * Returns an empty {@link V1Beta3.Write} builder. Configure the destination + * {@code projectId} using {@link V1Beta3.Write#withProjectId}. + */ + public Write write() { + return new Write(null); } /** - * A {@link DatastoreSource.Reader} over the records from a query of the datastore. + * A {@link PTransform} that writes {@link Entity} objects to Cloud Datastore. * - *

Timestamped records are currently not supported. - * All records implicitly have the timestamp of {@code BoundedWindow.TIMESTAMP_MIN_VALUE}. + * @see DatastoreIO */ - @VisibleForTesting - static class DatastoreReader extends BoundedSource.BoundedReader { - private final DatastoreSource source; - - /** - * Datastore to read from. - */ - private final Datastore datastore; - - /** - * True if more results may be available. - */ - private boolean moreResults; - - /** - * Iterator over records. - */ - private java.util.Iterator entities; - - /** - * Current batch of query results. - */ - private QueryResultBatch currentBatch; - - /** - * Maximum number of results to request per query. - * - *

Must be set, or it may result in an I/O error when querying - * Cloud Datastore. - */ - private static final int QUERY_BATCH_LIMIT = 500; + public static class Write extends PTransform, PDone> { + @Nullable + private final String projectId; /** - * Remaining user-requested limit on the number of sources to return. If the user did not set a - * limit, then this variable will always have the value {@link Integer#MAX_VALUE} and will never - * be decremented. + * Note that {@code projectId} is only {@code @Nullable} as a matter of build order, but if + * it is {@code null} at instantiation time, an error will be thrown. */ - private int userLimit; - - private volatile boolean done = false; - - private Entity currentEntity; + public Write(@Nullable String projectId) { + this.projectId = projectId; + } /** - * Returns a DatastoreReader with DatastoreSource and Datastore object set. - * - * @param datastore a datastore connection to use. + * Returns a new {@link Write} that writes to the Cloud Datastore for the specified project. */ - public DatastoreReader(DatastoreSource source, Datastore datastore) { - this.source = source; - this.datastore = datastore; - // If the user set a limit on the query, remember it. Otherwise pin to MAX_VALUE. - userLimit = source.query.hasLimit() - ? source.query.getLimit().getValue() : Integer.MAX_VALUE; - } - - @Override - public Entity getCurrent() { - return currentEntity; - } - - @Override - public final long getSplitPointsConsumed() { - return done ? 1 : 0; - } - - @Override - public final long getSplitPointsRemaining() { - return done ? 0 : 1; - } - - @Override - public boolean start() throws IOException { - return advance(); + public Write withProjectId(String projectId) { + checkNotNull(projectId, "projectId"); + return new Write(projectId); } @Override - public boolean advance() throws IOException { - if (entities == null || (!entities.hasNext() && moreResults)) { - try { - entities = getIteratorAndMoveCursor(); - } catch (DatastoreException e) { - throw new IOException(e); - } - } - - if (entities == null || !entities.hasNext()) { - currentEntity = null; - done = true; - return false; - } - - currentEntity = entities.next().getEntity(); - return true; + public PDone apply(PCollection input) { + return input.apply( + com.google.cloud.dataflow.sdk.io.Write.to(new DatastoreSink(projectId))); } @Override - public void close() throws IOException { - // Nothing + public void validate(PCollection input) { + checkNotNull(projectId, "projectId"); } - @Override - public DatastoreSource getCurrentSource() { - return source; + @Nullable + public String getProjectId() { + return projectId; } @Override - public DatastoreSource splitAtFraction(double fraction) { - // Not supported. - return null; + public String toString() { + return MoreObjects.toStringHelper(getClass()) + .add("projectId", projectId) + .toString(); } @Override - public Double getFractionConsumed() { - // Not supported. - return null; - } - - /** - * Returns an iterator over the next batch of records for the query - * and updates the cursor to get the next batch as needed. - * Query has specified limit and offset from InputSplit. - */ - private Iterator getIteratorAndMoveCursor() throws DatastoreException { - Query.Builder query = source.query.toBuilder().clone(); - query.setLimit(Int32Value.newBuilder().setValue(Math.min(userLimit, QUERY_BATCH_LIMIT))); - if (currentBatch != null && !currentBatch.getEndCursor().isEmpty()) { - query.setStartCursor(currentBatch.getEndCursor()); - } - - RunQueryRequest request = source.makeRequest(query.build()); - RunQueryResponse response = datastore.runQuery(request); - - currentBatch = response.getBatch(); - - // MORE_RESULTS_AFTER_LIMIT is not implemented yet: - // https://groups.google.com/forum/#!topic/gcd-discuss/iNs6M1jA2Vw, so - // use result count to determine if more results might exist. - int numFetch = currentBatch.getEntityResultsCount(); - if (source.query.hasLimit()) { - verify(userLimit >= numFetch, - "Expected userLimit %s >= numFetch %s, because query limit %s should be <= userLimit", - userLimit, numFetch, query.getLimit()); - userLimit -= numFetch; - } - moreResults = - // User-limit does not exist (so userLimit == MAX_VALUE) and/or has not been satisfied. - (userLimit > 0) - // All indications from the API are that there are/may be more results. - && ((numFetch == QUERY_BATCH_LIMIT) - || (currentBatch.getMoreResults() == NOT_FINISHED)); - - // May receive a batch of 0 results if the number of records is a multiple - // of the request limit. - if (numFetch == 0) { - return null; - } - - return currentBatch.getEntityResultsList().iterator(); + public void populateDisplayData(DisplayData.Builder builder) { + super.populateDisplayData(builder); + builder + .addIfNotNull(DisplayData.item("projectId", projectId) + .withLabel("Output Project")); } } diff --git a/sdk/src/test/java/com/google/cloud/dataflow/sdk/io/datastore/V1Beta3Test.java b/sdk/src/test/java/com/google/cloud/dataflow/sdk/io/datastore/V1Beta3Test.java index 0f9733eda5..7f668565e7 100644 --- a/sdk/src/test/java/com/google/cloud/dataflow/sdk/io/datastore/V1Beta3Test.java +++ b/sdk/src/test/java/com/google/cloud/dataflow/sdk/io/datastore/V1Beta3Test.java @@ -16,9 +16,18 @@ package com.google.cloud.dataflow.sdk.io.datastore; +import static com.google.cloud.dataflow.sdk.io.datastore.V1Beta3.Read.DEFAULT_BUNDLE_SIZE_BYTES; +import static com.google.cloud.dataflow.sdk.io.datastore.V1Beta3.Read.QUERY_BATCH_LIMIT; +import static com.google.cloud.dataflow.sdk.io.datastore.V1Beta3.Read.getEstimatedSizeBytes; +import static com.google.cloud.dataflow.sdk.io.datastore.V1Beta3.Read.makeRequest; import static com.google.cloud.dataflow.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem; +import static com.google.datastore.v1beta3.PropertyFilter.Operator.EQUAL; +import static com.google.datastore.v1beta3.PropertyOrder.Direction.DESCENDING; +import static com.google.datastore.v1beta3.client.DatastoreHelper.makeFilter; import static com.google.datastore.v1beta3.client.DatastoreHelper.makeKey; +import static com.google.datastore.v1beta3.client.DatastoreHelper.makeOrder; +import static com.google.datastore.v1beta3.client.DatastoreHelper.makeValue; import static org.hamcrest.Matchers.containsInAnyOrder; import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.hamcrest.Matchers.lessThanOrEqualTo; @@ -28,37 +37,32 @@ import static org.junit.Assert.assertTrue; import static org.mockito.Matchers.any; import static org.mockito.Matchers.eq; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.never; -import static org.mockito.Mockito.spy; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyNoMoreInteractions; +import static org.mockito.Mockito.verifyZeroInteractions; import static org.mockito.Mockito.when; -import com.google.cloud.dataflow.sdk.io.datastore.V1Beta3.DatastoreReader; -import com.google.cloud.dataflow.sdk.io.datastore.V1Beta3.DatastoreSource; import com.google.cloud.dataflow.sdk.io.datastore.V1Beta3.DatastoreWriter; -import com.google.cloud.dataflow.sdk.options.GcpOptions; +import com.google.cloud.dataflow.sdk.io.datastore.V1Beta3.Read.ReadFn; +import com.google.cloud.dataflow.sdk.io.datastore.V1Beta3.Read.SplitQueryFn; +import com.google.cloud.dataflow.sdk.io.datastore.V1Beta3.Read.V1Beta3DatastoreFactory; +import com.google.cloud.dataflow.sdk.io.datastore.V1Beta3.Read.V1Beta3Options; import com.google.cloud.dataflow.sdk.options.PipelineOptions; -import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory; -import com.google.cloud.dataflow.sdk.testing.ExpectedLogs; +import com.google.cloud.dataflow.sdk.transforms.DoFnTester; +import com.google.cloud.dataflow.sdk.transforms.DoFnTester.CloningBehavior; import com.google.cloud.dataflow.sdk.transforms.display.DisplayData; -import com.google.cloud.dataflow.sdk.util.TestCredential; +import com.google.cloud.dataflow.sdk.values.KV; import com.google.common.collect.Lists; import com.google.datastore.v1beta3.Entity; import com.google.datastore.v1beta3.EntityResult; import com.google.datastore.v1beta3.Key; -import com.google.datastore.v1beta3.KindExpression; import com.google.datastore.v1beta3.PartitionId; -import com.google.datastore.v1beta3.PropertyFilter; import com.google.datastore.v1beta3.Query; import com.google.datastore.v1beta3.QueryResultBatch; import com.google.datastore.v1beta3.RunQueryRequest; import com.google.datastore.v1beta3.RunQueryResponse; -import com.google.datastore.v1beta3.Value; import com.google.datastore.v1beta3.client.Datastore; -import com.google.datastore.v1beta3.client.DatastoreHelper; import com.google.datastore.v1beta3.client.QuerySplitter; import com.google.protobuf.Int32Value; @@ -75,8 +79,10 @@ import java.util.ArrayList; import java.util.Collections; +import java.util.HashSet; +import java.util.LinkedList; import java.util.List; -import java.util.NoSuchElementException; +import java.util.Set; /** * Tests for {@link V1Beta3}. @@ -87,36 +93,36 @@ public class V1Beta3Test { private static final String NAMESPACE = "testNamespace"; private static final String KIND = "testKind"; private static final Query QUERY; + private static final V1Beta3Options v1Beta3Options; static { Query.Builder q = Query.newBuilder(); q.addKindBuilder().setName(KIND); QUERY = q.build(); + v1Beta3Options = V1Beta3Options.from(PROJECT_ID, QUERY, NAMESPACE); } private V1Beta3.Read initialRead; @Mock Datastore mockDatastore; + @Mock + QuerySplitter mockQuerySplitter; + @Mock + V1Beta3DatastoreFactory mockDatastoreFactory; @Rule public final ExpectedException thrown = ExpectedException.none(); - @Rule public final ExpectedLogs logged = ExpectedLogs.none(DatastoreSource.class); - @Before public void setUp() { MockitoAnnotations.initMocks(this); initialRead = DatastoreIO.v1beta3().read() .withProjectId(PROJECT_ID).withQuery(QUERY).withNamespace(NAMESPACE); - } - /** - * Helper function to create a test {@code DataflowPipelineOptions}. - */ - static final GcpOptions testPipelineOptions() { - GcpOptions options = PipelineOptionsFactory.as(GcpOptions.class); - options.setGcpCredential(new TestCredential()); - return options; + when(mockDatastoreFactory.getDatastore(any(PipelineOptions.class), any(String.class))) + .thenReturn(mockDatastore); + when(mockDatastoreFactory.getQuerySplitter()) + .thenReturn(mockQuerySplitter); } @Test @@ -229,182 +235,6 @@ public void testWriteDisplayData() { assertThat(displayData, hasDisplayItem("projectId", PROJECT_ID)); } - @Test - public void testQuerySplitBasic() throws Exception { - KindExpression mykind = KindExpression.newBuilder().setName("mykind").build(); - Query query = Query.newBuilder().addKind(mykind).build(); - - List mockSplits = new ArrayList<>(); - for (int i = 0; i < 8; ++i) { - mockSplits.add( - Query.newBuilder() - .addKind(mykind) - .setFilter( - DatastoreHelper.makeFilter("foo", PropertyFilter.Operator.EQUAL, - Value.newBuilder().setIntegerValue(i).build())) - .build()); - } - - QuerySplitter splitter = mock(QuerySplitter.class); - /* No namespace */ - PartitionId partition = PartitionId.newBuilder().build(); - when(splitter.getSplits(any(Query.class), eq(partition), eq(8), any(Datastore.class))) - .thenReturn(mockSplits); - - DatastoreSource io = initialRead - .withNamespace(null) - .withQuery(query) - .getSource() - .withMockSplitter(splitter) - .withMockEstimateSizeBytes(8 * 1024L); - - List bundles = io.splitIntoBundles(1024, testPipelineOptions()); - assertEquals(8, bundles.size()); - for (int i = 0; i < 8; ++i) { - DatastoreSource bundle = bundles.get(i); - Query bundleQuery = bundle.getQuery(); - assertEquals("mykind", bundleQuery.getKind(0).getName()); - assertEquals(i, bundleQuery.getFilter().getPropertyFilter().getValue().getIntegerValue()); - } - } - - /** - * Verifies that when namespace is set in the source, the split request includes the namespace. - */ - @Test - public void testSourceWithNamespace() throws Exception { - QuerySplitter splitter = mock(QuerySplitter.class); - DatastoreSource io = initialRead - .getSource() - .withMockSplitter(splitter) - .withMockEstimateSizeBytes(8 * 1024L); - - io.splitIntoBundles(1024, testPipelineOptions()); - - PartitionId partition = PartitionId.newBuilder().setNamespaceId(NAMESPACE).build(); - verify(splitter).getSplits(eq(QUERY), eq(partition), eq(8), any(Datastore.class)); - verifyNoMoreInteractions(splitter); - } - - @Test - public void testQuerySplitWithZeroSize() throws Exception { - KindExpression mykind = KindExpression.newBuilder().setName("mykind").build(); - Query query = Query.newBuilder().addKind(mykind).build(); - - List mockSplits = Lists.newArrayList( - Query.newBuilder() - .addKind(mykind) - .build()); - - QuerySplitter splitter = mock(QuerySplitter.class); - when(splitter.getSplits(any(Query.class), any(PartitionId.class), eq(1), any(Datastore.class))) - .thenReturn(mockSplits); - - DatastoreSource io = initialRead - .withQuery(query) - .getSource() - .withMockSplitter(splitter) - .withMockEstimateSizeBytes(0L); - - List bundles = io.splitIntoBundles(1024, testPipelineOptions()); - assertEquals(1, bundles.size()); - verify(splitter, never()) - .getSplits(any(Query.class), any(PartitionId.class), eq(1), any(Datastore.class)); - DatastoreSource bundle = bundles.get(0); - Query bundleQuery = bundle.getQuery(); - assertEquals("mykind", bundleQuery.getKind(0).getName()); - assertFalse(bundleQuery.hasFilter()); - } - - /** - * Tests that a query with a user-provided limit field does not split, and does not even - * interact with a query splitter. - */ - @Test - public void testQueryDoesNotSplitWithLimitSet() throws Exception { - // Minimal query with a limit - Query query = Query.newBuilder().setLimit(Int32Value.newBuilder().setValue(5)).build(); - - // Mock query splitter, should not be invoked. - QuerySplitter splitter = mock(QuerySplitter.class); - when(splitter.getSplits(any(Query.class), any(PartitionId.class), eq(2), any(Datastore.class))) - .thenThrow(new AssertionError("Splitter should not be invoked")); - - List bundles = - initialRead - .withQuery(query) - .getSource() - .withMockSplitter(splitter) - .splitIntoBundles(1024, testPipelineOptions()); - - assertEquals(1, bundles.size()); - assertEquals(query, bundles.get(0).getQuery()); - verifyNoMoreInteractions(splitter); - } - - /** - * Tests that when {@link QuerySplitter} cannot split a query, {@link V1Beta3} falls back to - * a single split. - */ - @Test - public void testQuerySplitterThrows() throws Exception { - // Mock query splitter that throws IllegalArgumentException - IllegalArgumentException exception = - new IllegalArgumentException("query not supported by splitter"); - QuerySplitter splitter = mock(QuerySplitter.class); - when( - splitter.getSplits( - any(Query.class), any(PartitionId.class), any(Integer.class), any(Datastore.class))) - .thenThrow(exception); - - Query query = Query.newBuilder().addKind(KindExpression.newBuilder().setName("myKind")).build(); - List bundles = - initialRead - .withQuery(query) - .getSource() - .withMockSplitter(splitter) - .withMockEstimateSizeBytes(10240L) - .splitIntoBundles(1024, testPipelineOptions()); - - assertEquals(1, bundles.size()); - assertEquals(query, bundles.get(0).getQuery()); - verify(splitter, times(1)) - .getSplits( - any(Query.class), any(PartitionId.class), any(Integer.class), any(Datastore.class)); - logged.verifyWarn("Unable to parallelize the given query", exception); - } - - @Test - public void testQuerySplitSizeUnavailable() throws Exception { - KindExpression mykind = KindExpression.newBuilder().setName("mykind").build(); - Query query = Query.newBuilder().addKind(mykind).build(); - - List mockSplits = Lists.newArrayList(Query.newBuilder().addKind(mykind).build()); - - QuerySplitter splitter = mock(QuerySplitter.class); - when(splitter.getSplits(any(Query.class), any(PartitionId.class), eq(12), any(Datastore.class))) - .thenReturn(mockSplits); - - DatastoreSource io = initialRead - .withQuery(query) - .getSource() - .withMockSplitter(splitter) - .withMockEstimateSizeBytes(8 * 1024L); - - DatastoreSource spiedIo = spy(io); - when(spiedIo.getEstimatedSizeBytes(any(PipelineOptions.class))) - .thenThrow(new NoSuchElementException()); - - List bundles = spiedIo.splitIntoBundles(1024, testPipelineOptions()); - assertEquals(1, bundles.size()); - verify(splitter, never()) - .getSplits(any(Query.class), any(PartitionId.class), eq(1), any(Datastore.class)); - DatastoreSource bundle = bundles.get(0); - Query bundleQuery = bundle.getQuery(); - assertEquals("mykind", bundleQuery.getKind(0).getName()); - assertFalse(bundleQuery.hasFilter()); - } - /** * Test building a Write using builder methods. */ @@ -489,22 +319,146 @@ public void testAddingEntities() throws Exception { assertThat(writer.entities, containsInAnyOrder(expected.toArray())); } - /** Datastore batch API limit in number of records per query. */ - private static final int DATASTORE_QUERY_BATCH_LIMIT = 500; + /** + * Tests {@link V1Beta3.Read#getEstimatedSizeBytes} to fetch and return estimated size for a + * query. + */ + @Test + public void testEstimatedSizeBytes() throws Exception { + long entityBytes = 100L; + // Per Kind statistics request and response + RunQueryRequest statRequest = makeRequest(makeStatKindQuery(NAMESPACE), NAMESPACE); + RunQueryResponse statResponse = makeStatKindResponse(entityBytes); + + when(mockDatastore.runQuery(statRequest)) + .thenReturn(statResponse); + + assertEquals(entityBytes, getEstimatedSizeBytes(mockDatastore, QUERY, NAMESPACE)); + verify(mockDatastore, times(1)).runQuery(statRequest); + } + + /** + * Tests {@link SplitQueryFn} when number of query splits is specified. + */ + @Test + public void testSplitQueryFnWithNumSplits() throws Exception { + int numSplits = 100; + when(mockQuerySplitter.getSplits( + eq(QUERY), any(PartitionId.class), eq(numSplits), any(Datastore.class))) + .thenReturn(splitQuery(QUERY, numSplits)); + + SplitQueryFn splitQueryFn = new SplitQueryFn(v1Beta3Options, numSplits, mockDatastoreFactory); + DoFnTester> doFnTester = DoFnTester.of(splitQueryFn); + /** + * Although Datastore client is marked transient in {@link SplitQueryFn}, when injected through + * mock factory using a when clause for unit testing purposes, it is not serializable + * because it doesn't have a no-arg constructor. Thus disabling the cloning to prevent the + * doFn from being serialized. + */ + doFnTester.setCloningBehavior(CloningBehavior.DO_NOT_CLONE); + List> queries = doFnTester.processBatch(QUERY); + + assertEquals(queries.size(), numSplits); + verifyUniqueKeys(queries); + verify(mockQuerySplitter, times(1)).getSplits( + eq(QUERY), any(PartitionId.class), eq(numSplits), any(Datastore.class)); + verifyZeroInteractions(mockDatastore); + } + + /** + * Tests {@link SplitQueryFn} when no query splits is specified. + */ + @Test + public void testSplitQueryFnWithoutNumSplits() throws Exception { + // Force SplitQueryFn to compute the number of query splits + int numSplits = 0; + int expectedNumSplits = 20; + long entityBytes = expectedNumSplits * DEFAULT_BUNDLE_SIZE_BYTES; + + // Per Kind statistics request and response + RunQueryRequest statRequest = makeRequest(makeStatKindQuery(NAMESPACE), NAMESPACE); + RunQueryResponse statResponse = makeStatKindResponse(entityBytes); + + when(mockDatastore.runQuery(statRequest)) + .thenReturn(statResponse); + when(mockQuerySplitter.getSplits( + eq(QUERY), any(PartitionId.class), eq(expectedNumSplits), any(Datastore.class))) + .thenReturn(splitQuery(QUERY, expectedNumSplits)); + + SplitQueryFn splitQueryFn = new SplitQueryFn(v1Beta3Options, numSplits, mockDatastoreFactory); + DoFnTester> doFnTester = DoFnTester.of(splitQueryFn); + doFnTester.setCloningBehavior(CloningBehavior.DO_NOT_CLONE); + List> queries = doFnTester.processBatch(QUERY); + + assertEquals(queries.size(), expectedNumSplits); + verifyUniqueKeys(queries); + verify(mockQuerySplitter, times(1)).getSplits( + eq(QUERY), any(PartitionId.class), eq(expectedNumSplits), any(Datastore.class)); + verify(mockDatastore, times(1)).runQuery(statRequest); + } + + /** + * Tests {@link V1Beta3.Read.SplitQueryFn} when the query has a user specified limit. + */ + @Test + public void testSplitQueryFnWithQueryLimit() throws Exception { + Query queryWithLimit = QUERY.toBuilder().clone() + .setLimit(Int32Value.newBuilder().setValue(1)) + .build(); + + SplitQueryFn splitQueryFn = new SplitQueryFn(v1Beta3Options, 10, mockDatastoreFactory); + DoFnTester> doFnTester = DoFnTester.of(splitQueryFn); + doFnTester.setCloningBehavior(CloningBehavior.DO_NOT_CLONE); + List> queries = doFnTester.processBatch(queryWithLimit); + + assertEquals(queries.size(), 1); + verifyUniqueKeys(queries); + verifyNoMoreInteractions(mockDatastore); + verifyNoMoreInteractions(mockQuerySplitter); + } + + /** Tests {@link ReadFn} with a query limit less than one batch. */ + @Test + public void testReadFnWithOneBatch() throws Exception { + readFnTest(5); + } + + /** Tests {@link ReadFn} with a query limit more than one batch, and not a multiple. */ + @Test + public void testReadFnWithMultipleBatches() throws Exception { + readFnTest(QUERY_BATCH_LIMIT + 5); + } + + /** Tests {@link ReadFn} for several batches, using an exact multiple of batch size results. */ + @Test + public void testReadFnWithBatchesExactMultiple() throws Exception { + readFnTest(5 * QUERY_BATCH_LIMIT); + } + + /** Helper Methods */ + + /** A helper function that verifies if all the queries have unique keys. */ + private void verifyUniqueKeys(List> queries) { + Set keys = new HashSet<>(); + for (KV kv: queries) { + keys.add(kv.getKey()); + } + assertEquals(keys.size(), queries.size()); + } /** * A helper function that creates mock {@link Entity} results in response to a query. Always * indicates that more results are available, unless the batch is limited to fewer than - * {@link #DATASTORE_QUERY_BATCH_LIMIT} results. + * {@link V1Beta3.Read#QUERY_BATCH_LIMIT} results. */ private static RunQueryResponse mockResponseForQuery(Query q) { // Every query V1Beta3 sends should have a limit. assertTrue(q.hasLimit()); - // The limit should be in the range [1, DATASTORE_QUERY_BATCH_LIMIT] + // The limit should be in the range [1, QUERY_BATCH_LIMIT] int limit = q.getLimit().getValue(); assertThat(limit, greaterThanOrEqualTo(1)); - assertThat(limit, lessThanOrEqualTo(DATASTORE_QUERY_BATCH_LIMIT)); + assertThat(limit, lessThanOrEqualTo(QUERY_BATCH_LIMIT)); // Create the requested number of entities. List entities = new ArrayList<>(limit); @@ -521,62 +475,80 @@ private static RunQueryResponse mockResponseForQuery(Query q) { .addAllEntityResults(entities) .setEntityResultType(EntityResult.ResultType.FULL) .setMoreResults( - limit == DATASTORE_QUERY_BATCH_LIMIT + limit == QUERY_BATCH_LIMIT ? QueryResultBatch.MoreResultsType.NOT_FINISHED : QueryResultBatch.MoreResultsType.NO_MORE_RESULTS); return ret.build(); } - /** Helper function to run a test reading from a limited-result query. */ - private void runQueryLimitReadTest(int numEntities) throws Exception { + /** Helper function to run a test reading from a {@link ReadFn}. */ + private void readFnTest(int numEntities) throws Exception { // An empty query to read entities. Query query = Query.newBuilder().setLimit( Int32Value.newBuilder().setValue(numEntities)).build(); - V1Beta3.Read read = DatastoreIO.v1beta3().read().withQuery(query).withProjectId("mockProject"); // Use mockResponseForQuery to generate results. when(mockDatastore.runQuery(any(RunQueryRequest.class))) - .thenAnswer( - new Answer() { - @Override - public RunQueryResponse answer(InvocationOnMock invocation) throws Throwable { - Query q = ((RunQueryRequest) invocation.getArguments()[0]).getQuery(); - return mockResponseForQuery(q); - } - }); - - // Actually instantiate the reader. - DatastoreReader reader = new DatastoreReader(read.getSource(), mockDatastore); - - // Simply count the number of results returned by the reader. - assertTrue(reader.start()); - int resultCount = 1; - while (reader.advance()) { - resultCount++; - } - reader.close(); - + .thenAnswer(new Answer() { + @Override + public RunQueryResponse answer(InvocationOnMock invocationOnMock) throws Throwable { + Query q = ((RunQueryRequest) invocationOnMock.getArguments()[0]).getQuery(); + return mockResponseForQuery(q); + } + }); + + ReadFn readFn = new ReadFn(v1Beta3Options, mockDatastoreFactory); + DoFnTester doFnTester = DoFnTester.of(readFn); + /** + * Although Datastore client is marked transient in {@link ReadFn}, when injected through + * mock factory using a when clause for unit testing purposes, it is not serializable + * because it doesn't have a no-arg constructor. Thus disabling the cloning to prevent the + * test object from being serialized. + */ + doFnTester.setCloningBehavior(CloningBehavior.DO_NOT_CLONE); + List entities = doFnTester.processBatch(query); + + int expectedNumCallsToRunQuery = (int) Math.ceil((double) numEntities / QUERY_BATCH_LIMIT); + verify(mockDatastore, times(expectedNumCallsToRunQuery)).runQuery(any(RunQueryRequest.class)); // Validate the number of results. - assertEquals(numEntities, resultCount); + assertEquals(numEntities, entities.size()); } - /** Tests reading with a query limit less than one batch. */ - @Test - public void testReadingWithLimitOneBatch() throws Exception { - runQueryLimitReadTest(5); + /** Builds a per-kind statistics response with the given entity size. */ + private static RunQueryResponse makeStatKindResponse(long entitySizeInBytes) { + RunQueryResponse.Builder timestampResponse = RunQueryResponse.newBuilder(); + Entity.Builder entity = Entity.newBuilder(); + entity.setKey(makeKey("dummyKind", "dummyId")); + entity.getMutableProperties().put("entity_bytes", makeValue(entitySizeInBytes).build()); + EntityResult.Builder entityResult = EntityResult.newBuilder(); + entityResult.setEntity(entity); + QueryResultBatch.Builder batch = QueryResultBatch.newBuilder(); + batch.addEntityResults(entityResult); + timestampResponse.setBatch(batch); + return timestampResponse.build(); } - /** Tests reading with a query limit more than one batch, and not a multiple. */ - @Test - public void testReadingWithLimitMultipleBatches() throws Exception { - runQueryLimitReadTest(DATASTORE_QUERY_BATCH_LIMIT + 5); + /** Builds a per-kind statistics query for the given timestamp and namespace. */ + private static Query makeStatKindQuery(String namespace) { + Query.Builder statQuery = Query.newBuilder(); + if (namespace == null) { + statQuery.addKindBuilder().setName("__Stat_Kind__"); + } else { + statQuery.addKindBuilder().setName("__Ns_Stat_Kind__"); + } + statQuery.setFilter(makeFilter("kind_name", EQUAL, makeValue(KIND)).build()); + statQuery.addOrder(makeOrder("timestamp", DESCENDING)); + statQuery.setLimit(Int32Value.newBuilder().setValue(1)); + return statQuery.build(); } - /** Tests reading several batches, using an exact multiple of batch size results. */ - @Test - public void testReadingWithLimitMultipleBatchesExactMultiple() throws Exception { - runQueryLimitReadTest(5 * DATASTORE_QUERY_BATCH_LIMIT); + /** Generate dummy query splits. */ + private List splitQuery(Query query, int numSplits) { + List queries = new LinkedList<>(); + for (int i = 0; i < numSplits; i++) { + queries.add(query.toBuilder().clone().build()); + } + return queries; } } -