Skip to content

Commit

Permalink
Teach EtlMultiOutputFormat to use snappy compression
Browse files Browse the repository at this point in the history
Set etl.output.codec=snappy to enable snappy compression
  • Loading branch information
Jonathan Bryant authored and Bob Cotton committed Mar 5, 2013
1 parent f9c5234 commit 886939f
Show file tree
Hide file tree
Showing 4 changed files with 33 additions and 4 deletions.
2 changes: 1 addition & 1 deletion camus-etl-kafka/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.8.1</version>
</dependency>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ public class EtlMultiOutputFormat extends FileOutputFormat<EtlKey, Object> {

public static final String KAFKA_MONITOR_TIME_GRANULARITY_MS = "kafka.monitor.time.granularity";
public static final String ETL_DEFAULT_PARTITIONER_CLASS = "etl.partitioner.class";
public static final String ETL_OUTPUT_CODEC = "etl.output.codec";
public static final String ETL_DEFAULT_OUTPUT_CODEC = "deflate";

public static final DateTimeFormatter FILE_DATE_FORMATTER = DateUtils
.getDateTimeFormatter("YYYYMMddHH");
Expand Down Expand Up @@ -83,8 +85,12 @@ private RecordWriter<EtlKey, AvroWrapper<Object>> getDataRecordWriter(
new SpecificDatumWriter<Object>());

if (FileOutputFormat.getCompressOutput(context)) {
int level = getEtlDeflateLevel(context);
writer.setCodec(CodecFactory.deflateCodec(level));
if ("snappy".equals(getEtlOutputCodec(context))) {
writer.setCodec(CodecFactory.snappyCodec());
} else {
int level = getEtlDeflateLevel(context);
writer.setCodec(CodecFactory.deflateCodec(level));
}
}

Path path = committer.getWorkPath();
Expand Down Expand Up @@ -159,7 +165,15 @@ public static int getEtlAvroWriterSyncInterval(JobContext job) {
public static void setEtlDeflateLevel(JobContext job, int val) {
job.getConfiguration().setInt(ETL_DEFLATE_LEVEL, val);
}

public static void setEtlOutputCodec(JobContext job, String codec) {
job.getConfiguration().set(ETL_OUTPUT_CODEC, codec);
}

public static String getEtlOutputCodec(JobContext job) {
return job.getConfiguration().get(ETL_OUTPUT_CODEC, ETL_DEFAULT_OUTPUT_CODEC);

}
public static int getEtlDeflateLevel(JobContext job) {
return job.getConfiguration().getInt(ETL_DEFLATE_LEVEL, 6);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,17 @@ public void testPerTopicPartitioner() throws IOException {
assertTrue(EtlMultiOutputFormat.getPartitioner(taskAttemptContext, "test-topic") instanceof EtlMultiOutputCommitterTest);
}

@Test
public void testDefaultOutputCodecIsDeflate() {
assertEquals("deflate", EtlMultiOutputFormat.getEtlOutputCodec(taskAttemptContext));
}

@Test
public void testSetOutputCodec() {
EtlMultiOutputFormat.setEtlOutputCodec(taskAttemptContext, "snappy");
assertEquals("snappy", EtlMultiOutputFormat.getEtlOutputCodec(taskAttemptContext));
}

public long convertTime(long time) {
long outfilePartitionMs = EtlMultiOutputFormat.getEtlOutputFileTimePartitionMins(taskAttemptContext) * 60000L;
return DateUtils.getPartition(outfilePartitionMs, time);
Expand Down
6 changes: 5 additions & 1 deletion camus-example/src/main/resources/camus.properties
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,12 @@ etl.hourly=hourly
etl.daily=daily
etl.ignore.schema.errors=false

etl.default.timezone=America/Los_Angeles
# configure output compression for deflate or snappy. Defaults to deflate
etl.output.codec=deflate
etl.deflate.level=6
#etl.output.codec=snappy

etl.default.timezone=America/Los_Angeles
etl.output.file.time.partition.mins=60
etl.keep.count.files=false
etl.execution.history.max.of.quota=.8
Expand Down

0 comments on commit 886939f

Please sign in to comment.