From 5ec8da2b288c0e886202c4023d5c09f4b8353c89 Mon Sep 17 00:00:00 2001 From: Andrew Otto Date: Mon, 23 Dec 2013 15:10:00 -0500 Subject: [PATCH] Adding SequenceFileRecordWriterProvider.java This class provides a RecordWriter that writes sequence files to HDFS. Compression in configured using usual Hadoop properties. --- .../SequenceFileRecordWriterProvider.java | 121 ++++++++++++++++++ 1 file changed, 121 insertions(+) create mode 100644 camus-etl-kafka/src/main/java/com/linkedin/camus/etl/kafka/common/SequenceFileRecordWriterProvider.java diff --git a/camus-etl-kafka/src/main/java/com/linkedin/camus/etl/kafka/common/SequenceFileRecordWriterProvider.java b/camus-etl-kafka/src/main/java/com/linkedin/camus/etl/kafka/common/SequenceFileRecordWriterProvider.java new file mode 100644 index 000000000..010e1627b --- /dev/null +++ b/camus-etl-kafka/src/main/java/com/linkedin/camus/etl/kafka/common/SequenceFileRecordWriterProvider.java @@ -0,0 +1,121 @@ +package com.linkedin.camus.etl.kafka.common; + +import com.linkedin.camus.coders.CamusWrapper; +import com.linkedin.camus.etl.IEtlKey; +import com.linkedin.camus.etl.RecordWriterProvider; +import com.linkedin.camus.etl.kafka.mapred.EtlMultiOutputFormat; +import java.io.IOException; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.util.ReflectionUtils; +import org.apache.hadoop.mapreduce.RecordWriter; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter; +import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat; + +import org.apache.hadoop.io.SequenceFile; +import org.apache.hadoop.io.SequenceFile.CompressionType; +import org.apache.hadoop.io.compress.CompressionCodec; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.Text; + +import org.apache.hadoop.conf.Configuration; + +import org.apache.log4j.Logger; + +/** + * Provides a RecordWriter that uses SequenceFile.Writer to write + * SequenceFiles records to HDFS. Compression settings are controlled via + * the usual hadoop configuration values. + * - mapred.output.compress - true or false) + * - mapred.output.compression.codec - org.apache.hadoop.io.compress.SnappyCodec, etc. + * - mapred.output.compression.type - BLOCK or RECORD + * + * I believe that the above properties are deprecated, and instead one should use + * mapreduce.output.fileoutputformat.compress, mapred.output.fileoutputformat.compress.codec, + * and mapreduce.output.fileoutputformat.compress.type. This might depend on the version + * of Hadoop that you are using. + * + */ +public class SequenceFileRecordWriterProvider implements RecordWriterProvider { + public static final String ETL_OUTPUT_RECORD_DELIMITER = "etl.output.record.delimiter"; + public static final String DEFAULT_RECORD_DELIMITER = ""; + + private static Logger log = Logger.getLogger(SequenceFileRecordWriterProvider.class); + + protected String recordDelimiter = null; + + // TODO: Make this configurable somehow. + // To do this, we'd have to make SequenceFileRecordWriterProvider have an + // init(JobContext context) method signature that EtlMultiOutputFormat would always call. + @Override + public String getFilenameExtension() { + return ""; + } + + @Override + public RecordWriter getDataRecordWriter( + TaskAttemptContext context, + String fileName, + CamusWrapper camusWrapper, + FileOutputCommitter committer) throws IOException, InterruptedException { + + Configuration conf = context.getConfiguration(); + + // If recordDelimiter hasn't been initialized, do so now + if (recordDelimiter == null) { + recordDelimiter = conf.get( + ETL_OUTPUT_RECORD_DELIMITER, + DEFAULT_RECORD_DELIMITER + ); + } + + CompressionCodec compressionCodec = null; + CompressionType compressionType = CompressionType.NONE; + + // Determine compression type (BLOCK or RECORD) and compression codec to use. + if (SequenceFileOutputFormat.getCompressOutput(context)) { + compressionType = SequenceFileOutputFormat.getOutputCompressionType(context); + Class codecClass = SequenceFileOutputFormat.getOutputCompressorClass(context, DefaultCodec.class); + // Instantiate the CompressionCodec Class + compressionCodec = (CompressionCodec)ReflectionUtils.newInstance(codecClass, conf); + } + + + // Get the filename for this RecordWriter. + Path path = new Path( + committer.getWorkPath(), + EtlMultiOutputFormat.getUniqueFile( + context, fileName, getFilenameExtension() + ) + ); + + log.info("Creating new SequenceFile.Writer with compression type " + compressionType + " and compression codec " + compressionCodec.getClass().getName()); + final SequenceFile.Writer writer = SequenceFile.createWriter( + path.getFileSystem(conf), + conf, + path, + LongWritable.class, + Text.class, + compressionType, + compressionCodec, + context + ); + + // Return a new anonymous RecordWriter that uses the + // SequenceFile.Writer to write data to HDFS + return new RecordWriter() { + @Override + public void write(IEtlKey key, CamusWrapper data) throws IOException, InterruptedException { + String record = (String)data.getRecord() + recordDelimiter; + // Use the timestamp from the EtlKey as the key for this record. + // TODO: Is there a better key to use here? + writer.append(new LongWritable(key.getTime()), new Text(record)); + } + + @Override + public void close(TaskAttemptContext context) throws IOException, InterruptedException { + writer.close(); + } + }; + } +}