Skip to content

Commit

Permalink
[hotfix][state] Extract PeriodicMaterializationManager into state-bac…
Browse files Browse the repository at this point in the history
…kend-common

This commit is copied from apache#19312 / FLINK-26965 .
It allows to:
- test PeriodicMaterializationManager easier (subsequent commit)
- reuse PeriodicMaterializationManager for non-changelog cases (e.g. FLIP-151)
  • Loading branch information
rkhachatryan committed Aug 26, 2022
1 parent 3149c62 commit a38b852
Show file tree
Hide file tree
Showing 11 changed files with 149 additions and 23 deletions.
6 changes: 6 additions & 0 deletions flink-state-backends/flink-statebackend-changelog/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,12 @@ under the License.
<artifactId>flink-shaded-guava</artifactId>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-statebackend-common</artifactId>
<version>${project.version}</version>
</dependency>

<!-- test dependencies -->
<dependency>
<groupId>org.apache.flink</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@
import org.apache.flink.runtime.state.ttl.TtlTimeProvider;
import org.apache.flink.state.changelog.restore.ChangelogRestoreTarget;
import org.apache.flink.state.changelog.restore.FunctionDelegationHelper;
import org.apache.flink.state.common.PeriodicMaterializationManager.MaterializationTarget;

import org.apache.flink.shaded.guava30.com.google.common.io.Closer;

Expand Down Expand Up @@ -95,7 +96,7 @@
import static java.util.Collections.emptyList;
import static java.util.Collections.singletonList;
import static org.apache.flink.runtime.state.StateSnapshotTransformer.StateSnapshotTransformFactory.noTransform;
import static org.apache.flink.state.changelog.PeriodicMaterializationManager.MaterializationRunnable;
import static org.apache.flink.state.common.PeriodicMaterializationManager.MaterializationRunnable;
import static org.apache.flink.util.Preconditions.checkNotNull;

/**
Expand All @@ -109,7 +110,8 @@ public class ChangelogKeyedStateBackend<K>
implements CheckpointableKeyedStateBackend<K>,
CheckpointListener,
TestableKeyedStateBackend<K>,
InternalCheckpointListener {
InternalCheckpointListener,
MaterializationTarget {
private static final Logger LOG = LoggerFactory.getLogger(ChangelogKeyedStateBackend.class);

/**
Expand Down Expand Up @@ -754,6 +756,7 @@ private ChangelogSnapshotState completeRestore(
* @return a tuple of - future snapshot result from the underlying state backend - a {@link
* SequenceNumber} identifying the latest change in the changelog
*/
@Override
public Optional<MaterializationRunnable> initMaterialization() throws Exception {
SequenceNumber upTo = stateChangelogWriter.nextSequenceNumber();
SequenceNumber lastMaterializedTo = changelogSnapshotState.lastMaterializedTo();
Expand Down Expand Up @@ -800,7 +803,8 @@ public Optional<MaterializationRunnable> initMaterialization() throws Exception
* This method is not thread safe. It should be called either under a lock or through task
* mailbox executor.
*/
public void updateChangelogSnapshotState(
@Override
public void handleMaterializationResult(
SnapshotResult<KeyedStateHandle> materializedSnapshot,
long materializationID,
SequenceNumber upTo) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@
import org.apache.flink.runtime.taskmanager.AsynchronousException;
import org.apache.flink.state.changelog.restore.ChangelogBackendRestoreOperation;
import org.apache.flink.state.changelog.restore.ChangelogBackendRestoreOperation.BaseBackendBuilder;
import org.apache.flink.state.common.ChangelogMaterializationMetricGroup;
import org.apache.flink.state.common.PeriodicMaterializationManager;
import org.apache.flink.util.Preconditions;

import java.util.Collection;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ public void testCheckpointConfirmation() throws Exception {
MockKeyedStateBackend<Integer> mock = createMock();
ChangelogKeyedStateBackend<Integer> changelog = createChangelog(mock);
try {
changelog.updateChangelogSnapshotState(
changelog.handleMaterializationResult(
SnapshotResult.empty(), materializationId, SequenceNumber.of(Long.MAX_VALUE));
checkpoint(changelog, checkpointId).get().discardState();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@
import org.apache.flink.runtime.state.memory.MemCheckpointStreamFactory;
import org.apache.flink.runtime.state.storage.JobManagerCheckpointStorage;
import org.apache.flink.runtime.testutils.ExceptionallyDoneFuture;
import org.apache.flink.state.common.ChangelogMaterializationMetricGroup;
import org.apache.flink.state.common.PeriodicMaterializationManager;
import org.apache.flink.util.Preconditions;

import org.junit.Test;
Expand All @@ -48,13 +50,13 @@
import java.util.concurrent.RunnableFuture;

import static org.apache.flink.runtime.state.StateBackendTestUtils.wrapStateBackendWithSnapshotFunction;
import static org.apache.flink.state.changelog.ChangelogMaterializationMetricGroup.COMPLETED_MATERIALIZATION;
import static org.apache.flink.state.changelog.ChangelogMaterializationMetricGroup.FAILED_MATERIALIZATION;
import static org.apache.flink.state.changelog.ChangelogMaterializationMetricGroup.STARTED_MATERIALIZATION;
import static org.apache.flink.state.changelog.ChangelogStateBackendMetricGroup.LATEST_FULL_SIZE_OF_MATERIALIZATION;
import static org.apache.flink.state.changelog.ChangelogStateBackendMetricGroup.LATEST_FULL_SIZE_OF_NON_MATERIALIZATION;
import static org.apache.flink.state.changelog.ChangelogStateBackendMetricGroup.LATEST_INC_SIZE_OF_MATERIALIZATION;
import static org.apache.flink.state.changelog.ChangelogStateBackendMetricGroup.LATEST_INC_SIZE_OF_NON_MATERIALIZATION;
import static org.apache.flink.state.common.ChangelogMaterializationMetricGroup.COMPLETED_MATERIALIZATION;
import static org.apache.flink.state.common.ChangelogMaterializationMetricGroup.FAILED_MATERIALIZATION;
import static org.apache.flink.state.common.ChangelogMaterializationMetricGroup.STARTED_MATERIALIZATION;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotEquals;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@
import org.apache.flink.runtime.state.VoidNamespaceSerializer;
import org.apache.flink.runtime.state.ttl.TtlTimeProvider;
import org.apache.flink.runtime.testutils.statemigration.TestType;
import org.apache.flink.state.common.ChangelogMaterializationMetricGroup;
import org.apache.flink.state.common.PeriodicMaterializationManager;
import org.apache.flink.util.CloseableIterator;
import org.apache.flink.util.IOUtils;
import org.apache.flink.util.concurrent.Executors;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -450,7 +450,7 @@ private static void startTracking(

private static void materialize(
ChangelogKeyedStateBackend<String> backend, StateChangelogWriter<?> writer) {
backend.updateChangelogSnapshotState(empty(), 0L, writer.nextSequenceNumber());
backend.handleMaterializationResult(empty(), 0L, writer.nextSequenceNumber());
}

private static void truncate(
Expand Down
77 changes: 77 additions & 0 deletions flink-state-backends/flink-statebackend-common/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">

<modelVersion>4.0.0</modelVersion>

<parent>
<groupId>org.apache.flink</groupId>
<artifactId>flink-state-backends</artifactId>
<version>1.16-SNAPSHOT</version>
<relativePath>..</relativePath>
</parent>

<artifactId>flink-statebackend-common</artifactId>
<name>Flink : State backends : Common</name>

<packaging>jar</packaging>

<dependencies>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-core</artifactId>
<version>${project.version}</version>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-runtime</artifactId>
<version>${project.version}</version>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-shaded-guava</artifactId>
</dependency>

<!-- test dependencies -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-runtime</artifactId>
<version>${project.version}</version>
<scope>test</scope>
<type>test-jar</type>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-test-utils-junit</artifactId>
<scope>test</scope>
</dependency>

</dependencies>

</project>
Original file line number Diff line number Diff line change
Expand Up @@ -15,33 +15,35 @@
* limitations under the License.
*/

package org.apache.flink.state.changelog;
package org.apache.flink.state.common;

import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.metrics.ThreadSafeSimpleCounter;
import org.apache.flink.runtime.metrics.groups.ProxyMetricGroup;

/** Metrics related to the materialization part of Changelog. */
class ChangelogMaterializationMetricGroup extends ProxyMetricGroup<MetricGroup> {
@Internal
public class ChangelogMaterializationMetricGroup extends ProxyMetricGroup<MetricGroup> {

private static final String PREFIX = "ChangelogMaterialization";

@VisibleForTesting
static final String STARTED_MATERIALIZATION = PREFIX + ".startedMaterialization";
public static final String STARTED_MATERIALIZATION = PREFIX + ".startedMaterialization";

@VisibleForTesting
static final String COMPLETED_MATERIALIZATION = PREFIX + ".completedMaterialization";
public static final String COMPLETED_MATERIALIZATION = PREFIX + ".completedMaterialization";

@VisibleForTesting
static final String FAILED_MATERIALIZATION = PREFIX + ".failedMaterialization";
public static final String FAILED_MATERIALIZATION = PREFIX + ".failedMaterialization";

private final Counter startedMaterializationCounter;
private final Counter completedMaterializationCounter;
private final Counter failedMaterializationCounter;

ChangelogMaterializationMetricGroup(MetricGroup parentMetricGroup) {
public ChangelogMaterializationMetricGroup(MetricGroup parentMetricGroup) {
super(parentMetricGroup);
this.startedMaterializationCounter =
counter(STARTED_MATERIALIZATION, new ThreadSafeSimpleCounter());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,9 @@
* limitations under the License.
*/

package org.apache.flink.state.changelog;
package org.apache.flink.state.common;

import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.operators.MailboxExecutor;
import org.apache.flink.core.fs.FileSystemSafetyNet;
Expand Down Expand Up @@ -46,7 +47,35 @@
import static org.apache.flink.util.Preconditions.checkNotNull;

/** Stateless Materialization Manager. */
class PeriodicMaterializationManager implements Closeable {
@Internal
public class PeriodicMaterializationManager implements Closeable {

/** {@link MaterializationRunnable} provider and consumer, i.e. state backend. */
public interface MaterializationTarget {

/**
* Initialize state materialization so that materialized data can be persisted durably and
* included into the checkpoint.
*
* <p>This method is not thread safe. It should be called either under a lock or through
* task mailbox executor.
*
* @return a tuple of - future snapshot result from the underlying state backend - a {@link
* SequenceNumber} identifying the latest change in the changelog
*/
Optional<MaterializationRunnable> initMaterialization() throws Exception;

/**
* This method is not thread safe. It should be called either under a lock or through task
* mailbox executor.
*/
void handleMaterializationResult(
SnapshotResult<KeyedStateHandle> materializedSnapshot,
long materializationID,
SequenceNumber upTo)
throws Exception;
}

private static final Logger LOG = LoggerFactory.getLogger(PeriodicMaterializationManager.class);

/** task mailbox executor, execute from Task Thread. */
Expand All @@ -70,7 +99,7 @@ class PeriodicMaterializationManager implements Closeable {
/** Number of consecutive materialization failures. */
private final AtomicInteger numberOfConsecutiveFailures;

private final ChangelogKeyedStateBackend<?> keyedStateBackend;
private final MaterializationTarget target;

private final ChangelogMaterializationMetricGroup metrics;

Expand All @@ -79,12 +108,12 @@ class PeriodicMaterializationManager implements Closeable {

private final long initialDelay;

PeriodicMaterializationManager(
public PeriodicMaterializationManager(
MailboxExecutor mailboxExecutor,
ExecutorService asyncOperationsThreadPool,
String subtaskName,
AsyncExceptionHandler asyncExceptionHandler,
ChangelogKeyedStateBackend<?> keyedStateBackend,
MaterializationTarget target,
ChangelogMaterializationMetricGroup metricGroup,
long periodicMaterializeDelay,
int allowedNumberOfFailures,
Expand All @@ -94,7 +123,7 @@ class PeriodicMaterializationManager implements Closeable {
this.subtaskName = checkNotNull(subtaskName);
this.asyncExceptionHandler = checkNotNull(asyncExceptionHandler);
this.metrics = metricGroup;
this.keyedStateBackend = checkNotNull(keyedStateBackend);
this.target = checkNotNull(target);

this.periodicMaterializeDelay = periodicMaterializeDelay;
this.allowedNumberOfFailures = allowedNumberOfFailures;
Expand Down Expand Up @@ -139,7 +168,7 @@ public void triggerMaterialization() {
metrics.reportStartedMaterialization();
Optional<MaterializationRunnable> materializationRunnableOptional;
try {
materializationRunnableOptional = keyedStateBackend.initMaterialization();
materializationRunnableOptional = target.initMaterialization();
} catch (Exception ex) {
metrics.reportFailedMaterialization();
throw ex;
Expand Down Expand Up @@ -182,7 +211,7 @@ private void asyncMaterializationPhase(
mailboxExecutor.execute(
() -> {
try {
keyedStateBackend.updateChangelogSnapshotState(
target.handleMaterializationResult(
snapshotResult, materializationID, upTo);
metrics.reportCompletedMaterialization();
} catch (Exception ex) {
Expand Down Expand Up @@ -292,7 +321,8 @@ private synchronized void scheduleNextMaterialization(long offset) {
}
}

static class MaterializationRunnable {
/** A {@link Runnable} representing the materialization and the associated metadata. */
public static class MaterializationRunnable {
private final RunnableFuture<SnapshotResult<KeyedStateHandle>> materializationRunnable;

private final long materializationID;
Expand Down
1 change: 1 addition & 0 deletions flink-state-backends/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -39,5 +39,6 @@ under the License.
<module>flink-statebackend-rocksdb</module>
<module>flink-statebackend-heap-spillable</module>
<module>flink-statebackend-changelog</module>
<module>flink-statebackend-common</module>
</modules>
</project>

0 comments on commit a38b852

Please sign in to comment.