Skip to content
This repository has been archived by the owner on Dec 27, 2023. It is now read-only.

Commit

Permalink
feat: added class to process asto value as stream (#336)
Browse files Browse the repository at this point in the history
Added class to read storage value as input stream if exists, process the value and write it back to storage.
Such workflow is used to create/update repository metadata files (in conda and debian at least).

Ticket: #333
  • Loading branch information
olenagerasimova authored Sep 21, 2021
1 parent baf65c4 commit ed21e7c
Show file tree
Hide file tree
Showing 3 changed files with 192 additions and 0 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
# v1.5.0

- feat: Added class to process asto value as input/output streams (#334)

# v1.4.0

- feat: process storage item as input stream (#334)
Expand Down
101 changes: 101 additions & 0 deletions src/main/java/com/artipie/asto/streams/StorageValuePipeline.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
/*
* The MIT License (MIT) Copyright (c) 2020-2021 artipie.com
* https://github.com/artipie/asto/LICENSE.txt
*/
package com.artipie.asto.streams;

import com.artipie.asto.ArtipieIOException;
import com.artipie.asto.Content;
import com.artipie.asto.Key;
import com.artipie.asto.Storage;
import com.artipie.asto.misc.UncheckedIOConsumer;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.PipedInputStream;
import java.io.PipedOutputStream;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.function.BiConsumer;
import org.cqfn.rio.Buffers;
import org.cqfn.rio.WriteGreed;
import org.cqfn.rio.stream.ReactiveInputStream;
import org.cqfn.rio.stream.ReactiveOutputStream;

/**
* Processes storage value content as optional input stream and
* saves the result back as output stream.
* @since 1.5
*/
public final class StorageValuePipeline {

/**
* Abstract storage.
*/
private final Storage asto;

/**
* Storage item key to process.
*/
private final Key key;

/**
* Ctor.
* @param asto Abstract storage
* @param key Item key
*/
public StorageValuePipeline(final Storage asto, final Key key) {
this.asto = asto;
this.key = key;
}

/**
* Process storage item and save it back.
* @param action Action to perform with storage content if exists and write back as
* output stream.
* @return Completion action
* @throws ArtipieIOException On Error
*/
CompletionStage<Void> process(final BiConsumer<Optional<InputStream>, OutputStream> action) {
return this.asto.exists(this.key).thenCompose(
exists -> {
final CompletionStage<Void> future;
Optional<InputStream> oinput = Optional.empty();
Optional<PipedOutputStream> oout = Optional.empty();
final CompletableFuture<Void> tmp;
try (PipedOutputStream resout = new PipedOutputStream()) {
if (exists) {
oinput = Optional.of(new PipedInputStream());
final PipedOutputStream tmpout =
new PipedOutputStream((PipedInputStream) oinput.get());
oout = Optional.of(tmpout);
tmp = this.asto.value(this.key).thenCompose(
input -> new ReactiveOutputStream(tmpout)
.write(input, WriteGreed.SYSTEM)
);
} else {
tmp = CompletableFuture.allOf();
oinput = Optional.empty();
}
final PipedInputStream src = new PipedInputStream(resout);
future = tmp.thenCompose(
nothing -> this.asto.save(
this.key,
new Content.From(
new ReactiveInputStream(src).read(Buffers.Standard.K8)
)
)
);
action.accept(oinput, resout);
} catch (final IOException err) {
throw new ArtipieIOException(err);
} finally {
oinput.ifPresent(new UncheckedIOConsumer<>(InputStream::close));
oout.ifPresent(new UncheckedIOConsumer<>(PipedOutputStream::close));
}
return future;
}
);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
/*
* The MIT License (MIT) Copyright (c) 2020-2021 artipie.com
* https://github.com/artipie/asto/LICENSE.txt
*/
package com.artipie.asto.streams;

import com.artipie.asto.ArtipieIOException;
import com.artipie.asto.Content;
import com.artipie.asto.Key;
import com.artipie.asto.Storage;
import com.artipie.asto.blocking.BlockingStorage;
import com.artipie.asto.memory.InMemoryStorage;
import java.io.IOException;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.List;
import org.hamcrest.MatcherAssert;
import org.hamcrest.core.IsEqual;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.testcontainers.shaded.org.apache.commons.io.IOUtils;

/**
* Test for {@link StorageValuePipeline}.
* @since 1.5
*/
class StorageValuePipelineTest {

/**
* Test storage.
*/
private Storage asto;

@BeforeEach
void init() {
this.asto = new InMemoryStorage();
}

@Test
void processesExistingItem() {
final Key key = new Key.From("test.txt");
final Charset charset = StandardCharsets.US_ASCII;
this.asto.save(key, new Content.From("one\ntwo\nfour".getBytes(charset))).join();
new StorageValuePipeline(this.asto, key).process(
(input, out) -> {
try {
final List<String> list = IOUtils.readLines(input.get(), charset);
list.add(2, "three");
IOUtils.writeLines(list, "\n", out, charset);
} catch (final IOException err) {
throw new ArtipieIOException(err);
}
}
).toCompletableFuture().join();
MatcherAssert.assertThat(
new String(new BlockingStorage(this.asto).value(key), charset),
new IsEqual<>("one\ntwo\nthree\nfour\n")
);
}

@Test
void writesNewItem() {
final Key key = new Key.From("my_test.txt");
final Charset charset = StandardCharsets.US_ASCII;
final String text = "Hello world!";
new StorageValuePipeline(this.asto, key).process(
(input, out) -> {
MatcherAssert.assertThat(
"Input should be absent",
input.isPresent(),
new IsEqual<>(false)
);
try {
IOUtils.write(text, out, charset);
} catch (final IOException err) {
throw new ArtipieIOException(err);
}
}
).toCompletableFuture().join();
MatcherAssert.assertThat(
"test.txt does not contain text `Hello world!`",
new String(new BlockingStorage(this.asto).value(key), charset),
new IsEqual<>(text)
);
}

}

0 comments on commit ed21e7c

Please sign in to comment.