Skip to content

Commit

Permalink
Defer loading hive partition if number of partitions crosses limit
Browse files Browse the repository at this point in the history
We defer the initial loading of HivePartitionInformation if the number of
partitions crosses a limit. This allows further invocation of
applyFilter which could reduce the number of partitions to be scanned.
  • Loading branch information
Praveen2112 committed Mar 3, 2022
1 parent 628b146 commit e07c7c3
Show file tree
Hide file tree
Showing 6 changed files with 163 additions and 51 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -288,6 +288,7 @@
import static io.trino.spi.StandardErrorCode.INVALID_TABLE_PROPERTY;
import static io.trino.spi.StandardErrorCode.NOT_SUPPORTED;
import static io.trino.spi.StandardErrorCode.TABLE_NOT_FOUND;
import static io.trino.spi.connector.Constraint.alwaysTrue;
import static io.trino.spi.connector.RetryMode.NO_RETRIES;
import static io.trino.spi.predicate.TupleDomain.withColumnDomains;
import static io.trino.spi.statistics.TableStatisticType.ROW_COUNT;
Expand Down Expand Up @@ -485,7 +486,7 @@ public ConnectorTableHandle getTableHandleForStatisticsCollection(ConnectorSessi

handle = handle.withAnalyzePartitionValues(list);
HivePartitionResult partitions = partitionManager.getPartitions(handle, list);
handle = partitionManager.applyPartitionResult(handle, partitions, Optional.empty());
handle = partitionManager.applyPartitionResult(handle, partitions, alwaysTrue());
}

if (analyzeColumnNames.isPresent()) {
Expand Down Expand Up @@ -765,8 +766,14 @@ public TableStatistics getTableStatistics(ConnectorSession session, ConnectorTab
Map<String, Type> columnTypes = columns.entrySet().stream()
.collect(toImmutableMap(Map.Entry::getKey, entry -> getColumnMetadata(session, tableHandle, entry.getValue()).getType()));
HivePartitionResult partitionResult = partitionManager.getPartitions(metastore, tableHandle, constraint);
List<HivePartition> partitions = partitionManager.getPartitionsAsList(partitionResult);
return hiveStatisticsProvider.getTableStatistics(session, ((HiveTableHandle) tableHandle).getSchemaTableName(), columns, columnTypes, partitions);
// If partitions are not loaded, then don't generate table statistics.
// Note that the computation is not persisted in the table handle, so can be redone many times
// TODO: https://github.com/trinodb/trino/issues/10980.
if (partitionManager.canPartitionsBeLoaded(partitionResult)) {
List<HivePartition> partitions = partitionManager.getPartitionsAsList(partitionResult);
return hiveStatisticsProvider.getTableStatistics(session, ((HiveTableHandle) tableHandle).getSchemaTableName(), columns, columnTypes, partitions);
}
return TableStatistics.empty();
}

private List<SchemaTableName> listTables(ConnectorSession session, SchemaTablePrefix prefix)
Expand Down Expand Up @@ -2540,17 +2547,40 @@ public ConnectorTableProperties getTableProperties(ConnectorSession session, Con
HiveTableHandle hiveTable = (HiveTableHandle) table;

List<ColumnHandle> partitionColumns = ImmutableList.copyOf(hiveTable.getPartitionColumns());
List<HivePartition> partitions = partitionManager.getOrLoadPartitions(metastore, hiveTable);

TupleDomain<ColumnHandle> predicate = createPredicate(partitionColumns, partitions);

TupleDomain<ColumnHandle> predicate = TupleDomain.all();
Optional<DiscretePredicates> discretePredicates = Optional.empty();
if (!partitionColumns.isEmpty()) {
// Do not create tuple domains for every partition at the same time!
// There can be a huge number of partitions so use an iterable so
// all domains do not need to be in memory at the same time.
Iterable<TupleDomain<ColumnHandle>> partitionDomains = Iterables.transform(partitions, hivePartition -> TupleDomain.fromFixedValues(hivePartition.getKeys()));
discretePredicates = Optional.of(new DiscretePredicates(partitionColumns, partitionDomains));

// If only partition names are loaded, then the predicates are partially enforced.
// So computation of predicate and discretePredicates are not valid.
if (hiveTable.getPartitionNames().isEmpty()) {
Optional<List<HivePartition>> partitions = hiveTable.getPartitions()
// If the partitions are not loaded, try out if they can be loaded.
.or(() -> {
// We load the partitions to compute the predicates enforced by the table.
// Note that the computation is not persisted in the table handle, so can be redone many times
// TODO: https://github.com/trinodb/trino/issues/10980.
HivePartitionResult partitionResult = partitionManager.getPartitions(metastore, table, new Constraint(hiveTable.getEnforcedConstraint()));
if (partitionManager.canPartitionsBeLoaded(partitionResult)) {
return Optional.of(partitionManager.getPartitionsAsList(partitionResult));
}
return Optional.empty();
});

if (partitions.isPresent()) {
List<HivePartition> hivePartitions = partitions.orElseThrow();
// Since the partitions are fully loaded now, we need to compute
predicate = createPredicate(partitionColumns, hivePartitions);

// Un-partitioned tables can have a partition with ID - UNPARTITIONED,
// this check allows us to ensure that table is partitioned
if (!partitionColumns.isEmpty()) {
// Do not create tuple domains for every partition at the same time!
// There can be a huge number of partitions so use an iterable so
// all domains do not need to be in memory at the same time.
Iterable<TupleDomain<ColumnHandle>> partitionDomains = Iterables.transform(hivePartitions, hivePartition -> TupleDomain.fromFixedValues(hivePartition.getKeys()));
discretePredicates = Optional.of(new DiscretePredicates(partitionColumns, partitionDomains));
}
}
}

Optional<ConnectorTablePartitioning> tablePartitioning = Optional.empty();
Expand Down Expand Up @@ -2599,16 +2629,23 @@ public Optional<ConstraintApplicationResult<ConnectorTableHandle>> applyFilter(C
checkArgument(handle.getAnalyzePartitionValues().isEmpty() || constraint.getSummary().isAll(), "Analyze should not have a constraint");

HivePartitionResult partitionResult = partitionManager.getPartitions(metastore, handle, constraint);
HiveTableHandle newHandle = partitionManager.applyPartitionResult(handle, partitionResult, constraint.getPredicateColumns());
HiveTableHandle newHandle = partitionManager.applyPartitionResult(handle, partitionResult, constraint);

if (handle.getPartitions().equals(newHandle.getPartitions()) &&
handle.getPartitionNames().equals(newHandle.getPartitionNames()) &&
handle.getCompactEffectivePredicate().equals(newHandle.getCompactEffectivePredicate()) &&
handle.getBucketFilter().equals(newHandle.getBucketFilter()) &&
handle.getConstraintColumns().equals(newHandle.getConstraintColumns())) {
return Optional.empty();
}

return Optional.of(new ConstraintApplicationResult<>(newHandle, partitionResult.getUnenforcedConstraint(), false));
TupleDomain<ColumnHandle> unenforcedConstraint = partitionResult.getEffectivePredicate();
if (newHandle.getPartitions().isPresent()) {
List<HiveColumnHandle> partitionColumns = partitionResult.getPartitionColumns();
unenforcedConstraint = partitionResult.getEffectivePredicate().filter((column, domain) -> !partitionColumns.contains(column));
}

return Optional.of(new ConstraintApplicationResult<>(newHandle, unenforcedConstraint, false));
}

@Override
Expand Down Expand Up @@ -2842,6 +2879,7 @@ public ConnectorTableHandle makeCompatiblePartitioning(ConnectorSession session,
hiveTable.getTableParameters(),
hiveTable.getPartitionColumns(),
hiveTable.getDataColumns(),
hiveTable.getPartitionNames(),
hiveTable.getPartitions(),
hiveTable.getCompactEffectivePredicate(),
hiveTable.getEnforcedConstraint(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.function.Predicate;

import static com.google.common.base.Preconditions.checkArgument;
Expand Down Expand Up @@ -86,7 +85,7 @@ public HivePartitionResult getPartitions(SemiTransactionalHiveMetastore metastor
List<HiveColumnHandle> partitionColumns = hiveTableHandle.getPartitionColumns();

if (effectivePredicate.isNone()) {
return new HivePartitionResult(partitionColumns, ImmutableList.of(), none(), none(), none(), hiveBucketHandle, Optional.empty());
return new HivePartitionResult(partitionColumns, Optional.empty(), ImmutableList.of(), none(), none(), hiveBucketHandle, Optional.empty());
}

Optional<HiveBucketFilter> bucketFilter = getHiveBucketFilter(hiveTableHandle, effectivePredicate);
Expand All @@ -97,10 +96,10 @@ public HivePartitionResult getPartitions(SemiTransactionalHiveMetastore metastor
if (partitionColumns.isEmpty()) {
return new HivePartitionResult(
partitionColumns,
Optional.empty(),
ImmutableList.of(new HivePartition(tableName)),
compactEffectivePredicate,
effectivePredicate,
TupleDomain.all(),
compactEffectivePredicate,
hiveBucketHandle,
bucketFilter);
}
Expand All @@ -109,6 +108,7 @@ public HivePartitionResult getPartitions(SemiTransactionalHiveMetastore metastor
.map(HiveColumnHandle::getType)
.collect(toList());

Optional<List<String>> partitionNames = Optional.empty();
Iterable<HivePartition> partitionsIterable;
Predicate<Map<ColumnHandle, NullableValue>> predicate = constraint.predicate().orElse(value -> true);
if (hiveTableHandle.getPartitions().isPresent()) {
Expand All @@ -117,19 +117,18 @@ public HivePartitionResult getPartitions(SemiTransactionalHiveMetastore metastor
.collect(toImmutableList());
}
else {
List<String> partitionNames = getFilteredPartitionNames(metastore, tableName, partitionColumns, compactEffectivePredicate);
partitionsIterable = () -> partitionNames.stream()
List<String> partitionNamesList = hiveTableHandle.getPartitionNames()
.orElseGet(() -> getFilteredPartitionNames(metastore, tableName, partitionColumns, compactEffectivePredicate));
partitionsIterable = () -> partitionNamesList.stream()
// Apply extra filters which could not be done by getFilteredPartitionNames
.map(partitionName -> parseValuesAndFilterPartition(tableName, partitionName, partitionColumns, partitionTypes, effectivePredicate, predicate))
.filter(Optional::isPresent)
.map(Optional::get)
.iterator();
partitionNames = Optional.of(partitionNamesList);
}

// All partition key domains will be fully evaluated, so we don't need to include those
TupleDomain<ColumnHandle> remainingTupleDomain = effectivePredicate.filter((column, domain) -> !partitionColumns.contains(column));
TupleDomain<ColumnHandle> enforcedTupleDomain = effectivePredicate.filter((column, domain) -> partitionColumns.contains(column));
return new HivePartitionResult(partitionColumns, partitionsIterable, compactEffectivePredicate, remainingTupleDomain, enforcedTupleDomain, hiveBucketHandle, bucketFilter);
return new HivePartitionResult(partitionColumns, partitionNames, partitionsIterable, effectivePredicate, compactEffectivePredicate, hiveBucketHandle, bucketFilter);
}

public HivePartitionResult getPartitions(ConnectorTableHandle tableHandle, List<List<String>> partitionValuesList)
Expand All @@ -153,7 +152,7 @@ public HivePartitionResult getPartitions(ConnectorTableHandle tableHandle, List<
.map(partition -> partition.orElseThrow(() -> new VerifyException("partition must exist")))
.collect(toImmutableList());

return new HivePartitionResult(partitionColumns, partitionList, TupleDomain.all(), TupleDomain.all(), TupleDomain.all(), bucketHandle, Optional.empty());
return new HivePartitionResult(partitionColumns, Optional.empty(), partitionList, TupleDomain.all(), TupleDomain.all(), bucketHandle, Optional.empty());
}

public List<HivePartition> getPartitionsAsList(HivePartitionResult partitionResult)
Expand All @@ -175,22 +174,38 @@ public List<HivePartition> getPartitionsAsList(HivePartitionResult partitionResu
return partitionList.build();
}

public HiveTableHandle applyPartitionResult(HiveTableHandle handle, HivePartitionResult partitions, Optional<Set<ColumnHandle>> columns)
public HiveTableHandle applyPartitionResult(HiveTableHandle handle, HivePartitionResult partitions, Constraint constraint)
{
Optional<List<String>> partitionNames = partitions.getPartitionNames();
Optional<List<HivePartition>> partitionList = Optional.empty();
TupleDomain<ColumnHandle> enforcedConstraint = handle.getEnforcedConstraint();

// Partitions will be loaded if
// 1. Number of partitionNames is less than or equal to threshold value. Thereby generating additional filter criteria
// that can be applied on other join side (if the join is based on partition column),
// 2. If additional predicate is passed as a part of Constraint. (specified via loadPartition). This delays the partition checks
// until we have additional filtering based on Constraint
if (canPartitionsBeLoaded(partitions) || constraint.predicate().isPresent()) {
partitionNames = Optional.empty();
partitionList = Optional.of(getPartitionsAsList(partitions));
List<HiveColumnHandle> partitionColumns = partitions.getPartitionColumns();
enforcedConstraint = partitions.getEffectivePredicate().filter((column, domain) -> partitionColumns.contains(column));
}
return new HiveTableHandle(
handle.getSchemaName(),
handle.getTableName(),
handle.getTableParameters(),
ImmutableList.copyOf(partitions.getPartitionColumns()),
handle.getDataColumns(),
Optional.of(getPartitionsAsList(partitions)),
partitionNames,
partitionList,
partitions.getCompactEffectivePredicate(),
partitions.getEnforcedConstraint(),
enforcedConstraint,
partitions.getBucketHandle(),
partitions.getBucketFilter(),
handle.getAnalyzePartitionValues(),
handle.getAnalyzeColumnNames(),
Sets.union(handle.getConstraintColumns(), columns.orElseGet(ImmutableSet::of)),
Sets.union(handle.getConstraintColumns(), constraint.getPredicateColumns().orElseGet(ImmutableSet::of)),
handle.getProjectedColumns(),
handle.getTransaction(),
handle.isRecordScannedFiles(),
Expand All @@ -199,8 +214,21 @@ public HiveTableHandle applyPartitionResult(HiveTableHandle handle, HivePartitio

public List<HivePartition> getOrLoadPartitions(SemiTransactionalHiveMetastore metastore, HiveTableHandle table)
{
// In case of partitions not being loaded, their permissible values are specified in `HiveTableHandle#getCompactEffectivePredicate,
// so we do an intersection of getCompactEffectivePredicate and HiveTable's enforced constraint
TupleDomain<ColumnHandle> summary = table.getEnforcedConstraint().intersect(
table.getCompactEffectivePredicate()
.transformKeys(ColumnHandle.class::cast));
return table.getPartitions().orElseGet(() ->
getPartitionsAsList(getPartitions(metastore, table, new Constraint(table.getEnforcedConstraint()))));
getPartitionsAsList(getPartitions(metastore, table, new Constraint(summary))));
}

public boolean canPartitionsBeLoaded(HivePartitionResult partitionResult)
{
if (partitionResult.getPartitionNames().isPresent()) {
return partitionResult.getPartitionNames().orElseThrow().size() <= maxPartitions;
}
return true;
}

private Optional<HivePartition> parseValuesAndFilterPartition(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
*/
package io.trino.plugin.hive;

import com.google.common.collect.ImmutableList;
import io.trino.plugin.hive.util.HiveBucketing.HiveBucketFilter;
import io.trino.spi.connector.ColumnHandle;
import io.trino.spi.predicate.TupleDomain;
Expand All @@ -35,26 +36,26 @@ public class HivePartitionResult
{
private final List<HiveColumnHandle> partitionColumns;
private final Iterable<HivePartition> partitions;
private final TupleDomain<ColumnHandle> effectivePredicate;
private final TupleDomain<HiveColumnHandle> compactEffectivePredicate;
private final TupleDomain<ColumnHandle> unenforcedConstraint;
private final TupleDomain<ColumnHandle> enforcedConstraint;
private final Optional<HiveBucketHandle> bucketHandle;
private final Optional<HiveBucketFilter> bucketFilter;
private final Optional<List<String>> partitionNames;

public HivePartitionResult(
List<HiveColumnHandle> partitionColumns,
Optional<List<String>> partitionNames,
Iterable<HivePartition> partitions,
TupleDomain<ColumnHandle> effectivePredicate,
TupleDomain<HiveColumnHandle> compactEffectivePredicate,
TupleDomain<ColumnHandle> unenforcedConstraint,
TupleDomain<ColumnHandle> enforcedConstraint,
Optional<HiveBucketHandle> bucketHandle,
Optional<HiveBucketFilter> bucketFilter)
{
this.partitionColumns = requireNonNull(partitionColumns, "partitionColumns is null");
this.partitionNames = requireNonNull(partitionNames, "partitionNames is null").map(ImmutableList::copyOf);
this.partitions = requireNonNull(partitions, "partitions is null");
this.effectivePredicate = requireNonNull(effectivePredicate, "effectivePredicate is null");
this.compactEffectivePredicate = requireNonNull(compactEffectivePredicate, "compactEffectivePredicate is null");
this.unenforcedConstraint = requireNonNull(unenforcedConstraint, "unenforcedConstraint is null");
this.enforcedConstraint = requireNonNull(enforcedConstraint, "enforcedConstraint is null");
this.bucketHandle = requireNonNull(bucketHandle, "bucketHandle is null");
this.bucketFilter = requireNonNull(bucketFilter, "bucketFilter is null");
}
Expand All @@ -64,24 +65,24 @@ public List<HiveColumnHandle> getPartitionColumns()
return partitionColumns;
}

public Iterator<HivePartition> getPartitions()
public Optional<List<String>> getPartitionNames()
{
return partitions.iterator();
return partitionNames;
}

public TupleDomain<HiveColumnHandle> getCompactEffectivePredicate()
public Iterator<HivePartition> getPartitions()
{
return compactEffectivePredicate;
return partitions.iterator();
}

public TupleDomain<ColumnHandle> getUnenforcedConstraint()
public TupleDomain<ColumnHandle> getEffectivePredicate()
{
return unenforcedConstraint;
return effectivePredicate;
}

public TupleDomain<ColumnHandle> getEnforcedConstraint()
public TupleDomain<HiveColumnHandle> getCompactEffectivePredicate()
{
return enforcedConstraint;
return compactEffectivePredicate;
}

public Optional<HiveBucketHandle> getBucketHandle()
Expand Down
Loading

0 comments on commit e07c7c3

Please sign in to comment.