Skip to content

Commit

Permalink
[FLINK-10301][network] extend StreamNetworkBenchmarkEnvironment to al…
Browse files Browse the repository at this point in the history
…low custom Configuration instances

This closes apache#6670.
  • Loading branch information
Nico Kruber authored and NicoK committed Sep 13, 2018
1 parent fbe816e commit f04af47
Show file tree
Hide file tree
Showing 3 changed files with 64 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -68,11 +68,6 @@
*/
public class StreamNetworkBenchmarkEnvironment<T extends IOReadableWritable> {

private static final int BUFFER_SIZE =
checkedDownCast(MemorySize.parse(TaskManagerOptions.MEMORY_SEGMENT_SIZE.defaultValue()).getBytes());

private static final int NUM_SLOTS_AND_THREADS = 1;

private static final InetAddress LOCAL_ADDRESS;

static {
Expand All @@ -96,6 +91,21 @@ public class StreamNetworkBenchmarkEnvironment<T extends IOReadableWritable> {

protected ResultPartitionID[] partitionIds;

public void setUp(
int writers,
int channels,
boolean localMode,
int senderBufferPoolSize,
int receiverBufferPoolSize) throws Exception {
setUp(
writers,
channels,
localMode,
senderBufferPoolSize,
receiverBufferPoolSize,
new Configuration());
}

/**
* Sets up the environment including buffer pools and netty threads.
*
Expand All @@ -115,7 +125,8 @@ public void setUp(
int channels,
boolean localMode,
int senderBufferPoolSize,
int receiverBufferPoolSize) throws Exception {
int receiverBufferPoolSize,
Configuration config) throws Exception {
this.localMode = localMode;
this.channels = channels;
this.partitionIds = new ResultPartitionID[writers];
Expand All @@ -128,13 +139,13 @@ public void setUp(

ioManager = new IOManagerAsync();

senderEnv = createNettyNetworkEnvironment(senderBufferPoolSize);
senderEnv = createNettyNetworkEnvironment(senderBufferPoolSize, config);
senderEnv.start();
if (localMode && senderBufferPoolSize == receiverBufferPoolSize) {
receiverEnv = senderEnv;
}
else {
receiverEnv = createNettyNetworkEnvironment(receiverBufferPoolSize);
receiverEnv = createNettyNetworkEnvironment(receiverBufferPoolSize, config);
receiverEnv.start();
}

Expand Down Expand Up @@ -179,12 +190,25 @@ private void generatePartitionIds() throws Exception {
}

private NetworkEnvironment createNettyNetworkEnvironment(
@SuppressWarnings("SameParameterValue") int bufferPoolSize) throws Exception {
@SuppressWarnings("SameParameterValue") int bufferPoolSize, Configuration config) throws Exception {

int segmentSize =
checkedDownCast(
MemorySize.parse(config.getString(TaskManagerOptions.MEMORY_SEGMENT_SIZE))
.getBytes());

// we need this because many configs have been written with a "-1" entry
// similar to TaskManagerServicesConfiguration#fromConfiguration()
// -> please note that this directly influences the number of netty threads!
int slots = config.getInteger(TaskManagerOptions.NUM_TASK_SLOTS, 1);
if (slots == -1) {
slots = 1;
}

final NetworkBufferPool bufferPool = new NetworkBufferPool(bufferPoolSize, BUFFER_SIZE);
final NetworkBufferPool bufferPool = new NetworkBufferPool(bufferPoolSize, segmentSize);

final NettyConnectionManager nettyConnectionManager = new NettyConnectionManager(
new NettyConfig(LOCAL_ADDRESS, 0, BUFFER_SIZE, NUM_SLOTS_AND_THREADS, new Configuration()));
new NettyConfig(LOCAL_ADDRESS, 0, segmentSize, slots, config));

return new NetworkEnvironment(
bufferPool,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.flink.streaming.runtime.io.benchmark;

import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
import org.apache.flink.types.LongValue;

Expand Down Expand Up @@ -61,16 +62,20 @@ public void executeBenchmark(long records, boolean flushAfterLastEmit) throws Ex
recordsReceived.get(RECEIVER_TIMEOUT, TimeUnit.MILLISECONDS);
}

public void setUp(long flushTimeout) throws Exception {
setUp(flushTimeout, new Configuration());
}

/**
* Initializes the throughput benchmark with the given parameters.
*
* @param flushTimeout
* output flushing interval of the
* {@link org.apache.flink.streaming.runtime.io.StreamRecordWriter}'s output flusher thread
*/
public void setUp(long flushTimeout) throws Exception {
public void setUp(long flushTimeout, Configuration config) throws Exception {
environment = new StreamNetworkBenchmarkEnvironment<>();
environment.setUp(1, 1, false, -1, -1);
environment.setUp(1, 1, false, -1, -1, config);

receiver = environment.createReceiver();
recordWriter = environment.createRecordWriter(0, flushTimeout);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.flink.streaming.runtime.io.benchmark;

import org.apache.flink.configuration.Configuration;
import org.apache.flink.types.LongValue;

import java.util.concurrent.CompletableFuture;
Expand Down Expand Up @@ -63,6 +64,24 @@ public void setUp(int recordWriters, int channels, int flushTimeout, boolean loc
setUp(recordWriters, channels, flushTimeout, localMode, -1, -1);
}

public void setUp(
int recordWriters,
int channels,
int flushTimeout,
boolean localMode,
int senderBufferPoolSize,
int receiverBufferPoolSize) throws Exception {
setUp(
recordWriters,
channels,
flushTimeout,
localMode,
senderBufferPoolSize,
receiverBufferPoolSize,
new Configuration()
);
}

/**
* Initializes the throughput benchmark with the given parameters.
*
Expand All @@ -78,9 +97,10 @@ public void setUp(
int flushTimeout,
boolean localMode,
int senderBufferPoolSize,
int receiverBufferPoolSize) throws Exception {
int receiverBufferPoolSize,
Configuration config) throws Exception {
environment = new StreamNetworkBenchmarkEnvironment<>();
environment.setUp(recordWriters, channels, localMode, senderBufferPoolSize, receiverBufferPoolSize);
environment.setUp(recordWriters, channels, localMode, senderBufferPoolSize, receiverBufferPoolSize, config);
receiver = environment.createReceiver();
writerThreads = new LongRecordWriterThread[recordWriters];
for (int writer = 0; writer < recordWriters; writer++) {
Expand Down

0 comments on commit f04af47

Please sign in to comment.