Skip to content

Commit

Permalink
[FLINK-11775][table-runtime-blink] Use MemorySegmentWritable to Binar…
Browse files Browse the repository at this point in the history
…yRowSerializer

This closes apache#8775
  • Loading branch information
JingsongLi authored and KurtYoung committed Jul 4, 2019
1 parent 99a94ed commit f54f774
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -622,7 +622,7 @@ void insertIntoTable(long key, int hashCode, BinaryRow row) throws IOException {
if (row.getSegments().length == 1) {
buildSideWriteBuffer.write(row.getSegments()[0], row.getOffset(), sizeInBytes);
} else {
buildSideSerializer.serializeToPagesWithoutLength(row, buildSideWriteBuffer);
buildSideSerializer.serializeWithoutLengthSlow(row, buildSideWriteBuffer);
}
} else {
serializeToPages(row);
Expand All @@ -642,7 +642,7 @@ public void serializeToPages(BinaryRow row) throws IOException {
if (row.getSegments().length == 1) {
buildSideWriteBuffer.write(row.getSegments()[0], row.getOffset(), sizeInBytes);
} else {
buildSideSerializer.serializeToPagesWithoutLength(row, buildSideWriteBuffer);
buildSideSerializer.serializeWithoutLengthSlow(row, buildSideWriteBuffer);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.core.memory.MemorySegmentFactory;
import org.apache.flink.core.memory.MemorySegmentWritable;
import org.apache.flink.runtime.memory.AbstractPagedInputView;
import org.apache.flink.runtime.memory.AbstractPagedOutputView;
import org.apache.flink.table.dataformat.BinaryRow;
Expand Down Expand Up @@ -83,12 +84,13 @@ public int getLength() {
@Override
public void serialize(BinaryRow record, DataOutputView target) throws IOException {
target.writeInt(record.getSizeInBytes());
SegmentsUtil.copyToView(
record.getSegments(),
record.getOffset(),
record.getSizeInBytes(),
target
);
if (target instanceof MemorySegmentWritable) {
serializeWithoutLength(record, (MemorySegmentWritable) target);
} else {
SegmentsUtil.copyToView(
record.getSegments(), record.getOffset(),
record.getSizeInBytes(), target);
}
}

@Override
Expand Down Expand Up @@ -133,25 +135,23 @@ public int serializeToPages(
BinaryRow record,
AbstractPagedOutputView headerLessView) throws IOException {
checkArgument(headerLessView.getHeaderLength() == 0);
int sizeInBytes = record.getSizeInBytes();
int skip = checkSkipWriteForFixLengthPart(headerLessView);
headerLessView.writeInt(record.getSizeInBytes());
serializeWithoutLength(record, headerLessView);
return skip;
}

private static void serializeWithoutLength(
BinaryRow record, MemorySegmentWritable writable) throws IOException {
if (record.getSegments().length == 1) {
headerLessView.writeInt(sizeInBytes);
headerLessView.write(record.getSegments()[0], record.getOffset(), sizeInBytes);
writable.write(record.getSegments()[0], record.getOffset(), record.getSizeInBytes());
} else {
headerLessView.writeInt(record.getSizeInBytes());
serializeToPagesWithoutLength(record, headerLessView);
serializeWithoutLengthSlow(record, writable);
}
return skip;
}

/**
* Serialize row to pages without row length. The caller should make sure that the fixed-length
* parit can fit in output's current segment, no skip check will be done here.
*/
public void serializeToPagesWithoutLength(
BinaryRow record,
AbstractPagedOutputView out) throws IOException {
public static void serializeWithoutLengthSlow(
BinaryRow record, MemorySegmentWritable out) throws IOException {
int remainSize = record.getSizeInBytes();
int posInSegOfRecord = record.getOffset();
int segmentSize = record.getSegments()[0].size();
Expand Down

0 comments on commit f54f774

Please sign in to comment.