Skip to content

Commit

Permalink
Fixing the issue with Default Partitioner
Browse files Browse the repository at this point in the history
The code in the 08 branch was forcing the use of the Default partitioner. Syncing this with the 07 branch.
  • Loading branch information
ggupta1612 committed Aug 12, 2013
1 parent 2b94ef4 commit 9ab26ab
Showing 1 changed file with 10 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -200,15 +200,23 @@ public String getWorkingFileName(JobContext context, EtlKey key) throws IOExcept
return "data." + key.getTopic().replaceAll("\\.", "_") + "." + key.getLeaderId() + "." + key.getPartition() + "." + partitioner.encodePartition(context, key);
}

public static Partitioner getDefaultPartitioner(JobContext job) {
/* public static Partitioner getDefaultPartitioner(JobContext job) {
if(partitionersByTopic.get(ETL_DEFAULT_PARTITIONER_CLASS) == null) {
//List<Partitioner> partitioners = job.getConfiguration().getInstances(ETL_DEFAULT_PARTITIONER_CLASS, com.linkedin.camus.coders.Partitioner.class);
List<Partitioner> partitioners = new ArrayList<Partitioner>();
partitioners.add(new DefaultPartitioner());
partitionersByTopic.put(ETL_DEFAULT_PARTITIONER_CLASS, partitioners.get(0));
}
return partitionersByTopic.get(ETL_DEFAULT_PARTITIONER_CLASS);
}
}*/

public static Partitioner getDefaultPartitioner(JobContext job) {
if(partitionersByTopic.get(ETL_DEFAULT_PARTITIONER_CLASS) == null) {
List<Partitioner> partitioners = job.getConfiguration().getInstances(ETL_DEFAULT_PARTITIONER_CLASS, com.linkedin.camus.coders.Partitioner.class);
partitionersByTopic.put(ETL_DEFAULT_PARTITIONER_CLASS, partitioners.get(0));
}
return partitionersByTopic.get(ETL_DEFAULT_PARTITIONER_CLASS);
}

public static Partitioner getPartitioner(JobContext job, String topicName) throws IOException {
String customPartitionerProperty = ETL_DEFAULT_PARTITIONER_CLASS + "." + topicName;
Expand Down

0 comments on commit 9ab26ab

Please sign in to comment.