Skip to content

Commit

Permalink
[FLINK-12556][e2e] Add read-only test FileSystem for end-to-end tests
Browse files Browse the repository at this point in the history
  • Loading branch information
1u0 authored and pnowojski committed Jun 5, 2019
1 parent 16479f4 commit 1c916a4
Show file tree
Hide file tree
Showing 7 changed files with 422 additions and 0 deletions.
74 changes: 74 additions & 0 deletions flink-end-to-end-tests/flink-plugins-test/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>flink-end-to-end-tests</artifactId>
<groupId>org.apache.flink</groupId>
<version>1.9-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>

<artifactId>flink-plugins-test</artifactId>

<packaging>jar</packaging>

<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-core</artifactId>
<version>${project.version}</version>
<scope>provided</scope>
</dependency>
</dependencies>

<build>
<plugins>

<!-- this is merely an intermediate build artifact and should not be -->
<!-- deployed to maven central -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-deploy-plugin</artifactId>
<configuration>
<skip>true</skip>
</configuration>
</plugin>

<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<executions>
<execution>
<id>shade-flink</id>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<finalName>flink-dummy-fs</finalName>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.fs.dummy;

import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.FileSystemFactory;

import java.io.IOException;
import java.net.URI;
import java.util.HashMap;
import java.util.Map;

/**
* Factory of dummy FileSystem. See documentation of {@link DummyFSFileSystem}.
*/
public class DummyFSFactory implements FileSystemFactory {

private final FileSystem fileSystem = new DummyFSFileSystem(getData());

@Override
public String getScheme() {
return DummyFSFileSystem.FS_URI.getScheme();
}

@Override
public FileSystem create(URI fsUri) throws IOException {
return fileSystem;
}

private static Map<String, String> getData() {
Map<String, String> data = new HashMap<>();
data.put("/words", "Hello World how are you, my dear dear world\n");
return data;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.fs.dummy;

import org.apache.flink.core.fs.FileStatus;
import org.apache.flink.core.fs.Path;

class DummyFSFileStatus implements FileStatus {
private final Path path;
private final int length;

DummyFSFileStatus(Path path, int length) {
this.path = path;
this.length = length;
}

@Override
public long getLen() {
return length;
}

@Override
public long getBlockSize() {
return length;
}

@Override
public short getReplication() {
return 0;
}

@Override
public long getModificationTime() {
return 0;
}

@Override
public long getAccessTime() {
return 0;
}

@Override
public boolean isDir() {
return false;
}

@Override
public Path getPath() {
return path;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.fs.dummy;

import org.apache.flink.core.fs.BlockLocation;
import org.apache.flink.core.fs.FSDataInputStream;
import org.apache.flink.core.fs.FSDataOutputStream;
import org.apache.flink.core.fs.FileStatus;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.FileSystemKind;
import org.apache.flink.core.fs.Path;
import org.apache.flink.core.fs.local.LocalBlockLocation;

import javax.annotation.Nullable;

import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.net.URI;
import java.nio.charset.Charset;
import java.util.HashMap;
import java.util.Map;

/**
* A FileSystem implementation for integration testing purposes. Supports and serves read-only content from static
* key value map.
*/
class DummyFSFileSystem extends FileSystem {

static final URI FS_URI = URI.create("dummy:///");

private static final String HOSTNAME = "localhost";

private final URI workingDir;

private final URI homeDir;

private final Map<String, byte[]> contents;

DummyFSFileSystem(Map<String, String> contents) {
this.workingDir = new File(System.getProperty("user.dir")).toURI();
this.homeDir = new File(System.getProperty("user.home")).toURI();
this.contents = convertToByteArrayMap(contents);
}

// ------------------------------------------------------------------------

@Override
public URI getUri() {
return FS_URI;
}

@Override
public Path getWorkingDirectory() {
return new Path(workingDir);
}

@Override
public Path getHomeDirectory() {
return new Path(homeDir);
}

@Override
public boolean exists(Path f) throws IOException {
return getDataByPath(f) != null;
}

@Override
public FileStatus[] listStatus(final Path f) throws IOException {
byte[] data = getDataByPath(f);
if (data == null) {
return null;
}
return new FileStatus[] { new DummyFSFileStatus(f, data.length) };
}

@Override
public BlockLocation[] getFileBlockLocations(FileStatus file, long start, long len) throws IOException {
return new BlockLocation[] {
new LocalBlockLocation(HOSTNAME, file.getLen())
};
}

@Override
public FileStatus getFileStatus(Path f) throws IOException {
byte[] data = getDataByPath(f);
if (data == null) {
throw new FileNotFoundException("File " + f + " does not exist.");
}
return new DummyFSFileStatus(f, data.length);
}

@Override
public FSDataInputStream open(final Path f, final int bufferSize) throws IOException {
return open(f);
}

@Override
public FSDataInputStream open(final Path f) throws IOException {
return DummyFSInputStream.create(getDataByPath(f));
}

@Override
public boolean delete(final Path path, final boolean recursive) throws IOException {
throw new UnsupportedOperationException("Dummy FS doesn't support delete operation");
}

@Override
public boolean mkdirs(final Path path) throws IOException {
throw new UnsupportedOperationException("Dummy FS doesn't support mkdirs operation");
}

@Override
public FSDataOutputStream create(final Path path, final WriteMode overwrite) throws IOException {
throw new UnsupportedOperationException("Dummy FS doesn't support create operation");
}

@Override
public boolean rename(final Path src, final Path dst) throws IOException {
throw new UnsupportedOperationException("Dummy FS doesn't support rename operation");
}

@Override
public boolean isDistributedFS() {
return true;
}

@Override
public FileSystemKind getKind() {
return FileSystemKind.OBJECT_STORE;
}

@Nullable
private byte[] getDataByPath(Path path) {
return contents.get(path.toUri().getPath());
}

private static Map<String, byte[]> convertToByteArrayMap(Map<String, String> content) {
Map<String, byte[]> data = new HashMap<>();
Charset utf8 = Charset.forName("UTF-8");
content.entrySet().forEach(
entry -> data.put(entry.getKey(), entry.getValue().getBytes(utf8))
);
return data;
}
}
Loading

0 comments on commit 1c916a4

Please sign in to comment.