Skip to content

Commit

Permalink
[FLINK-17547][task] Implement getUnconsumedSegment for spilled buffers
Browse files Browse the repository at this point in the history
  • Loading branch information
rkhachatryan authored and pnowojski committed May 19, 2020
1 parent 5415574 commit 2aacb62
Show file tree
Hide file tree
Showing 7 changed files with 448 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@

import java.nio.ByteBuffer;

import static org.apache.flink.util.Preconditions.checkArgument;

/**
* A factory for (hybrid) memory segments ({@link HybridMemorySegment}).
*
Expand All @@ -52,6 +54,31 @@ public static MemorySegment wrap(byte[] buffer) {
return new HybridMemorySegment(buffer, null);
}

/**
* Copies the given heap memory region and creates a new memory segment wrapping it.
*
* @param bytes The heap memory region.
* @param start starting position, inclusive
* @param end end position, exclusive
* @return A new memory segment that targets a copy of the given heap memory region.
* @throws IllegalArgumentException if start > end or end > bytes.length
*/
public static MemorySegment wrapCopy(byte[] bytes, int start, int end) throws IllegalArgumentException {
checkArgument(end >= start);
checkArgument(end <= bytes.length);
MemorySegment copy = allocateUnpooledSegment(end - start);
copy.put(0, bytes, start, copy.size());
return copy;
}

/**
* Wraps the four bytes representing the given number with a {@link MemorySegment}.
* @see ByteBuffer#putInt(int)
*/
public static MemorySegment wrapInt(int value) {
return wrap(ByteBuffer.allocate(Integer.BYTES).putInt(value).array());
}

/**
* Allocates some unpooled memory and creates a new memory segment that represents
* that memory.
Expand Down Expand Up @@ -161,5 +188,4 @@ public static MemorySegment allocateOffHeapUnsafeMemory(int size, Object owner,
public static MemorySegment wrapOffHeapMemory(ByteBuffer memory) {
return new HybridMemorySegment(memory, null);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,9 @@
import java.util.Collections;
import java.util.Deque;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Queue;
import java.util.function.Consumer;

import static java.util.Arrays.asList;
Expand Down Expand Up @@ -80,6 +82,36 @@ public void close() throws Exception {
};
}

static <T> CloseableIterator<T> flatten(CloseableIterator<T>... iterators) {
return new CloseableIterator<T>() {
private final Queue<CloseableIterator<T>> queue = removeEmptyHead(new LinkedList<>(asList(iterators)));

private Queue<CloseableIterator<T>> removeEmptyHead(Queue<CloseableIterator<T>> queue) {
while (!queue.isEmpty() && !queue.peek().hasNext()) {
queue.poll();
}
return queue;
}

@Override
public boolean hasNext() {
removeEmptyHead(queue);
return !queue.isEmpty();
}

@Override
public T next() {
removeEmptyHead(queue);
return queue.peek().next();
}

@Override
public void close() throws Exception {
IOUtils.closeAll(iterators);
}
};
}

@SuppressWarnings("unchecked")
static <T> CloseableIterator<T> empty() {
return (CloseableIterator<T>) EMPTY_INSTANCE;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.core.memory;

import org.junit.Test;

import static java.lang.System.arraycopy;
import static org.junit.Assert.assertArrayEquals;

/**
* {@link MemorySegmentFactory} test.
*/
public class MemorySegmentFactoryTest {

@Test
public void testWrapCopyChangingData() {
byte[] data = {1, 2, 3, 4, 5};
byte[] changingData = new byte[data.length];
arraycopy(data, 0, changingData, 0, data.length);
MemorySegment segment = MemorySegmentFactory.wrapCopy(changingData, 0, changingData.length);
changingData[0]++;
assertArrayEquals(data, segment.heapMemory);
}

@Test
public void testWrapPartialCopy() {
byte[] data = {1, 2, 3, 5, 6};
MemorySegment segment = MemorySegmentFactory.wrapCopy(data, 0, data.length / 2);
byte[] exp = new byte[segment.size()];
arraycopy(data, 0, exp, 0, exp.length);
assertArrayEquals(exp, segment.heapMemory);
}

@Test
public void testWrapCopyEmpty() {
MemorySegmentFactory.wrapCopy(new byte[0], 0, 0);
}

@Test(expected = IllegalArgumentException.class)
public void testWrapCopyWrongStart() {
MemorySegmentFactory.wrapCopy(new byte[]{1, 2, 3}, 10, 3);
}

@Test(expected = IllegalArgumentException.class)
public void testWrapCopyWrongEnd() {
MemorySegmentFactory.wrapCopy(new byte[]{1, 2, 3}, 0, 10);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.util;

import org.junit.Test;

import java.util.ArrayList;
import java.util.List;

import static java.util.Arrays.asList;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertFalse;

/**
* {@link CloseableIterator} test.
*/
@SuppressWarnings("unchecked")
public class CloseableIteratorTest {

private static final String[] ELEMENTS = new String[]{"flink", "blink"};

@Test
public void testFlattenEmpty() throws Exception {
List<CloseableIterator<?>> iterators = asList(
CloseableIterator.flatten(),
CloseableIterator.flatten(CloseableIterator.empty()),
CloseableIterator.flatten(CloseableIterator.flatten()));
for (CloseableIterator<?> i : iterators) {
assertFalse(i.hasNext());
i.close();
}
}

@Test
public void testFlattenIteration() {
CloseableIterator<String> iterator = CloseableIterator.flatten(
CloseableIterator.ofElement(ELEMENTS[0], unused -> {
}),
CloseableIterator.ofElement(ELEMENTS[1], unused -> {
})
);

List<String> iterated = new ArrayList<>();
iterator.forEachRemaining(iterated::add);
assertArrayEquals(ELEMENTS, iterated.toArray());
}

@Test(expected = TestException.class)
public void testFlattenErrorHandling() throws Exception {
List<String> closed = new ArrayList<>();
CloseableIterator<String> iterator = CloseableIterator.flatten(
CloseableIterator.ofElement(ELEMENTS[0], e -> {
closed.add(e);
throw new TestException();
}),
CloseableIterator.ofElement(ELEMENTS[1], closed::add)
);
try {
iterator.close();
} finally {
assertArrayEquals(ELEMENTS, closed.toArray());
}
}

private static class TestException extends RuntimeException {
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.runtime.io.disk;

import org.apache.flink.annotation.Internal;
import org.apache.flink.core.fs.RefCountedFile;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
import org.apache.flink.runtime.io.network.buffer.NetworkBuffer;
import org.apache.flink.util.CloseableIterator;

import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;

import static org.apache.flink.core.memory.MemorySegmentFactory.wrap;
import static org.apache.flink.runtime.io.network.buffer.Buffer.DataType.DATA_BUFFER;
import static org.apache.flink.util.IOUtils.closeAll;
import static org.apache.flink.util.Preconditions.checkArgument;
import static org.apache.flink.util.Preconditions.checkNotNull;
import static org.apache.flink.util.Preconditions.checkState;

/**
* {@link CloseableIterator} of {@link Buffer buffers} over file content.
*/
@Internal
public class FileBasedBufferIterator implements CloseableIterator<Buffer> {

private final RefCountedFile file;
private final FileInputStream stream;
private final int bufferSize;

private int offset;
private int bytesToRead;

public FileBasedBufferIterator(RefCountedFile file, int bytesToRead, int bufferSize) throws FileNotFoundException {
checkNotNull(file);
checkArgument(bytesToRead >= 0);
checkArgument(bufferSize > 0);
this.stream = new FileInputStream(file.getFile());
this.file = file;
this.bufferSize = bufferSize;
this.bytesToRead = bytesToRead;
file.retain();
}

@Override
public boolean hasNext() {
return bytesToRead > 0;
}

@Override
public Buffer next() {
byte[] buffer = new byte[bufferSize];
int bytesRead = read(buffer);
checkState(bytesRead >= 0, "unexpected end of file, file = " + file.getFile() + ", offset=" + offset);
offset += bytesRead;
bytesToRead -= bytesRead;
return new NetworkBuffer(wrap(buffer), FreeingBufferRecycler.INSTANCE, DATA_BUFFER, bytesRead);
}

private int read(byte[] buffer) {
int limit = Math.min(buffer.length, bytesToRead);
try {
return stream.read(buffer, offset, limit);
} catch (IOException e) {
throw new RuntimeException(e);
}
}

@Override
public void close() throws Exception {
closeAll(stream, file::release);
}
}
Loading

0 comments on commit 2aacb62

Please sign in to comment.