Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/master'
Browse files Browse the repository at this point in the history
Conflicts:
	parquet-column/src/main/java/parquet/column/primitive/DevNullColumnReader.java
	parquet-column/src/main/java/parquet/column/primitive/PrimitiveColumnWriter.java
	parquet-hadoop/src/main/java/parquet/format/converter/ParquetMetadataConverter.java
	parquet-hadoop/src/main/java/parquet/hadoop/ColumnData.java
	parquet-hadoop/src/main/java/parquet/hadoop/PageConsumer.java
	parquet-hadoop/src/test/java/parquet/hadoop/TestInputFormat.java
	parquet-hadoop/src/test/java/parquet/hadoop/TestReadIntTestFile.java
  • Loading branch information
toddlipcon committed Mar 6, 2013
2 parents 962dd9e + 76fd1d8 commit 7ea3721
Show file tree
Hide file tree
Showing 56 changed files with 439 additions and 438 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
*/
package parquet.column;

import parquet.io.Binary;

/**
* Reader for (repetition level, definition level, values) triplets.
* At any given point in time, a ColumnReader points to a single (r, d, v) triplet.
Expand Down Expand Up @@ -71,7 +73,7 @@ public interface ColumnReader {
/**
* @return the current value
*/
byte[] getBinary();
Binary getBinary();

/**
* @return the current value
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
*/
package parquet.column;

import parquet.io.Binary;

/**
* writer for (repetition level, definition level, values) triplets
*
Expand Down Expand Up @@ -53,7 +55,7 @@ public interface ColumnWriter {
* @param repetitionLevel
* @param definitionLevel
*/
void write(byte[] value, int repetitionLevel, int definitionLevel);
void write(Binary value, int repetitionLevel, int definitionLevel);

/**
* writes the current value
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import parquet.column.ColumnDescriptor;
import parquet.column.ColumnReadStore;
import parquet.column.ColumnReader;
import parquet.io.Binary;
import parquet.io.ParquetDecodingException;


Expand Down Expand Up @@ -105,14 +106,14 @@ public String getCurrentValueToString() throws IOException {
}

private static final class BINARYMemColumnReader extends MemColumnReader {
private byte[] current;
private Binary current;

public BINARYMemColumnReader(ColumnDescriptor path, PageReader pageReader) {
super(path, pageReader);
}

@Override
public byte[] getBinary() {
public Binary getBinary() {
checkValueRead();
return current;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import parquet.column.primitive.BoundedColumnFactory;
import parquet.column.primitive.PlainColumnReader;
import parquet.column.primitive.PrimitiveColumnReader;
import parquet.io.Binary;
import parquet.io.ParquetDecodingException;

/**
Expand Down Expand Up @@ -131,7 +132,7 @@ public long getLong() {
* @see parquet.column.ColumnReader#getBinary()
*/
@Override
public byte[] getBinary() {
public Binary getBinary() {
throw new UnsupportedOperationException();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import parquet.column.primitive.DataColumnWriter;
import parquet.column.primitive.PlainColumnWriter;
import parquet.column.primitive.PrimitiveColumnWriter;
import parquet.io.Binary;
import parquet.io.ParquetEncodingException;


Expand Down Expand Up @@ -110,7 +111,7 @@ public void write(float value, int repetitionLevel, int definitionLevel) {
}

@Override
public void write(byte[] value, int repetitionLevel, int definitionLevel) {
public void write(Binary value, int repetitionLevel, int definitionLevel) {
if (DEBUG) log(value, repetitionLevel, definitionLevel);
repetitionLevelColumn.writeInteger(repetitionLevel);
definitionLevelColumn.writeInteger(definitionLevel);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package parquet.column.primitive;

import java.io.IOException;
import parquet.io.Binary;

/**
* ColumnReader which does not read any actual data, but rather simply produces
Expand All @@ -28,7 +29,7 @@ public class DevNullColumnReader extends PrimitiveColumnReader {
private byte defaultByte = 0;
private float defaultFloat = 0f;
private double defaultDouble = 0.0;
private byte[] defaultBytes = new byte[0];
private Binary defaultBytes = Binary.EMPTY;

// TODO(julien): the setDefault* don't seem to be used anywhere. Can we kill them
// for now, so that this is truly DevNull instead of producing a constant stream of
Expand Down Expand Up @@ -57,7 +58,7 @@ public void setDefaultByte(byte defaultByte) {
this.defaultByte = defaultByte;
}

public void setDefaultBytes(byte[] defaultBytes) {
public void setDefaultBytes(Binary defaultBytes) {
this.defaultBytes = defaultBytes;
}

Expand All @@ -73,7 +74,7 @@ public float readFloat() {
return defaultFloat;
}

public byte[] readBytes() {
public Binary readBytes() {
return defaultBytes;
}

Expand All @@ -94,4 +95,4 @@ public int initFromPage(long valueCount, byte[] in, int offset) throws IOExcepti
return offset;
}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package parquet.column.primitive;

import parquet.bytes.BytesInput;
import parquet.io.Binary;

/**
* This is a special writer that doesn't write anything. The idea being that
Expand Down Expand Up @@ -45,7 +46,7 @@ public void writeBoolean(boolean v) {
}

@Override
public void writeBytes(byte[] v) {
public void writeBytes(Binary v) {
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

import parquet.Log;
import parquet.bytes.LittleEndianDataInputStream;
import parquet.io.Binary;
import parquet.io.ParquetDecodingException;

/**
Expand All @@ -45,11 +46,12 @@ public float readFloat() {
}

@Override
public byte[] readBytes() {
public Binary readBytes() {
try {
byte[] value = new byte[in.readInt()];
in.readFully(value);
return value;
// TODO: we don't need to read to an array.
return Binary.fromByteArray(value);
} catch (IOException e) {
throw new ParquetDecodingException("could not read bytes", e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import parquet.bytes.CapacityByteArrayOutputStream;
import parquet.bytes.LittleEndianDataOutputStream;
import parquet.column.Encoding;
import parquet.io.Binary;
import parquet.io.ParquetEncodingException;


Expand All @@ -46,10 +47,10 @@ public PlainColumnWriter(int initialSize) {
}

@Override
public final void writeBytes(byte[] v) {
public final void writeBytes(Binary v) {
try {
out.writeInt(v.length);
out.write(v);
out.writeInt(v.length());
v.writeTo(out);
} catch (IOException e) {
throw new ParquetEncodingException("could not write bytes", e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

import java.io.IOException;

import parquet.io.Binary;

/**
* Base class to implement an encoding for a given column type.
*
Expand Down Expand Up @@ -78,7 +80,7 @@ public float readFloat() {
/**
* @return the next boolean from the page
*/
public byte[] readBytes() {
public Binary readBytes() {
throw new UnsupportedOperationException();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
package parquet.column.primitive;

import parquet.bytes.BytesInput;
import parquet.column.Encoding;
import parquet.io.Binary;

/**
* base class to implement an encoding for a given column
Expand Down Expand Up @@ -66,7 +68,7 @@ public void writeBoolean(boolean v) {
/**
* @param value the value to encode
*/
public void writeBytes(byte[] v) {
public void writeBytes(Binary v) {
throw new UnsupportedOperationException();
}

Expand Down Expand Up @@ -98,4 +100,4 @@ public void writeFloat(float v) {
throw new UnsupportedOperationException();
}

}
}
9 changes: 5 additions & 4 deletions parquet-column/src/main/java/parquet/example/data/Group.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package parquet.example.data;

import parquet.Log;
import parquet.io.Binary;
import parquet.io.RecordConsumer;

abstract public class Group extends GroupValueSource {
Expand All @@ -38,7 +39,7 @@ public void add(String field, boolean value) {
add(getType().getFieldIndex(field), value);
}

public void add(String field, byte[] value) {
public void add(String field, Binary value) {
add(getType().getFieldIndex(field), value);
}

Expand All @@ -59,7 +60,7 @@ public Group getGroup(String field, int index) {

abstract public void add(int fieldIndex, boolean value);

abstract public void add(int fieldIndex, byte[] value);
abstract public void add(int fieldIndex, Binary value);

abstract public void add(int fieldIndex, float value);

Expand All @@ -84,7 +85,7 @@ public Group append(String fieldName, long value) {
}

public Group append(String fieldName, String value) {
add(fieldName, value.getBytes());
add(fieldName, Binary.fromString(value));
return this;
}

Expand All @@ -93,7 +94,7 @@ public Group append(String fieldName, boolean value) {
return this;
}

public Group append(String fieldName, byte[] value) {
public Group append(String fieldName, Binary value) {
add(fieldName, value);
return this;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.util.ArrayDeque;
import java.util.Deque;

import parquet.io.Binary;
import parquet.io.RecordMaterializer;


Expand Down Expand Up @@ -82,7 +83,7 @@ public void addBoolean(boolean value) {
}

@Override
public void addBinary(byte[] value) {
public void addBinary(Binary value) {
groups.peek().add(fields.peek(), value);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/
package parquet.example.data;

import parquet.io.Binary;
import parquet.schema.GroupType;

abstract public class GroupValueSource {
Expand All @@ -39,7 +40,7 @@ public boolean getBoolean(String field, int index) {
return getBoolean(getType().getFieldIndex(field), index);
}

public byte[] getBinary(String field, int index) {
public Binary getBinary(String field, int index) {
return getBinary(getType().getFieldIndex(field), index);
}

Expand All @@ -53,7 +54,7 @@ public byte[] getBinary(String field, int index) {

abstract public boolean getBoolean(int fieldIndex, int index);

abstract public byte[] getBinary(int fieldIndex, int index);
abstract public Binary getBinary(int fieldIndex, int index);

abstract public String getValueToString(int fieldIndex, int index);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,25 +16,26 @@
package parquet.example.data.simple;

import parquet.bytes.BytesUtils;
import parquet.io.Binary;
import parquet.io.RecordConsumer;


public class BinaryValue extends Primitive {

private final byte[] binary;
private final Binary binary;

public BinaryValue(byte[] binary) {
public BinaryValue(Binary binary) {
this.binary = binary;
}

@Override
public byte[] getBinary() {
public Binary getBinary() {
return binary;
}

@Override
public String getString() {
return new String(binary);
return binary.toStringUsingUTF8();
}

@Override
Expand All @@ -44,6 +45,6 @@ public void writeValue(RecordConsumer recordConsumer) {

@Override
public String toString() {
return new String(binary, BytesUtils.UTF8);
return getString();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/
package parquet.example.data.simple;

import parquet.io.Binary;
import parquet.io.RecordConsumer;

public abstract class Primitive {
Expand All @@ -35,7 +36,7 @@ public boolean getBoolean() {
throw new UnsupportedOperationException();
}

public byte[] getBinary() {
public Binary getBinary() {
throw new UnsupportedOperationException();
}

Expand Down
Loading

0 comments on commit 7ea3721

Please sign in to comment.