Skip to content

Commit

Permalink
Add leader & follower logic in its own namespace
Browse files Browse the repository at this point in the history
  • Loading branch information
ryankwagner committed Jul 10, 2019
1 parent 92c961f commit 3862ff2
Show file tree
Hide file tree
Showing 16 changed files with 488 additions and 238 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
// Copyright 2017, Yahoo Holdings Inc.
// Licensed under the terms of the Apache License 2.0. Please see LICENSE file in project root for terms.
package com.yahoo.maha.maha_druid_lookups.query.lookup;

import com.metamx.common.logger.Logger;
import com.yahoo.maha.maha_druid_lookups.query.lookup.namespace.JDBCExtractionNamespace;
import com.yahoo.maha.maha_druid_lookups.server.lookup.namespace.LookupService;

import java.lang.invoke.MethodHandles;
import java.util.List;
import java.util.Map;

public class JDBCLookupExtractorWithLeaderAndFollower<U extends List<String>> extends OnlineDatastoreLookupExtractor<U> {
private static final Logger LOG = new Logger(MethodHandles.lookup().lookupClass());

public JDBCLookupExtractorWithLeaderAndFollower(JDBCExtractionNamespace extractionNamespace, Map<String, U> map, LookupService lookupService) {
super(extractionNamespace, map, lookupService);
}

@Override
protected Logger LOGGER() {
return LOG;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type")
@JsonSubTypes(value = {
@JsonSubTypes.Type(name = "mahajdbcleaderfollower", value = JDBCExtractionNamespaceWithLeaderAndFollower.class),
@JsonSubTypes.Type(name = "mahajdbc", value = JDBCExtractionNamespace.class),
@JsonSubTypes.Type(name = "maharocksdb", value = RocksDBExtractionNamespace.class),
@JsonSubTypes.Type(name = "mahainmemorydb", value = RocksDBExtractionNamespace.class),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.yahoo.maha.maha_druid_lookups.server.lookup.namespace.entity.ProtobufSchemaFactory;
import io.druid.metadata.MetadataStorageConnectorConfig;
import org.joda.time.Period;

Expand All @@ -17,21 +18,42 @@
import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.Objects;
import java.util.Properties;

@JsonTypeName("mahajdbcleaderfollower")
public class JDBCExtractionNamespaceWithLeaderAndFollower extends JDBCExtractionNamespace {
@JsonProperty
private final MetadataStorageConnectorConfig connectorConfig;
@JsonProperty
private final String table;
@JsonProperty
private final String tsColumn;
@JsonProperty
private final Period pollPeriod;
@JsonProperty
private final ImmutableList<String> columnList;
@JsonProperty
private final String primaryKeyColumn;

@JsonProperty
private boolean cacheEnabled = true;

@JsonProperty
private final String lookupName;

private boolean firstTimeCaching = true;
private Timestamp previousLastUpdateTimestamp;
private final ImmutableMap<String, Integer> columnIndexMap;

@JsonProperty
private final String kafkaTopic;

@JsonProperty
private final boolean isLeader;

@JsonProperty
private final Properties kafkaProperties;

@JsonCreator
public JDBCExtractionNamespaceWithLeaderAndFollower(
@NotNull @JsonProperty(value = "connectorConfig", required = true) final MetadataStorageConnectorConfig connectorConfig,
Expand All @@ -43,14 +65,53 @@ public JDBCExtractionNamespaceWithLeaderAndFollower(
@JsonProperty(value = "cacheEnabled", required = false) final boolean cacheEnabled,
@NotNull @JsonProperty(value = "lookupName", required = true) final String lookupName,
@JsonProperty(value = "kafkaTopic", required = true) final String kafkaTopic,
@JsonProperty(value = "isLeader", required = true) final boolean isLeader
) {
@JsonProperty(value = "isLeader", required = true) final boolean isLeader,
@JsonProperty(value = "kafkaProperties", required = true) final Properties kafkaProperties
) {

super(connectorConfig, table, columnList, primaryKeyColumn, tsColumn, pollPeriod, cacheEnabled, lookupName);

this.connectorConfig = Preconditions.checkNotNull(connectorConfig, "connectorConfig");
Preconditions.checkNotNull(connectorConfig.getConnectURI(), "connectorConfig.connectURI");
this.table = Preconditions.checkNotNull(table, "table");
this.columnList = ImmutableList.copyOf(Preconditions.checkNotNull(columnList, "columnList"));
this.primaryKeyColumn = Preconditions.checkNotNull(primaryKeyColumn, "primaryKeyColumn");
this.tsColumn = tsColumn;
this.pollPeriod = pollPeriod == null ? new Period(0L) : pollPeriod;
this.cacheEnabled = cacheEnabled;
this.lookupName = lookupName;
int index = 0;
ImmutableMap.Builder<String, Integer> builder = ImmutableMap.builder();
for (String col : columnList) {
builder.put(col, index);
index += 1;
}
this.columnIndexMap = builder.build();

this.kafkaTopic = Objects.nonNull(kafkaTopic) ? kafkaTopic : "unassigned";

this.isLeader = Objects.nonNull(isLeader) ? isLeader : false;

this.kafkaProperties = kafkaProperties;
}

@Override
public String toString() {
return "JDBCExtractionNamespaceWithLeaderAndFollower{" +
"connectorConfig=" + connectorConfig +
", table='" + table + '\'' +
", tsColumn='" + tsColumn + '\'' +
", pollPeriod=" + pollPeriod +
", columnList=" + columnList +
", primaryKeyColumn='" + primaryKeyColumn + '\'' +
", cacheEnabled=" + cacheEnabled +
", lookupName='" + lookupName + '\'' +
", firstTimeCaching=" + firstTimeCaching +
", previousLastUpdateTimestamp=" + previousLastUpdateTimestamp +
", kafkaProperties" + kafkaProperties.toString() +
", isLeader=" + isLeader +
", kafkaTopic=" + kafkaTopic +
'}';
}

public String getKafkaTopic() {
Expand All @@ -60,5 +121,8 @@ public String getKafkaTopic() {
public boolean getIsLeader() {
return isLeader;
}

public Properties getKafkaProperties() { return kafkaProperties; }

}

Original file line number Diff line number Diff line change
Expand Up @@ -3,28 +3,19 @@
package com.yahoo.maha.maha_druid_lookups.server.lookup.namespace;

import com.google.inject.Inject;
import com.google.protobuf.Descriptors;
import com.google.protobuf.InvalidProtocolBufferException;
import com.google.protobuf.Message;
import com.google.protobuf.Parser;
import com.metamx.common.logger.Logger;
import com.metamx.emitter.service.ServiceEmitter;
import com.yahoo.maha.maha_druid_lookups.query.lookup.DecodeConfig;
import com.yahoo.maha.maha_druid_lookups.query.lookup.namespace.ExtractionNamespaceCacheFactory;
import com.yahoo.maha.maha_druid_lookups.query.lookup.namespace.JDBCExtractionNamespace;
import com.yahoo.maha.maha_druid_lookups.query.lookup.namespace.JDBCExtractionNamespaceWithLeaderAndFollower;
import com.yahoo.maha.maha_druid_lookups.server.lookup.namespace.entity.ProtobufSchemaFactory;
import com.yahoo.maha.maha_druid_lookups.server.lookup.namespace.entity.RowMapper;
import org.skife.jdbi.v2.DBI;
import org.skife.jdbi.v2.DefaultMapper;
import org.skife.jdbi.v2.Handle;
import org.skife.jdbi.v2.tweak.HandleCallback;
import org.skife.jdbi.v2.util.TimestampMapper;

import java.sql.Timestamp;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.*;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
Expand All @@ -44,8 +35,6 @@ public class JDBCExtractionNamespaceCacheFactory
LookupService lookupService;
@Inject
ServiceEmitter emitter;
@Inject
ProtobufSchemaFactory protobufSchemaFactory;

@Override
public Callable<String> getCachePopulator(
Expand Down Expand Up @@ -88,7 +77,7 @@ public Void withHandle(Handle handle) throws Exception {
extractionNamespace.getTable()
);

populateRowListFromJDBC(extractionNamespace, query, cache, lastDBUpdate, handle);
populateRowListFromJDBC(extractionNamespace, query, lastDBUpdate, handle, new RowMapper(extractionNamespace, cache));
return null;
}
}
Expand All @@ -101,35 +90,30 @@ public Void withHandle(Handle handle) throws Exception {
};
}

protected List<Map<String, Object>> populateRowListFromJDBC(
protected Void populateRowListFromJDBC(
JDBCExtractionNamespace extractionNamespace,
String query,
Map<String, List<String>> cache,
Timestamp lastDBUpdate,
Handle handle
Handle handle,
RowMapper rm
) {
List<Map<String, Object>> rowList;
Timestamp updateTS;
if (extractionNamespace.isFirstTimeCaching()) {
extractionNamespace.setFirstTimeCaching(false);
query = String.format("%s %s", query, FIRST_TIME_CACHING_WHERE_CLAUSE);
rowList = handle.createQuery(query).map(
new RowMapper(extractionNamespace, cache))
.setFetchSize(FETCH_SIZE)
.bind("lastUpdatedTimeStamp", lastDBUpdate)
.map(new DefaultMapper())
.list();
updateTS = lastDBUpdate;

} else {
query = String.format("%s %s", query, SUBSEQUENT_CACHING_WHERE_CLAUSE);
rowList = handle.createQuery(query).map(
new RowMapper(extractionNamespace, cache))
.setFetchSize(FETCH_SIZE)
.bind("lastUpdatedTimeStamp", extractionNamespace.getPreviousLastUpdateTimestamp())
.map(new DefaultMapper())
.list();
updateTS = extractionNamespace.getPreviousLastUpdateTimestamp();
}

return rowList;
handle.createQuery(query).map(rm)
.setFetchSize(FETCH_SIZE)
.bind("lastUpdatedTimeStamp", updateTS)
.list();

return null;
}

protected DBI ensureDBI(String id, JDBCExtractionNamespace namespace) {
Expand Down
Loading

0 comments on commit 3862ff2

Please sign in to comment.