Skip to content

Commit

Permalink
[FLINK-2248] add flag to disable sysout logging from cli
Browse files Browse the repository at this point in the history
This closes apache#957.
  • Loading branch information
sachingoel0101 authored and mxm committed Jul 31, 2015
1 parent a56aad7 commit ce622aa
Show file tree
Hide file tree
Showing 6 changed files with 141 additions and 4 deletions.
4 changes: 4 additions & 0 deletions docs/apis/cli.md
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,10 @@ The command line can be used to
./bin/flink run -p 16 ./examples/flink-java-examples-{{ site.version }}-WordCount.jar \
file:///home/user/hamlet.txt file:///home/user/wordcount_out

- Run example program with flink log output disabled

./bin/flink run -q ./examples/flink-java-examples-{{ site.version }}-WordCount.jar

- Run example program on a specific JobManager:

./bin/flink run -m myJMHost:6123 \
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -276,6 +276,7 @@ protected int run(String[] args) {
LOG.debug("User parallelism is set to {}", userParallelism);

Client client = getClient(options, program.getUserCodeClassLoader(), program.getMainClassName(), userParallelism);
client.setPrintStatusDuringExecution(options.getStdoutLogging());
LOG.debug("Client slots is set to {}", client.getMaxSlots());
if(client.getMaxSlots() != -1 && userParallelism == -1) {
logAndSysout("Using the parallelism provided by the remote cluster ("+client.getMaxSlots()+"). " +
Expand Down Expand Up @@ -604,7 +605,6 @@ protected int executeProgram(PackagedProgram program, Client client, int paralle
LOG.info("Starting execution of program");
JobSubmissionResult execResult;
try {
client.setPrintStatusDuringExecution(true);
execResult = client.run(program, parallelism, wait);
}
catch (ProgramInvocationException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,10 @@ public class CliFrontendParser {
"The parallelism with which to run the program. Optional flag to override the default value " +
"specified in the configuration.");

static final Option LOGGING_OPTION = new Option("q", "sysoutLogging", false, "Whether sysout" +
" " +
"logging is required or not");

static final Option ARGS_OPTION = new Option("a", "arguments", true,
"Program arguments. Arguments can also be added without -a, simply as trailing parameters.");

Expand Down Expand Up @@ -81,6 +85,8 @@ public class CliFrontendParser {
PARALLELISM_OPTION.setRequired(false);
PARALLELISM_OPTION.setArgName("parallelism");

LOGGING_OPTION.setRequired(false);

ARGS_OPTION.setRequired(false);
ARGS_OPTION.setArgName("programArgs");
ARGS_OPTION.setArgs(Option.UNLIMITED_VALUES);
Expand All @@ -107,6 +113,7 @@ public static Options getProgramSpecificOptions(Options options) {
options.addOption(CLASS_OPTION);
options.addOption(PARALLELISM_OPTION);
options.addOption(ARGS_OPTION);
options.addOption(LOGGING_OPTION);

// also add the YARN options so that the parser can parse them
yarnSessionCLi.getYARNSessionCLIOptions(options);
Expand All @@ -116,6 +123,7 @@ public static Options getProgramSpecificOptions(Options options) {
private static Options getProgramSpecificOptionsWithoutDeprecatedOptions(Options options) {
options.addOption(CLASS_OPTION);
options.addOption(PARALLELISM_OPTION);
options.addOption(LOGGING_OPTION);
return options;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import static org.apache.flink.client.cli.CliFrontendParser.JAR_OPTION;
import static org.apache.flink.client.cli.CliFrontendParser.CLASS_OPTION;
import static org.apache.flink.client.cli.CliFrontendParser.PARALLELISM_OPTION;
import static org.apache.flink.client.cli.CliFrontendParser.LOGGING_OPTION;

/**
* Base class for command line options that refer to a JAR file program.
Expand All @@ -39,6 +40,8 @@ public abstract class ProgramOptions extends CommandLineOptions {

private final int parallelism;

private final boolean stdoutLogging;

protected ProgramOptions(CommandLine line) throws CliArgsException {
super(line);

Expand Down Expand Up @@ -77,6 +80,12 @@ else if (args.length > 0) {
else {
parallelism = -1;
}

if(line.hasOption(LOGGING_OPTION.getOpt())){
stdoutLogging = false;
} else{
stdoutLogging = true;
}
}

public String getJarFilePath() {
Expand All @@ -94,4 +103,8 @@ public String[] getProgramArgs() {
public int getParallelism() {
return parallelism;
}

public boolean getStdoutLogging() {
return stdoutLogging;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.client;

import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.StreamingMode;
import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

import java.io.ByteArrayOutputStream;
import java.io.PrintStream;

import static org.apache.flink.client.CliFrontendTestUtils.getTestJarPath;
import static org.apache.flink.client.CliFrontendTestUtils.getConfigDir;
import static org.junit.Assert.fail;

public class CliFrontendLoggingTest {

private static LocalFlinkMiniCluster cluster;
private static Configuration config;
private static String hostPort;
private ByteArrayOutputStream stream = new ByteArrayOutputStream();
private CliFrontend cli;
private PrintStream output;

@Before
public void setUp() throws Exception {
stream.reset();
output = System.out;
System.setOut(new PrintStream(stream));

config = new Configuration();
config.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, "localhost");
config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 2);
config.setInteger(ConfigConstants.LOCAL_INSTANCE_MANAGER_NUMBER_TASK_MANAGER, 1);
config.setBoolean(ConfigConstants.LOCAL_INSTANCE_MANAGER_START_WEBSERVER, false);
hostPort = config.getString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, null) + ":" +
config.getInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, ConfigConstants.DEFAULT_JOB_MANAGER_IPC_PORT);

try {
cluster = new LocalFlinkMiniCluster(config, false, StreamingMode.BATCH_ONLY);
}
catch (Exception e) {
e.printStackTrace();
fail("Setup of test actor system failed.");
}

cli = new CliFrontend(getConfigDir());
}

@After
public void shutDownActorSystem() {
System.setOut(output);
if(cluster != null){
cluster.shutdown();
}
}

@Test
public void verifyLogging(){
try {
int ret = cli.run(new String[]{"-m", hostPort, getTestJarPath()});
System.out.flush();
assert(ret == 0 && checkForLogs(stream.toString()));
} catch(Exception e){
e.printStackTrace();
fail(e.getMessage());
} finally {
if(cluster != null){
cluster.shutdown();
}
}
}

@Test
public void verifyNoLogging(){
try {
int ret = cli.run(new String[]{"-q", "-m", hostPort, getTestJarPath()});
System.out.flush();
assert(ret == 0 && !checkForLogs(stream.toString()));
} catch(Exception e){
e.printStackTrace();
fail(e.getMessage());
} finally {
if(cluster != null){
cluster.shutdown();
}
}
}

private boolean checkForLogs(String output){
return output.indexOf("RUNNING") >= 0;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -57,12 +57,11 @@ public static void main(String[] args) throws Exception {
// emit result
if(fileOutput) {
counts.writeAsCsv(outputPath, "\n", " ");
// execute program
env.execute("WordCount Example");
} else {
counts.print();
}

// execute program
env.execute("WordCount Example");
}

// *************************************************************************
Expand Down

0 comments on commit ce622aa

Please sign in to comment.