Skip to content

Commit

Permalink
Instantiating flat buffer schema factory using optional configs
Browse files Browse the repository at this point in the history
  • Loading branch information
pranavbhole committed Jun 26, 2020
1 parent 5d58484 commit 4944fd5
Show file tree
Hide file tree
Showing 15 changed files with 122 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,4 +29,6 @@ public interface ExtractionNamespace {
String getTsColumn();

boolean isCacheEnabled();

boolean isFlatBufferNamespace();
}
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,11 @@ public boolean isCacheEnabled() {
return cacheEnabled;
}

@Override
public boolean isFlatBufferNamespace() {
return false;
}

@Override
public long getPollMs() {
return pollPeriod.toStandardDuration().getMillis();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,11 @@ public boolean isCacheEnabled() {
return cacheEnabled;
}

@Override
public boolean isFlatBufferNamespace() {
return false;
}

@Override
public long getPollMs() {
return pollPeriod.toStandardDuration().getMillis();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,11 @@ public boolean isCacheEnabled() {
return cacheEnabled;
}

@Override
public boolean isFlatBufferNamespace() {
return this.cacheActionRunner.flatBufferSupport();
}

public boolean isLookupAuditingEnabled() {
return lookupAuditingEnabled;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
package com.yahoo.maha.maha_druid_lookups.server.lookup.namespace;

import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.inject.internal.cglib.proxy.$FixedValue;

import java.util.Properties;
import javax.annotation.Nullable;
Expand All @@ -24,6 +25,9 @@ public class MahaNamespaceExtractionConfig {
@NotNull @JsonProperty(value = "authHeaderFactory")
private String authHeaderFactoryClass;

@Nullable @JsonProperty(value="flatBufferSchemaFactory")
private String flatBufferSchemaFactory;

public Properties getKafkaProperties() {
return kafkaProperties;
}
Expand All @@ -43,4 +47,7 @@ public String getProtobufSchemaFactoryClass() {
public String getAuthHeaderFactoryClass() {
return authHeaderFactoryClass;
}

public String getFlatBufferSchemaFactory() { return flatBufferSchemaFactory; }

}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
import com.yahoo.maha.maha_druid_lookups.query.lookup.namespace.*;
import com.yahoo.maha.maha_druid_lookups.server.lookup.namespace.cache.MahaNamespaceExtractionCacheManager;
import com.yahoo.maha.maha_druid_lookups.server.lookup.namespace.cache.OnHeapMahaNamespaceExtractionCacheManager;
import com.yahoo.maha.maha_druid_lookups.server.lookup.namespace.schema.flatbuffer.FlatBufferSchemaFactory;
import com.yahoo.maha.maha_druid_lookups.server.lookup.namespace.schema.flatbuffer.FlatBufferSchemaFactoryProvider;
import com.yahoo.maha.maha_druid_lookups.server.lookup.namespace.schema.protobuf.ProtobufSchemaFactory;
import com.yahoo.maha.maha_druid_lookups.server.lookup.namespace.schema.protobuf.ProtobufSchemaFactoryProvider;
import org.apache.druid.guice.Jerseys;
Expand Down Expand Up @@ -146,6 +148,8 @@ public void configure(Binder binder)

binder.bind(ProtobufSchemaFactory.class).toProvider(ProtobufSchemaFactoryProvider.class);

binder.bind(FlatBufferSchemaFactory.class).toProvider(FlatBufferSchemaFactoryProvider.class);

binder.bind(AuthHeaderFactory.class).toProvider(AuthHeaderFactoryProvider.class);

getNamespaceFactoryMapBinder(binder)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,10 @@
package com.yahoo.maha.maha_druid_lookups.server.lookup.namespace;

import com.google.inject.Inject;
import com.yahoo.maha.maha_druid_lookups.query.lookup.namespace.ExtractionNamespace;
import com.yahoo.maha.maha_druid_lookups.server.lookup.namespace.schema.BaseSchemaFactory;
import com.yahoo.maha.maha_druid_lookups.server.lookup.namespace.schema.flatbuffer.FlatBufferSchemaFactory;
import com.yahoo.maha.maha_druid_lookups.server.lookup.namespace.schema.protobuf.ProtobufSchemaFactory;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
Expand All @@ -30,7 +33,9 @@ public class RocksDBExtractionNamespaceCacheFactory
@Inject
RocksDBManager rocksDBManager;
@Inject
BaseSchemaFactory schemaFactory;
ProtobufSchemaFactory protobufSchemaFactory;
@Inject
FlatBufferSchemaFactory flatBufferSchemaFactory;
@Inject
ServiceEmitter emitter;

Expand Down Expand Up @@ -73,13 +78,13 @@ public void updateCache(final RocksDBExtractionNamespace extractionNamespace,
final Map<String, String> cache, final String key, final byte[] value) {

RocksDB db = rocksDBManager.getDB(extractionNamespace.getNamespace());
extractionNamespace.getCacheActionRunner().updateCache(schemaFactory, key, value, db, emitter, extractionNamespace);
extractionNamespace.getCacheActionRunner().updateCache(getSchemaFactory(extractionNamespace), key, value, db, emitter, extractionNamespace);
}

@Override
public byte[] getCacheValue(final RocksDBExtractionNamespace extractionNamespace, final Map<String, String> cache, final String key, String valueColumn, final Optional<DecodeConfig> decodeConfigOptional) {
RocksDB db = rocksDBManager.getDB(extractionNamespace.getNamespace());
return extractionNamespace.getCacheActionRunner().getCacheValue(key, Optional.of(valueColumn), decodeConfigOptional, db, schemaFactory, lookupService, emitter, extractionNamespace);
return extractionNamespace.getCacheActionRunner().getCacheValue(key, Optional.of(valueColumn), decodeConfigOptional, db, getSchemaFactory(extractionNamespace), lookupService, emitter, extractionNamespace);
}

@Override
Expand Down Expand Up @@ -110,7 +115,15 @@ public Long getLastUpdatedTime(final RocksDBExtractionNamespace extractionNamesp
public void updateCacheWithDb(final RocksDBExtractionNamespace extractionNamespace,
RocksDB db, final String key, final byte[] value) {

extractionNamespace.getCacheActionRunner().updateCache(schemaFactory, key, value, db, emitter, extractionNamespace);
extractionNamespace.getCacheActionRunner().updateCache(getSchemaFactory(extractionNamespace), key, value, db, emitter, extractionNamespace);
}

private BaseSchemaFactory getSchemaFactory(ExtractionNamespace extractionNamespace) {
if (extractionNamespace.isFlatBufferNamespace()) {
return flatBufferSchemaFactory;
} else {
return protobufSchemaFactory;
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import com.google.common.base.Throwables;
import com.google.common.util.concurrent.*;
import com.google.inject.Inject;
import com.yahoo.maha.maha_druid_lookups.server.lookup.namespace.schema.flatbuffer.FlatBufferSchemaFactory;
import com.yahoo.maha.maha_druid_lookups.server.lookup.namespace.schema.protobuf.ProtobufSchemaFactory;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.ISE;
Expand Down Expand Up @@ -71,6 +72,8 @@ public NamespaceImplData(
KafkaManager kafkaManager;
@Inject
ProtobufSchemaFactory protobufSchemaFactory;
@Inject
FlatBufferSchemaFactory flatBufferSchemaFactory;

public MahaNamespaceExtractionCacheManager(
Lifecycle lifecycle,
Expand Down Expand Up @@ -387,6 +390,8 @@ private LookupExtractor getLookupExtractor(final ExtractionNamespace extractionN
return new JDBCLookupExtractorWithLeaderAndFollower((JDBCExtractionNamespaceWithLeaderAndFollower) extractionNamespace, map, lookupService);
} else if (extractionNamespace instanceof JDBCExtractionNamespace) {
return new JDBCLookupExtractor((JDBCExtractionNamespace) extractionNamespace, map, lookupService);
} else if (extractionNamespace instanceof RocksDBExtractionNamespace && extractionNamespace.isFlatBufferNamespace()) {
return new RocksDBLookupExtractor((RocksDBExtractionNamespace) extractionNamespace, map, lookupService, rocksDBManager, kafkaManager, flatBufferSchemaFactory, serviceEmitter);
} else if (extractionNamespace instanceof RocksDBExtractionNamespace) {
return new RocksDBLookupExtractor((RocksDBExtractionNamespace) extractionNamespace, map, lookupService, rocksDBManager, kafkaManager, protobufSchemaFactory, serviceEmitter);
} else if (extractionNamespace instanceof MongoExtractionNamespace) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,4 +31,6 @@ void updateCache(BaseSchemaFactory schemaFactory
, RocksDBExtractionNamespace extractionNamespace);

void validateSchemaFactory(BaseSchemaFactory schemaFactory);

boolean flatBufferSupport();
}
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,11 @@ public void validateSchemaFactory(BaseSchemaFactory schemaFactory) {
}
}

@Override
public boolean flatBufferSupport() {
return false;
}

@Override
public String toString() {
return "CacheActionRunner{}";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,11 @@ public void validateSchemaFactory(BaseSchemaFactory schemaFactory) {
}
}

@Override
public boolean flatBufferSupport() {
return true;
}

@Override
public String toString() {
return "CacheActionRunner{}";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@

import java.util.Optional;

public class NoopCacheActionRunner extends CacheActionRunner {
public class NoopCacheActionRunner implements BaseCacheActionRunner {

private static final Logger LOG = new Logger(NoopCacheActionRunner.class);

Expand All @@ -37,6 +37,15 @@ public void updateCache(BaseSchemaFactory schemaFactory
LOG.error("Noop called, no update to make.");
}

@Override
public void validateSchemaFactory(BaseSchemaFactory schemaFactory) {
}

@Override
public boolean flatBufferSupport() {
return false;
}

@Override
public String toString() {
return "NoopCacheActionRunner{}";
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package com.yahoo.maha.maha_druid_lookups.server.lookup.namespace.schema.flatbuffer;

import com.google.common.collect.ImmutableMap;
import com.google.inject.Inject;
import com.google.inject.Provider;
import com.yahoo.maha.maha_druid_lookups.server.lookup.namespace.MahaNamespaceExtractionConfig;
import org.apache.commons.lang.StringUtils;
import org.apache.druid.java.util.common.logger.Logger;

public class FlatBufferSchemaFactoryProvider implements Provider<FlatBufferSchemaFactory> {

private static final Logger LOG = new Logger(FlatBufferSchemaFactoryProvider.class);

private FlatBufferSchemaFactory flatBufferSchemaFactory = new DefaultFlatBufferSchemaFactory(ImmutableMap.<String, FlatBufferWrapper>builder().build());

@Inject
public FlatBufferSchemaFactoryProvider(MahaNamespaceExtractionConfig config) {
String schemaFactoryClass = config.getFlatBufferSchemaFactory();
if (StringUtils.isNotBlank(schemaFactoryClass)) {
Class clazz;
try {
clazz = Class.forName(schemaFactoryClass);
if (FlatBufferSchemaFactory.class.isAssignableFrom(clazz)) {
flatBufferSchemaFactory = (FlatBufferSchemaFactory) clazz.newInstance();
}
} catch (Exception e) {
LOG.error(e, "Failed to instantiate FlatBufferSchemaFactory from className {}", schemaFactoryClass);
throw new IllegalArgumentException(e);
}
} else {
LOG.warn("Implementation of FlatBufferSchemaFactory class name is black in the MahaNamespaceExtractionConfig, considering default implementation");
}
}

@Override
public FlatBufferSchemaFactory get() {
return flatBufferSchemaFactory;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -60,10 +60,13 @@ public class RocksDBExtractionNamespaceCacheFactoryFlatBufferTest {
public void setUp() {
MockitoAnnotations.initMocks(this);
obj.rocksDBManager = rocksDBManager;
obj.schemaFactory = new TestFlatBufferSchemaFactory();
obj.protobufSchemaFactory = new TestProtobufSchemaFactory();
obj.flatBufferSchemaFactory = new TestFlatBufferSchemaFactory();
obj.emitter = serviceEmitter;

noopObj.rocksDBManager = rocksDBManager;
noopObj.schemaFactory = new TestFlatBufferSchemaFactory();
noopObj.flatBufferSchemaFactory = new TestFlatBufferSchemaFactory();
noopObj.protobufSchemaFactory = new TestProtobufSchemaFactory();
noopObj.emitter = serviceEmitter;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import com.google.protobuf.Descriptors;
import com.google.protobuf.Message;
import com.google.protobuf.Parser;
import com.yahoo.maha.maha_druid_lookups.server.lookup.namespace.schema.flatbuffer.TestFlatBufferSchemaFactory;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import com.yahoo.maha.maha_druid_lookups.query.lookup.namespace.RocksDBExtractionNamespace;
import com.yahoo.maha.maha_druid_lookups.server.lookup.namespace.entity.AdProtos;
Expand Down Expand Up @@ -49,10 +50,13 @@ public class RocksDBExtractionNamespaceCacheFactoryTest {
public void setUp() {
MockitoAnnotations.initMocks(this);
obj.rocksDBManager = rocksDBManager;
obj.schemaFactory = new TestProtobufSchemaFactory();
obj.protobufSchemaFactory = new TestProtobufSchemaFactory();
obj.flatBufferSchemaFactory = new TestFlatBufferSchemaFactory();
obj.emitter = serviceEmitter;

noopObj.rocksDBManager = rocksDBManager;
noopObj.schemaFactory = new TestProtobufSchemaFactory();
noopObj.flatBufferSchemaFactory = new TestFlatBufferSchemaFactory();
noopObj.protobufSchemaFactory = new TestProtobufSchemaFactory();
noopObj.emitter = serviceEmitter;
}

Expand Down

0 comments on commit 4944fd5

Please sign in to comment.