Skip to content

Commit

Permalink
Cleanup WorkflowExecutorCache, rearrange checks in a flaky test (temp…
Browse files Browse the repository at this point in the history
  • Loading branch information
Spikhalskiy authored Oct 12, 2022
1 parent 40f9212 commit f55f610
Show file tree
Hide file tree
Showing 7 changed files with 86 additions and 63 deletions.
2 changes: 1 addition & 1 deletion gradle/linting.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ subprojects {
// needs to be aligned with .editorconfig
// https://github.com/diffplug/spotless/tree/main/plugin-gradle#ktlint
// reenable filename rule after https://github.com/pinterest/ktlint/issues/1521
.editorConfigOverride(['indent_size': '2', 'disabled_rules': 'filename'])
.editorConfigOverride(['indent_size': '2', 'ktlint_disabled_rules': 'filename'])
}
}

Expand Down
2 changes: 1 addition & 1 deletion temporal-kotlin/.editorconfig
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
[*.{kt,kts}]
indent_size = 2
# reenable filename rule after https://github.com/pinterest/ktlint/issues/1521
disabled_rules = filename
ktlint_disabled_rules = filename
Original file line number Diff line number Diff line change
Expand Up @@ -23,17 +23,16 @@
import static io.temporal.internal.common.WorkflowExecutionUtils.isFullHistory;

import com.google.common.base.Preconditions;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import com.uber.m3.tally.Scope;
import io.temporal.api.common.v1.WorkflowExecution;
import io.temporal.api.workflowservice.v1.PollWorkflowTaskQueueResponseOrBuilder;
import io.temporal.internal.replay.WorkflowRunTaskHandler;
import io.temporal.worker.MetricsType;
import java.util.Objects;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import javax.annotation.Nullable;
import javax.annotation.concurrent.ThreadSafe;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -42,7 +41,7 @@
public final class WorkflowExecutorCache {
private final Logger log = LoggerFactory.getLogger(WorkflowExecutorCache.class);
private final WorkflowRunLockManager runLockManager;
private final LoadingCache<String, WorkflowRunTaskHandler> cache;
private final Cache<String, WorkflowRunTaskHandler> cache;
private final Scope metricsScope;

public WorkflowExecutorCache(
Expand Down Expand Up @@ -73,13 +72,7 @@ public WorkflowExecutorCache(
}
}
})
.build(
new CacheLoader<String, WorkflowRunTaskHandler>() {
@Override
public WorkflowRunTaskHandler load(String key) {
return null;
}
});
.build();
this.metricsScope = Objects.requireNonNull(scope);
this.metricsScope.gauge(MetricsType.STICKY_CACHE_SIZE).update(size());
}
Expand All @@ -90,47 +83,30 @@ public WorkflowRunTaskHandler getOrCreate(
Callable<WorkflowRunTaskHandler> workflowExecutorFn)
throws Exception {
WorkflowExecution execution = workflowTask.getWorkflowExecution();
String runId = execution.getRunId();
if (isFullHistory(workflowTask)) {
// no need to call a full-blown #invalidate, because we don't need to unmark from processing
// yet
cache.invalidate(execution.getRunId());
metricsScope.counter(MetricsType.STICKY_CACHE_TOTAL_FORCED_EVICTION).inc(1);

invalidate(execution, metricsScope, "full history", null);
log.trace(
"New Workflow Executor {}-{} has been created for a full history run",
execution.getWorkflowId(),
execution.getRunId());
runId);
return workflowExecutorFn.call();
}

WorkflowRunTaskHandler workflowRunTaskHandler = getForProcessing(execution, workflowTypeScope);
@Nullable WorkflowRunTaskHandler workflowRunTaskHandler = cache.getIfPresent(runId);

if (workflowRunTaskHandler != null) {
workflowTypeScope.counter(MetricsType.STICKY_CACHE_HIT).inc(1);
return workflowRunTaskHandler;
}

log.trace(
"Workflow Executor {}-{} wasn't found in cache and a new executor has been created",
execution.getWorkflowId(),
execution.getRunId());
return workflowExecutorFn.call();
}
runId);
workflowTypeScope.counter(MetricsType.STICKY_CACHE_MISS).inc(1);

public WorkflowRunTaskHandler getForProcessing(
WorkflowExecution workflowExecution, Scope metricsScope) throws ExecutionException {
String runId = workflowExecution.getRunId();
try {
WorkflowRunTaskHandler workflowRunTaskHandler = cache.get(runId);
log.trace(
"Workflow Execution {}-{} has been marked as in-progress",
workflowExecution.getWorkflowId(),
workflowExecution.getRunId());
metricsScope.counter(MetricsType.STICKY_CACHE_HIT).inc(1);
return workflowRunTaskHandler;
} catch (CacheLoader.InvalidCacheLoadException e) {
// We don't have a default loader and don't want to have one. So it's ok to get null value.
metricsScope.counter(MetricsType.STICKY_CACHE_MISS).inc(1);
return null;
}
return workflowExecutorFn.call();
}

public void addToCache(
Expand All @@ -143,7 +119,13 @@ public void addToCache(
this.metricsScope.gauge(MetricsType.STICKY_CACHE_SIZE).update(size());
}

public boolean evictAnyNotInProcessing(WorkflowExecution inFavorOfExecution, Scope metricsScope) {
/**
* @param workflowTypeScope accepts workflow metric scope (tagged with task queue and workflow
* type)
*/
@SuppressWarnings("deprecation")
public boolean evictAnyNotInProcessing(
WorkflowExecution inFavorOfExecution, Scope workflowTypeScope) {
try {
String inFavorOfRunId = inFavorOfExecution.getRunId();
for (String key : cache.asMap().keySet()) {
Expand All @@ -159,8 +141,8 @@ public boolean evictAnyNotInProcessing(WorkflowExecution inFavorOfExecution, Sco
inFavorOfRunId,
key);
cache.invalidate(key);
metricsScope.counter(MetricsType.STICKY_CACHE_THREAD_FORCED_EVICTION).inc(1);
metricsScope.counter(MetricsType.STICKY_CACHE_TOTAL_FORCED_EVICTION).inc(1);
workflowTypeScope.counter(MetricsType.STICKY_CACHE_THREAD_FORCED_EVICTION).inc(1);
workflowTypeScope.counter(MetricsType.STICKY_CACHE_TOTAL_FORCED_EVICTION).inc(1);
return true;
} finally {
runLockManager.unlock(key);
Expand All @@ -175,22 +157,23 @@ public boolean evictAnyNotInProcessing(WorkflowExecution inFavorOfExecution, Sco
}
}

@SuppressWarnings("deprecation")
public void invalidate(
WorkflowExecution execution, Scope workflowTypeScope, String reason, Throwable cause) {
try {
String runId = execution.getRunId();
if (log.isTraceEnabled()) {
log.trace(
"Invalidating {}-{} because of '{}', value is present in the cache: {}",
execution.getWorkflowId(),
runId,
reason,
cache.getIfPresent(runId),
cause);
}
cache.invalidate(runId);
String runId = execution.getRunId();
@Nullable WorkflowRunTaskHandler present = cache.getIfPresent(runId);
if (log.isTraceEnabled()) {
log.trace(
"Invalidating {}-{} because of '{}', value is present in the cache: {}",
execution.getWorkflowId(),
runId,
reason,
present,
cause);
}
cache.invalidate(runId);
if (present != null) {
workflowTypeScope.counter(MetricsType.STICKY_CACHE_TOTAL_FORCED_EVICTION).inc(1);
} finally {
this.metricsScope.gauge(MetricsType.STICKY_CACHE_SIZE).update(size());
}
}
Expand Down
22 changes: 20 additions & 2 deletions temporal-sdk/src/main/java/io/temporal/worker/MetricsType.java
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ private MetricsType() {}
public static final String LOCAL_ACTIVITY_FAILED_COUNTER =
TEMPORAL_METRICS_PREFIX + "local_activity_failed";

// Worker internals, tagged with worker_type
// Worker internals, tagged with namespace, task_queue, worker_type
public static final String WORKER_START_COUNTER = TEMPORAL_METRICS_PREFIX + "worker_start";
public static final String POLLER_START_COUNTER = TEMPORAL_METRICS_PREFIX + "poller_start";
// gauge
Expand All @@ -140,13 +140,31 @@ private MetricsType() {}
// Worker Factory
//

// tagged with namespace, task_queue, worker_type, workflow_type
public static final String STICKY_CACHE_HIT = TEMPORAL_METRICS_PREFIX + "sticky_cache_hit";
// tagged with namespace, task_queue, worker_type, workflow_type
public static final String STICKY_CACHE_MISS = TEMPORAL_METRICS_PREFIX + "sticky_cache_miss";
// tagged with namespace, task_queue, worker_type, workflow_type
@Deprecated
// This metric in its current form is useless, it's not possible for users to interpret it for any
// meaningful purpose.
// We count in workflows that are getting evicted because we are out of threads in workflow thread
// pool. (makes sense)
// We count in workflows that are getting "evicted" because a full history from the server is
// received. (kinda makes sense)
// We count in workflows that are getting "evicted" because they are done. But only if they were
// added to the cache. (doesn't make sense)
// We DON'T count in workflows that are getting "evicted" because the cache is overflown. (doesn't
// make sense)
// TODO revisit implementation of this metric in Go and if it make sense there, fix the Java
// version.
// Otherwise deprecate it everywhere and remove from docs.
public static final String STICKY_CACHE_TOTAL_FORCED_EVICTION =
TEMPORAL_METRICS_PREFIX + "sticky_cache_total_forced_eviction";
// tagged with namespace, task_queue, worker_type, workflow_type
public static final String STICKY_CACHE_THREAD_FORCED_EVICTION =
TEMPORAL_METRICS_PREFIX + "sticky_cache_thread_forced_eviction";
// gauge
// gauge, tagged with namespace
public static final String STICKY_CACHE_SIZE = TEMPORAL_METRICS_PREFIX + "sticky_cache_size";
// gauge
public static final String WORKFLOW_ACTIVE_THREAD_COUNT =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,12 +55,18 @@ public synchronized void assertNoMetric(String name, Map<String, String> tags) {
fail(
"Metric '"
+ metricName
+ "', all reported metrics: \n "
+ String.join("\n ", counters.keySet()));
+ "' was reported, with value: '"
+ counters.get(metricName).get()
+ "'");
}
}

public synchronized void assertCounter(String name, Map<String, String> tags, long expected) {
assertCounter(name, tags, actual -> actual == expected);
}

public synchronized void assertCounter(
String name, Map<String, String> tags, Predicate<Long> expected) {
String metricName = getMetricName(name, tags);
AtomicLong accumulator = counters.get(metricName);
if (accumulator == null) {
Expand All @@ -70,7 +76,8 @@ public synchronized void assertCounter(String name, Map<String, String> tags, lo
+ "', reported metrics: \n "
+ String.join("\n ", counters.keySet()));
}
assertEquals(String.valueOf(accumulator.get()), expected, accumulator.get());
long actual = accumulator.get();
assertTrue("" + actual, expected.test(actual));
}

public synchronized void assertGauge(String name, Map<String, String> tags, double expected) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,7 @@ public void whenStickyIsEnabledThenTheWorkflowIsCachedActivities() throws Except
}

@Test
@SuppressWarnings("deprecation")
public void whenStickyIsEnabledThenTheWorkflowIsCachedChildWorkflows() throws Exception {
// Arrange
String taskQueueName = "cachedStickyTest_ChildWorkflows";
Expand Down Expand Up @@ -291,8 +292,21 @@ public void whenStickyIsEnabledThenTheWorkflowIsCachedChildWorkflows() throws Ex
.put(MetricsTag.WORKFLOW_TYPE, "TestWorkflow1")
.build();
metricsScope.close(); // Flush metrics
reporter.assertCounter(MetricsType.STICKY_CACHE_HIT, tags, 2);

// making sure none workflow tasks came with a full history (after timeout from the sticky
// queue) which caused a forced eviction from the cache. 1 eviction comes from the finishing of
// the parent workflow and eviction of it from the cache.
// TODO feel free to remove this assertion if refactoring out STICKY_CACHE_TOTAL_FORCED_EVICTION
// metric. It has been added just as an additional verification to investigate a flaky test
reporter.assertCounter(MetricsType.STICKY_CACHE_TOTAL_FORCED_EVICTION, tags, 1);

reporter.assertNoMetric(MetricsType.STICKY_CACHE_MISS, tags);
// It's valid for the server to schedule a workflow task
// - after the child is started and before the child is completed
// - or after it was completed
// Depending on that, there will be one or two additional workflow tasks.
// So both 1 or 2 here is a valid scenario. The main thing that matters is 0 cache miss.
reporter.assertCounter(MetricsType.STICKY_CACHE_HIT, tags, a -> a == 1 || a == 2);
// Finish Workflow
wrapper.close();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,16 +29,17 @@
import java.util.concurrent.ConcurrentMap;

public class MetricsTag {
public static final String ACTIVITY_TYPE = "activity_type";
public static final String NAMESPACE = "namespace";
public static final String TASK_QUEUE = "task_queue";
public static final String WORKER_TYPE = "worker_type";

public static final String ACTIVITY_TYPE = "activity_type";
public static final String WORKFLOW_TYPE = "workflow_type";
public static final String SIGNAL_NAME = "signal_name";
public static final String QUERY_TYPE = "query_type";
public static final String STATUS_CODE = "status_code";
public static final String EXCEPTION = "exception";
public static final String OPERATION_NAME = "operation";
public static final String WORKER_TYPE = "worker_type";

/** Used to pass metrics scope to the interceptor */
public static final CallOptions.Key<Scope> METRICS_TAGS_CALL_OPTIONS_KEY =
Expand Down

0 comments on commit f55f610

Please sign in to comment.