Skip to content

Commit

Permalink
[hotfix] Fix access to temp file directories in SpillingAdaptiveSpann…
Browse files Browse the repository at this point in the history
…ingRecordDeserializer
  • Loading branch information
StephanEwen committed May 26, 2016
1 parent 5a7f4e3 commit bf256c7
Show file tree
Hide file tree
Showing 30 changed files with 212 additions and 93 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ public interface RuntimeContext {
* Returns the metric group for this parallel subtask.
*
* @return The metric group for this parallel subtask.
*/
*/
MetricGroup getMetricGroup();

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,14 @@
import org.apache.flink.annotation.PublicEvolving;

/**
* A MetricGroup is a named container for {@link Metric Metrics} and {@link MetricGroup MetricGroups}.
* A MetricGroup is a named container for {@link Metric Metrics} and further metric subgroups.
*
* <p>Instances of this class can be used to register new metrics with Flink and to create a nested
* hierarchy based on the group names.
*
* <p>A MetricGroup is uniquely identified by it's place in the hierarchy and name.
*
* <p>Metrics groups can be {@link #close() closed}. Upon closing, they de-register all metrics
* <p>Metrics groups can be {@link #close() closed}. Upon closing, the group de-register all metrics
* from any metrics reporter and any internal maps. Note that even closed metrics groups
* return Counters, Gauges, etc to the code, to prevent exceptions in the monitored code.
* These metrics simply do not get reported any more, when created on a closed group.
Expand All @@ -39,7 +39,7 @@ public interface MetricGroup {
// ------------------------------------------------------------------------
// Closing
// ------------------------------------------------------------------------

/**
* Marks the group as closed.
* Recursively unregisters all {@link Metric Metrics} contained in this group.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,23 +45,11 @@ public final String getResourceIdString() {
return resourceId;
}

/**
* Generate a random resource id.
* @return A random resource id.
*/
public static ResourceID generate() {
return new ResourceID(new AbstractID().toString());
}

@Override
public final boolean equals(Object o) {
if (this == o) {
return true;
} else if (o == null || !(o instanceof ResourceID)) {
return false;
} else {
return resourceId.equals(((ResourceID) o).resourceId);
}
return this == o ||
(o != null && o.getClass() == ResourceID.class &&
this.resourceId.equals(((ResourceID) o).resourceId));
}

@Override
Expand All @@ -71,8 +59,18 @@ public final int hashCode() {

@Override
public String toString() {
return "ResourceID{" +
"resourceId='" + resourceId + '\'' +
'}';
return "ResourceID (" + resourceId + ')';
}

// ------------------------------------------------------------------------
// factory
// ------------------------------------------------------------------------

/**
* Generate a random resource id.
* @return A random resource id.
*/
public static ResourceID generate() {
return new ResourceID(new AbstractID().toString());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.commons.io.FileUtils;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.runtime.io.network.buffer.Buffer;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -281,7 +282,19 @@ public int getNumberOfSpillingDirectories() {
public File[] getSpillingDirectories() {
return this.paths;
}


/**
* Gets the directories that the I/O manager spills to, as path strings.
*
* @return The directories that the I/O manager spills to, as path strings.
*/
public String[] getSpillingDirectoriesPaths() {
String[] strings = new String[this.paths.length];
for (int i = 0; i < strings.length; i++) {
strings[i] = paths[i].getAbsolutePath();
}
return strings;
}

protected int getNextPathNum() {
final int next = this.nextPath;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,14 +45,22 @@ abstract class AbstractRecordReader<T extends IOReadableWritable> extends Abstra

private boolean isFinished;

/**
* Creates a new AbstractRecordReader that de-serializes records from the given input gate and
* can spill partial records to disk, if they grow large.
*
* @param inputGate The input gate to read from.
* @param tmpDirectories The temp directories. USed for spilling if the reader concurrently
* reconstructs multiple large records.
*/
@SuppressWarnings("unchecked")
protected AbstractRecordReader(InputGate inputGate) {
protected AbstractRecordReader(InputGate inputGate, String[] tmpDirectories) {
super(inputGate);

// Initialize one deserializer per input channel
this.recordDeserializers = new SpillingAdaptiveSpanningRecordDeserializer[inputGate.getNumberOfInputChannels()];
for (int i = 0; i < recordDeserializers.length; i++) {
recordDeserializers[i] = new SpillingAdaptiveSpanningRecordDeserializer<T>();
recordDeserializers[i] = new SpillingAdaptiveSpanningRecordDeserializer<T>(tmpDirectories);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,16 @@

public class MutableRecordReader<T extends IOReadableWritable> extends AbstractRecordReader<T> implements MutableReader<T> {

public MutableRecordReader(InputGate inputGate) {
super(inputGate);
/**
* Creates a new MutableRecordReader that de-serializes records from the given input gate and
* can spill partial records to disk, if they grow large.
*
* @param inputGate The input gate to read from.
* @param tmpDirectories The temp directories. USed for spilling if the reader concurrently
* reconstructs multiple large records.
*/
public MutableRecordReader(InputGate inputGate, String[] tmpDirectories) {
super(inputGate, tmpDirectories);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,16 @@ public class RecordReader<T extends IOReadableWritable> extends AbstractRecordRe

private T currentRecord;

public RecordReader(InputGate inputGate, Class<T> recordType) {
super(inputGate);
/**
* Creates a new RecordReader that de-serializes records from the given input gate and
* can spill partial records to disk, if they grow large.
*
* @param inputGate The input gate to read from.
* @param tmpDirectories The temp directories. USed for spilling if the reader concurrently
* reconstructs multiple large records.
*/
public RecordReader(InputGate inputGate, Class<T> recordType, String[] tmpDirectories) {
super(inputGate, tmpDirectories);

this.recordType = recordType;
}
Expand Down Expand Up @@ -73,10 +81,7 @@ private T instantiateRecordType() {
try {
return recordType.newInstance();
}
catch (InstantiationException e) {
throw new RuntimeException("Cannot instantiate class " + recordType.getName(), e);
}
catch (IllegalAccessException e) {
catch (InstantiationException | IllegalAccessException e) {
throw new RuntimeException("Cannot instantiate class " + recordType.getName(), e);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@

package org.apache.flink.runtime.io.network.api.serialization;

import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.flink.core.io.IOReadableWritable;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataInputViewStreamWrapper;
Expand Down Expand Up @@ -65,18 +63,12 @@ public class SpillingAdaptiveSpanningRecordDeserializer<T extends IOReadableWrit

private AccumulatorRegistry.Reporter reporter;

private transient Counter numRecordsIn;
private transient Counter numBytesIn;
private Counter numRecordsIn;
private Counter numBytesIn;

public SpillingAdaptiveSpanningRecordDeserializer() {

String tempDirString = GlobalConfiguration.getString(
ConfigConstants.TASK_MANAGER_TMP_DIR_KEY,
ConfigConstants.DEFAULT_TASK_MANAGER_TMP_PATH);
String[] directories = tempDirString.split(",|" + File.pathSeparator);

public SpillingAdaptiveSpanningRecordDeserializer(String[] tmpDirectories) {
this.nonSpanningWrapper = new NonSpanningWrapper();
this.spanningWrapper = new SpanningWrapper(directories);
this.spanningWrapper = new SpanningWrapper(tmpDirectories);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,9 @@ public class IterationSynchronizationSinkTask extends AbstractInvokable implemen

@Override
public void invoke() throws Exception {
this.headEventReader = new MutableRecordReader<IntValue>(getEnvironment().getInputGate(0));
this.headEventReader = new MutableRecordReader<IntValue>(
getEnvironment().getInputGate(0),
getEnvironment().getTaskManagerInfo().getTmpDirectories());

TaskConfig taskConfig = new TaskConfig(getTaskConfiguration());

Expand Down Expand Up @@ -184,7 +186,7 @@ private void readHeadEventChannel(IntValue rec) throws IOException {

// read (and thereby process all events in the handler's event handling functions)
try {
while (this.headEventReader.next(rec)) {
if (this.headEventReader.next(rec)) {
throw new RuntimeException("Synchronization task must not see any records!");
}
} catch (InterruptedException iex) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -659,14 +659,18 @@ protected void initInputReaders() throws Exception {

if (groupSize == 1) {
// non-union case
inputReaders[i] = new MutableRecordReader<IOReadableWritable>(getEnvironment().getInputGate(currentReaderOffset));
inputReaders[i] = new MutableRecordReader<IOReadableWritable>(
getEnvironment().getInputGate(currentReaderOffset),
getEnvironment().getTaskManagerInfo().getTmpDirectories());
} else if (groupSize > 1){
// union case
InputGate[] readers = new InputGate[groupSize];
for (int j = 0; j < groupSize; ++j) {
readers[j] = getEnvironment().getInputGate(currentReaderOffset + j);
}
inputReaders[i] = new MutableRecordReader<IOReadableWritable>(new UnionInputGate(readers));
inputReaders[i] = new MutableRecordReader<IOReadableWritable>(
new UnionInputGate(readers),
getEnvironment().getTaskManagerInfo().getTmpDirectories());
} else {
throw new Exception("Illegal input group size in task configuration: " + groupSize);
}
Expand Down Expand Up @@ -701,14 +705,18 @@ protected void initBroadcastInputReaders() throws Exception {
final int groupSize = this.config.getBroadcastGroupSize(i);
if (groupSize == 1) {
// non-union case
broadcastInputReaders[i] = new MutableRecordReader<IOReadableWritable>(getEnvironment().getInputGate(currentReaderOffset));
broadcastInputReaders[i] = new MutableRecordReader<IOReadableWritable>(
getEnvironment().getInputGate(currentReaderOffset),
getEnvironment().getTaskManagerInfo().getTmpDirectories());
} else if (groupSize > 1){
// union case
InputGate[] readers = new InputGate[groupSize];
for (int j = 0; j < groupSize; ++j) {
readers[j] = getEnvironment().getInputGate(currentReaderOffset + j);
}
broadcastInputReaders[i] = new MutableRecordReader<IOReadableWritable>(new UnionInputGate(readers));
broadcastInputReaders[i] = new MutableRecordReader<IOReadableWritable>(
new UnionInputGate(readers),
getEnvironment().getTaskManagerInfo().getTmpDirectories());
} else {
throw new Exception("Illegal input group size in task configuration: " + groupSize);
}
Expand Down Expand Up @@ -765,8 +773,6 @@ protected void initBroadcastInputsSerializers(int numBroadcastInputs) throws Exc
*
* NOTE: This method must be invoked after the invocation of {@code #initInputReaders()} and
* {@code #initInputSerializersAndComparators(int)}!
*
* @param numInputs
*/
protected void initLocalStrategies(int numInputs) throws Exception {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -332,10 +332,14 @@ private void initInputReaders() throws Exception {
numGates += groupSize;
if (groupSize == 1) {
// non-union case
inputReader = new MutableRecordReader<DeserializationDelegate<IT>>(getEnvironment().getInputGate(0));
inputReader = new MutableRecordReader<DeserializationDelegate<IT>>(
getEnvironment().getInputGate(0),
getEnvironment().getTaskManagerInfo().getTmpDirectories());
} else if (groupSize > 1){
// union case
inputReader = new MutableRecordReader<IOReadableWritable>(new UnionInputGate(getEnvironment().getAllInputGates()));
inputReader = new MutableRecordReader<IOReadableWritable>(
new UnionInputGate(getEnvironment().getAllInputGates()),
getEnvironment().getTaskManagerInfo().getTmpDirectories());
} else {
throw new Exception("Illegal input group size in task configuration: " + groupSize);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@

import org.apache.flink.configuration.Configuration;

import static org.apache.flink.util.Preconditions.checkNotNull;
import static org.apache.flink.util.Preconditions.checkArgument;

/**
* Encapsulation of TaskManager runtime information, like hostname and configuration.
*/
Expand All @@ -33,14 +36,32 @@ public class TaskManagerRuntimeInfo implements java.io.Serializable {
/** configuration that the TaskManager was started with */
private final Configuration configuration;

/** list of temporary file directories */
private final String[] tmpDirectories;

/**
* Creates a runtime info.
*
* @param hostname The host name of the interface that the TaskManager uses to communicate.
* @param configuration The configuration that the TaskManager was started with.
* @param tmpDirectory The temporary file directory.
*/
public TaskManagerRuntimeInfo(String hostname, Configuration configuration) {
this.hostname = hostname;
this.configuration = configuration;
public TaskManagerRuntimeInfo(String hostname, Configuration configuration, String tmpDirectory) {
this(hostname, configuration, new String[] { tmpDirectory });
}

/**
* Creates a runtime info.
* @param hostname The host name of the interface that the TaskManager uses to communicate.
* @param configuration The configuration that the TaskManager was started with.
* @param tmpDirectories The list of temporary file directories.
*/
public TaskManagerRuntimeInfo(String hostname, Configuration configuration, String[] tmpDirectories) {
checkArgument(tmpDirectories.length > 0);
this.hostname = checkNotNull(hostname);
this.configuration = checkNotNull(configuration);
this.tmpDirectories = tmpDirectories;

}

/**
Expand All @@ -58,4 +79,12 @@ public String getHostname() {
public Configuration getConfiguration() {
return configuration;
}

/**
* Gets the list of temporary file directories.
* @return The list of temporary file directories.
*/
public String[] getTmpDirectories() {
return tmpDirectories;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,8 @@ class TaskManager(

private val runtimeInfo = new TaskManagerRuntimeInfo(
connectionInfo.getHostname(),
new UnmodifiableConfiguration(config.configuration))
new UnmodifiableConfiguration(config.configuration),
config.tmpDirPaths)
// --------------------------------------------------------------------------
// Actor messages and life cycle
// --------------------------------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,9 @@ private void testNonSpillingDeserializer(Util.MockRecords records, int segmentSi

private void testSpillingDeserializer(Util.MockRecords records, int segmentSize) throws Exception {
RecordSerializer<SerializationTestType> serializer = new SpanningRecordSerializer<SerializationTestType>();
RecordDeserializer<SerializationTestType> deserializer = new SpillingAdaptiveSpanningRecordDeserializer<SerializationTestType>();
RecordDeserializer<SerializationTestType> deserializer =
new SpillingAdaptiveSpanningRecordDeserializer<SerializationTestType>(
new String[] { System.getProperty("java.io.tmpdir") });

test(records, segmentSize, serializer, deserializer);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,10 @@ public void testHandleMixedLargeRecordsSpillingAdaptiveSerializer() {
final int SEGMENT_SIZE = 32 * 1024;

final RecordSerializer<SerializationTestType> serializer = new SpanningRecordSerializer<SerializationTestType>();
final RecordDeserializer<SerializationTestType> deserializer = new SpillingAdaptiveSpanningRecordDeserializer<SerializationTestType>();

final RecordDeserializer<SerializationTestType> deserializer =
new SpillingAdaptiveSpanningRecordDeserializer<SerializationTestType>(
new String[] { System.getProperty("java.io.tmpdir") } );

final Buffer buffer = new Buffer(MemorySegmentFactory.allocateUnpooledSegment(SEGMENT_SIZE), mock(BufferRecycler.class));

Expand Down
Loading

0 comments on commit bf256c7

Please sign in to comment.