Skip to content

Commit

Permalink
Merge pull request yahoo#287 from yahoo/handle-decode-in-lookupservice
Browse files Browse the repository at this point in the history
handle decode in lookup service
  • Loading branch information
pranavbhole authored Jul 31, 2018
2 parents 5c6e060 + f09316c commit 9d354f0
Show file tree
Hide file tree
Showing 28 changed files with 366 additions and 63 deletions.
2 changes: 1 addition & 1 deletion api-example/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
<parent>
<artifactId>maha-parent</artifactId>
<groupId>com.yahoo.maha</groupId>
<version>5.179-SNAPSHOT</version>
<version>5.180-SNAPSHOT</version>
</parent>

<name>maha api-example</name>
Expand Down
2 changes: 1 addition & 1 deletion api-jersey/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
<parent>
<groupId>com.yahoo.maha</groupId>
<artifactId>maha-parent</artifactId>
<version>5.179-SNAPSHOT</version>
<version>5.180-SNAPSHOT</version>
</parent>

<name>maha api-jersey</name>
Expand Down
2 changes: 1 addition & 1 deletion core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
<parent>
<groupId>com.yahoo.maha</groupId>
<artifactId>maha-parent</artifactId>
<version>5.179-SNAPSHOT</version>
<version>5.180-SNAPSHOT</version>
</parent>

<name>maha core</name>
Expand Down
2 changes: 1 addition & 1 deletion db/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
<parent>
<groupId>com.yahoo.maha</groupId>
<artifactId>maha-parent</artifactId>
<version>5.179-SNAPSHOT</version>
<version>5.180-SNAPSHOT</version>
</parent>

<name>maha db</name>
Expand Down
2 changes: 1 addition & 1 deletion druid-lookups/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<parent>
<artifactId>maha-parent</artifactId>
<groupId>com.yahoo.maha</groupId>
<version>5.179-SNAPSHOT</version>
<version>5.180-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ public void process(String dimension,
.forEach(fd -> messageBuilder.setField(fd, String.valueOf(map.get(fd.getName()))));

Message message = messageBuilder.build();
LOGGER.debug("Producing key[%s] val[%s]", dimension, message);
LOGGER.info("Producing key[%s] val[%s]", dimension, message);
ProducerRecord<String, byte[]> producerRecord =
new ProducerRecord<>(producerKafkaTopic, dimension, message.toByteArray());
kafkaProducer.send(producerRecord);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ public void start(T missingLookupExtractionNamespaceFactory,
LOGGER.error("Bad key/message from topic [%s]", consumerKafkaTopic);
continue;
}
LOGGER.debug("Read key[%s] val[%s]", dimension, new String(extractionNamespaceByteArray));
LOGGER.info("Read key[%s] val[%s]", dimension, new String(extractionNamespaceByteArray));
try {
missingLookupExtractionNamespaceFactory.process(dimension,
extractionNamespaceByteArray,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@

import com.fasterxml.jackson.annotation.JsonProperty;

import java.util.Objects;

public class DecodeConfig {

@JsonProperty
Expand Down Expand Up @@ -64,21 +66,17 @@ public String toString() {
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;

DecodeConfig that = (DecodeConfig) o;

if (!columnToCheck.equals(that.columnToCheck)) return false;
if (!valueToCheck.equals(that.valueToCheck)) return false;
if (!columnIfValueMatched.equals(that.columnIfValueMatched)) return false;
return columnIfValueNotMatched.equals(that.columnIfValueNotMatched);
return Objects.equals(columnToCheck, that.columnToCheck) &&
Objects.equals(valueToCheck, that.valueToCheck) &&
Objects.equals(columnIfValueMatched, that.columnIfValueMatched) &&
Objects.equals(columnIfValueNotMatched, that.columnIfValueNotMatched);
}

@Override
public int hashCode() {
int result = columnToCheck.hashCode();
result = 31 * result + valueToCheck.hashCode();
result = 31 * result + columnIfValueMatched.hashCode();
result = 31 * result + columnIfValueNotMatched.hashCode();
return result;

return Objects.hash(columnToCheck, valueToCheck, columnIfValueMatched, columnIfValueNotMatched);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.TimeUnit;

import static java.nio.charset.StandardCharsets.UTF_8;
Expand Down Expand Up @@ -102,7 +103,7 @@ public String apply(@NotNull String val)

if (!extractionNamespace.isCacheEnabled()) {
byte[] cacheByteValue = lookupService.lookup(new LookupService.LookupData(extractionNamespace,
dimension, valueColumn));
dimension, valueColumn, Optional.empty()));
return (cacheByteValue == null || cacheByteValue.length == 0) ? null : new String(cacheByteValue, UTF_8);
} else {
final RocksDB db = rocksDBManager.getDB(extractionNamespace.getNamespace());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Optional;

import static java.nio.charset.StandardCharsets.UTF_8;

Expand Down Expand Up @@ -74,8 +75,9 @@ public String apply(@NotNull String val)
}

if (!extractionNamespace.isCacheEnabled()) {
Optional<DecodeConfig> decodeConfigOptional = (decodeConfig == null) ? Optional.empty() : Optional.of(decodeConfig);
byte[] cacheByteValue = lookupService.lookup(new LookupService.LookupData(extractionNamespace,
dimension, valueColumn));
dimension, valueColumn, decodeConfigOptional));
return (cacheByteValue == null || cacheByteValue.length == 0) ? null : new String(cacheByteValue, UTF_8);
} else {
U cacheValueArray = map.get(dimension);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,10 @@
// Licensed under the terms of the Apache License 2.0. Please see LICENSE file in project root for terms.
package com.yahoo.maha.maha_druid_lookups.query.lookup.namespace;

import com.yahoo.maha.maha_druid_lookups.query.lookup.DecodeConfig;

import java.util.Map;
import java.util.Optional;
import java.util.concurrent.Callable;

public interface ExtractionNamespaceCacheFactory<T extends ExtractionNamespace, U>
Expand All @@ -13,7 +16,7 @@ public interface ExtractionNamespaceCacheFactory<T extends ExtractionNamespace,
void updateCache(T extractionNamespace,
final Map<String, U> cache, final String key, final byte[] value);

default byte[] getCacheValue(T extractionNamespace, Map<String, U> cache, String key, String valueColumn) {
default byte[] getCacheValue(T extractionNamespace, Map<String, U> cache, String key, String valueColumn, Optional<DecodeConfig> decodeConfigOptional) {
return new byte[0];
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonTypeName;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
import io.druid.metadata.MetadataStorageConnectorConfig;
import org.joda.time.Period;

Expand All @@ -14,6 +15,7 @@
import javax.validation.constraints.NotNull;
import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.Map;

@JsonTypeName("mahajdbc")
public class JDBCExtractionNamespace implements ExtractionNamespace
Expand All @@ -37,6 +39,7 @@ public class JDBCExtractionNamespace implements ExtractionNamespace

private boolean firstTimeCaching = true;
private Timestamp previousLastUpdateTimestamp;
private final Map<String, Integer> columnIndexMap;

@JsonCreator
public JDBCExtractionNamespace(
Expand Down Expand Up @@ -67,6 +70,20 @@ public JDBCExtractionNamespace(
this.pollPeriod = pollPeriod == null ? new Period(0L) : pollPeriod;
this.cacheEnabled = cacheEnabled;
this.lookupName = lookupName;
int index = 0;
ImmutableMap.Builder<String, Integer> builder = ImmutableMap.builder();
for(String col : columnList) {
builder.put(col, index);
index += 1;
}
this.columnIndexMap = builder.build();
}

public int getColumnIndex(String valueColumn) {
if(columnIndexMap != null && valueColumn != null && columnIndexMap.containsKey(valueColumn)) {
return columnIndexMap.get(valueColumn);
}
return -1;
}

public MetadataStorageConnectorConfig getConnectorConfig()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,15 @@
import com.metamx.common.logger.Logger;
import com.metamx.emitter.service.ServiceEmitter;
import com.metamx.emitter.service.ServiceMetricEvent;
import com.yahoo.maha.maha_druid_lookups.query.lookup.DecodeConfig;
import com.yahoo.maha.maha_druid_lookups.query.lookup.namespace.ExtractionNamespaceCacheFactory;
import com.yahoo.maha.maha_druid_lookups.query.lookup.namespace.InMemoryDBExtractionNamespace;
import com.yahoo.maha.maha_druid_lookups.server.lookup.namespace.entity.ProtobufSchemaFactory;
import org.rocksdb.RocksDB;
import org.rocksdb.RocksDBException;

import java.util.Map;
import java.util.Optional;
import java.util.concurrent.Callable;

/**
Expand Down Expand Up @@ -107,11 +109,11 @@ public void updateCache(final InMemoryDBExtractionNamespace extractionNamespace,
}

@Override
public byte[] getCacheValue(final InMemoryDBExtractionNamespace extractionNamespace, final Map<String, String> cache, final String key, String valueColumn) {
public byte[] getCacheValue(final InMemoryDBExtractionNamespace extractionNamespace, final Map<String, String> cache, final String key, String valueColumn, final Optional<DecodeConfig> decodeConfigOptional) {

try {
if (!extractionNamespace.isCacheEnabled()) {
return lookupService.lookup(new LookupService.LookupData(extractionNamespace, key, valueColumn));
return lookupService.lookup(new LookupService.LookupData(extractionNamespace, key, valueColumn, decodeConfigOptional));
}

final RocksDB db = rocksDBManager.getDB(extractionNamespace.getNamespace());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,12 @@
// Licensed under the terms of the Apache License 2.0. Please see LICENSE file in project root for terms.
package com.yahoo.maha.maha_druid_lookups.server.lookup.namespace;

import com.google.common.base.Strings;
import com.google.inject.Inject;
import com.metamx.common.logger.Logger;
import com.metamx.emitter.service.ServiceEmitter;
import com.metamx.emitter.service.ServiceMetricEvent;
import com.yahoo.maha.maha_druid_lookups.query.lookup.DecodeConfig;
import com.yahoo.maha.maha_druid_lookups.query.lookup.namespace.ExtractionNamespaceCacheFactory;
import com.yahoo.maha.maha_druid_lookups.query.lookup.namespace.JDBCExtractionNamespace;
import com.yahoo.maha.maha_druid_lookups.server.lookup.namespace.entity.RowMapper;
Expand All @@ -18,6 +20,7 @@
import java.sql.Timestamp;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
Expand Down Expand Up @@ -172,22 +175,57 @@ public void updateCache(final JDBCExtractionNamespace extractionNamespace, final
}

@Override
public byte[] getCacheValue(final JDBCExtractionNamespace extractionNamespace, final Map<String, List<String>> cache, final String key, final String valueColumn) {
public byte[] getCacheValue(final JDBCExtractionNamespace extractionNamespace, final Map<String, List<String>> cache, final String key, final String valueColumn, final Optional<DecodeConfig> decodeConfigOptional) {
if (!extractionNamespace.isCacheEnabled()) {
byte[] value = lookupService.lookup(new LookupService.LookupData(extractionNamespace, key, valueColumn));
byte[] value = lookupService.lookup(new LookupService.LookupData(extractionNamespace, key, valueColumn, decodeConfigOptional));
value = (value == null) ? new byte[0] : value;
LOG.info("Cache value [%s]", new String(value));
return value;
}
List<String> cacheValue = cache.get(key);
int index = extractionNamespace.getColumnList().indexOf(valueColumn);
if(cacheValue == null) {
return new byte[0];
}

if(decodeConfigOptional.isPresent()) {
return handleDecode(extractionNamespace, cacheValue, decodeConfigOptional.get());
}

int index = extractionNamespace.getColumnIndex(valueColumn);
if(index == -1) {
LOG.error("invalid valueColumn [%s]", valueColumn);
return new byte[0];
}
String value = cacheValue.get(index);
return (value == null) ? new byte[0] : value.getBytes();
}

private byte[] handleDecode(JDBCExtractionNamespace extractionNamespace, List<String> cacheValue, DecodeConfig decodeConfig) {

final int columnToCheckIndex = extractionNamespace.getColumnIndex(decodeConfig.getColumnToCheck());
if (columnToCheckIndex < 0 || columnToCheckIndex >= cacheValue.size() ) {
return new byte[0];
}

final String valueFromColumnToCheck = cacheValue.get(columnToCheckIndex);

if(valueFromColumnToCheck != null && valueFromColumnToCheck.equals(decodeConfig.getValueToCheck())) {
final int columnIfValueMatchedIndex = extractionNamespace.getColumnIndex(decodeConfig.getColumnIfValueMatched());
if (columnIfValueMatchedIndex < 0) {
return new byte[0];
}
String value = cacheValue.get(columnIfValueMatchedIndex);
return (value == null) ? new byte[0] : value.getBytes();
} else {
final int columnIfValueNotMatchedIndex = extractionNamespace.getColumnIndex(decodeConfig.getColumnIfValueNotMatched());
if (columnIfValueNotMatchedIndex < 0) {
return new byte[0];
}
String value = cacheValue.get(columnIfValueNotMatchedIndex);
return (value == null) ? new byte[0] : value.getBytes();
}
}

@Override
public String getCacheSize(final JDBCExtractionNamespace extractionNamespace, final Map<String, List<String>> cache) {
if (!extractionNamespace.isCacheEnabled()) {
Expand Down
Loading

0 comments on commit 9d354f0

Please sign in to comment.