Skip to content

Commit

Permalink
Merge pull request yahoo#291 from yahoo/request-log-processor
Browse files Browse the repository at this point in the history
Add codecs and kafka serdes
  • Loading branch information
patelh authored Aug 8, 2018
2 parents 59c3747 + 6d06460 commit dd6bfe3
Show file tree
Hide file tree
Showing 28 changed files with 1,403 additions and 25 deletions.
47 changes: 47 additions & 0 deletions db/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -43,11 +43,28 @@
<groupId>org.fusesource.leveldbjni</groupId>
<artifactId>leveldbjni-all</artifactId>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</dependency>
<dependency>
<groupId>org.lz4</groupId>
<artifactId>lz4-java</artifactId>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
</dependencies>

<profiles>
Expand All @@ -68,6 +85,35 @@
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<executions>
<execution>
<id>scala-compile-first</id>
<phase>process-resources</phase>
<goals>
<goal>add-source</goal>
<goal>compile</goal>
</goals>
</execution>
<execution>
<id>scala-test-compile</id>
<phase>process-test-resources</phase>
<goals>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<executions>
<execution>
<phase>compile</phase>
<goals>
<goal>compile</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.scalatest</groupId>
Expand All @@ -81,6 +127,7 @@
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-release-plugin</artifactId>
</plugin>

</plugins>
</build>

Expand Down
53 changes: 53 additions & 0 deletions db/src/main/java/com/yahoo/maha/data/ByteBufferOutputStream.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
// Copyright 2017, Yahoo Holdings Inc.
// Licensed under the terms of the Apache License 2.0. Please see LICENSE file in project root for terms.
package com.yahoo.maha.data;

/**
* Created by shrav87 on 11/21/16.
*/

import java.io.IOException;
import java.io.OutputStream;
import java.nio.ByteBuffer;

class ByteBufferOutputStream extends OutputStream {
private ByteBuffer buffer = null;

public ByteBufferOutputStream() {
}

public void set(ByteBuffer buffer) {
this.buffer = buffer;
}

public void write(byte[] b) throws IOException {
write(b, 0, b.length);
}

public void write(byte[] b, int off, int len) throws IOException {
if (b == null) {
throw new NullPointerException();
}
if(len > this.buffer.capacity() - this.buffer.position()) {
throw new ByteBufferOutputStream.BufferOverflowException("ByteBuffer is not large enough.");
} else {
this.buffer.put(b, off, len);
}
}

public void write(int b) throws IOException {
if(1 > this.buffer.capacity() - this.buffer.position()) {
throw new ByteBufferOutputStream.BufferOverflowException("ByteBuffer is not large enough.");
} else {
this.buffer.put((byte)b);
}
}

public static class BufferOverflowException extends IOException {
private static final long serialVersionUID = 1L;

public BufferOverflowException(String message) {
super(message);
}
}
}
36 changes: 36 additions & 0 deletions db/src/main/java/com/yahoo/maha/data/Compressor.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
// Copyright 2017, Yahoo Holdings Inc.
// Licensed under the terms of the Apache License 2.0. Please see LICENSE file in project root for terms.
package com.yahoo.maha.data;

import java.io.IOException;
import java.nio.ByteBuffer;

/**
* Created by hiral on 8/28/14.
*/
public interface Compressor {
public static final String COMPRESSOR_CODEC_PROPERTY = "maha.compressor.codec";
void compressToBB(byte[] bytes, ByteBuffer outputBuffer) throws IOException;
default public int compress(byte[] bytes, byte[] outputBuffer) throws IOException {
return compress(bytes, 0, bytes.length, outputBuffer);
}
int compress(byte[] bytes, int offset, int len, byte[] outputBuffer) throws IOException;
void compressBB(ByteBuffer byteBuffer, ByteBuffer outputBuffer) throws IOException;
void decompressToBB(byte[] bytes, ByteBuffer outputBuffer) throws IOException;
int decompress(byte[] bytes, byte[] outputBuffer) throws IOException;
void decompressBB(ByteBuffer byteBuffer, ByteBuffer outputBuffer) throws IOException;

enum Codec {
GZIP(1,1), LZ4(2,2), LZ4HC(3,3), NONE(4,4);

private final int index;
private final int value;

private Codec(int index, int value) {
this.index = index;
this.value = value;
}
}

Codec codec();
}
22 changes: 22 additions & 0 deletions db/src/main/java/com/yahoo/maha/data/CompressorFactory.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
// Copyright 2017, Yahoo Holdings Inc.
// Licensed under the terms of the Apache License 2.0. Please see LICENSE file in project root for terms.
package com.yahoo.maha.data;

/**
* Created by hiral on 8/28/14.
*/
public class CompressorFactory {

public static Compressor getCompressor(Compressor.Codec codec) {
switch (codec) {
case GZIP:
return new GZIPCodec();
case LZ4:
return new Lz4Codec();
case LZ4HC:
return new Lz4HCCodec();
default:
return new PassThroughCodec();
}
}
}
13 changes: 13 additions & 0 deletions db/src/main/java/com/yahoo/maha/data/Decoder.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
// Copyright 2017, Yahoo Holdings Inc.
// Licensed under the terms of the Apache License 2.0. Please see LICENSE file in project root for terms.
package com.yahoo.maha.data;

import java.nio.ByteBuffer;

/**
* Created by hiral on 8/2/18.
*/
interface Decoder<T> {
T decode(byte[] bytes);
T decode(ByteBuffer byteBuffer);
}
14 changes: 14 additions & 0 deletions db/src/main/java/com/yahoo/maha/data/Encoder.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
// Copyright 2017, Yahoo Holdings Inc.
// Licensed under the terms of the Apache License 2.0. Please see LICENSE file in project root for terms.
package com.yahoo.maha.data;

import java.nio.ByteBuffer;

/**
* Created by hiral on 8/2/18.
*/
interface Encoder<T> {
int encode(T t, byte[] outputBuffer);
void encode(T t, ByteBuffer outputBuffer);
byte[] encode(T t);
}
21 changes: 21 additions & 0 deletions db/src/main/java/com/yahoo/maha/data/EventBatch.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
// Copyright 2017, Yahoo Holdings Inc.
// Licensed under the terms of the Apache License 2.0. Please see LICENSE file in project root for terms.
package com.yahoo.maha.data;

import java.util.Collection;
import java.util.List;

/**
* Created by hiral on 10/21/14.
*/
public interface EventBatch<T> {

List<T> getEvents();

interface EventBatchBuilder<T> {
EventBatchBuilder<T> add(T t);
EventBatchBuilder<T> addAll(Collection<T> collection);
int size();
EventBatch<T> build();
}
}
12 changes: 12 additions & 0 deletions db/src/main/java/com/yahoo/maha/data/EventBatcher.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
// Copyright 2017, Yahoo Holdings Inc.
// Licensed under the terms of the Apache License 2.0. Please see LICENSE file in project root for terms.
package com.yahoo.maha.data;

import java.io.Serializable;

/**
* Created by hiral on 10/21/14.
*/
public interface EventBatcher<T> extends Serializable {
EventBatch.EventBatchBuilder<T> newBuilder(int initSize);
}
93 changes: 93 additions & 0 deletions db/src/main/java/com/yahoo/maha/data/GZIPCodec.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
// Copyright 2017, Yahoo Holdings Inc.
// Licensed under the terms of the Apache License 2.0. Please see LICENSE file in project root for terms.
package com.yahoo.maha.data;

import com.google.common.base.Preconditions;

import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.zip.GZIPInputStream;
import java.util.zip.GZIPOutputStream;

/**
* Created by hiral on 8/28/14.
*/
public class GZIPCodec implements Compressor {
private final ThreadLocal<ByteBufferOutputStream> compressOutputStream =
ThreadLocal.withInitial(ByteBufferOutputStream::new);
private final ThreadLocal<ByteBufferOutputStream> compressToBBOutputStream =
ThreadLocal.withInitial(ByteBufferOutputStream::new);
private final ThreadLocal<ByteBufferOutputStream> compressBBOutputStream =
ThreadLocal.withInitial(ByteBufferOutputStream::new);

@Override
public void compressToBB(byte[] bytes, ByteBuffer outputBuffer) throws IOException {
int buffSize = outputBuffer.capacity() - outputBuffer.position();
Preconditions.checkArgument((bytes.length + 128) <= buffSize
, "Buffer must be at least 128 bytes greater than input size"
);
ByteBufferOutputStream byteBufferOutputStream = compressToBBOutputStream.get();
byteBufferOutputStream.set(outputBuffer);
GZIPOutputStream gzipOutputStream = new GZIPOutputStream(byteBufferOutputStream);
gzipOutputStream.write(bytes);
gzipOutputStream.finish();
byteBufferOutputStream.set(null);
}

@Override
public int compress(byte[] bytes, int offset, int len, byte[] outputBuffer) throws IOException {
ByteBuffer byteBuffer = ByteBuffer.wrap(outputBuffer);
ByteBufferOutputStream byteBufferOutputStream = compressOutputStream.get();
byteBufferOutputStream.set(byteBuffer);
GZIPOutputStream gzipOutputStream = new GZIPOutputStream(byteBufferOutputStream);
gzipOutputStream.write(bytes, offset, len);
gzipOutputStream.finish();
byteBufferOutputStream.set(null);
byteBuffer.flip();
return byteBuffer.limit();
}

@Override
public void compressBB(ByteBuffer byteBuffer, ByteBuffer outputBuffer) throws IOException {
ByteBufferOutputStream byteBufferOutputStream = compressBBOutputStream.get();
byteBufferOutputStream.set(outputBuffer);
GZIPOutputStream gzipOutputStream = new GZIPOutputStream(byteBufferOutputStream);
gzipOutputStream.write(byteBuffer.array(), byteBuffer.arrayOffset(), byteBuffer.limit());
gzipOutputStream.finish();
byteBufferOutputStream.set(null);
}

@Override
public void decompressToBB(byte[] bytes, ByteBuffer outputBuffer) throws IOException {
ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(bytes);
GZIPInputStream gzipInputStream = new GZIPInputStream(byteArrayInputStream,bytes.length);
int p1 = outputBuffer.position();
int pos = gzipInputStream.read(outputBuffer.array(), outputBuffer.arrayOffset(), outputBuffer.capacity() - p1);
outputBuffer.position(pos + p1);
}

@Override
public int decompress(byte[] bytes, byte[] outputBuffer) throws IOException {
ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(bytes);
GZIPInputStream gzipInputStream = new GZIPInputStream(byteArrayInputStream,bytes.length);
return gzipInputStream.read(outputBuffer, 0, outputBuffer.length);
}

@Override
public void decompressBB(ByteBuffer byteBuffer, ByteBuffer outputBuffer) throws IOException {
int inputSize = byteBuffer.limit();
ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(byteBuffer.array(), byteBuffer.arrayOffset(), inputSize);
GZIPInputStream gzipInputStream = new GZIPInputStream(byteArrayInputStream, inputSize);
int p1 = outputBuffer.position();
int pos = gzipInputStream.read(outputBuffer.array(), outputBuffer.arrayOffset(), outputBuffer.capacity() - p1);
outputBuffer.position(pos + p1);
}

@Override
public Codec codec() {
return Codec.GZIP;
}


}
Loading

0 comments on commit dd6bfe3

Please sign in to comment.