Skip to content

Commit

Permalink
[Functions] Close InputStreams properly (apache#9568)
Browse files Browse the repository at this point in the history
### Motivation

InputStreams should always be closed properly. There were a few locations related to function
upload/download where streams weren't closed. This PR also fixes other locations which showed up to not close InputStreams properly.

### Modifications

try-with-resources block is used to handle an InputStream
  • Loading branch information
lhotari authored Feb 12, 2021
1 parent 8b45a65 commit 547ab3b
Show file tree
Hide file tree
Showing 11 changed files with 59 additions and 63 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -220,12 +220,13 @@ public synchronized void start() throws Exception {
}

private void resetDefaultRealm() throws IOException {
InputStream templateResource = new FileInputStream(
getKrb5conf().getAbsolutePath());
String content = IOUtil.readInput(templateResource);
content = content.replaceAll("default_realm = .*\n",
"default_realm = " + getRealm() + "\n");
IOUtil.writeFile(content, getKrb5conf());
try (InputStream templateResource = new FileInputStream(
getKrb5conf().getAbsolutePath())) {
String content = IOUtil.readInput(templateResource);
content = content.replaceAll("default_realm = .*\n",
"default_realm = " + getRealm() + "\n");
IOUtil.writeFile(content, getKrb5conf());
}
}

private void prepareKdcServer() throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,9 @@ public class PulsarConfigurationLoader {
public static <T extends PulsarConfiguration> T create(String configFile,
Class<? extends PulsarConfiguration> clazz) throws IOException, IllegalArgumentException {
checkNotNull(configFile);
return create(new FileInputStream(configFile), clazz);
try (InputStream inputStream = new FileInputStream(configFile)) {
return create(inputStream, clazz);
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import com.google.common.annotations.VisibleForTesting;
import java.io.File;
import java.io.FileInputStream;
import java.io.InputStream;
import java.net.MalformedURLException;
import java.nio.file.Paths;
import java.text.DateFormat;
Expand Down Expand Up @@ -61,10 +62,12 @@ public class PulsarBrokerStarter {
private static ServiceConfiguration loadConfig(String configFile) throws Exception {
SLF4JBridgeHandler.removeHandlersForRootLogger();
SLF4JBridgeHandler.install();
ServiceConfiguration config = create((new FileInputStream(configFile)), ServiceConfiguration.class);
// it validates provided configuration is completed
isComplete(config);
return config;
try (InputStream inputStream = new FileInputStream(configFile)) {
ServiceConfiguration config = create(inputStream, ServiceConfiguration.class);
// it validates provided configuration is completed
isComplete(config);
return config;
}
}

@VisibleForTesting
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,10 @@ public PulsarStandaloneStarter(String[] args) throws Exception {
return;
}

this.config = PulsarConfigurationLoader.create(
(new FileInputStream(this.getConfigFile())), ServiceConfiguration.class);
try (FileInputStream inputStream = new FileInputStream(this.getConfigFile())) {
this.config = PulsarConfigurationLoader.create(
inputStream, ServiceConfiguration.class);
}

String zkServers = "127.0.0.1";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -241,10 +241,11 @@ public void testTlsCertsFromDynamicStreamExpiredAndRenewCert() throws Exception
}

private ByteArrayInputStream createByteInputStream(String filePath) throws IOException {
InputStream inStream = new FileInputStream(filePath);
ByteArrayOutputStream baos = new ByteArrayOutputStream();
IOUtils.copy(inStream, baos);
return new ByteArrayInputStream(baos.toByteArray());
try (InputStream inStream = new FileInputStream(filePath)) {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
IOUtils.copy(inStream, baos);
return new ByteArrayInputStream(baos.toByteArray());
}
}

private ByteArrayInputStream getStream(AtomicInteger index, ByteArrayInputStream... streams) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,9 +172,8 @@ public CompletableFuture<Void> downloadAsync(String packageName, String path) {
@Override
public void completed(Response response) {
if (response.getStatusInfo().equals(Response.Status.OK)) {
InputStream inputStream = response.readEntity(InputStream.class);
Path destinyPath = Paths.get(path);
try {
try (InputStream inputStream = response.readEntity(InputStream.class)) {
Path destinyPath = Paths.get(path);
if (destinyPath.getParent() != null) {
Files.createDirectories(destinyPath.getParent());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,10 @@
import com.google.protobuf.AbstractMessage.Builder;
import com.google.protobuf.MessageOrBuilder;
import com.google.protobuf.util.JsonFormat;
import java.io.InputStream;
import java.nio.file.CopyOption;
import java.nio.file.Files;
import java.nio.file.StandardCopyOption;
import lombok.AccessLevel;
import lombok.NoArgsConstructor;
import lombok.extern.slf4j.Slf4j;
Expand Down Expand Up @@ -218,11 +222,9 @@ public static Class<?> getSinkType(Class sinkClass) {

public static void downloadFromHttpUrl(String destPkgUrl, File targetFile) throws IOException {
URL website = new URL(destPkgUrl);

ReadableByteChannel rbc = Channels.newChannel(website.openStream());
log.info("Downloading function package from {} to {} ...", destPkgUrl, targetFile.getAbsoluteFile());
try (FileOutputStream fos = new FileOutputStream(targetFile)) {
fos.getChannel().transferFrom(rbc, 0, Long.MAX_VALUE);
try (InputStream in = website.openStream()) {
log.info("Downloading function package from {} to {} ...", destPkgUrl, targetFile.getAbsoluteFile());
Files.copy(in, targetFile.toPath(), StandardCopyOption.REPLACE_EXISTING);
}
log.info("Downloading function package from {} to {} completed!", destPkgUrl, targetFile.getAbsoluteFile());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,9 @@ public final class WorkerUtils {
private WorkerUtils(){}

public static void uploadFileToBookkeeper(String packagePath, File sourceFile, Namespace dlogNamespace) throws IOException {
FileInputStream uploadedInputStream = new FileInputStream(sourceFile);
uploadToBookeeper(dlogNamespace, uploadedInputStream, packagePath);
try (FileInputStream uploadedInputStream = new FileInputStream(sourceFile)) {
uploadToBookeeper(dlogNamespace, uploadedInputStream, packagePath);
}
}

public static void uploadToBookeeper(Namespace dlogNamespace,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufUtil;
import io.netty.buffer.Unpooled;
import java.nio.file.Files;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.api.StorageClient;
import org.apache.bookkeeper.api.kv.Table;
Expand Down Expand Up @@ -1261,24 +1262,24 @@ public StreamingOutput downloadFunction(String tenant, String namespace, String
String pkgPath = functionMetaDataManager.getFunctionMetaData(tenant, namespace, componentName)
.getPackageLocation().getPackagePath();

return getStreamingOutput(pkgPath);
}

private StreamingOutput getStreamingOutput(String pkgPath) {
final StreamingOutput streamingOutput = output -> {
if (pkgPath.startsWith(Utils.HTTP)) {
URL url = new URL(pkgPath);
IOUtils.copy(url.openStream(), output);
} else if (pkgPath.startsWith(Utils.FILE)) {
URL url = new URL(pkgPath);
File file;
try {
file = new File(url.toURI());
IOUtils.copy(new FileInputStream(file), output);
} catch (URISyntaxException e) {
throw new IllegalArgumentException("invalid file url path: " + pkgPath);
URL url = URI.create(pkgPath).toURL();
try (InputStream inputStream = url.openStream()) {
IOUtils.copy(inputStream, output);
}
} else if (pkgPath.startsWith(Utils.FILE)) {
URI url = URI.create(pkgPath);
File file = new File(url.getPath());
Files.copy(file.toPath(), output);
} else {
WorkerUtils.downloadFromBookkeeper(worker().getDlogNamespace(), output, pkgPath);
}
};

return streamingOutput;
}

Expand Down Expand Up @@ -1314,25 +1315,7 @@ public StreamingOutput downloadFunction(final String path, String clientRole, Au
}
}

final StreamingOutput streamingOutput = output -> {
if (path.startsWith(Utils.HTTP)) {
URL url = new URL(path);
IOUtils.copy(url.openStream(), output);
} else if (path.startsWith(Utils.FILE)) {
URL url = new URL(path);
File file;
try {
file = new File(url.toURI());
IOUtils.copy(new FileInputStream(file), output);
} catch (URISyntaxException e) {
throw new IllegalArgumentException("invalid file url path: " + path);
}
} else {
WorkerUtils.downloadFromBookkeeper(worker().getDlogNamespace(), output, path);
}
};

return streamingOutput;
return getStreamingOutput(path);
}

private void validateListFunctionRequestParams(final String tenant, final String namespace) throws IllegalArgumentException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -194,9 +194,10 @@ public static ReadHandle open(ScheduledExecutorService executor,
Blob blob = blobStore.getBlob(bucket, indexKey);
versionCheck.check(indexKey, blob);
OffloadIndexBlockBuilder indexBuilder = OffloadIndexBlockBuilder.create();
final InputStream payLoadStream = blob.getPayload().openStream();
OffloadIndexBlock index = (OffloadIndexBlock) indexBuilder.fromStream(payLoadStream);
payLoadStream.close();
OffloadIndexBlock index;
try (InputStream payLoadStream = blob.getPayload().openStream()) {
index = (OffloadIndexBlock) indexBuilder.fromStream(payLoadStream);
}

BackedInputStream inputStream = new BlobStoreBackedInputStreamImpl(blobStore, bucket, key,
versionCheck,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -286,9 +286,10 @@ public static ReadHandle open(ScheduledExecutorService executor,
log.debug("indexKey blob: {} {}", indexKey, blob);
versionCheck.check(indexKey, blob);
OffloadIndexBlockV2Builder indexBuilder = OffloadIndexBlockV2Builder.create();
final InputStream payloadStream = blob.getPayload().openStream();
OffloadIndexBlockV2 index = indexBuilder.fromStream(payloadStream);
payloadStream.close();
OffloadIndexBlockV2 index;
try (InputStream payloadStream = blob.getPayload().openStream()) {
index = indexBuilder.fromStream(payloadStream);
}

BackedInputStream inputStream = new BlobStoreBackedInputStreamImpl(blobStore, bucket, key,
versionCheck,
Expand Down

0 comments on commit 547ab3b

Please sign in to comment.