Skip to content

Commit

Permalink
[FLINK-3171] [misc] Consolidate zoo of wrapper classes for input/outp…
Browse files Browse the repository at this point in the history
…ut-stream to data-input/output-view.
  • Loading branch information
StephanEwen committed Dec 14, 2015
1 parent 066913e commit d9a061c
Show file tree
Hide file tree
Showing 31 changed files with 355 additions and 1,080 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,11 @@
package org.apache.flink.api.common.accumulators;

import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.core.memory.InputViewDataInputStreamWrapper;
import org.apache.flink.core.memory.OutputViewDataOutputStreamWrapper;
import org.apache.flink.core.memory.DataInputViewStreamWrapper;
import org.apache.flink.core.memory.DataOutputViewStreamWrapper;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
Expand Down Expand Up @@ -53,9 +51,7 @@ public void add(T value) {
public void add(T value, TypeSerializer<T> serializer) throws IOException {
try {
ByteArrayOutputStream outStream = new ByteArrayOutputStream();
OutputViewDataOutputStreamWrapper out =
new OutputViewDataOutputStreamWrapper(new DataOutputStream(outStream));

DataOutputViewStreamWrapper out = new DataOutputViewStreamWrapper(outStream);
serializer.serialize(value, out);
localValue.add(outStream.toByteArray());
}
Expand Down Expand Up @@ -93,7 +89,7 @@ public static <T> List<T> deserializeList(ArrayList<byte[]> data, TypeSerializer
List<T> result = new ArrayList<T>(data.size());
for (byte[] bytes : data) {
ByteArrayInputStream inStream = new ByteArrayInputStream(bytes);
InputViewDataInputStreamWrapper in = new InputViewDataInputStreamWrapper(new DataInputStream(inStream));
DataInputViewStreamWrapper in = new DataInputViewStreamWrapper(inStream);
T val = serializer.deserialize(in);
result.add(val);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,6 @@

package org.apache.flink.api.common.io;

import java.io.DataInputStream;
import java.io.FilterInputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.flink.api.common.io.statistics.BaseStatistics;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.BlockLocation;
Expand All @@ -36,9 +27,18 @@
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.InputViewDataInputStreamWrapper;
import org.apache.flink.core.memory.DataInputViewStreamWrapper;
import org.apache.flink.util.StringUtils;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.FilterInputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;

/**
* Base class for all input formats that use blocks of fixed size. The input splits are aligned to these blocks. Without
* configuration, these block sizes equal the native block sizes of the HDFS.
Expand All @@ -63,9 +63,9 @@ public abstract class BinaryInputFormat<T> extends FileInputFormat<T> {
*/
private long blockSize = NATIVE_BLOCK_SIZE;

private DataInputStream dataInputStream;
private transient DataInputViewStreamWrapper dataInputStream;

private BlockInfo blockInfo;
private transient BlockInfo blockInfo;

private long readRecords;

Expand Down Expand Up @@ -116,7 +116,7 @@ public FileInputSplit[] createInputSplits(int minNumSplits) throws IOException {
}
}

return inputSplits.toArray(new FileInputSplit[0]);
return inputSplits.toArray(new FileInputSplit[inputSplits.size()]);
}

protected List<FileStatus> getFiles() throws IOException {
Expand Down Expand Up @@ -213,9 +213,8 @@ protected SequentialStatistics createStatistics(List<FileStatus> files, FileBase

FSDataInputStream fdis = file.getPath().getFileSystem().open(file.getPath(), blockInfo.getInfoSize());
fdis.seek(file.getLen() - blockInfo.getInfoSize());

DataInputStream input = new DataInputStream(fdis);
blockInfo.read(new InputViewDataInputStreamWrapper(input));

blockInfo.read(new DataInputViewStreamWrapper(fdis));
totalCount += blockInfo.getAccumulatedRecordCount();
}

Expand Down Expand Up @@ -250,13 +249,12 @@ public void open(FileInputSplit split) throws IOException {
if (this.splitLength > this.blockInfo.getInfoSize()) {
// TODO: seek not supported by compressed streams. Will throw exception
this.stream.seek(this.splitStart + this.splitLength - this.blockInfo.getInfoSize());
DataInputStream infoStream = new DataInputStream(this.stream);
this.blockInfo.read(new InputViewDataInputStreamWrapper(infoStream));
this.blockInfo.read(new DataInputViewStreamWrapper(this.stream));
}

this.stream.seek(this.splitStart + this.blockInfo.getFirstRecordStart());
BlockBasedInput blockBasedInput = new BlockBasedInput(this.stream, (int) blockSize);
this.dataInputStream = new DataInputStream(blockBasedInput);
this.dataInputStream = new DataInputViewStreamWrapper(blockBasedInput);
this.readRecords = 0;
}

Expand All @@ -271,7 +269,7 @@ public T nextRecord(T record) throws IOException {
return null;
}

record = this.deserialize(record, new InputViewDataInputStreamWrapper(this.dataInputStream));
record = this.deserialize(record, this.dataInputStream);
this.readRecords++;
return record;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,42 +18,46 @@

package org.apache.flink.api.common.io;

import java.io.DataOutputStream;
import java.io.FilterOutputStream;
import java.io.IOException;
import java.io.OutputStream;

import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.core.memory.OutputViewDataOutputStreamWrapper;
import org.apache.flink.core.memory.DataOutputViewStreamWrapper;


public abstract class BinaryOutputFormat<T> extends FileOutputFormat<T> {

private static final long serialVersionUID = 1L;

/**
* The config parameter which defines the fixed length of a record.
*/
/** The config parameter which defines the fixed length of a record. */
public static final String BLOCK_SIZE_PARAMETER_KEY = "output.block_size";

public static final long NATIVE_BLOCK_SIZE = Long.MIN_VALUE;

/**
* The block size to use.
*/
/** The block size to use. */
private long blockSize = NATIVE_BLOCK_SIZE;

private DataOutputStream dataOutputStream;

private BlockBasedOutput blockBasedInput;
private transient BlockBasedOutput blockBasedOutput;
private transient DataOutputViewStreamWrapper outView;


@Override
public void close() throws IOException {
this.dataOutputStream.close();
super.close();
try {
DataOutputViewStreamWrapper o = this.outView;
if (o != null) {
o.close();
}
}
finally {
super.close();
}
}

protected void complementBlockInfo(BlockInfo blockInfo) throws IOException {
}
protected void complementBlockInfo(BlockInfo blockInfo) {}

@Override
public void configure(Configuration parameters) {
Expand All @@ -80,16 +84,16 @@ public void open(int taskNumber, int numTasks) throws IOException {
final long blockSize = this.blockSize == NATIVE_BLOCK_SIZE ?
this.outputFilePath.getFileSystem().getDefaultBlockSize() : this.blockSize;

this.blockBasedInput = new BlockBasedOutput(this.stream, (int) blockSize);
this.dataOutputStream = new DataOutputStream(this.blockBasedInput);
this.blockBasedOutput = new BlockBasedOutput(this.stream, (int) blockSize);
this.outView = new DataOutputViewStreamWrapper(this.blockBasedOutput);
}

protected abstract void serialize(T record, DataOutputView dataOutput) throws IOException;

@Override
public void writeRecord(T record) throws IOException {
this.blockBasedInput.startRecord();
this.serialize(record, new OutputViewDataOutputStreamWrapper(this.dataOutputStream));
this.blockBasedOutput.startRecord();
this.serialize(record, outView);
}

/**
Expand All @@ -111,11 +115,11 @@ protected class BlockBasedOutput extends FilterOutputStream {

private BlockInfo blockInfo = BinaryOutputFormat.this.createBlockInfo();

private DataOutputStream headerStream;
private DataOutputView headerStream;

public BlockBasedOutput(OutputStream out, int blockSize) {
super(out);
this.headerStream = new DataOutputStream(out);
this.headerStream = new DataOutputViewStreamWrapper(out);
this.maxPayloadSize = blockSize - this.blockInfo.getInfoSize();
}

Expand Down Expand Up @@ -170,7 +174,7 @@ private void writeInfo() throws IOException {
this.blockInfo.setAccumulatedRecordCount(this.totalCount);
this.blockInfo.setFirstRecordStart(this.firstRecordStartPos == NO_RECORD ? 0 : this.firstRecordStartPos);
BinaryOutputFormat.this.complementBlockInfo(this.blockInfo);
this.blockInfo.write(new OutputViewDataOutputStreamWrapper(this.headerStream));
this.blockInfo.write(this.headerStream);
this.blockPos = 0;
this.blockCount = 0;
this.firstRecordStartPos = NO_RECORD;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@
import java.io.IOException;
import java.io.InputStream;

/**
* Utility class that turns an {@link InputStream} into a {@link DataInputView}.
*/
public class DataInputViewStreamWrapper extends DataInputStream implements DataInputView {

public DataInputViewStreamWrapper(InputStream in) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@
import java.io.IOException;
import java.io.OutputStream;

/**
* Utility class that turns an {@link OutputStream} into a {@link DataOutputView}.
*/
public class DataOutputViewStreamWrapper extends DataOutputStream implements DataOutputView {

private byte[] tempBuffer;
Expand Down

This file was deleted.

Loading

0 comments on commit d9a061c

Please sign in to comment.