Skip to content

Latest commit

 

History

History
 
 

17.3 - Pipelined

Folders and files

NameName
Last commit message
Last commit date

parent directory

..
 
 
 
 
 
 
 
 
 
 
 
 

Example 17.3 - Pipelined

Pipelined version of our two-process file synchronizer, minimizing the chattiness of the protocol

./mill -i sync.test

Upstream Example: 17.2 - FileSyncer:

Diff:

diff --git a/17.2 - FileSyncer/sync/src/Sync.scala b/17.3 - Pipelined/sync/src/Sync.scala
index 30e1a48..367c163 100644
--- a/17.2 - FileSyncer/sync/src/Sync.scala	
+++ b/17.3 - Pipelined/sync/src/Sync.scala	
@@ -6,29 +6,28 @@ object Sync {
     val agentExecutable = os.temp(os.read.bytes(os.resource / "agent.jar"))
     os.perms.set(agentExecutable, "rwx------")
     val agent = os.proc(agentExecutable).spawn(cwd = dest)
-    def callAgent[T: upickle.default.Reader](rpc: Rpc): T = {
+    def callAgent[T: upickle.default.Reader](rpc: Rpc): () => T = {
       Shared.send(agent.stdin.data, rpc)
-      Shared.receive[T](agent.stdout.data)
+      () => Shared.receive[T](agent.stdout.data)
     }
-    for (srcSubPath <- os.walk(src)) {
-      val subPath = srcSubPath.subRelativeTo(src)
-      val destSubPath = dest / subPath
-      (os.isDir(srcSubPath), callAgent[Boolean](Rpc.IsDir(subPath))) match {
-        case (false, true) =>
-          callAgent[Unit](Rpc.WriteOver(os.read.bytes(srcSubPath), subPath))
-        case (true, false) =>
-          for (p <- os.walk(srcSubPath) if os.isFile(p)) {
-            callAgent[Unit](Rpc.WriteOver(os.read.bytes(p), p.subRelativeTo(src)))
+    val subPaths = os.walk(src).map(_.subRelativeTo(src))
+    def pipelineCalls[T: upickle.default.Reader](rpcFor: os.SubPath => Option[Rpc]) = {
+      val buffer = collection.mutable.Buffer.empty[(os.RelPath, () => T)]
+      for (p <- subPaths; rpc <- rpcFor (p)) buffer.append((p, callAgent[T](rpc)))
+      buffer.map{case (k, v) => (k, v())}.toMap
     }
-        case (false, false)
-          if !callAgent[Boolean](Rpc.Exists(subPath))
-            || !os.read.bytes(srcSubPath).sameElements(
-            callAgent[Array[Byte]](Rpc.ReadBytes(subPath))
-          ) =>
-
-          callAgent[Unit](Rpc.WriteOver(os.read.bytes(srcSubPath), subPath))
-
-        case _ => // do nothing
+    val existsMap = pipelineCalls[Boolean](p => Some(Rpc.Exists(p)))
+    val isDirMap = pipelineCalls[Boolean](p => Some(Rpc.IsDir(p)))
+    val readMap = pipelineCalls[Array[Byte]]{p =>
+      if (existsMap(p) && !isDirMap(p)) Some(Rpc.ReadBytes(p))
+      else None
+    }
+    pipelineCalls[Unit]{ p =>
+      if (os.isDir(src / p)) None
+      else {
+        val localBytes = os.read.bytes(src / p)
+        if (readMap.get(p).exists(java.util.Arrays.equals(_, localBytes))) None
+        else Some(Rpc.WriteOver(localBytes, p))
       }
     }
   }

Downstream Examples