Skip to content

Commit

Permalink
Merge pull request LinkedInAttic#15 from smeder/camus-kafka-0.8
Browse files Browse the repository at this point in the history
Add support for other serialization formats. You'll have to implement th...
  • Loading branch information
ggupta1612 committed Jul 1, 2013
2 parents 9d3c7be + 87917a2 commit e07caf5
Show file tree
Hide file tree
Showing 10 changed files with 230 additions and 182 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
/**
* Container for messages. Enables the use of a custom message decoder with knowledge
* of where these values are stored in the message schema
*
*
* @author kgoodhop
*
* @param <R> The type of decoded payload
Expand All @@ -20,11 +20,11 @@ public class CamusWrapper<R> {
public CamusWrapper(R record) {
this(record, System.currentTimeMillis());
}

public CamusWrapper(R record, long timestamp) {
this(record, timestamp, "unknown_server", "unknown_service");
}

public CamusWrapper(R record, long timestamp, String server, String service) {
this.record = record;
this.timestamp = timestamp;
Expand Down Expand Up @@ -58,14 +58,14 @@ public void put(Writable key, Writable value) {

/**
* Get a value for partitions
* @returns the value for the given key
* @return the value for the given key
*/
public Writable get(Writable key) {
return partitionMap.get(key);
}

/**
* Get all the parititon key/partitionMap
* Get all the partition key/partitionMap
*/
public MapWritable getPartitionMap() {
return partitionMap;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@
import java.util.Properties;

/**
* Decoder interface. Implemenations should return a CamusWrapper with timestamp
* set at the very least. Camus will instantiate a descendent of this class
* Decoder interface. Implementations should return a CamusWrapper with timestamp
* set at the very least. Camus will instantiate a descendent of this class
* based on the property ccamus.message.decoder.class.
* @author kgoodhop
*
Expand All @@ -14,7 +14,7 @@
public abstract class MessageDecoder<M,R> {
protected Properties props;
protected String topicName;

public void init(Properties props, String topicName){
this.props = props;
this.topicName = topicName;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package com.linkedin.camus.etl;

import com.linkedin.camus.coders.CamusWrapper;
import java.io.IOException;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;

/**
*
*
*/
public interface RecordWriterProvider {

String getFilenameExtension();

RecordWriter<IEtlKey, CamusWrapper> getDataRecordWriter(
TaskAttemptContext context, String fileName, CamusWrapper data, FileOutputCommitter committer) throws IOException,
InterruptedException;
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,12 @@
package com.linkedin.camus.etl.kafka;

import com.linkedin.camus.etl.kafka.common.DateUtils;
import com.linkedin.camus.etl.kafka.common.EtlCounts;
import com.linkedin.camus.etl.kafka.common.EtlKey;
import com.linkedin.camus.etl.kafka.common.ExceptionWritable;
import com.linkedin.camus.etl.kafka.common.Source;
import com.linkedin.camus.etl.kafka.mapred.EtlInputFormat;
import com.linkedin.camus.etl.kafka.mapred.EtlMultiOutputFormat;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileInputStream;
Expand All @@ -15,8 +22,6 @@
import java.util.Map.Entry;
import java.util.Properties;
import java.util.TreeMap;

import org.apache.avro.mapred.AvroWrapper;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.CommandLineParser;
import org.apache.commons.cli.HelpFormatter;
Expand Down Expand Up @@ -47,21 +52,13 @@
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.codehaus.jackson.map.DeserializationConfig;
import org.codehaus.jackson.map.ObjectMapper;
import org.codehaus.jackson.type.TypeReference;
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;
import org.joda.time.format.DateTimeFormatter;

import com.linkedin.camus.etl.kafka.common.DateUtils;
import com.linkedin.camus.etl.kafka.common.EtlCounts;
import com.linkedin.camus.etl.kafka.common.EtlKey;
import com.linkedin.camus.etl.kafka.common.ExceptionWritable;
import com.linkedin.camus.etl.kafka.common.Source;
import com.linkedin.camus.etl.kafka.mapred.EtlInputFormat;
import com.linkedin.camus.etl.kafka.mapred.EtlMultiOutputFormat;
import org.codehaus.jackson.map.DeserializationConfig;
import org.codehaus.jackson.type.TypeReference;

public class CamusJob extends Configured implements Tool {

public static final String ETL_EXECUTION_BASE_PATH = "etl.execution.base.path";
Expand Down Expand Up @@ -234,9 +231,6 @@ public void run() throws Exception {

job.setInputFormatClass(EtlInputFormat.class);
job.setOutputFormatClass(EtlMultiOutputFormat.class);
job.setOutputKeyClass(EtlKey.class);
job.setOutputValueClass(AvroWrapper.class);

job.setNumReduceTasks(0);

stopTiming("pre-setup");
Expand All @@ -255,15 +249,15 @@ public void run() throws Exception {

stopTiming("hadoop");
startTiming("commit");

//Send Tracking counts to Kafka
sendTrackingCounts(job, fs,newExecutionOutput);

//Print any potentail errors encountered
printErrors(fs, newExecutionOutput);

fs.rename(newExecutionOutput, execHistory);

System.out.println("Job finished");
stopTiming("commit");
stopTiming("total");
Expand Down Expand Up @@ -301,7 +295,7 @@ public void printErrors(FileSystem fs, Path newExecutionOutput) throws IOExcepti
reader.close();
}
}

//Posts the tracking counts to Kafka
public void sendTrackingCounts(JobContext job, FileSystem fs, Path newExecutionOutput) throws IOException, URISyntaxException
{
Expand Down Expand Up @@ -391,11 +385,11 @@ public void sendTrackingCounts(JobContext job, FileSystem fs, Path newExecutionO
}
}
}

/**
* Creates a diagnostic report mostly focused on timing breakdowns. Useful
* for determining where to optimize.
*
*
* @param job
* @param timingMap
* @throws IOException
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,5 @@ public String generatePartitionedPath(JobContext context, String topic, int brok
DateTime bucket = new DateTime(Long.valueOf(encodedPartition));
sb.append(bucket.toString(OUTPUT_DATE_FORMAT));
return sb.toString();

}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
package com.linkedin.camus.etl.kafka.common;

import com.linkedin.camus.coders.CamusWrapper;
import com.linkedin.camus.etl.IEtlKey;
import com.linkedin.camus.etl.RecordWriterProvider;
import com.linkedin.camus.etl.kafka.mapred.EtlMultiOutputFormat;
import java.io.IOException;
import org.apache.avro.file.CodecFactory;
import org.apache.avro.file.DataFileWriter;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.specific.SpecificDatumWriter;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

/**
*
*
*/
public class AvroRecordWriterProvider implements RecordWriterProvider {
public final static String EXT = ".avro";

@Override
public String getFilenameExtension() {
return EXT;
}

@Override
public RecordWriter<IEtlKey, CamusWrapper> getDataRecordWriter(
TaskAttemptContext context,
String fileName,
CamusWrapper data,
FileOutputCommitter committer) throws IOException, InterruptedException {
final DataFileWriter<Object> writer = new DataFileWriter<Object>(
new SpecificDatumWriter<Object>());

if (FileOutputFormat.getCompressOutput(context)) {
if ("snappy".equals(EtlMultiOutputFormat.getEtlOutputCodec(context))) {
writer.setCodec(CodecFactory.snappyCodec());
} else {
int level = EtlMultiOutputFormat.getEtlDeflateLevel(context);
writer.setCodec(CodecFactory.deflateCodec(level));
}
}

Path path = committer.getWorkPath();
path = new Path(path, EtlMultiOutputFormat.getUniqueFile(context, fileName, EXT));
writer.create(((GenericRecord) data.getRecord()).getSchema(),
path.getFileSystem(context.getConfiguration()).create(path));

writer.setSyncInterval(EtlMultiOutputFormat.getEtlAvroWriterSyncInterval(context));

return new RecordWriter<IEtlKey, CamusWrapper>() {
@Override
public void write(IEtlKey ignore, CamusWrapper data) throws IOException {
writer.append(data.getRecord());
}

@Override
public void close(TaskAttemptContext arg0) throws IOException, InterruptedException {
writer.close();
}
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
/**
* Poorly named class that handles kafka pull events within each
* KafkaRecordReader.
*
*
* @author Richard Park
*/
public class KafkaReader {
Expand All @@ -46,7 +46,7 @@ public class KafkaReader {
private int fetchBufferSize;

/**
* Construct using the json represention of the kafka request
* Construct using the json representation of the kafka request
*/
public KafkaReader(TaskAttemptContext context, EtlRequest request,
int clientTimeout, int fetchBufferSize) throws Exception {
Expand Down Expand Up @@ -90,7 +90,7 @@ public boolean hasNext() throws IOException {
/**
* Fetches the next Kafka message and stuffs the results into the key and
* value
*
*
* @param key
* @param payload
* @param pKey
Expand All @@ -108,7 +108,7 @@ public boolean getNext(EtlKey key, BytesWritable payload ,BytesWritable pKey) th
byte[] bytes = new byte[origSize];
buf.get(bytes, buf.position(), origSize);
payload.set(bytes, 0, origSize);

buf = message.key();
if(buf != null){
origSize = buf.remaining();
Expand All @@ -133,7 +133,7 @@ public boolean getNext(EtlKey key, BytesWritable payload ,BytesWritable pKey) th

/**
* Creates a fetch request.
*
*
* @return false if there's no more fetches
* @throws IOException
*/
Expand Down Expand Up @@ -200,14 +200,14 @@ public boolean fetch() throws IOException {
System.out.println("Skipping offset : " + skippedMessage.offset());
skipped --;
}

if (!messageIter.hasNext()) {
System.out
.println("No more data left to process. Returning false");
messageIter = null;
return false;
}

return true;
}
} catch (Exception e) {
Expand All @@ -220,7 +220,7 @@ public boolean fetch() throws IOException {

/**
* Closes this context
*
*
* @throws IOException
*/
public void close() throws IOException {
Expand All @@ -232,7 +232,7 @@ public void close() throws IOException {
/**
* Returns the total bytes that will be fetched. This is calculated by
* taking the diffs of the offsets
*
*
* @return
*/
public long getTotalBytes() {
Expand All @@ -241,7 +241,7 @@ public long getTotalBytes() {

/**
* Returns the total bytes that have been fetched so far
*
*
* @return
*/
public long getReadBytes() {
Expand All @@ -250,7 +250,7 @@ public long getReadBytes() {

/**
* Returns the number of events that have been read r
*
*
* @return
*/
public long getCount() {
Expand All @@ -259,7 +259,7 @@ public long getCount() {

/**
* Returns the fetch time of the last fetch in ms
*
*
* @return
*/
public long getFetchTime() {
Expand All @@ -268,7 +268,7 @@ public long getFetchTime() {

/**
* Returns the totalFetchTime in ms
*
*
* @return
*/
public long getTotalFetchTime() {
Expand Down
Loading

0 comments on commit e07caf5

Please sign in to comment.