Skip to content

Commit

Permalink
[FLINK-4082] Add Setting for enabling/disabling LargeRecordHandler
Browse files Browse the repository at this point in the history
By default this is set to disabled because there are known issues when
users specify a custom TypeInformation.
  • Loading branch information
aljoscha committed Jun 21, 2016
1 parent bc427d5 commit bce3a68
Show file tree
Hide file tree
Showing 19 changed files with 86 additions and 42 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,12 @@ public final class ConfigConstants {
*/
public static final String FS_STREAM_OPENING_TIMEOUT_KEY = "taskmanager.runtime.fs_timeout";


/**
* Whether to use the LargeRecordHandler when spilling.
*/
public static final String USE_LARGE_RECORD_HANDLER_KEY = "taskmanager.runtime.large-record-handler";


// -------- Common Resource Framework Configuration (YARN & Mesos) --------

/**
Expand Down Expand Up @@ -758,6 +763,11 @@ public final class ConfigConstants {
*/
public static final int DEFAULT_FS_STREAM_OPENING_TIMEOUT = 0;

/**
* Whether to use the LargeRecordHandler when spilling.
*/
public static final boolean DEFAULT_USE_LARGE_RECORD_HANDLER = false;


// ------ Common Resource Framework Configuration (YARN & Mesos) ------

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,8 @@ public class JobGraphGenerator implements Visitor<PlanNode> {
private final int defaultMaxFan;

private final float defaultSortSpillingThreshold;

private final boolean useLargeRecordHandler;

private int iterationIdEnumerator = 1;

Expand All @@ -143,13 +145,18 @@ public class JobGraphGenerator implements Visitor<PlanNode> {
public JobGraphGenerator() {
this.defaultMaxFan = ConfigConstants.DEFAULT_SPILLING_MAX_FAN;
this.defaultSortSpillingThreshold = ConfigConstants.DEFAULT_SORT_SPILLING_THRESHOLD;
this.useLargeRecordHandler = ConfigConstants.DEFAULT_USE_LARGE_RECORD_HANDLER;
}

public JobGraphGenerator(Configuration config) {
this.defaultMaxFan = config.getInteger(ConfigConstants.DEFAULT_SPILLING_MAX_FAN_KEY,
ConfigConstants.DEFAULT_SPILLING_MAX_FAN);
this.defaultSortSpillingThreshold = config.getFloat(ConfigConstants.DEFAULT_SORT_SPILLING_THRESHOLD_KEY,
ConfigConstants.DEFAULT_SORT_SPILLING_THRESHOLD);
this.useLargeRecordHandler = config.getBoolean(
ConfigConstants.USE_LARGE_RECORD_HANDLER_KEY,
ConfigConstants.DEFAULT_USE_LARGE_RECORD_HANDLER);

}

/**
Expand Down Expand Up @@ -1051,6 +1058,7 @@ private void assignLocalStrategyResources(Channel c, TaskConfig config, int inpu
config.setRelativeMemoryInput(inputNum, c.getRelativeMemoryLocalStrategy());
config.setFilehandlesInput(inputNum, this.defaultMaxFan);
config.setSpillingThresholdInput(inputNum, this.defaultSortSpillingThreshold);
config.setUseLargeRecordHandler(this.useLargeRecordHandler);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -926,7 +926,8 @@ private void initInputLocalStrategy(int inputNum) throws Exception {
UnilateralSortMerger<?> sorter = new UnilateralSortMerger(getMemoryManager(), getIOManager(),
this.inputIterators[inputNum], this, this.inputSerializers[inputNum], getLocalStrategyComparator(inputNum),
this.config.getRelativeMemoryInput(inputNum), this.config.getFilehandlesInput(inputNum),
this.config.getSpillingThresholdInput(inputNum), this.getExecutionConfig().isObjectReuseEnabled());
this.config.getSpillingThresholdInput(inputNum), this.config.getUseLargeRecordHandler(),
this.getExecutionConfig().isObjectReuseEnabled());
// set the input to null such that it will be lazily fetched from the input strategy
this.inputs[inputNum] = null;
this.localStrategies[inputNum] = sorter;
Expand Down Expand Up @@ -962,7 +963,8 @@ this.inputIterators[inputNum], this, this.inputSerializers[inputNum], getLocalSt
(GroupCombineFunction) localStub, getMemoryManager(), getIOManager(), this.inputIterators[inputNum],
this, this.inputSerializers[inputNum], getLocalStrategyComparator(inputNum),
this.config.getRelativeMemoryInput(inputNum), this.config.getFilehandlesInput(inputNum),
this.config.getSpillingThresholdInput(inputNum), this.getExecutionConfig().isObjectReuseEnabled());
this.config.getSpillingThresholdInput(inputNum), this.getTaskConfig().getUseLargeRecordHandler(),
this.getExecutionConfig().isObjectReuseEnabled());
cSorter.setUdfConfiguration(this.config.getStubParameters());

// set the input to null such that it will be lazily fetched from the input strategy
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,7 @@ public void invoke() throws Exception {
this.reader, this, this.inputTypeSerializerFactory, compFact.createComparator(),
this.config.getRelativeMemoryInput(0), this.config.getFilehandlesInput(0),
this.config.getSpillingThresholdInput(0),
this.config.getUseLargeRecordHandler(),
this.getExecutionConfig().isObjectReuseEnabled());

this.localStrategy = sorter;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,11 +103,12 @@ public class CombiningUnilateralSortMerger<E> extends UnilateralSortMerger<E> {
public CombiningUnilateralSortMerger(GroupCombineFunction<E, E> combineStub, MemoryManager memoryManager, IOManager ioManager,
MutableObjectIterator<E> input, AbstractInvokable parentTask,
TypeSerializerFactory<E> serializerFactory, TypeComparator<E> comparator,
double memoryFraction, int maxNumFileHandles, float startSpillingFraction, boolean objectReuseEnabled)
double memoryFraction, int maxNumFileHandles, float startSpillingFraction,
boolean handleLargeRecords, boolean objectReuseEnabled)
throws IOException, MemoryAllocationException
{
this(combineStub, memoryManager, ioManager, input, parentTask, serializerFactory, comparator,
memoryFraction, -1, maxNumFileHandles, startSpillingFraction, objectReuseEnabled);
memoryFraction, -1, maxNumFileHandles, startSpillingFraction, handleLargeRecords, objectReuseEnabled);
}

/**
Expand Down Expand Up @@ -136,12 +137,12 @@ public CombiningUnilateralSortMerger(GroupCombineFunction<E, E> combineStub, Mem
MutableObjectIterator<E> input, AbstractInvokable parentTask,
TypeSerializerFactory<E> serializerFactory, TypeComparator<E> comparator,
double memoryFraction, int numSortBuffers, int maxNumFileHandles,
float startSpillingFraction, boolean objectReuseEnabled)
float startSpillingFraction, boolean handleLargeRecords, boolean objectReuseEnabled)
throws IOException, MemoryAllocationException
{
super(memoryManager, ioManager, input, parentTask, serializerFactory, comparator,
memoryFraction, numSortBuffers, maxNumFileHandles, startSpillingFraction, false, true,
objectReuseEnabled);
memoryFraction, numSortBuffers, maxNumFileHandles, startSpillingFraction, false,
handleLargeRecords, objectReuseEnabled);

this.combineStub = combineStub;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,28 +156,28 @@ public class UnilateralSortMerger<E> implements Sorter<E> {
// ------------------------------------------------------------------------

public UnilateralSortMerger(MemoryManager memoryManager, IOManager ioManager,
MutableObjectIterator<E> input, AbstractInvokable parentTask,
MutableObjectIterator<E> input, AbstractInvokable parentTask,
TypeSerializerFactory<E> serializerFactory, TypeComparator<E> comparator,
double memoryFraction, int maxNumFileHandles, float startSpillingFraction,
boolean objectReuseEnabled)
boolean handleLargeRecords, boolean objectReuseEnabled)
throws IOException, MemoryAllocationException
{
this(memoryManager, ioManager, input, parentTask, serializerFactory, comparator,
memoryFraction, -1, maxNumFileHandles, startSpillingFraction, objectReuseEnabled);
memoryFraction, -1, maxNumFileHandles, startSpillingFraction, handleLargeRecords, objectReuseEnabled);
}

public UnilateralSortMerger(MemoryManager memoryManager, IOManager ioManager,
MutableObjectIterator<E> input, AbstractInvokable parentTask,
MutableObjectIterator<E> input, AbstractInvokable parentTask,
TypeSerializerFactory<E> serializerFactory, TypeComparator<E> comparator,
double memoryFraction, int numSortBuffers, int maxNumFileHandles,
float startSpillingFraction, boolean objectReuseEnabled)
float startSpillingFraction, boolean handleLargeRecords, boolean objectReuseEnabled)
throws IOException, MemoryAllocationException
{
this(memoryManager, ioManager, input, parentTask, serializerFactory, comparator,
memoryFraction, numSortBuffers, maxNumFileHandles, startSpillingFraction, false, true,
memoryFraction, numSortBuffers, maxNumFileHandles, startSpillingFraction, false, handleLargeRecords,
objectReuseEnabled);
}

public UnilateralSortMerger(MemoryManager memoryManager, List<MemorySegment> memory,
IOManager ioManager,
MutableObjectIterator<E> input, AbstractInvokable parentTask,
Expand All @@ -202,7 +202,7 @@ protected UnilateralSortMerger(MemoryManager memoryManager,
{
this(memoryManager, memoryManager.allocatePages(parentTask, memoryManager.computeNumberOfPages(memoryFraction)),
ioManager, input, parentTask, serializerFactory, comparator,
numSortBuffers, maxNumFileHandles, startSpillingFraction, noSpillingMemory, true,
numSortBuffers, maxNumFileHandles, startSpillingFraction, noSpillingMemory, handleLargeRecords,
objectReuseEnabled);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,11 @@ public class TaskConfig implements Serializable {
private static final String SORT_SPILLING_THRESHOLD_DRIVER = "sort-spill-threshold.driver";

private static final String SORT_SPILLING_THRESHOLD_INPUT_PREFIX = "sort-spill-threshold.input.";


private static final String USE_LARGE_RECORD_HANDLER = "sort-spill.large-record-handler";

private static final boolean USE_LARGE_RECORD_HANDLER_DEFAULT = false;

// ----------------------------------- Iterations ---------------------------------------------

private static final String NUMBER_OF_ITERATIONS = "iterative.num-iterations";
Expand Down Expand Up @@ -692,6 +696,14 @@ public void setSpillingThresholdInput(int inputNum, float threshold) {
public float getSpillingThresholdInput(int inputNum) {
return this.config.getFloat(SORT_SPILLING_THRESHOLD_INPUT_PREFIX + inputNum, 0.7f);
}

public void setUseLargeRecordHandler(boolean useLargeRecordHandler) {
this.config.setBoolean(USE_LARGE_RECORD_HANDLER, useLargeRecordHandler);
}

public boolean getUseLargeRecordHandler() {
return this.config.getBoolean(USE_LARGE_RECORD_HANDLER, USE_LARGE_RECORD_HANDLER_DEFAULT);
}

// --------------------------------------------------------------------------------------------
// Parameters for Function Chaining
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ public void testSingleLevelMergeCombiningReduceTask()
getMemoryManager(), getIOManager(), new UniformRecordGenerator(keyCnt, valCnt, false),
getOwningNepheleTask(), RecordSerializerFactory.get(), this.comparator.duplicate(),
this.perSortFractionMem,
2, 0.8f, true);
2, 0.8f, true /* use large record handler */, true);
addInput(sorter.getIterator());

GroupReduceDriver<Record, Record> testTask = new GroupReduceDriver<>();
Expand Down Expand Up @@ -182,7 +182,7 @@ public void testMultiLevelMergeCombiningReduceTask() {
getMemoryManager(), getIOManager(), new UniformRecordGenerator(keyCnt, valCnt, false),
getOwningNepheleTask(), RecordSerializerFactory.get(), this.comparator.duplicate(),
this.perSortFractionMem,
2, 0.8f, false);
2, 0.8f, true /* use large record handler */, false);
addInput(sorter.getIterator());

GroupReduceDriver<Record, Record> testTask = new GroupReduceDriver<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ public void testCombiningReduceTask() {
sorter = new CombiningUnilateralSortMerger<>(new MockCombiningReduceStub(),
getMemoryManager(), getIOManager(), new UniformRecordGenerator(keyCnt, valCnt, false),
getOwningNepheleTask(), RecordSerializerFactory.get(), this.comparator.duplicate(), this.perSortFractionMem,
4, 0.8f, true);
4, 0.8f, true /* use large record handler */, true);
addInput(sorter.getIterator());

GroupReduceDriver<Record, Record> testTask = new GroupReduceDriver<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ public void testCombine() throws Exception

Sorter<Tuple2<Integer, Integer>> merger = new CombiningUnilateralSortMerger<>(comb,
this.memoryManager, this.ioManager, reader, this.parentTask, this.serializerFactory2, this.comparator2,
0.25, 64, 0.7f, false);
0.25, 64, 0.7f, true /* use large record handler */, false);

final Tuple2<Integer, Integer> rec = new Tuple2<>();
rec.setField(1, 1);
Expand Down Expand Up @@ -156,7 +156,7 @@ public void testCombineSpilling() throws Exception {

Sorter<Tuple2<Integer, Integer>> merger = new CombiningUnilateralSortMerger<>(comb,
this.memoryManager, this.ioManager, reader, this.parentTask, this.serializerFactory2, this.comparator2,
0.01, 64, 0.005f, true);
0.01, 64, 0.005f, true /* use large record handler */, true);

final Tuple2<Integer, Integer> rec = new Tuple2<>();
rec.setField(1, 1);
Expand Down Expand Up @@ -203,7 +203,7 @@ public void testSortAndValidate() throws Exception

Sorter<Tuple2<Integer, String>> merger = new CombiningUnilateralSortMerger<>(comb,
this.memoryManager, this.ioManager, reader, this.parentTask, this.serializerFactory1, this.comparator1,
0.25, 2, 0.7f, false);
0.25, 2, 0.7f, true /* use large record handler */, false);

// emit data
LOG.debug("emitting data");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ public void testInMemorySort() {

Sorter<Tuple2<Integer, String>> merger = new UnilateralSortMerger<>(this.memoryManager, this.ioManager,
source, this.parentTask, this.pactRecordSerializer, this.pactRecordComparator,
(double)64/78, 2, 0.9f, true);
(double)64/78, 2, 0.9f, true /*use large record handler*/, true);

// emit data
LOG.debug("Reading and sorting data...");
Expand Down Expand Up @@ -163,7 +163,7 @@ public void testInMemorySortUsing10Buffers() {

Sorter<Tuple2<Integer, String>> merger = new UnilateralSortMerger<>(this.memoryManager, this.ioManager,
source, this.parentTask, this.pactRecordSerializer, this.pactRecordComparator,
(double)64/78, 10, 2, 0.9f, false);
(double)64/78, 10, 2, 0.9f, true /*use large record handler*/, false);

// emit data
LOG.debug("Reading and sorting data...");
Expand Down Expand Up @@ -212,7 +212,7 @@ public void testSpillingSort() {

Sorter<Tuple2<Integer, String>> merger = new UnilateralSortMerger<>(this.memoryManager, this.ioManager,
source, this.parentTask, this.pactRecordSerializer, this.pactRecordComparator,
(double)16/78, 64, 0.7f, true);
(double)16/78, 64, 0.7f, true /*use large record handler*/, true);

// emit data
LOG.debug("Reading and sorting data...");
Expand Down Expand Up @@ -264,7 +264,7 @@ public void testSpillingSortWithIntermediateMerge() {

Sorter<Tuple2<Integer, String>> merger = new UnilateralSortMerger<>(this.memoryManager, this.ioManager,
source, this.parentTask, this.pactRecordSerializer, this.pactRecordComparator,
(double)64/78, 16, 0.7f, false);
(double)64/78, 16, 0.7f, true /*use large record handler*/, false);

// emit data
LOG.debug("Emitting data...");
Expand Down Expand Up @@ -321,7 +321,8 @@ public void testSpillingSortWithIntermediateMergeIntPair() {
LOG.debug("Initializing sortmerger...");

Sorter<IntPair> merger = new UnilateralSortMerger<IntPair>(this.memoryManager, this.ioManager,
generator, this.parentTask, serializerFactory, comparator, (double)64/78, 4, 0.7f, true);
generator, this.parentTask, serializerFactory, comparator, (double)64/78, 4, 0.7f,
true /*use large record handler*/, true);

// emit data
LOG.debug("Emitting data...");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ public Tuple2<Long, SomeMaybeLongValue> next() {
this.memoryManager, this.ioManager,
source, this.parentTask,
new RuntimeSerializerFactory<Tuple2<Long, SomeMaybeLongValue>>(serializer, (Class<Tuple2<Long, SomeMaybeLongValue>>) (Class<?>) Tuple2.class),
comparator, 1.0, 1, 128, 0.7f, false);
comparator, 1.0, 1, 128, 0.7f, true /* use large record handler */ , false);

// check order
MutableObjectIterator<Tuple2<Long, SomeMaybeLongValue>> iterator = sorter.getIterator();
Expand Down Expand Up @@ -198,7 +198,7 @@ public Tuple2<Long, SomeMaybeLongValue> next() {
this.memoryManager, this.ioManager,
source, this.parentTask,
new RuntimeSerializerFactory<Tuple2<Long, SomeMaybeLongValue>>(serializer, (Class<Tuple2<Long, SomeMaybeLongValue>>) (Class<?>) Tuple2.class),
comparator, 1.0, 1, 128, 0.7f, true);
comparator, 1.0, 1, 128, 0.7f, true /*use large record handler*/, true);

// check order
MutableObjectIterator<Tuple2<Long, SomeMaybeLongValue>> iterator = sorter.getIterator();
Expand Down Expand Up @@ -283,7 +283,7 @@ else if (num % MEDIUM_REC_INTERVAL == 0) {
this.memoryManager, this.ioManager,
source, this.parentTask,
new RuntimeSerializerFactory<Tuple2<Long, SmallOrMediumOrLargeValue>>(serializer, (Class<Tuple2<Long, SmallOrMediumOrLargeValue>>) (Class<?>) Tuple2.class),
comparator, 1.0, 1, 128, 0.7f, false);
comparator, 1.0, 1, 128, 0.7f, true /*use large record handler*/, false);

// check order
MutableObjectIterator<Tuple2<Long, SmallOrMediumOrLargeValue>> iterator = sorter.getIterator();
Expand Down Expand Up @@ -354,7 +354,7 @@ public Tuple2<Long, SmallOrMediumOrLargeValue> next() {
this.memoryManager, this.ioManager,
source, this.parentTask,
new RuntimeSerializerFactory<Tuple2<Long, SmallOrMediumOrLargeValue>>(serializer, (Class<Tuple2<Long, SmallOrMediumOrLargeValue>>) (Class<?>) Tuple2.class),
comparator, 1.0, 1, 128, 0.7f, true);
comparator, 1.0, 1, 128, 0.7f, true /*use large record handler*/, true);

// check order
MutableObjectIterator<Tuple2<Long, SmallOrMediumOrLargeValue>> iterator = sorter.getIterator();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,7 @@ public void addInputSorted(MutableObjectIterator<IN> input, TypeSerializer<IN> s
this.perSortFractionMem,
32,
0.8f,
true /*use large record handler*/,
false
);
this.sorters.add(sorter);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ public void addInput(MutableObjectIterator<Record> input) {
public void addInputSorted(MutableObjectIterator<Record> input, RecordComparator comp) throws Exception {
UnilateralSortMerger<Record> sorter = new UnilateralSortMerger<Record>(
this.memManager, this.ioManager, input, this.owner, RecordSerializerFactory.get(), comp,
this.perSortFractionMem, 32, 0.8f, true);
this.perSortFractionMem, 32, 0.8f, true /*use large record handler*/, true);
this.sorters.add(sorter);
this.inputs.add(null);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,9 @@ public void addInputSorted(MutableObjectIterator<IN> input,
this.memManager, this.ioManager, input, this.owner,
this.<IN>getInputSerializer(0),
comp,
this.perSortFractionMem, 32, 0.8f, false);
this.perSortFractionMem, 32, 0.8f,
true /*use large record handler*/,
false);
}

public void addDriverComparator(TypeComparator<IN> comparator) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,11 +130,13 @@ public void testSortBothMerge() {

final UnilateralSortMerger<Tuple2<Integer, String>> sorter1 = new UnilateralSortMerger<>(
this.memoryManager, this.ioManager, input1, this.parentTask, this.serializer1,
this.comparator1.duplicate(), (double)MEMORY_FOR_SORTER/MEMORY_SIZE, 128, 0.8f, true);
this.comparator1.duplicate(), (double)MEMORY_FOR_SORTER/MEMORY_SIZE, 128, 0.8f,
true /*use large record handler*/, true);

final UnilateralSortMerger<Tuple2<Integer, String>> sorter2 = new UnilateralSortMerger<>(
this.memoryManager, this.ioManager, input2, this.parentTask, this.serializer2,
this.comparator2.duplicate(), (double)MEMORY_FOR_SORTER/MEMORY_SIZE, 128, 0.8f, true);
this.comparator2.duplicate(), (double)MEMORY_FOR_SORTER/MEMORY_SIZE, 128, 0.8f,
true /*use large record handler*/, true);

final MutableObjectIterator<Tuple2<Integer, String>> sortedInput1 = sorter1.getIterator();
final MutableObjectIterator<Tuple2<Integer, String>> sortedInput2 = sorter2.getIterator();
Expand Down
Loading

0 comments on commit bce3a68

Please sign in to comment.