Skip to content

Commit

Permalink
[FLINK-7017] Remove netty usages in flink-tests
Browse files Browse the repository at this point in the history
This closes apache#4196.
  • Loading branch information
zentol committed Jul 11, 2017
1 parent 748eba1 commit ed3b326
Show file tree
Hide file tree
Showing 3 changed files with 52 additions and 54 deletions.
7 changes: 7 additions & 0 deletions flink-runtime-web/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,13 @@ under the License.
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-test-utils_${scala.binary.version}</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-test</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,60 +16,55 @@
* limitations under the License.
*/

package org.apache.flink.test.web;
package org.apache.flink.runtime.webmonitor;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ArrayNode;

import org.apache.commons.io.FileUtils;

import org.apache.commons.math3.optim.nonlinear.vector.JacobianMultivariateVectorOptimizer;
import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobmanager.JobManagerCliOptions;
import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
import org.apache.flink.runtime.testutils.StoppableInvokable;
import org.apache.flink.runtime.webmonitor.WebMonitorUtils;
import org.apache.flink.runtime.webmonitor.files.MimeTypes;
import org.apache.flink.runtime.webmonitor.testutils.HttpTestClient;
import org.apache.flink.test.util.TestBaseUtils;

import org.apache.flink.util.TestLogger;
import org.junit.BeforeClass;
import org.junit.Test;

import scala.concurrent.duration.Deadline;
import scala.concurrent.duration.FiniteDuration;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ArrayNode;
import io.netty.handler.codec.http.HttpResponseStatus;
import org.apache.commons.io.FileUtils;
import org.junit.BeforeClass;
import org.junit.Test;

import java.io.File;
import java.nio.file.Files;
import java.util.Map;
import java.util.concurrent.TimeUnit;

import scala.concurrent.duration.Deadline;
import scala.concurrent.duration.FiniteDuration;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;

import static org.apache.flink.test.util.TestBaseUtils.getFromHTTP;

/**
* Tests for the WebFrontend.
*/
public class WebFrontendITCase extends TestLogger {

private static final int NUM_TASK_MANAGERS = 2;
private static final int NUM_SLOTS = 4;

private static LocalFlinkMiniCluster cluster;

private static int port = -1;

@BeforeClass
public static void initialize() throws Exception {
Configuration config = new Configuration();
Expand All @@ -83,16 +78,16 @@ public static void initialize() throws Exception {
assertTrue("Unable to create temp directory", logDir.mkdir());
File logFile = new File(logDir, "jobmanager.log");
File outFile = new File(logDir, "jobmanager.out");

Files.createFile(logFile.toPath());
Files.createFile(outFile.toPath());

config.setString(JobManagerOptions.WEB_LOG_PATH, logFile.getAbsolutePath());
config.setString(ConfigConstants.TASK_MANAGER_LOG_PATH_KEY, logFile.getAbsolutePath());

cluster = new LocalFlinkMiniCluster(config, false);
cluster.start();

port = cluster.webMonitor().get().getServerPort();
}

Expand All @@ -102,8 +97,7 @@ public void getFrontPage() {
String fromHTTP = TestBaseUtils.getFromHTTP("http://localhost:" + port + "/index.html");
String text = "Apache Flink Dashboard";
assertTrue("Startpage should contain " + text, fromHTTP.contains(text));
}
catch (Exception e) {
} catch (Exception e) {
e.printStackTrace();
fail(e.getMessage());
}
Expand All @@ -120,8 +114,7 @@ public void getNumberOfTaskManagers() {

assertNotNull(taskManagers);
assertEquals(cluster.numTaskManagers(), taskManagers.size());
}
catch (Exception e) {
} catch (Exception e) {
e.printStackTrace();
fail(e.getMessage());
}
Expand All @@ -130,7 +123,7 @@ public void getNumberOfTaskManagers() {
@Test
public void getTaskmanagers() {
try {
String json = getFromHTTP("http://localhost:" + port + "/taskmanagers/");
String json = TestBaseUtils.getFromHTTP("http://localhost:" + port + "/taskmanagers/");

ObjectMapper mapper = new ObjectMapper();
JsonNode parsed = mapper.readTree(json);
Expand All @@ -143,8 +136,7 @@ public void getTaskmanagers() {
assertNotNull(taskManager);
assertEquals(NUM_SLOTS, taskManager.get("slotsNumber").asInt());
assertTrue(taskManager.get("freeSlots").asInt() <= NUM_SLOTS);
}
catch(Exception e) {
} catch (Exception e) {
e.printStackTrace();
fail(e.getMessage());
}
Expand All @@ -155,37 +147,36 @@ public void getLogAndStdoutFiles() throws Exception {
WebMonitorUtils.LogFileLocation logFiles = WebMonitorUtils.LogFileLocation.find(cluster.configuration());

FileUtils.writeStringToFile(logFiles.logFile, "job manager log");
String logs = getFromHTTP("http://localhost:" + port + "/jobmanager/log");
String logs = TestBaseUtils.getFromHTTP("http://localhost:" + port + "/jobmanager/log");
assertTrue(logs.contains("job manager log"));

FileUtils.writeStringToFile(logFiles.stdOutFile, "job manager out");
logs = getFromHTTP("http://localhost:" + port + "/jobmanager/stdout");
logs = TestBaseUtils.getFromHTTP("http://localhost:" + port + "/jobmanager/stdout");
assertTrue(logs.contains("job manager out"));
}

@Test
public void getTaskManagerLogAndStdoutFiles() {
try {
String json = getFromHTTP("http://localhost:" + port + "/taskmanagers/");
String json = TestBaseUtils.getFromHTTP("http://localhost:" + port + "/taskmanagers/");

ObjectMapper mapper = new ObjectMapper();
JsonNode parsed = mapper.readTree(json);
ArrayNode taskManagers = (ArrayNode) parsed.get("taskmanagers");
JsonNode taskManager = taskManagers.get(0);
String id = taskManager.get("id").asText();

WebMonitorUtils.LogFileLocation logFiles = WebMonitorUtils.LogFileLocation.find(cluster.configuration());

//we check for job manager log files, since no separate taskmanager logs exist
FileUtils.writeStringToFile(logFiles.logFile, "job manager log");
String logs = getFromHTTP("http://localhost:" + port + "/taskmanagers/" + id + "/log");
String logs = TestBaseUtils.getFromHTTP("http://localhost:" + port + "/taskmanagers/" + id + "/log");
assertTrue(logs.contains("job manager log"));

FileUtils.writeStringToFile(logFiles.stdOutFile, "job manager out");
logs = getFromHTTP("http://localhost:" + port + "/taskmanagers/" + id + "/stdout");
logs = TestBaseUtils.getFromHTTP("http://localhost:" + port + "/taskmanagers/" + id + "/stdout");
assertTrue(logs.contains("job manager out"));
}
catch (Exception e) {
} catch (Exception e) {
e.printStackTrace();
fail(e.getMessage());
}
Expand All @@ -194,14 +185,13 @@ public void getTaskManagerLogAndStdoutFiles() {
@Test
public void getConfiguration() {
try {
String config = getFromHTTP("http://localhost:" + port + "/jobmanager/config");
String config = TestBaseUtils.getFromHTTP("http://localhost:" + port + "/jobmanager/config");

Map<String, String> conf = WebMonitorUtils.fromKeyValueJsonArray(config);
assertEquals(
cluster.configuration().getString("taskmanager.numberOfTaskSlots", null),
conf.get(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS));
}
catch (Exception e) {
} catch (Exception e) {
e.printStackTrace();
fail(e.getMessage());
}
Expand All @@ -211,7 +201,7 @@ public void getConfiguration() {
public void testStop() throws Exception {
// this only works if there is no active job at this point
assertTrue(cluster.getCurrentlyRunningJobsJava().isEmpty());

// Create a task
final JobVertex sender = new JobVertex("Sender");
sender.setParallelism(2);
Expand All @@ -226,7 +216,7 @@ public void testStop() throws Exception {
while (cluster.getCurrentlyRunningJobsJava().isEmpty()) {
Thread.sleep(10);
}

final FiniteDuration testTimeout = new FiniteDuration(2, TimeUnit.MINUTES);
final Deadline deadline = testTimeout.fromNow();

Expand All @@ -235,7 +225,7 @@ public void testStop() throws Exception {
// Request the file from the web server
client.sendDeleteRequest("/jobs/" + jid + "/stop", deadline.timeLeft());
HttpTestClient.SimpleHttpResponse response = client.getNextResponse(deadline.timeLeft());

assertEquals(HttpResponseStatus.OK, response.getStatus());
assertEquals(response.getType(), MimeTypes.getMimeTypeForExtension("json"));
assertEquals("{}", response.getContent());
Expand All @@ -252,17 +242,17 @@ public void testStop() throws Exception {

assertEquals(HttpResponseStatus.OK, response.getStatus());
assertEquals(response.getType(), MimeTypes.getMimeTypeForExtension("json"));
assertEquals("{\"jid\":\""+jid+"\",\"name\":\"Stoppable streaming test job\"," +
"\"execution-config\":{\"execution-mode\":\"PIPELINED\",\"restart-strategy\":\"default\"," +
"\"job-parallelism\":-1,\"object-reuse-mode\":false,\"user-config\":{}}}", response.getContent());
assertEquals("{\"jid\":\"" + jid + "\",\"name\":\"Stoppable streaming test job\"," +
"\"execution-config\":{\"execution-mode\":\"PIPELINED\",\"restart-strategy\":\"default\"," +
"\"job-parallelism\":-1,\"object-reuse-mode\":false,\"user-config\":{}}}", response.getContent());
}
}

@Test
public void testStopYarn() throws Exception {
// this only works if there is no active job at this point
assertTrue(cluster.getCurrentlyRunningJobsJava().isEmpty());

// Create a task
final JobVertex sender = new JobVertex("Sender");
sender.setParallelism(2);
Expand All @@ -285,15 +275,15 @@ public void testStopYarn() throws Exception {
try (HttpTestClient client = new HttpTestClient("localhost", port)) {
// Request the file from the web server
client.sendGetRequest("/jobs/" + jid + "/yarn-stop", deadline.timeLeft());

HttpTestClient.SimpleHttpResponse response = client
.getNextResponse(deadline.timeLeft());
.getNextResponse(deadline.timeLeft());

assertEquals(HttpResponseStatus.OK, response.getStatus());
assertEquals(response.getType(), MimeTypes.getMimeTypeForExtension("json"));
assertEquals("{}", response.getContent());
}

Thread.sleep(20);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
Expand Down Expand Up @@ -866,7 +867,7 @@ public void initializeState(FunctionInitializationContext context) throws Except

private static class CollectionSink<IN> implements SinkFunction<IN> {

private static ConcurrentSet<Object> elements = new ConcurrentSet<Object>();
private static Set<Object> elements = Collections.newSetFromMap(new ConcurrentHashMap<Object, Boolean>());

private static final long serialVersionUID = -1652452958040267745L;

Expand Down

0 comments on commit ed3b326

Please sign in to comment.