Skip to content

Commit

Permalink
[FLINK-4265] [dataset api] Add a NoOpOperator
Browse files Browse the repository at this point in the history
Adds a NoOpOperator which is unwound in OperatorTranslation.translate.
This will be first used by Gelly as a placeholder to support implicit
operator reuse.

This closes apache#2294
  • Loading branch information
greghogan committed Sep 6, 2016
1 parent cab76f6 commit 66d4b87
Show file tree
Hide file tree
Showing 2 changed files with 62 additions and 9 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.api.java.operators;

import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.util.Preconditions;

/**
* This operator will be ignored during translation.
*
* @param <IN> The type of the data set passed through the operator.
*/
@Internal
public class NoOpOperator<IN> extends DataSet<IN> {

private DataSet<IN> input;

public NoOpOperator(DataSet<IN> input, TypeInformation<IN> resultType) {
super(input.getExecutionEnvironment(), resultType);

this.input = input;
}

public DataSet<IN> getInput() {
return input;
}

public void setInput(DataSet<IN> input) {
Preconditions.checkNotNull(input);

this.input = input;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,24 +19,23 @@
package org.apache.flink.api.java.operators;

import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.Plan;
import org.apache.flink.api.java.DataSet;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import org.apache.flink.api.common.InvalidProgramException;
import org.apache.flink.api.common.Plan;
import org.apache.flink.api.common.operators.AbstractUdfOperator;
import org.apache.flink.api.common.operators.BinaryOperatorInformation;
import org.apache.flink.api.common.operators.GenericDataSinkBase;
import org.apache.flink.api.common.operators.Operator;
import org.apache.flink.api.common.operators.UnaryOperatorInformation;
import org.apache.flink.api.common.operators.base.BulkIterationBase;
import org.apache.flink.api.common.operators.base.DeltaIterationBase;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.configuration.Configuration;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

@Internal
public class OperatorTranslation {

Expand Down Expand Up @@ -70,7 +69,10 @@ private <T> GenericDataSinkBase<T> translate(DataSink<T> sink) {


private <T> Operator<T> translate(DataSet<T> dataSet) {

while (dataSet instanceof NoOpOperator) {
dataSet = ((NoOpOperator<T>) dataSet).getInput();
}

// check if we have already translated that data set (operation or source)
Operator<?> previous = (Operator<?>) this.translated.get(dataSet);
if (previous != null) {
Expand Down

0 comments on commit 66d4b87

Please sign in to comment.