Skip to content

Commit

Permalink
Merge branch 'master' into FSA_codegen
Browse files Browse the repository at this point in the history
Conflicts:
	redelm-column/src/main/java/redelm/io/MessageColumnIO.java
	redelm-column/src/main/java/redelm/io/RecordReader.java
	redelm-column/src/main/java/redelm/schema/PrimitiveType.java
	redelm-column/src/test/java/redelm/io/PerfTest.java
	redelm-column/src/test/java/redelm/io/TestColumnIO.java
	redelm-pig/src/test/java/redelm/hadoop/TestInputFormat.java
	redelm-pig/src/test/java/redelm/pig/TupleConsumerPerfTest.java
  • Loading branch information
julienledem committed Feb 20, 2013
2 parents b89750b + 08df187 commit 951cd2f
Show file tree
Hide file tree
Showing 120 changed files with 6,902 additions and 2,134 deletions.
12 changes: 12 additions & 0 deletions redelm-column/src/main/java/redelm/Log.java
Original file line number Diff line number Diff line change
Expand Up @@ -146,4 +146,16 @@ public void warn(Object m, Throwable t) {
logger.log(Level.WARNING, String.valueOf(m), t);
}

public void error(Object m) {
if (m instanceof Throwable) {
logger.log(Level.SEVERE, "", (Throwable)m);
} else {
logger.warning(String.valueOf(m));
}
}

public void error(Object m, Throwable t) {
logger.log(Level.SEVERE, String.valueOf(m), t);
}

}
219 changes: 219 additions & 0 deletions redelm-column/src/main/java/redelm/bytes/BytesInput.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,219 @@
/**
* Copyright 2012 Twitter, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package redelm.bytes;

import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;

import redelm.Log;

/**
*
* A source of bytes capable of writing itself to an output
*
* @author Julien Le Dem
*
*/
abstract public class BytesInput {
private static final Log LOG = Log.getLog(BytesInput.class);
private static final boolean DEBUG = false;//Log.DEBUG;
private static final EmptyBytesInput EMPTY_BYTES_INPUT = new EmptyBytesInput();

public static BytesInput fromSequence(BytesInput... inputs) {
return new SequenceBytesIn(inputs);
}

public static BytesInput from(InputStream in, int bytes) {
return new StreamBytesInput(in, bytes);
}

public static BytesInput from(byte[] in) {
if (DEBUG) LOG.debug("BytesInput from array of " + in.length + " bytes");
return new ByteArrayBytesInput(in);
}

public static BytesInput fromInt(int intValue) {
return new IntBytesInput(intValue);
}

public static BytesInput from(ByteArrayOutputStream arrayOut) {
return new BAOSBytesInput(arrayOut);
}

public static BytesInput empty() {
return EMPTY_BYTES_INPUT;
}

public static BytesInput copy(BytesInput bytesInput) throws IOException {
return from(bytesInput.toByteArray());
}

abstract public void writeAllTo(OutputStream out) throws IOException;

public byte[] toByteArray() throws IOException {
BAOS baos = new BAOS((int)size());
this.writeAllTo(baos);
if (DEBUG) LOG.debug("converted " + size() + " to byteArray of " + baos.size() + " bytes");
return baos.getBuf();
}

abstract public long size();

private static final class BAOS extends ByteArrayOutputStream {
private BAOS(int size) {
super(size);
}

public byte[] getBuf() {
return this.buf;
}
}

private static class StreamBytesInput extends BytesInput {
private static final Log LOG = Log.getLog(BytesInput.StreamBytesInput.class);
private final InputStream in;
private final int byteCount;

private StreamBytesInput(InputStream in, int byteCount) {
super();
this.in = in;
this.byteCount = byteCount;
}

@Override
public void writeAllTo(OutputStream out) throws IOException {
if (DEBUG) LOG.debug("write All "+ byteCount + " bytes");
// TODO: more efficient
byte[] buf = new byte[byteCount];
new DataInputStream(in).readFully(buf);
out.write(buf);
}

@Override
public long size() {
return byteCount;
}

}

private static class SequenceBytesIn extends BytesInput {
private static final Log LOG = Log.getLog(BytesInput.SequenceBytesIn.class);

public final BytesInput[] inputs;
private final long size;

private SequenceBytesIn(BytesInput[] inputs) {
this.inputs = inputs;
long total = 0;
for (BytesInput input : inputs) {
total += input.size();
}
this.size = total;
}

@Override
public void writeAllTo(OutputStream out) throws IOException {
for (BytesInput input : inputs) {
if (DEBUG) LOG.debug("write " + input.size() + " bytes to out");
if (DEBUG && input instanceof SequenceBytesIn) LOG.debug("{");
input.writeAllTo(out);
if (DEBUG && input instanceof SequenceBytesIn) LOG.debug("}");
}
}

@Override
public long size() {
return size;
}

}

private static class IntBytesInput extends BytesInput {

private final int intValue;

public IntBytesInput(int intValue) {
this.intValue = intValue;
}

@Override
public void writeAllTo(OutputStream out) throws IOException {
BytesUtils.writeIntBigEndian(out, intValue);
}

@Override
public long size() {
return 4;
}

}

private static class EmptyBytesInput extends BytesInput {

@Override
public void writeAllTo(OutputStream out) throws IOException {
}

@Override
public long size() {
return 0;
}

}

private static class BAOSBytesInput extends BytesInput {

private final ByteArrayOutputStream arrayOut;

private BAOSBytesInput(ByteArrayOutputStream arrayOut) {
this.arrayOut = arrayOut;
}

@Override
public void writeAllTo(OutputStream out) throws IOException {
arrayOut.writeTo(out);
}

@Override
public long size() {
return arrayOut.size();
}

}

private static class ByteArrayBytesInput extends BytesInput {

private final byte[] in;

private ByteArrayBytesInput(byte[] in) {
this.in = in;
}

@Override
public void writeAllTo(OutputStream out) throws IOException {
out.write(in);
}

@Override
public long size() {
return in.length;
}

}
}
87 changes: 87 additions & 0 deletions redelm-column/src/main/java/redelm/bytes/BytesUtils.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
/**
* Copyright 2012 Twitter, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package redelm.bytes;

import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.charset.Charset;

import redelm.Log;

public class BytesUtils {
private static final Log LOG = Log.getLog(BytesUtils.class);

public static final Charset UTF8 = Charset.forName("UTF-8");

public static int getWidthFromMaxInt(int bound) {
return (int)Math.ceil(Math.log(bound + 1)/Math.log(2));
}

public static int readIntBigEndian(byte[] in, int offset) throws IOException {
int ch1 = in[offset] & 0xff;
int ch2 = in[offset + 1] & 0xff;
int ch3 = in[offset + 2] & 0xff;
int ch4 = in[offset + 3] & 0xff;
return ((ch1 << 24) + (ch2 << 16) + (ch3 << 8) + (ch4 << 0));
}

public static int readIntLittleEndian(InputStream in) throws IOException {
int ch1 = in.read();
int ch2 = in.read();
int ch3 = in.read();
int ch4 = in.read();
if ((ch1 | ch2 | ch3 | ch4) < 0)
throw new EOFException();
if (Log.DEBUG) LOG.debug("read le int: " + ch1 + " " + ch2 + " " + ch3 + " " + ch4 + " => " + ((ch4 << 24) + (ch3 << 16) + (ch2 << 8) + (ch1 << 0)));
return ((ch4 << 24) + (ch3 << 16) + (ch2 << 8) + (ch1 << 0));
}

public static void writeIntBigEndian(OutputStream out, int v) throws IOException {
out.write((v >>> 24) & 0xFF);
out.write((v >>> 16) & 0xFF);
out.write((v >>> 8) & 0xFF);
out.write((v >>> 0) & 0xFF);
}

public static void writeIntLittleEndian(OutputStream out, int v) throws IOException {
out.write((v >>> 0) & 0xFF);
out.write((v >>> 8) & 0xFF);
out.write((v >>> 16) & 0xFF);
out.write((v >>> 24) & 0xFF);
if (Log.DEBUG) LOG.debug("write le int: " + v + " => "+ ((v >>> 0) & 0xFF) + " " + ((v >>> 8) & 0xFF) + " " + ((v >>> 16) & 0xFF) + " " + ((v >>> 24) & 0xFF));
}

public static int readUnsignedVarInt(InputStream in) throws IOException {
int value = 0;
int i = 0;
int b;
while (((b = in.read()) & 0x80) != 0) {
value |= (b & 0x7F) << i;
i += 7;
}
return value | (b << i);
}

public static void writeUnsignedVarInt(int value, OutputStream out) throws IOException {
while ((value & 0xFFFFFF80) != 0L) {
out.write((value & 0x7F) | 0x80);
value >>>= 7;
}
out.write(value & 0x7F);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,18 +13,24 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package redelm.column;
package redelm.bytes;

import java.io.ByteArrayOutputStream;
import java.io.DataOutput;
import java.io.IOException;

public class RedelmByteArrayOutputStream extends ByteArrayOutputStream {
public RedelmByteArrayOutputStream(int initialSize) {
/**
* expose the memory used by a ByteArrayOutputStream
*
* @author Julien Le Dem
*
*/
public class CapacityByteArrayOutputStream extends ByteArrayOutputStream {

public CapacityByteArrayOutputStream(int initialSize) {
super(initialSize);
}

public void writeTo(DataOutput out) throws IOException {
out.write(buf, 0, count);
public int getCapacity() {
return buf.length;
}

}
Loading

0 comments on commit 951cd2f

Please sign in to comment.