Skip to content

Commit

Permalink
[FLINK-3113] Remove unused and unsupported global-order methods from …
Browse files Browse the repository at this point in the history
…GenericDataSinkBase

This closes apache#1435
  • Loading branch information
fhueske committed Dec 7, 2015
1 parent d144e86 commit 9547f08
Show file tree
Hide file tree
Showing 2 changed files with 3 additions and 90 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import java.util.Comparator;

import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.distributions.DataDistribution;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.common.io.OutputFormat;
import org.apache.flink.api.common.io.RichOutputFormat;
Expand Down Expand Up @@ -53,10 +52,6 @@ public class GenericDataSinkBase<IN> extends Operator<Nothing> {

private Ordering localOrdering;

private Ordering partitionOrdering;

private DataDistribution distribution;

// --------------------------------------------------------------------------------------------

/**
Expand Down Expand Up @@ -157,31 +152,6 @@ public void addInputs(List<? extends Operator<IN>> inputs) {
}

// --------------------------------------------------------------------------------------------

/**
* Sets the order in which the sink must write its data. For any value other then <tt>NONE</tt>,
* this will cause the system to perform a global sort, or try to reuse an order from a
* previous operation.
*
* @param globalOrder The order to write the data in.
*/
public void setGlobalOrder(Ordering globalOrder) {
this.localOrdering = globalOrder;
setRangePartitioned(globalOrder);
}

/**
* Sets the order in which the sink must write its data. For any value other then <tt>NONE</tt>,
* this will cause the system to perform a global sort, or try to reuse an order from a
* previous operation.
*
* @param globalOrder The order to write the data in.
* @param distribution The distribution to use for the range partitioning.
*/
public void setGlobalOrder(Ordering globalOrder, DataDistribution distribution) {
this.localOrdering = globalOrder;
setRangePartitioned(globalOrder, distribution);
}

/**
* Gets the order, in which the data sink writes its data locally. Local order means that
Expand All @@ -205,53 +175,7 @@ public Ordering getLocalOrder() {
public void setLocalOrder(Ordering localOrder) {
this.localOrdering = localOrder;
}

/**
* Gets the record ordering over which the sink partitions in ranges.
*
* @return The record ordering over which to partition in ranges.
*/
public Ordering getPartitionOrdering() {
return this.partitionOrdering;
}

/**
* Sets the sink to partition the records into ranges over the given ordering.
*
* @param partitionOrdering The record ordering over which to partition in ranges.
*/
public void setRangePartitioned(Ordering partitionOrdering) {
throw new UnsupportedOperationException(
"Range partitioning is currently only supported with a user supplied data distribution.");
}

/**
* Sets the sink to partition the records into ranges over the given ordering.
* The bucket boundaries are determined using the given data distribution.
*
* @param partitionOrdering The record ordering over which to partition in ranges.
* @param distribution The distribution to use for the range partitioning.
*/
public void setRangePartitioned(Ordering partitionOrdering, DataDistribution distribution) {
if (partitionOrdering.getNumberOfFields() != distribution.getNumberOfFields()) {
throw new IllegalArgumentException("The number of keys in the distribution must match number of ordered fields.");
}

// TODO: check compatibility of distribution and ordering (number and order of keys, key types, etc.
// TODO: adapt partition ordering to data distribution (use prefix of ordering)

this.partitionOrdering = partitionOrdering;
this.distribution = distribution;
}

/**
* Gets the distribution to use for the range partitioning.
*
* @return The distribution to use for the range partitioning.
*/
public DataDistribution getDataDistribution() {
return this.distribution;
}


// --------------------------------------------------------------------------------------------

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import java.util.Map;

import org.apache.flink.api.common.ExecutionMode;
import org.apache.flink.api.common.distributions.DataDistribution;
import org.apache.flink.api.common.operators.GenericDataSinkBase;
import org.apache.flink.api.common.operators.Operator;
import org.apache.flink.api.common.operators.Ordering;
Expand Down Expand Up @@ -139,22 +138,12 @@ protected void computeOperatorSpecificDefaultEstimates(DataStatistics statistics
@Override
public void computeInterestingPropertiesForInputs(CostEstimator estimator) {
final InterestingProperties iProps = new InterestingProperties();

{
final Ordering partitioning = getOperator().getPartitionOrdering();
final DataDistribution dataDist = getOperator().getDataDistribution();
final RequestedGlobalProperties partitioningProps = new RequestedGlobalProperties();
if (partitioning != null) {
if(dataDist != null) {
partitioningProps.setRangePartitioned(partitioning, dataDist);
} else {
partitioningProps.setRangePartitioned(partitioning);
}
iProps.addGlobalProperties(partitioningProps);
}
iProps.addGlobalProperties(partitioningProps);
}

{
final Ordering localOrder = getOperator().getLocalOrder();
final RequestedLocalProperties orderProps = new RequestedLocalProperties();
Expand Down

0 comments on commit 9547f08

Please sign in to comment.