Skip to content

Commit

Permalink
[FLINK-21353][state] Add DFS-based StateChangelog
Browse files Browse the repository at this point in the history
  • Loading branch information
rkhachatryan committed Jul 28, 2021
1 parent e653487 commit 717a552
Show file tree
Hide file tree
Showing 29 changed files with 2,033 additions and 17 deletions.
87 changes: 87 additions & 0 deletions flink-dstl/flink-dstl-dfs/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">

<modelVersion>4.0.0</modelVersion>

<parent>
<groupId>org.apache.flink</groupId>
<artifactId>flink-dstl</artifactId>
<version>1.14-SNAPSHOT</version>
<relativePath>..</relativePath>
</parent>

<artifactId>flink-dstl-dfs_${scala.binary.version}</artifactId>
<name>Flink : DSTL : DFS</name>

<packaging>jar</packaging>

<dependencies>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-core</artifactId>
<version>${project.version}</version>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-runtime</artifactId>
<version>${project.version}</version>
<type>test-jar</type>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-runtime</artifactId>
<version>${project.version}</version>
<type>jar</type>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
<version>${project.version}</version>
<type>test-jar</type>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-shaded-guava</artifactId>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-test-utils-junit</artifactId>
</dependency>

</dependencies>

</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.changelog.fs;

import org.apache.flink.annotation.Experimental;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
import org.apache.flink.configuration.MemorySize;

/** {@link ConfigOptions} for {@link FsStateChangelogStorage}. */
@Experimental
public class FsStateChangelogOptions {
public static final ConfigOption<String> BASE_PATH =
ConfigOptions.key("dstl.dfs.base-path")
.stringType()
.noDefaultValue()
.withDescription("Base path to store changelog files.");
public static final ConfigOption<Boolean> COMPRESSION_ENABLED =
ConfigOptions.key("dstl.dfs.compression.enabled")
.booleanType()
.defaultValue(false)
.withDescription("Whether to enable compression when serializing changelog.");
public static final ConfigOption<MemorySize> PREEMPTIVE_PERSIST_THRESHOLD =
ConfigOptions.key("dstl.dfs.preemptive-persist-threshold")
.memoryType()
.defaultValue(MemorySize.parse("5Mb"))
.withDescription(
"Size threshold for state changes of a single operator "
+ "beyond which they are persisted pre-emptively without waiting for a checkpoint. "
+ " Improves checkpointing time by allowing quasi-continuous uploading of state changes "
+ "(as opposed to uploading all accumulated changes on checkpoint).");
public static final ConfigOption<MemorySize> UPLOAD_BUFFER_SIZE =
ConfigOptions.key("dstl.dfs.upload.buffer-size")
.memoryType()
.defaultValue(MemorySize.parse("1Mb"))
.withDescription("Buffer size used when uploading change sets");
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.changelog.fs;

import org.apache.flink.annotation.Experimental;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.changelog.ChangelogStateHandleStreamImpl;
import org.apache.flink.runtime.state.changelog.StateChangelogHandleReader;
import org.apache.flink.runtime.state.changelog.StateChangelogHandleStreamHandleReader;
import org.apache.flink.runtime.state.changelog.StateChangelogStorage;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.annotation.concurrent.ThreadSafe;

import java.io.IOException;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicInteger;

import static org.apache.flink.changelog.fs.FsStateChangelogOptions.PREEMPTIVE_PERSIST_THRESHOLD;

/** Filesystem-based implementation of {@link StateChangelogStorage}. */
@Experimental
@ThreadSafe
public class FsStateChangelogStorage
implements StateChangelogStorage<ChangelogStateHandleStreamImpl> {
private static final Logger LOG = LoggerFactory.getLogger(FsStateChangelogStorage.class);

private final StateChangeUploader uploader;
private final long preEmptivePersistThresholdInBytes;

/**
* The log id is only needed on write to separate changes from different backends (i.e.
* operators) in the resulting file.
*/
private final AtomicInteger logIdGenerator = new AtomicInteger(0);

public FsStateChangelogStorage(Configuration config) throws IOException {
this(
StateChangeUploader.fromConfig(config),
config.get(PREEMPTIVE_PERSIST_THRESHOLD).getBytes());
}

@VisibleForTesting
public FsStateChangelogStorage(Path basePath, boolean compression, int bufferSize)
throws IOException {
this(
new StateChangeFsUploader(
basePath, basePath.getFileSystem(), compression, bufferSize),
PREEMPTIVE_PERSIST_THRESHOLD.defaultValue().getBytes());
}

private FsStateChangelogStorage(
StateChangeUploader uploader, long preEmptivePersistThresholdInBytes) {
this.uploader = uploader;
this.preEmptivePersistThresholdInBytes = preEmptivePersistThresholdInBytes;
}

@Override
public FsStateChangelogWriter createWriter(String operatorID, KeyGroupRange keyGroupRange) {
UUID logId = new UUID(0, logIdGenerator.getAndIncrement());
LOG.info("createWriter for operator {}/{}: {}", operatorID, keyGroupRange, logId);
return new FsStateChangelogWriter(
logId, keyGroupRange, uploader, preEmptivePersistThresholdInBytes);
}

@Override
public StateChangelogHandleReader<ChangelogStateHandleStreamImpl> createReader() {
return new StateChangelogHandleStreamHandleReader(new StateChangeFormat());
}

@Override
public void close() throws Exception {
uploader.close();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.changelog.fs;

import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.changelog.StateChangelogStorage;
import org.apache.flink.runtime.state.changelog.StateChangelogStorageFactory;

import java.io.IOException;

class FsStateChangelogStorageFactory implements StateChangelogStorageFactory {

@Override
public String getIdentifier() {
return "filesystem";
}

@Override
public StateChangelogStorage<?> createStorage(Configuration configuration) throws IOException {
return new FsStateChangelogStorage(configuration);
}
}
Loading

0 comments on commit 717a552

Please sign in to comment.