Skip to content

Commit

Permalink
fix(cache): Fix cache key serialization in search service (datahub-pr…
Browse files Browse the repository at this point in the history
…oject#7858)

Co-authored-by: RyanHolstien <[email protected]>
Co-authored-by: Ryan Holstien <[email protected]>
  • Loading branch information
3 people authored Apr 20, 2023
1 parent b588946 commit 9199c89
Show file tree
Hide file tree
Showing 5 changed files with 100 additions and 42 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,9 @@

import com.linkedin.common.urn.Urn;
import com.linkedin.metadata.graph.LineageDirection;
import lombok.Data;

import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.time.temporal.TemporalUnit;
import lombok.Data;


@Data
Expand All @@ -17,27 +15,17 @@ public class EntityLineageResultCacheKey {
private final Long endTimeMillis;
private final Integer maxHops;

// Force use of static method outside of package (for tests)
EntityLineageResultCacheKey(Urn sourceUrn, LineageDirection direction, Long startTimeMillis, Long endTimeMillis, Integer maxHops) {
public EntityLineageResultCacheKey(Urn sourceUrn, LineageDirection direction, Long startTimeMillis,
Long endTimeMillis, Integer maxHops, TemporalUnit resolution) {

this.sourceUrn = sourceUrn;
this.direction = direction;
this.startTimeMillis = startTimeMillis;
this.endTimeMillis = endTimeMillis;
this.maxHops = maxHops;
}

public static EntityLineageResultCacheKey from(Urn sourceUrn, LineageDirection direction, Long startTimeMillis,
Long endTimeMillis, Integer maxHops) {
return EntityLineageResultCacheKey.from(sourceUrn, direction, startTimeMillis, endTimeMillis, maxHops, ChronoUnit.DAYS);
}

public static EntityLineageResultCacheKey from(Urn sourceUrn, LineageDirection direction, Long startTimeMillis,
Long endTimeMillis, Integer maxHops, TemporalUnit resolution) {
long endOffset = resolution.getDuration().getSeconds() * 1000;
return new EntityLineageResultCacheKey(sourceUrn, direction,
startTimeMillis == null ? null
: Instant.ofEpochMilli(startTimeMillis).truncatedTo(resolution).toEpochMilli(),
endTimeMillis == null ? null : Instant.ofEpochMilli(endTimeMillis + endOffset).truncatedTo(resolution).toEpochMilli(),
maxHops);
this.startTimeMillis =
startTimeMillis == null ? null : Instant.ofEpochMilli(startTimeMillis).truncatedTo(resolution).toEpochMilli();
this.endTimeMillis = endTimeMillis == null ? null
: Instant.ofEpochMilli(endTimeMillis + endOffset).truncatedTo(resolution).toEpochMilli();

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import io.opentelemetry.extension.annotations.WithSpan;

import java.net.URISyntaxException;
import java.time.temporal.ChronoUnit;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
Expand Down Expand Up @@ -122,8 +123,8 @@ public LineageSearchResult searchAcrossLineage(@Nonnull Urn sourceUrn, @Nonnull
}

// Cache multihop result for faster performance
final EntityLineageResultCacheKey cacheKey = EntityLineageResultCacheKey.from(sourceUrn, direction, startTimeMillis,
endTimeMillis, maxHops);
final EntityLineageResultCacheKey cacheKey = new EntityLineageResultCacheKey(sourceUrn, direction, startTimeMillis,
endTimeMillis, maxHops, ChronoUnit.DAYS);
CachedEntityLineageResult cachedLineageResult = null;

if (cacheEnabled) {
Expand Down Expand Up @@ -569,8 +570,8 @@ public LineageScrollResult scrollAcrossLineage(@Nonnull Urn sourceUrn, @Nonnull
@Nullable SortCriterion sortCriterion, @Nullable String scrollId, @Nonnull String keepAlive, int size, @Nullable Long startTimeMillis,
@Nullable Long endTimeMillis, @Nonnull SearchFlags searchFlags) {
// Cache multihop result for faster performance
final EntityLineageResultCacheKey cacheKey =
new EntityLineageResultCacheKey(sourceUrn, direction, startTimeMillis, endTimeMillis, maxHops);
final EntityLineageResultCacheKey cacheKey = new EntityLineageResultCacheKey(sourceUrn, direction, startTimeMillis,
endTimeMillis, maxHops, ChronoUnit.DAYS);
CachedEntityLineageResult cachedLineageResult = cacheEnabled
? cache.get(cacheKey, CachedEntityLineageResult.class) : null;
EntityLineageResult lineageResult;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ public class CachingEntitySearchService {
private static final String ENTITY_SEARCH_SERVICE_SEARCH_CACHE_NAME = "entitySearchServiceSearch";
private static final String ENTITY_SEARCH_SERVICE_AUTOCOMPLETE_CACHE_NAME = "entitySearchServiceAutoComplete";
private static final String ENTITY_SEARCH_SERVICE_BROWSE_CACHE_NAME = "entitySearchServiceBrowse";
private static final String ENTITY_SEARCH_SERVICE_SCROLL_CACHE_NAME = "entitySearchServiceScroll";
public static final String ENTITY_SEARCH_SERVICE_SCROLL_CACHE_NAME = "entitySearchServiceScroll";

private final CacheManager cacheManager;
private final EntitySearchService entitySearchService; // This is a shared component, also used in search aggregation
Expand Down Expand Up @@ -244,7 +244,10 @@ public ScrollResult getCachedScrollResults(
ScrollResult result;
if (enableCache(flags)) {
Timer.Context cacheAccess = MetricUtils.timer(this.getClass(), "scroll_cache_access").time();
Object cacheKey = Sextet.with(entities, query, filters, sortCriterion, scrollId, size);
Object cacheKey = Sextet.with(entities, query,
filters != null ? toJsonString(filters) : null,
sortCriterion != null ? toJsonString(sortCriterion) : null,
scrollId, size);
result = cache.get(cacheKey, ScrollResult.class);
cacheAccess.stop();
if (result == null) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,25 +1,32 @@
package com.linkedin.metadata.search;

import java.time.temporal.ChronoUnit;
import org.springframework.test.context.testng.AbstractTestNGSpringContextTests;
import org.testng.annotations.Test;

import static org.testng.AssertJUnit.assertEquals;
import static org.testng.AssertJUnit.assertNotSame;


public class LineageSearchResultCacheKeyTest extends AbstractTestNGSpringContextTests {

@Test
public void testNulls() {
// ensure no NPE
assertEquals(new EntityLineageResultCacheKey(null, null, null, null, null),
EntityLineageResultCacheKey.from(null, null, null, null, null));
assertEquals(new EntityLineageResultCacheKey(null, null, null, null, null, ChronoUnit.DAYS),
new EntityLineageResultCacheKey(null, null, null, null, null, ChronoUnit.DAYS));
}

@Test
public void testDateTruncation() {
// expect start of day milli
assertEquals(new EntityLineageResultCacheKey(null, null, 1679529600000L, 1679616000000L, null),
EntityLineageResultCacheKey.from(null, null, 1679530293000L, 1679530293001L,
null));
assertEquals(new EntityLineageResultCacheKey(null, null, 1679529600000L,
1679615999999L, null, ChronoUnit.DAYS),
new EntityLineageResultCacheKey(null, null, 1679530293000L,
1679530293001L, null, ChronoUnit.DAYS));
assertNotSame(new EntityLineageResultCacheKey(null, null, 1679529600000L,
1679616000000L, null, ChronoUnit.DAYS),
new EntityLineageResultCacheKey(null, null, 1679530293000L,
1679530293001L, null, ChronoUnit.DAYS));
}
}
Original file line number Diff line number Diff line change
@@ -1,31 +1,40 @@
package com.linkedin.gms.factory.search;

import com.google.common.collect.ImmutableList;
import com.hazelcast.config.Config;
import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.jet.core.JetTestSupport;
import com.hazelcast.spring.cache.HazelcastCacheManager;
import com.linkedin.common.urn.CorpuserUrn;
import com.linkedin.data.template.StringArray;
import com.linkedin.metadata.graph.EntityLineageResult;
import com.linkedin.metadata.graph.LineageDirection;
import com.linkedin.metadata.graph.LineageRelationship;
import com.linkedin.metadata.graph.LineageRelationshipArray;
import com.linkedin.metadata.query.filter.Condition;
import com.linkedin.metadata.query.filter.ConjunctiveCriterion;
import com.linkedin.metadata.query.filter.ConjunctiveCriterionArray;
import com.linkedin.metadata.query.filter.Criterion;
import com.linkedin.metadata.query.filter.CriterionArray;
import com.linkedin.metadata.query.filter.Filter;
import com.linkedin.metadata.query.filter.SortCriterion;
import com.linkedin.metadata.search.EntityLineageResultCacheKey;
import com.linkedin.metadata.search.SearchEntity;
import com.linkedin.metadata.search.SearchEntityArray;
import com.linkedin.metadata.search.SearchResult;
import com.linkedin.metadata.search.SearchResultMetadata;
import com.linkedin.metadata.search.cache.CacheableSearcher;
import com.linkedin.metadata.search.cache.CachedEntityLineageResult;
import java.time.temporal.ChronoUnit;
import java.util.List;
import org.javatuples.Quintet;
import org.javatuples.Triplet;
import org.javatuples.Sextet;
import org.springframework.cache.Cache;
import org.testng.Assert;
import org.testng.annotations.Test;

import static com.datahub.util.RecordUtils.*;
import static com.linkedin.metadata.search.utils.GZIPUtil.*;
import static com.linkedin.metadata.search.client.CachingEntitySearchService.*;


public class CacheTest extends JetTestSupport {
Expand Down Expand Up @@ -79,6 +88,55 @@ public void hazelcastTest() {
Assert.assertEquals(cacheableSearcher1.getSearchResults(0, 1), cacheableSearcher2.getSearchResults(0, 1));
}

@Test
public void hazelcastTestScroll() {
CorpuserUrn corpuserUrn = new CorpuserUrn("user");
SearchEntity searchEntity = new SearchEntity().setEntity(corpuserUrn);
SearchResult searchResult = new SearchResult()
.setEntities(new SearchEntityArray(List.of(searchEntity)))
.setNumEntities(1)
.setFrom(0)
.setPageSize(1)
.setMetadata(new SearchResultMetadata());

final Criterion filterCriterion = new Criterion()
.setField("platform")
.setCondition(Condition.EQUAL)
.setValue("hive")
.setValues(new StringArray(ImmutableList.of("hive")));

final Criterion subtypeCriterion = new Criterion()
.setField("subtypes")
.setCondition(Condition.EQUAL)
.setValue("")
.setValues(new StringArray(ImmutableList.of("view")));

final Filter filterWithCondition = new Filter().setOr(
new ConjunctiveCriterionArray(
new ConjunctiveCriterion().setAnd(
new CriterionArray(ImmutableList.of(filterCriterion))),
new ConjunctiveCriterion().setAnd(
new CriterionArray(ImmutableList.of(subtypeCriterion)))
));

Sextet<List<String>, String, String, String, String, Integer>
sextet = Sextet.with(List.of(corpuserUrn.toString()), "*", toJsonString(filterWithCondition), null, null, 1);

Cache cache1 = cacheManager1.getCache(ENTITY_SEARCH_SERVICE_SCROLL_CACHE_NAME);
Cache cache2 = cacheManager2.getCache(ENTITY_SEARCH_SERVICE_SCROLL_CACHE_NAME);

// Cache result
String json = toJsonString(searchResult);
cache1.put(sextet, json);
Assert.assertEquals(instance1.getMap(ENTITY_SEARCH_SERVICE_SCROLL_CACHE_NAME).get(sextet),
instance2.getMap(ENTITY_SEARCH_SERVICE_SCROLL_CACHE_NAME).get(sextet));
String cachedResult1 = cache1.get(sextet, String.class);
String cachedResult2 = cache2.get(sextet, String.class);
Assert.assertEquals(cachedResult1, cachedResult2);
Assert.assertEquals(cache1.get(sextet, String.class), json);
Assert.assertEquals(cache2.get(sextet, String.class), json);
}

@Test
public void testLineageCaching() {
CorpuserUrn corpuserUrn = new CorpuserUrn("user");
Expand All @@ -95,16 +153,17 @@ public void testLineageCaching() {
Cache cache1 = cacheManager1.getCache("relationshipSearchService");
Cache cache2 = cacheManager2.getCache("relationshipSearchService");

Triplet<String, LineageDirection, Integer> triplet = Triplet.with(corpuserUrn.toString(), LineageDirection.DOWNSTREAM, 3);
EntityLineageResultCacheKey key = new EntityLineageResultCacheKey(corpuserUrn, LineageDirection.DOWNSTREAM,
0L, 1L, 1, ChronoUnit.DAYS);

cache1.put(triplet, cachedEntityLineageResult);
cache1.put(key, cachedEntityLineageResult);

Assert.assertEquals(instance1.getMap("relationshipSearchService").get(triplet),
instance2.getMap("relationshipSearchService").get(triplet));
CachedEntityLineageResult cachedResult1 = cache1.get(triplet, CachedEntityLineageResult.class);
CachedEntityLineageResult cachedResult2 = cache2.get(triplet, CachedEntityLineageResult.class);
Assert.assertEquals(instance1.getMap("relationshipSearchService").get(key),
instance2.getMap("relationshipSearchService").get(key));
CachedEntityLineageResult cachedResult1 = cache1.get(key, CachedEntityLineageResult.class);
CachedEntityLineageResult cachedResult2 = cache2.get(key, CachedEntityLineageResult.class);
Assert.assertEquals(cachedResult1, cachedResult2);
Assert.assertEquals(cache1.get(triplet, CachedEntityLineageResult.class), cachedEntityLineageResult);
Assert.assertEquals(cache2.get(triplet, CachedEntityLineageResult.class).getEntityLineageResult(), lineageResult);
Assert.assertEquals(cache1.get(key, CachedEntityLineageResult.class), cachedEntityLineageResult);
Assert.assertEquals(cache2.get(key, CachedEntityLineageResult.class).getEntityLineageResult(), lineageResult);
}
}

0 comments on commit 9199c89

Please sign in to comment.