From af206555be4749c050d09cf5d91fd2286c49b23d Mon Sep 17 00:00:00 2001 From: Ufuk Celebi Date: Wed, 4 May 2016 10:38:04 +0200 Subject: [PATCH] [FLINK-3860] [connector-wikiedits] Add retry loop to WikipediaEditsSourceTest This closes #1964. --- .../wikiedits/WikipediaEditsSourceTest.java | 75 +++++++++++++++---- .../src/test/resources/log4j-test.properties | 27 +++++++ .../src/test/resources/log4j.properties | 27 +++++++ .../src/test/resources/logback-test.xml | 30 ++++++++ 4 files changed, 144 insertions(+), 15 deletions(-) create mode 100644 flink-contrib/flink-connector-wikiedits/src/test/resources/log4j-test.properties create mode 100644 flink-contrib/flink-connector-wikiedits/src/test/resources/log4j.properties create mode 100644 flink-contrib/flink-connector-wikiedits/src/test/resources/logback-test.xml diff --git a/flink-contrib/flink-connector-wikiedits/src/test/java/org/apache/flink/streaming/connectors/wikiedits/WikipediaEditsSourceTest.java b/flink-contrib/flink-connector-wikiedits/src/test/java/org/apache/flink/streaming/connectors/wikiedits/WikipediaEditsSourceTest.java index 0c5d93a5b8f2e..c10c56a3a4795 100644 --- a/flink-contrib/flink-connector-wikiedits/src/test/java/org/apache/flink/streaming/connectors/wikiedits/WikipediaEditsSourceTest.java +++ b/flink-contrib/flink-connector-wikiedits/src/test/java/org/apache/flink/streaming/connectors/wikiedits/WikipediaEditsSourceTest.java @@ -22,6 +22,11 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.sink.SinkFunction; import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.net.InetSocketAddress; +import java.net.Socket; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; @@ -29,33 +34,73 @@ public class WikipediaEditsSourceTest { + private static final Logger LOG = LoggerFactory.getLogger(WikipediaEditsSourceTest.class); + /** - * NOTE: if you are behind a firewall you may need to use a SOCKS Proxy for this test + * NOTE: if you are behind a firewall you may need to use a SOCKS Proxy for this test. + * + * We first check the connection to the IRC server. If it fails, this test + * is effectively ignored. * * @see Socks Proxy */ @Test(timeout = 120 * 1000) public void testWikipediaEditsSource() throws Exception { + final int numRetries = 5; + final int waitBetweenRetriesMillis = 2000; + final int connectTimeout = 1000; - StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - env.getConfig().disableSysoutLogging(); + boolean success = false; - DataStream edits = env.addSource(new WikipediaEditsSource()); + for (int i = 0; i < numRetries && !success; i++) { + // Check connection + boolean canConnect = false; - edits.addSink(new SinkFunction() { - @Override - public void invoke(WikipediaEditEvent value) throws Exception { - throw new Exception("Expected test exception"); + String host = WikipediaEditsSource.DEFAULT_HOST; + int port = WikipediaEditsSource.DEFAULT_PORT; + + try (Socket s = new Socket()) { + s.connect(new InetSocketAddress(host, port), connectTimeout); + canConnect = s.isConnected(); + } catch (Throwable ignored) { } - }); - try { - env.execute(); - fail("Did not throw expected Exception."); + if (canConnect) { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.getConfig().disableSysoutLogging(); + + DataStream edits = env.addSource(new WikipediaEditsSource()); + + edits.addSink(new SinkFunction() { + @Override + public void invoke(WikipediaEditEvent value) throws Exception { + throw new Exception("Expected test exception"); + } + }); + + try { + env.execute(); + fail("Did not throw expected Exception."); + } catch (Exception e) { + assertNotNull(e.getCause()); + assertEquals("Expected test exception", e.getCause().getMessage()); + } + + success = true; + } else { + LOG.info("Failed to connect to IRC server ({}/{}). Retrying in {} ms.", + i + 1, + numRetries, + waitBetweenRetriesMillis); + + Thread.sleep(waitBetweenRetriesMillis); + } } - catch (Exception e) { - assertNotNull(e.getCause()); - assertEquals("Expected test exception", e.getCause().getMessage()); + + if (success) { + LOG.info("Successfully ran test."); + } else { + LOG.info("Skipped test, because not able to connect to IRC server."); } } } diff --git a/flink-contrib/flink-connector-wikiedits/src/test/resources/log4j-test.properties b/flink-contrib/flink-connector-wikiedits/src/test/resources/log4j-test.properties new file mode 100644 index 0000000000000..aaf810167afa4 --- /dev/null +++ b/flink-contrib/flink-connector-wikiedits/src/test/resources/log4j-test.properties @@ -0,0 +1,27 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +# Set root logger level to DEBUG and its only appender to A1. +log4j.rootLogger=INFO, A1 + +# A1 is set to be a ConsoleAppender. +log4j.appender.A1=org.apache.log4j.ConsoleAppender + +# A1 uses PatternLayout. +log4j.appender.A1.layout=org.apache.log4j.PatternLayout +log4j.appender.A1.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n diff --git a/flink-contrib/flink-connector-wikiedits/src/test/resources/log4j.properties b/flink-contrib/flink-connector-wikiedits/src/test/resources/log4j.properties new file mode 100644 index 0000000000000..ed2bbcbbd8167 --- /dev/null +++ b/flink-contrib/flink-connector-wikiedits/src/test/resources/log4j.properties @@ -0,0 +1,27 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +# This file ensures that tests executed from the IDE show log output + +log4j.rootLogger=OFF, console + +# Log all infos in the given file +log4j.appender.console=org.apache.log4j.ConsoleAppender +log4j.appender.console.target = System.err +log4j.appender.console.layout=org.apache.log4j.PatternLayout +log4j.appender.console.layout.ConversionPattern=%d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n \ No newline at end of file diff --git a/flink-contrib/flink-connector-wikiedits/src/test/resources/logback-test.xml b/flink-contrib/flink-connector-wikiedits/src/test/resources/logback-test.xml new file mode 100644 index 0000000000000..4f56748368989 --- /dev/null +++ b/flink-contrib/flink-connector-wikiedits/src/test/resources/logback-test.xml @@ -0,0 +1,30 @@ + + + + + + %d{HH:mm:ss.SSS} [%thread] %-5level %logger{60} %X{sourceThread} - %msg%n + + + + + + + + \ No newline at end of file