Skip to content

Commit

Permalink
Make ConnectorBucketNodeMap a top level class
Browse files Browse the repository at this point in the history
  • Loading branch information
electrum committed Dec 18, 2018
1 parent 3239311 commit 24f279a
Show file tree
Hide file tree
Showing 9 changed files with 90 additions and 67 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import com.facebook.presto.spi.ConnectorSplit;
import com.facebook.presto.spi.NodeManager;
import com.facebook.presto.spi.PrestoException;
import com.facebook.presto.spi.connector.ConnectorBucketNodeMap;
import com.facebook.presto.spi.connector.ConnectorNodePartitioningProvider;
import com.facebook.presto.spi.connector.ConnectorPartitioningHandle;
import com.facebook.presto.spi.connector.ConnectorTransactionHandle;
Expand All @@ -27,7 +28,7 @@
import java.util.function.ToIntFunction;

import static com.facebook.presto.spi.StandardErrorCode.NOT_SUPPORTED;
import static com.facebook.presto.spi.connector.ConnectorNodePartitioningProvider.ConnectorBucketNodeMap.createBucketNodeMap;
import static com.facebook.presto.spi.connector.ConnectorBucketNodeMap.createBucketNodeMap;
import static java.util.Objects.requireNonNull;

public class BlackHoleNodePartitioningProvider
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import com.facebook.presto.spi.BucketFunction;
import com.facebook.presto.spi.ConnectorSession;
import com.facebook.presto.spi.ConnectorSplit;
import com.facebook.presto.spi.connector.ConnectorBucketNodeMap;
import com.facebook.presto.spi.connector.ConnectorNodePartitioningProvider;
import com.facebook.presto.spi.connector.ConnectorPartitionHandle;
import com.facebook.presto.spi.connector.ConnectorPartitioningHandle;
Expand All @@ -26,7 +27,7 @@
import java.util.function.ToIntFunction;
import java.util.stream.IntStream;

import static com.facebook.presto.spi.connector.ConnectorNodePartitioningProvider.ConnectorBucketNodeMap.createBucketNodeMap;
import static com.facebook.presto.spi.connector.ConnectorBucketNodeMap.createBucketNodeMap;
import static com.google.common.collect.ImmutableList.toImmutableList;

public class HiveNodePartitioningProvider
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@
import com.facebook.presto.spi.BucketFunction;
import com.facebook.presto.spi.ConnectorSplit;
import com.facebook.presto.spi.Node;
import com.facebook.presto.spi.connector.ConnectorBucketNodeMap;
import com.facebook.presto.spi.connector.ConnectorNodePartitioningProvider;
import com.facebook.presto.spi.connector.ConnectorNodePartitioningProvider.ConnectorBucketNodeMap;
import com.facebook.presto.spi.connector.ConnectorPartitionHandle;
import com.facebook.presto.spi.type.Type;
import com.facebook.presto.split.EmptySplit;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import com.facebook.presto.spi.ConnectorSplit;
import com.facebook.presto.spi.Node;
import com.facebook.presto.spi.PrestoException;
import com.facebook.presto.spi.connector.ConnectorBucketNodeMap;
import com.facebook.presto.spi.connector.ConnectorNodePartitioningProvider;
import com.facebook.presto.spi.connector.ConnectorPartitioningHandle;
import com.facebook.presto.spi.connector.ConnectorTransactionHandle;
Expand All @@ -32,7 +33,7 @@
import java.util.function.ToIntFunction;

import static com.facebook.presto.spi.StandardErrorCode.NO_NODES_AVAILABLE;
import static com.facebook.presto.spi.connector.ConnectorNodePartitioningProvider.ConnectorBucketNodeMap.createBucketNodeMap;
import static com.facebook.presto.spi.connector.ConnectorBucketNodeMap.createBucketNodeMap;
import static com.google.common.collect.Maps.uniqueIndex;
import static java.util.Objects.requireNonNull;

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.spi.connector;

import com.facebook.presto.spi.Node;

import java.util.HashMap;
import java.util.Map;
import java.util.Optional;

import static java.lang.String.format;
import static java.util.Collections.unmodifiableMap;
import static java.util.Objects.requireNonNull;

public final class ConnectorBucketNodeMap
{
private final int bucketCount;
private final Optional<Map<Integer, Node>> bucketToNode;

public static ConnectorBucketNodeMap createBucketNodeMap(int bucketCount)
{
return new ConnectorBucketNodeMap(bucketCount, Optional.empty());
}

public static ConnectorBucketNodeMap createBucketNodeMap(Map<Integer, Node> bucketToNode)
{
requireNonNull(bucketToNode, "bucketToNode is null");
int maxBucket = bucketToNode.keySet().stream()
.mapToInt(Integer::intValue)
.peek(bucket -> {
if (bucket < 0) {
throw new IllegalArgumentException("Bucket number must be positive: " + bucket);
}
})
.max()
.orElseThrow(() -> new IllegalArgumentException("bucketToNode is empty"));
return new ConnectorBucketNodeMap(maxBucket + 1, Optional.of(bucketToNode));
}

private ConnectorBucketNodeMap(int bucketCount, Optional<Map<Integer, Node>> bucketToNode)
{
if (bucketCount <= 0) {
throw new IllegalArgumentException("bucketCount must be positive");
}
if (bucketToNode.isPresent() && bucketToNode.get().size() != bucketCount) {
throw new IllegalArgumentException(format("Mismatched bucket count in bucketToNode (%s) and bucketCount (%s)", bucketToNode.get().size(), bucketCount));
}
this.bucketCount = bucketCount;
this.bucketToNode = requireNonNull(bucketToNode, "bucketToNode is null")
.map(mapping -> unmodifiableMap(new HashMap<>(mapping)));
}

public int getBucketCount()
{
return bucketCount;
}

public boolean hasFixedMapping()
{
return bucketToNode.isPresent();
}

public Map<Integer, Node> getFixedMapping()
{
return bucketToNode.orElseThrow(() -> new IllegalArgumentException("No fixed bucket to node mapping"));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,20 +16,13 @@
import com.facebook.presto.spi.BucketFunction;
import com.facebook.presto.spi.ConnectorSession;
import com.facebook.presto.spi.ConnectorSplit;
import com.facebook.presto.spi.Node;
import com.facebook.presto.spi.type.Type;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.function.ToIntFunction;

import static com.facebook.presto.spi.connector.NotPartitionedPartitionHandle.NOT_PARTITIONED;
import static java.lang.String.format;
import static java.util.Collections.singletonList;
import static java.util.Collections.unmodifiableMap;
import static java.util.Objects.requireNonNull;

public interface ConnectorNodePartitioningProvider
{
Expand Down Expand Up @@ -57,58 +50,4 @@ BucketFunction getBucketFunction(
ConnectorPartitioningHandle partitioningHandle,
List<Type> partitionChannelTypes,
int bucketCount);

final class ConnectorBucketNodeMap
{
private final int bucketCount;
private final Optional<Map<Integer, Node>> bucketToNode;

public static ConnectorBucketNodeMap createBucketNodeMap(int bucketCount)
{
return new ConnectorBucketNodeMap(bucketCount, Optional.empty());
}

public static ConnectorBucketNodeMap createBucketNodeMap(Map<Integer, Node> bucketToNode)
{
requireNonNull(bucketToNode, "bucketToNode is null");
int maxBucket = bucketToNode.keySet().stream()
.mapToInt(Integer::intValue)
.peek(bucket -> {
if (bucket < 0) {
throw new IllegalArgumentException("Bucket number must be positive: " + bucket);
}
})
.max()
.orElseThrow(() -> new IllegalArgumentException("bucketToNode is empty"));
return new ConnectorBucketNodeMap(maxBucket + 1, Optional.of(bucketToNode));
}

private ConnectorBucketNodeMap(int bucketCount, Optional<Map<Integer, Node>> bucketToNode)
{
if (bucketCount <= 0) {
throw new IllegalArgumentException("bucketCount must be positive");
}
if (bucketToNode.isPresent() && bucketToNode.get().size() != bucketCount) {
throw new IllegalArgumentException(format("Mismatched bucket count in bucketToNode (%s) and bucketCount (%s)", bucketToNode.get().size(), bucketCount));
}
this.bucketCount = bucketCount;
this.bucketToNode = requireNonNull(bucketToNode, "bucketToNode is null")
.map(mapping -> unmodifiableMap(new HashMap<>(mapping)));
}

public int getBucketCount()
{
return bucketCount;
}

public boolean hasFixedMapping()
{
return bucketToNode.isPresent();
}

public Map<Integer, Node> getFixedMapping()
{
return bucketToNode.orElseThrow(() -> new IllegalArgumentException("No fixed bucket to node mapping"));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import com.facebook.presto.spi.ConnectorSession;
import com.facebook.presto.spi.ConnectorSplit;
import com.facebook.presto.spi.classloader.ThreadContextClassLoader;
import com.facebook.presto.spi.connector.ConnectorBucketNodeMap;
import com.facebook.presto.spi.connector.ConnectorNodePartitioningProvider;
import com.facebook.presto.spi.connector.ConnectorPartitionHandle;
import com.facebook.presto.spi.connector.ConnectorPartitioningHandle;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import com.facebook.presto.spi.ConnectorSplit;
import com.facebook.presto.spi.Node;
import com.facebook.presto.spi.NodeManager;
import com.facebook.presto.spi.connector.ConnectorBucketNodeMap;
import com.facebook.presto.spi.connector.ConnectorNodePartitioningProvider;
import com.facebook.presto.spi.connector.ConnectorPartitioningHandle;
import com.facebook.presto.spi.connector.ConnectorTransactionHandle;
Expand All @@ -27,7 +28,7 @@
import java.util.Set;
import java.util.function.ToIntFunction;

import static com.facebook.presto.spi.connector.ConnectorNodePartitioningProvider.ConnectorBucketNodeMap.createBucketNodeMap;
import static com.facebook.presto.spi.connector.ConnectorBucketNodeMap.createBucketNodeMap;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkState;
import static java.lang.Math.toIntExact;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import com.facebook.presto.spi.ConnectorSplit;
import com.facebook.presto.spi.Node;
import com.facebook.presto.spi.NodeManager;
import com.facebook.presto.spi.connector.ConnectorBucketNodeMap;
import com.facebook.presto.spi.connector.ConnectorNodePartitioningProvider;
import com.facebook.presto.spi.connector.ConnectorPartitioningHandle;
import com.facebook.presto.spi.connector.ConnectorTransactionHandle;
Expand All @@ -28,7 +29,7 @@
import java.util.Set;
import java.util.function.ToIntFunction;

import static com.facebook.presto.spi.connector.ConnectorNodePartitioningProvider.ConnectorBucketNodeMap.createBucketNodeMap;
import static com.facebook.presto.spi.connector.ConnectorBucketNodeMap.createBucketNodeMap;
import static com.facebook.presto.spi.type.BigintType.BIGINT;
import static com.google.common.base.Preconditions.checkArgument;
import static java.lang.Math.toIntExact;
Expand Down

0 comments on commit 24f279a

Please sign in to comment.