Skip to content

Commit

Permalink
Adding SequenceFileRecordWriterProvider.java
Browse files Browse the repository at this point in the history
This class provides a RecordWriter that writes sequence files
to HDFS.  Compression in configured using usual Hadoop properties.
  • Loading branch information
ottomata committed Dec 23, 2013
1 parent 6f89813 commit 5ec8da2
Showing 1 changed file with 121 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
package com.linkedin.camus.etl.kafka.common;

import com.linkedin.camus.coders.CamusWrapper;
import com.linkedin.camus.etl.IEtlKey;
import com.linkedin.camus.etl.RecordWriterProvider;
import com.linkedin.camus.etl.kafka.mapred.EtlMultiOutputFormat;
import java.io.IOException;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;

import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.SequenceFile.CompressionType;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;

import org.apache.hadoop.conf.Configuration;

import org.apache.log4j.Logger;

/**
* Provides a RecordWriter that uses SequenceFile.Writer to write
* SequenceFiles records to HDFS. Compression settings are controlled via
* the usual hadoop configuration values.
* - mapred.output.compress - true or false)
* - mapred.output.compression.codec - org.apache.hadoop.io.compress.SnappyCodec, etc.
* - mapred.output.compression.type - BLOCK or RECORD
*
* I believe that the above properties are deprecated, and instead one should use
* mapreduce.output.fileoutputformat.compress, mapred.output.fileoutputformat.compress.codec,
* and mapreduce.output.fileoutputformat.compress.type. This might depend on the version
* of Hadoop that you are using.
*
*/
public class SequenceFileRecordWriterProvider implements RecordWriterProvider {
public static final String ETL_OUTPUT_RECORD_DELIMITER = "etl.output.record.delimiter";
public static final String DEFAULT_RECORD_DELIMITER = "";

private static Logger log = Logger.getLogger(SequenceFileRecordWriterProvider.class);

protected String recordDelimiter = null;

// TODO: Make this configurable somehow.
// To do this, we'd have to make SequenceFileRecordWriterProvider have an
// init(JobContext context) method signature that EtlMultiOutputFormat would always call.
@Override
public String getFilenameExtension() {
return "";
}

@Override
public RecordWriter<IEtlKey, CamusWrapper> getDataRecordWriter(
TaskAttemptContext context,
String fileName,
CamusWrapper camusWrapper,
FileOutputCommitter committer) throws IOException, InterruptedException {

Configuration conf = context.getConfiguration();

// If recordDelimiter hasn't been initialized, do so now
if (recordDelimiter == null) {
recordDelimiter = conf.get(
ETL_OUTPUT_RECORD_DELIMITER,
DEFAULT_RECORD_DELIMITER
);
}

CompressionCodec compressionCodec = null;
CompressionType compressionType = CompressionType.NONE;

// Determine compression type (BLOCK or RECORD) and compression codec to use.
if (SequenceFileOutputFormat.getCompressOutput(context)) {
compressionType = SequenceFileOutputFormat.getOutputCompressionType(context);
Class<?> codecClass = SequenceFileOutputFormat.getOutputCompressorClass(context, DefaultCodec.class);
// Instantiate the CompressionCodec Class
compressionCodec = (CompressionCodec)ReflectionUtils.newInstance(codecClass, conf);
}


// Get the filename for this RecordWriter.
Path path = new Path(
committer.getWorkPath(),
EtlMultiOutputFormat.getUniqueFile(
context, fileName, getFilenameExtension()
)
);

log.info("Creating new SequenceFile.Writer with compression type " + compressionType + " and compression codec " + compressionCodec.getClass().getName());
final SequenceFile.Writer writer = SequenceFile.createWriter(
path.getFileSystem(conf),
conf,
path,
LongWritable.class,
Text.class,
compressionType,
compressionCodec,
context
);

// Return a new anonymous RecordWriter that uses the
// SequenceFile.Writer to write data to HDFS
return new RecordWriter<IEtlKey, CamusWrapper>() {
@Override
public void write(IEtlKey key, CamusWrapper data) throws IOException, InterruptedException {
String record = (String)data.getRecord() + recordDelimiter;
// Use the timestamp from the EtlKey as the key for this record.
// TODO: Is there a better key to use here?
writer.append(new LongWritable(key.getTime()), new Text(record));
}

@Override
public void close(TaskAttemptContext context) throws IOException, InterruptedException {
writer.close();
}
};
}
}

0 comments on commit 5ec8da2

Please sign in to comment.