Skip to content

Commit

Permalink
S3 plugin fully working now!
Browse files Browse the repository at this point in the history
  • Loading branch information
ianopolous committed Aug 16, 2019
1 parent 5b6d5cb commit bb954e4
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 17 deletions.
5 changes: 3 additions & 2 deletions src/peergos/server/storage/IpfsInstaller.java
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ public Object toJson(Multihash nodeId) {
String s3PathPrefix = nodeId.toString() + "/" + path;
child.put("path", s3PathPrefix);
child.put("bucket", bucket);
child.put("accesKey", accessKey);
child.put("accessKey", accessKey);
child.put("secretKey", secretKey);
child.put("region", region);
child.put("regionEndpoint", regionEndpoint);
Expand Down Expand Up @@ -125,6 +125,7 @@ public void configure(IpfsWrapper ipfs) {
// Do the configuration dance..
System.out.println("Configuring S3 datastore IPFS plugin");
Multihash nodeId = ipfs.nodeId();

// update the config file
List<Object> mount = Arrays.asList(
toJson(nodeId),
Expand All @@ -140,7 +141,7 @@ public void configure(IpfsWrapper ipfs) {
" }")
);
String mounts = JSONParser.toString(mount);
ipfs.setConfig("Datastore.Spec.Mounts", mounts);
ipfs.setConfig("Datastore.Spec.mounts", mounts);

// replace the datastore spec file
String newDataStoreSpec = "{\"mounts\":[{\"bucket\":\"" + bucket +
Expand Down
62 changes: 47 additions & 15 deletions src/peergos/server/storage/IpfsWrapper.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,7 @@
import peergos.shared.user.*;
import peergos.shared.util.*;

import java.io.File;
import java.io.IOException;
import java.io.*;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
Expand Down Expand Up @@ -154,7 +153,7 @@ public synchronized Multihash nodeId() {
}

public synchronized void setConfig(String key, String val) {
runIpfsCmd("config", key, val);
runIpfsCmd("config", "--json", key, val);
}

private synchronized void start() {
Expand Down Expand Up @@ -254,7 +253,7 @@ private Process startIpfsCmdRetry(int retryCount, String... subCmd) {
long sleepMs = 100;
Process process = null;
for (int i = 0; i < retryCount; i++) {
process = startIpfsCmdOnce(subCmd);
process = startIpfsCmdOnce(true, subCmd);
try {
Thread.sleep(sleepMs);
} catch (InterruptedException ie){}
Expand All @@ -270,7 +269,7 @@ private Process startIpfsCmdRetry(int retryCount, String... subCmd) {
return process;
}

private Process startIpfsCmdOnce(String... subCmd) {
private Process startIpfsCmdOnce(boolean log, String... subCmd) {
LinkedList<String> list = new LinkedList<>(Arrays.asList(subCmd));
list.addFirst(ipfsPath.toString());
ProcessBuilder pb = new ProcessBuilder(list);
Expand All @@ -279,10 +278,12 @@ private Process startIpfsCmdOnce(String... subCmd) {
String command = Arrays.stream(subCmd).collect(Collectors.joining(" "));
System.out.println(command);
Process started = pb.start();
new Thread(() -> Logging.log(started.getInputStream(),
"$(ipfs " + command + ") out: "), "IPFS output stream").start();
new Thread(() -> Logging.log(started.getErrorStream(),
"$(ipfs " + command + ") err: "), "IPFS error stream").start();
if (log) {
new Thread(() -> Logging.log(started.getInputStream(),
"$(ipfs " + command + ") out: "), "IPFS output stream").start();
new Thread(() -> Logging.log(started.getErrorStream(),
"$(ipfs " + command + ") err: "), "IPFS error stream").start();
}
return started;
} catch (IOException ioe) {
throw new IllegalStateException(ioe.getMessage(), ioe);
Expand All @@ -295,14 +296,45 @@ private void runIpfsCmd(String... subCmd) {
}

private String runIpfsCmdAndGetOutput(String... subCmd) {
Process process = startIpfsCmd(subCmd);
Process process = startIpfsCmdOnce(false, subCmd);
try {
int rc = process.waitFor();
File stderrFile = File.createTempFile("tmpErr", "out");
File stdoutFile = File.createTempFile("tmpStd", "out");
try {
Thread logThread = new Thread(() -> {
BufferedReader reader = new BufferedReader(new InputStreamReader(process.getInputStream()));
String line;
try {
BufferedWriter writer = new BufferedWriter(new FileWriter(stdoutFile));
while ((line = reader.readLine()) != null) {
writer.write(line);
writer.flush();
}
} catch (Exception e) {
throw new RuntimeException(e);
}
});
logThread.start();
int exitCode = -1;
boolean done = false;
while (! done) {
try {
exitCode = process.waitFor();
done = true;
} catch (InterruptedException ie) {
System.out.println("Interrupted waiting for process to exit.");
}
}
logThread.join();

if (rc == 0) {
return new String(Serialize.readFully(process.getInputStream(), 1024*1024));
} else {
throw new IllegalStateException("ipfs " + Arrays.asList(subCmd) + " returned exit-code " + rc);
if (exitCode == 0) {
return new String(Serialize.readFully(new FileInputStream(stdoutFile), 1024 * 1024));
} else {
throw new IllegalStateException("ipfs " + Arrays.asList(subCmd) + " returned exit-code " + exitCode);
}
} finally {
stderrFile.delete();
stdoutFile.delete();
}
} catch (Exception ioe) {
throw new IllegalStateException(ioe.getMessage(), ioe);
Expand Down

0 comments on commit bb954e4

Please sign in to comment.