Skip to content

Commit

Permalink
[FLINK-8114][py] Fix forwarding of arguments
Browse files Browse the repository at this point in the history
  • Loading branch information
zentol committed Nov 20, 2017
1 parent 316fa1f commit b0a4a67
Show file tree
Hide file tree
Showing 4 changed files with 96 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,8 @@ private void startPython() throws IOException {

String pythonBinaryPath = config.getString(PythonOptions.PYTHON_BINARY_PATH);

process = Runtime.getRuntime().exec(new String[] {pythonBinaryPath, "-O", "-B", planPath, config.getString(PLAN_ARGUMENTS_KEY, "")});
String arguments = config.getString(PLAN_ARGUMENTS_KEY, "");
process = Runtime.getRuntime().exec(pythonBinaryPath + " -O -B " + planPath + arguments);
outPrinter = new Thread(new StreamPrinter(process.getInputStream()));
outPrinter.start();
errorPrinter = new Thread(new StreamPrinter(process.getErrorStream(), msg));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,18 +37,19 @@ protected boolean skipCollectionExecution() {
return true;
}

private static String findUtilsFile() throws Exception {
private static Path getBaseTestPythonDir() {
FileSystem fs = FileSystem.getLocalFileSystem();
return fs.getWorkingDirectory().toString()
+ "/src/test/python/org/apache/flink/python/api/utils/utils.py";
return new Path(fs.getWorkingDirectory(), "src/test/python/org/apache/flink/python/api");
}

private static String findUtilsFile() throws Exception {
return new Path(getBaseTestPythonDir(), "utils/utils.py").toString();
}

private static List<String> findTestFiles() throws Exception {
List<String> files = new ArrayList<>();
FileSystem fs = FileSystem.getLocalFileSystem();
FileStatus[] status = fs.listStatus(
new Path(fs.getWorkingDirectory().toString()
+ "/src/test/python/org/apache/flink/python/api"));
FileStatus[] status = fs.listStatus(getBaseTestPythonDir());
for (FileStatus f : status) {
String file = f.getPath().toString();
if (file.endsWith(".py")) {
Expand Down Expand Up @@ -126,11 +127,13 @@ protected void testProgram() throws Exception {
if (python2 != null) {
log.info("Running python2 tests");
runTestPrograms(python2);
runArgvTestPrograms(python2);
}
String python3 = getPython3Path();
if (python3 != null) {
log.info("Running python3 tests");
runTestPrograms(python3);
runArgvTestPrograms(python3);
}
}

Expand Down Expand Up @@ -177,4 +180,25 @@ private void testBoundCheck() throws Exception {
// we expect this exception to be thrown since no argument was passed
}
}

private void runArgvTestPrograms(String pythonBinary) throws Exception {
log.info("Running runArgvTestPrograms.");
String utils = findUtilsFile();

{
String noArgTestPath = new Path(getBaseTestPythonDir(), "args/no_arg.py").toString();

Configuration configuration = new Configuration();
configuration.setString(PythonOptions.PYTHON_BINARY_PATH, pythonBinary);
new PythonPlanBinder(configuration).runPlan(new String[]{noArgTestPath, utils});
}

{
String multiArgTestPath = new Path(getBaseTestPythonDir(), "args/multiple_args.py").toString();

Configuration configuration = new Configuration();
configuration.setString(PythonOptions.PYTHON_BINARY_PATH, pythonBinary);
new PythonPlanBinder(configuration).runPlan(new String[]{multiArgTestPath, utils, "-", "hello", "world"});
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
# ###############################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
from flink.plan.Environment import get_environment
import sys
from utils import Verify

if __name__ == "__main__":
env = get_environment()

d1 = env.from_elements(len(sys.argv))

d1.map_partition(Verify([3], "MultipleArguments")).output()

#Execution
env.set_parallelism(1)

env.execute(local=True)
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
# ###############################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
from flink.plan.Environment import get_environment
import sys
from utils import Verify

if __name__ == "__main__":
env = get_environment()

d1 = env.from_elements(len(sys.argv))

d1.map_partition(Verify([1], "NoArgument")).output()

#Execution
env.set_parallelism(1)

env.execute(local=True)

0 comments on commit b0a4a67

Please sign in to comment.