Skip to content

Commit

Permalink
[FLINK-3860] [connector-wikiedits] Add retry loop to WikipediaEditsSo…
Browse files Browse the repository at this point in the history
…urceTest

This closes apache#1964.
  • Loading branch information
uce committed May 4, 2016
1 parent e7586c3 commit af20655
Show file tree
Hide file tree
Showing 4 changed files with 144 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,40 +22,85 @@
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.net.InetSocketAddress;
import java.net.Socket;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.fail;

public class WikipediaEditsSourceTest {

private static final Logger LOG = LoggerFactory.getLogger(WikipediaEditsSourceTest.class);

/**
* NOTE: if you are behind a firewall you may need to use a SOCKS Proxy for this test
* NOTE: if you are behind a firewall you may need to use a SOCKS Proxy for this test.
*
* We first check the connection to the IRC server. If it fails, this test
* is effectively ignored.
*
* @see <a href="http://docs.oracle.com/javase/8/docs/technotes/guides/net/proxies.html">Socks Proxy</a>
*/
@Test(timeout = 120 * 1000)
public void testWikipediaEditsSource() throws Exception {
final int numRetries = 5;
final int waitBetweenRetriesMillis = 2000;
final int connectTimeout = 1000;

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.getConfig().disableSysoutLogging();
boolean success = false;

DataStream<WikipediaEditEvent> edits = env.addSource(new WikipediaEditsSource());
for (int i = 0; i < numRetries && !success; i++) {
// Check connection
boolean canConnect = false;

edits.addSink(new SinkFunction<WikipediaEditEvent>() {
@Override
public void invoke(WikipediaEditEvent value) throws Exception {
throw new Exception("Expected test exception");
String host = WikipediaEditsSource.DEFAULT_HOST;
int port = WikipediaEditsSource.DEFAULT_PORT;

try (Socket s = new Socket()) {
s.connect(new InetSocketAddress(host, port), connectTimeout);
canConnect = s.isConnected();
} catch (Throwable ignored) {
}
});

try {
env.execute();
fail("Did not throw expected Exception.");
if (canConnect) {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.getConfig().disableSysoutLogging();

DataStream<WikipediaEditEvent> edits = env.addSource(new WikipediaEditsSource());

edits.addSink(new SinkFunction<WikipediaEditEvent>() {
@Override
public void invoke(WikipediaEditEvent value) throws Exception {
throw new Exception("Expected test exception");
}
});

try {
env.execute();
fail("Did not throw expected Exception.");
} catch (Exception e) {
assertNotNull(e.getCause());
assertEquals("Expected test exception", e.getCause().getMessage());
}

success = true;
} else {
LOG.info("Failed to connect to IRC server ({}/{}). Retrying in {} ms.",
i + 1,
numRetries,
waitBetweenRetriesMillis);

Thread.sleep(waitBetweenRetriesMillis);
}
}
catch (Exception e) {
assertNotNull(e.getCause());
assertEquals("Expected test exception", e.getCause().getMessage());

if (success) {
LOG.info("Successfully ran test.");
} else {
LOG.info("Skipped test, because not able to connect to IRC server.");
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
################################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################

# Set root logger level to DEBUG and its only appender to A1.
log4j.rootLogger=INFO, A1

# A1 is set to be a ConsoleAppender.
log4j.appender.A1=org.apache.log4j.ConsoleAppender

# A1 uses PatternLayout.
log4j.appender.A1.layout=org.apache.log4j.PatternLayout
log4j.appender.A1.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
################################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################

# This file ensures that tests executed from the IDE show log output

log4j.rootLogger=OFF, console

# Log all infos in the given file
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.target = System.err
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
<!--
~ Licensed to the Apache Software Foundation (ASF) under one
~ or more contributor license agreements. See the NOTICE file
~ distributed with this work for additional information
~ regarding copyright ownership. The ASF licenses this file
~ to you under the Apache License, Version 2.0 (the
~ "License"); you may not use this file except in compliance
~ with the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->

<configuration>
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{60} %X{sourceThread} - %msg%n</pattern>
</encoder>
</appender>

<root level="WARN">
<appender-ref ref="STDOUT"/>
</root>
<logger name="org.apache.flink.runtime.client.JobClient" level="OFF"/>
</configuration>

0 comments on commit af20655

Please sign in to comment.