diff --git a/metadata-io/src/main/java/com/linkedin/metadata/search/EntityLineageResultCacheKey.java b/metadata-io/src/main/java/com/linkedin/metadata/search/EntityLineageResultCacheKey.java index 794aebcbb1689..75375df77ed6f 100644 --- a/metadata-io/src/main/java/com/linkedin/metadata/search/EntityLineageResultCacheKey.java +++ b/metadata-io/src/main/java/com/linkedin/metadata/search/EntityLineageResultCacheKey.java @@ -2,11 +2,9 @@ import com.linkedin.common.urn.Urn; import com.linkedin.metadata.graph.LineageDirection; -import lombok.Data; - import java.time.Instant; -import java.time.temporal.ChronoUnit; import java.time.temporal.TemporalUnit; +import lombok.Data; @Data @@ -17,27 +15,17 @@ public class EntityLineageResultCacheKey { private final Long endTimeMillis; private final Integer maxHops; - // Force use of static method outside of package (for tests) - EntityLineageResultCacheKey(Urn sourceUrn, LineageDirection direction, Long startTimeMillis, Long endTimeMillis, Integer maxHops) { + public EntityLineageResultCacheKey(Urn sourceUrn, LineageDirection direction, Long startTimeMillis, + Long endTimeMillis, Integer maxHops, TemporalUnit resolution) { + this.sourceUrn = sourceUrn; this.direction = direction; - this.startTimeMillis = startTimeMillis; - this.endTimeMillis = endTimeMillis; this.maxHops = maxHops; - } - - public static EntityLineageResultCacheKey from(Urn sourceUrn, LineageDirection direction, Long startTimeMillis, - Long endTimeMillis, Integer maxHops) { - return EntityLineageResultCacheKey.from(sourceUrn, direction, startTimeMillis, endTimeMillis, maxHops, ChronoUnit.DAYS); - } - - public static EntityLineageResultCacheKey from(Urn sourceUrn, LineageDirection direction, Long startTimeMillis, - Long endTimeMillis, Integer maxHops, TemporalUnit resolution) { long endOffset = resolution.getDuration().getSeconds() * 1000; - return new EntityLineageResultCacheKey(sourceUrn, direction, - startTimeMillis == null ? null - : Instant.ofEpochMilli(startTimeMillis).truncatedTo(resolution).toEpochMilli(), - endTimeMillis == null ? null : Instant.ofEpochMilli(endTimeMillis + endOffset).truncatedTo(resolution).toEpochMilli(), - maxHops); + this.startTimeMillis = + startTimeMillis == null ? null : Instant.ofEpochMilli(startTimeMillis).truncatedTo(resolution).toEpochMilli(); + this.endTimeMillis = endTimeMillis == null ? null + : Instant.ofEpochMilli(endTimeMillis + endOffset).truncatedTo(resolution).toEpochMilli(); + } } diff --git a/metadata-io/src/main/java/com/linkedin/metadata/search/LineageSearchService.java b/metadata-io/src/main/java/com/linkedin/metadata/search/LineageSearchService.java index 7e4e8abc35625..c7cdd604968a5 100644 --- a/metadata-io/src/main/java/com/linkedin/metadata/search/LineageSearchService.java +++ b/metadata-io/src/main/java/com/linkedin/metadata/search/LineageSearchService.java @@ -29,6 +29,7 @@ import io.opentelemetry.extension.annotations.WithSpan; import java.net.URISyntaxException; +import java.time.temporal.ChronoUnit; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; @@ -122,8 +123,8 @@ public LineageSearchResult searchAcrossLineage(@Nonnull Urn sourceUrn, @Nonnull } // Cache multihop result for faster performance - final EntityLineageResultCacheKey cacheKey = EntityLineageResultCacheKey.from(sourceUrn, direction, startTimeMillis, - endTimeMillis, maxHops); + final EntityLineageResultCacheKey cacheKey = new EntityLineageResultCacheKey(sourceUrn, direction, startTimeMillis, + endTimeMillis, maxHops, ChronoUnit.DAYS); CachedEntityLineageResult cachedLineageResult = null; if (cacheEnabled) { @@ -569,8 +570,8 @@ public LineageScrollResult scrollAcrossLineage(@Nonnull Urn sourceUrn, @Nonnull @Nullable SortCriterion sortCriterion, @Nullable String scrollId, @Nonnull String keepAlive, int size, @Nullable Long startTimeMillis, @Nullable Long endTimeMillis, @Nonnull SearchFlags searchFlags) { // Cache multihop result for faster performance - final EntityLineageResultCacheKey cacheKey = - new EntityLineageResultCacheKey(sourceUrn, direction, startTimeMillis, endTimeMillis, maxHops); + final EntityLineageResultCacheKey cacheKey = new EntityLineageResultCacheKey(sourceUrn, direction, startTimeMillis, + endTimeMillis, maxHops, ChronoUnit.DAYS); CachedEntityLineageResult cachedLineageResult = cacheEnabled ? cache.get(cacheKey, CachedEntityLineageResult.class) : null; EntityLineageResult lineageResult; diff --git a/metadata-io/src/main/java/com/linkedin/metadata/search/client/CachingEntitySearchService.java b/metadata-io/src/main/java/com/linkedin/metadata/search/client/CachingEntitySearchService.java index 058387c29b22d..842f8be01789a 100644 --- a/metadata-io/src/main/java/com/linkedin/metadata/search/client/CachingEntitySearchService.java +++ b/metadata-io/src/main/java/com/linkedin/metadata/search/client/CachingEntitySearchService.java @@ -30,7 +30,7 @@ public class CachingEntitySearchService { private static final String ENTITY_SEARCH_SERVICE_SEARCH_CACHE_NAME = "entitySearchServiceSearch"; private static final String ENTITY_SEARCH_SERVICE_AUTOCOMPLETE_CACHE_NAME = "entitySearchServiceAutoComplete"; private static final String ENTITY_SEARCH_SERVICE_BROWSE_CACHE_NAME = "entitySearchServiceBrowse"; - private static final String ENTITY_SEARCH_SERVICE_SCROLL_CACHE_NAME = "entitySearchServiceScroll"; + public static final String ENTITY_SEARCH_SERVICE_SCROLL_CACHE_NAME = "entitySearchServiceScroll"; private final CacheManager cacheManager; private final EntitySearchService entitySearchService; // This is a shared component, also used in search aggregation @@ -244,7 +244,10 @@ public ScrollResult getCachedScrollResults( ScrollResult result; if (enableCache(flags)) { Timer.Context cacheAccess = MetricUtils.timer(this.getClass(), "scroll_cache_access").time(); - Object cacheKey = Sextet.with(entities, query, filters, sortCriterion, scrollId, size); + Object cacheKey = Sextet.with(entities, query, + filters != null ? toJsonString(filters) : null, + sortCriterion != null ? toJsonString(sortCriterion) : null, + scrollId, size); result = cache.get(cacheKey, ScrollResult.class); cacheAccess.stop(); if (result == null) { diff --git a/metadata-io/src/test/java/com/linkedin/metadata/search/LineageSearchResultCacheKeyTest.java b/metadata-io/src/test/java/com/linkedin/metadata/search/LineageSearchResultCacheKeyTest.java index 816a8f1203c6d..1757883f1a5a9 100644 --- a/metadata-io/src/test/java/com/linkedin/metadata/search/LineageSearchResultCacheKeyTest.java +++ b/metadata-io/src/test/java/com/linkedin/metadata/search/LineageSearchResultCacheKeyTest.java @@ -1,9 +1,11 @@ package com.linkedin.metadata.search; +import java.time.temporal.ChronoUnit; import org.springframework.test.context.testng.AbstractTestNGSpringContextTests; import org.testng.annotations.Test; import static org.testng.AssertJUnit.assertEquals; +import static org.testng.AssertJUnit.assertNotSame; public class LineageSearchResultCacheKeyTest extends AbstractTestNGSpringContextTests { @@ -11,15 +13,20 @@ public class LineageSearchResultCacheKeyTest extends AbstractTestNGSpringContext @Test public void testNulls() { // ensure no NPE - assertEquals(new EntityLineageResultCacheKey(null, null, null, null, null), - EntityLineageResultCacheKey.from(null, null, null, null, null)); + assertEquals(new EntityLineageResultCacheKey(null, null, null, null, null, ChronoUnit.DAYS), + new EntityLineageResultCacheKey(null, null, null, null, null, ChronoUnit.DAYS)); } @Test public void testDateTruncation() { // expect start of day milli - assertEquals(new EntityLineageResultCacheKey(null, null, 1679529600000L, 1679616000000L, null), - EntityLineageResultCacheKey.from(null, null, 1679530293000L, 1679530293001L, - null)); + assertEquals(new EntityLineageResultCacheKey(null, null, 1679529600000L, + 1679615999999L, null, ChronoUnit.DAYS), + new EntityLineageResultCacheKey(null, null, 1679530293000L, + 1679530293001L, null, ChronoUnit.DAYS)); + assertNotSame(new EntityLineageResultCacheKey(null, null, 1679529600000L, + 1679616000000L, null, ChronoUnit.DAYS), + new EntityLineageResultCacheKey(null, null, 1679530293000L, + 1679530293001L, null, ChronoUnit.DAYS)); } } diff --git a/metadata-service/factories/src/test/java/com/linkedin/gms/factory/search/CacheTest.java b/metadata-service/factories/src/test/java/com/linkedin/gms/factory/search/CacheTest.java index 55b4df5384e6a..7337a52b12283 100644 --- a/metadata-service/factories/src/test/java/com/linkedin/gms/factory/search/CacheTest.java +++ b/metadata-service/factories/src/test/java/com/linkedin/gms/factory/search/CacheTest.java @@ -1,31 +1,40 @@ package com.linkedin.gms.factory.search; +import com.google.common.collect.ImmutableList; import com.hazelcast.config.Config; import com.hazelcast.core.HazelcastInstance; import com.hazelcast.jet.core.JetTestSupport; import com.hazelcast.spring.cache.HazelcastCacheManager; import com.linkedin.common.urn.CorpuserUrn; +import com.linkedin.data.template.StringArray; import com.linkedin.metadata.graph.EntityLineageResult; import com.linkedin.metadata.graph.LineageDirection; import com.linkedin.metadata.graph.LineageRelationship; import com.linkedin.metadata.graph.LineageRelationshipArray; +import com.linkedin.metadata.query.filter.Condition; +import com.linkedin.metadata.query.filter.ConjunctiveCriterion; +import com.linkedin.metadata.query.filter.ConjunctiveCriterionArray; +import com.linkedin.metadata.query.filter.Criterion; +import com.linkedin.metadata.query.filter.CriterionArray; import com.linkedin.metadata.query.filter.Filter; import com.linkedin.metadata.query.filter.SortCriterion; +import com.linkedin.metadata.search.EntityLineageResultCacheKey; import com.linkedin.metadata.search.SearchEntity; import com.linkedin.metadata.search.SearchEntityArray; import com.linkedin.metadata.search.SearchResult; import com.linkedin.metadata.search.SearchResultMetadata; import com.linkedin.metadata.search.cache.CacheableSearcher; import com.linkedin.metadata.search.cache.CachedEntityLineageResult; +import java.time.temporal.ChronoUnit; import java.util.List; import org.javatuples.Quintet; -import org.javatuples.Triplet; +import org.javatuples.Sextet; import org.springframework.cache.Cache; import org.testng.Assert; import org.testng.annotations.Test; import static com.datahub.util.RecordUtils.*; -import static com.linkedin.metadata.search.utils.GZIPUtil.*; +import static com.linkedin.metadata.search.client.CachingEntitySearchService.*; public class CacheTest extends JetTestSupport { @@ -79,6 +88,55 @@ public void hazelcastTest() { Assert.assertEquals(cacheableSearcher1.getSearchResults(0, 1), cacheableSearcher2.getSearchResults(0, 1)); } + @Test + public void hazelcastTestScroll() { + CorpuserUrn corpuserUrn = new CorpuserUrn("user"); + SearchEntity searchEntity = new SearchEntity().setEntity(corpuserUrn); + SearchResult searchResult = new SearchResult() + .setEntities(new SearchEntityArray(List.of(searchEntity))) + .setNumEntities(1) + .setFrom(0) + .setPageSize(1) + .setMetadata(new SearchResultMetadata()); + + final Criterion filterCriterion = new Criterion() + .setField("platform") + .setCondition(Condition.EQUAL) + .setValue("hive") + .setValues(new StringArray(ImmutableList.of("hive"))); + + final Criterion subtypeCriterion = new Criterion() + .setField("subtypes") + .setCondition(Condition.EQUAL) + .setValue("") + .setValues(new StringArray(ImmutableList.of("view"))); + + final Filter filterWithCondition = new Filter().setOr( + new ConjunctiveCriterionArray( + new ConjunctiveCriterion().setAnd( + new CriterionArray(ImmutableList.of(filterCriterion))), + new ConjunctiveCriterion().setAnd( + new CriterionArray(ImmutableList.of(subtypeCriterion))) + )); + + Sextet, String, String, String, String, Integer> + sextet = Sextet.with(List.of(corpuserUrn.toString()), "*", toJsonString(filterWithCondition), null, null, 1); + + Cache cache1 = cacheManager1.getCache(ENTITY_SEARCH_SERVICE_SCROLL_CACHE_NAME); + Cache cache2 = cacheManager2.getCache(ENTITY_SEARCH_SERVICE_SCROLL_CACHE_NAME); + + // Cache result + String json = toJsonString(searchResult); + cache1.put(sextet, json); + Assert.assertEquals(instance1.getMap(ENTITY_SEARCH_SERVICE_SCROLL_CACHE_NAME).get(sextet), + instance2.getMap(ENTITY_SEARCH_SERVICE_SCROLL_CACHE_NAME).get(sextet)); + String cachedResult1 = cache1.get(sextet, String.class); + String cachedResult2 = cache2.get(sextet, String.class); + Assert.assertEquals(cachedResult1, cachedResult2); + Assert.assertEquals(cache1.get(sextet, String.class), json); + Assert.assertEquals(cache2.get(sextet, String.class), json); + } + @Test public void testLineageCaching() { CorpuserUrn corpuserUrn = new CorpuserUrn("user"); @@ -95,16 +153,17 @@ public void testLineageCaching() { Cache cache1 = cacheManager1.getCache("relationshipSearchService"); Cache cache2 = cacheManager2.getCache("relationshipSearchService"); - Triplet triplet = Triplet.with(corpuserUrn.toString(), LineageDirection.DOWNSTREAM, 3); + EntityLineageResultCacheKey key = new EntityLineageResultCacheKey(corpuserUrn, LineageDirection.DOWNSTREAM, + 0L, 1L, 1, ChronoUnit.DAYS); - cache1.put(triplet, cachedEntityLineageResult); + cache1.put(key, cachedEntityLineageResult); - Assert.assertEquals(instance1.getMap("relationshipSearchService").get(triplet), - instance2.getMap("relationshipSearchService").get(triplet)); - CachedEntityLineageResult cachedResult1 = cache1.get(triplet, CachedEntityLineageResult.class); - CachedEntityLineageResult cachedResult2 = cache2.get(triplet, CachedEntityLineageResult.class); + Assert.assertEquals(instance1.getMap("relationshipSearchService").get(key), + instance2.getMap("relationshipSearchService").get(key)); + CachedEntityLineageResult cachedResult1 = cache1.get(key, CachedEntityLineageResult.class); + CachedEntityLineageResult cachedResult2 = cache2.get(key, CachedEntityLineageResult.class); Assert.assertEquals(cachedResult1, cachedResult2); - Assert.assertEquals(cache1.get(triplet, CachedEntityLineageResult.class), cachedEntityLineageResult); - Assert.assertEquals(cache2.get(triplet, CachedEntityLineageResult.class).getEntityLineageResult(), lineageResult); + Assert.assertEquals(cache1.get(key, CachedEntityLineageResult.class), cachedEntityLineageResult); + Assert.assertEquals(cache2.get(key, CachedEntityLineageResult.class).getEntityLineageResult(), lineageResult); } }