Skip to content

Commit

Permalink
enhance(sync): add specs for sync-xxx fns
Browse files Browse the repository at this point in the history
  • Loading branch information
RCmerci committed Apr 4, 2022
1 parent b182385 commit f7370de
Showing 1 changed file with 57 additions and 37 deletions.
94 changes: 57 additions & 37 deletions src/main/frontend/fs/sync.cljs
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,28 @@
(s/def ::TXContent string?)
(s/def ::diff (s/keys :req-un [::TXId ::TXType ::TXContent]))


(s/def ::succ-map #(= {:succ true} %))
(s/def ::unknown-map (comp some? :unknown))
(s/def ::stop-map #(= {:stop true} %))
(s/def ::need-sync-remote #(= {:need-sync-remote true} %))

(s/def ::sync-local->remote!-result
(s/or :succ ::succ-map
:need-sync-remote ::need-sync-remote
:unknown ::unknown-map))

(s/def ::sync-remote->local!-result
(s/or :succ ::succ-map
:need-remote->local-full-sync
#(= {:need-remote->local-full-sync true} %)
:stop ::stop-map
:unknown ::unknown-map))

(s/def ::sync-local->remote-all-files!-result
(s/or :succ ::succ-map
:stop ::stop-map
:need-sync-remote ::need-sync-remote
:unknown ::unknown-map))

(def ws-addr "wss://og96xf1si7.execute-api.us-east-2.amazonaws.com/production?graphuuid=%s")

Expand Down Expand Up @@ -161,8 +182,8 @@
[path]
(let [parts (string/split path "/")]
(if (and (< 2 (count parts))
(= 36 (count (parts 0)))
(= 36 (count (parts 1))))
(= 36 (count (parts 0)))
(= 36 (count (parts 1))))
(string/join "/" (drop 2 parts))
path)))

Expand Down Expand Up @@ -283,11 +304,11 @@
NOTE: this xf should apply on reversed diffs sequence (sort by txid)"
[n]
(comp
(map diff->filetxns)
cat
distinct-update-filetxns-xf
remove-deleted-filetxns-xf
(partition-filetxns n)))
(map diff->filetxns)
cat
distinct-update-filetxns-xf
remove-deleted-filetxns-xf
(partition-filetxns n)))

(defn- filepath->diff
[index {:keys [relative-path user-uuid graph-uuid]}]
Expand All @@ -307,7 +328,6 @@
(map-indexed filepath->diff)
(diffs->partitioned-filetxns n)))


(deftype FileMetadata [size etag path last-modified remote? ^:mutable normalized-path]
Object
(get-normalized-path [_]
Expand Down Expand Up @@ -460,10 +480,8 @@
(retry-rsapi
#(p->c (ipc/ipc "delete-remote-files" graph-uuid base-path filepaths local-txid token))))))))


(def rsapi (->RSAPI))


(deftype RemoteAPI []
Object

Expand Down Expand Up @@ -555,9 +573,9 @@
:Transactions
(as-> txns
(sort-by :TXId txns)
[txns
(:TXId (last txns))
(:TXId (first txns))]))))))
[txns
(:TXId (last txns))
(:TXId (first txns))]))))))

(create-graph [this graph-name]
(.request this "create_graph" {:GraphName graph-name}))
Expand Down Expand Up @@ -652,6 +670,8 @@


;; type = "change" | "add" | "unlink"


(deftype FileChangeEvent [type dir path stat]
IRelativePath
(-relative-path [_] (remove-dir-prefix dir path))
Expand Down Expand Up @@ -706,8 +726,6 @@
(sync-local->remote-all-files! [this] "compare all local files to remote ones, sync when not equal.
if local-txid != remote-txid, return {:need-sync-remote true}"))



(deftype Remote->LocalSyncer [graph-uuid base-path repo *txid *sync-state
^:mutable local->remote-syncer *stopped]
Object
Expand Down Expand Up @@ -789,12 +807,11 @@
latest-txid (:TXId
(<! (get-remote-graph remoteapi nil graph-uuid)))]
(println "[full-sync(remote->local)]"
(count diff-remote-files) "files need to sync")
(count diff-remote-files) "files need to sync")
(<! (.sync-files-remote->local!
this (map -relative-path diff-remote-files)
latest-txid))))))


(defn- file-changed?
"return true when file changed compared with remote"
[graph-uuid file-path-without-base-path base-path]
Expand Down Expand Up @@ -866,12 +883,13 @@
(async/close! c)))))
c))

(sync-local->remote! [this es]
(if (empty? es)
{:succ true}
(let [type (.-type ^FileChangeEvent (first es))
ignore-files (get-ignore-files this)
es->paths-xf (comp

(sync-local->remote! [this es]
(if (empty? es)
(go {:succ true})
(let [type (.-type ^FileChangeEvent (first es))
ignore-files (get-ignore-files this)
es->paths-xf (comp
(map #(relative-path %))
(filter #(not (contains-path? ignore-files %))))
paths (sequence es->paths-xf es)]
Expand Down Expand Up @@ -932,18 +950,19 @@
(if stopped
{:stop true}
(if (empty? es-partitions)
{:succ true}
(let [{:keys [succ need-sync-remote unknown] :as r}
(<! (sync-local->remote! this (first es-partitions)))]
(cond
succ
(recur (next es-partitions))

{:succ true}
(let [{:keys [succ need-sync-remote unknown] :as r}
(<! (sync-local->remote! this (first es-partitions)))]
(s/assert ::sync-local->remote!-result r)
(cond
succ
(recur (next es-partitions))
(or need-sync-remote unknown) r)))))))))


;;; sync state


(defn sync-state
"create a new sync-state"
[]
Expand Down Expand Up @@ -1003,6 +1022,7 @@

;;; put all stuff together


(deftype ^:large-vars/cleanup-todo
SyncManager [graph-uuid base-path *sync-state
^Local->RemoteSyncer local->remote-syncer ^Remote->LocalSyncer remote->local-syncer
Expand Down Expand Up @@ -1067,8 +1087,9 @@

(full-sync [this]
(go
(let [{:keys [succ need-sync-remote unknown stop]}
(let [{:keys [succ need-sync-remote unknown stop] :as r}
(<! (sync-local->remote-all-files! local->remote-syncer))]
(s/assert ::sync-local->remote-all-files!-result r)
(cond
succ
(.schedule this ::idle nil)
Expand Down Expand Up @@ -1099,8 +1120,9 @@
(go
(if (some-> remote-val :txid (<= @*txid))
(.schedule this ::idle nil)
(let [{:keys [succ unknown stop need-remote->local-full-sync]}
(let [{:keys [succ unknown stop need-remote->local-full-sync] :as r}
(<! (sync-remote->local! remote->local-syncer))]
(s/assert ::sync-remote->local!-result r)
(cond
need-remote->local-full-sync
(.schedule this ::remote->local-full-sync=>local->remote-full-sync nil)
Expand All @@ -1115,8 +1137,9 @@
(local->remote [this {^FileChangeEvents local-change :local}]
(assert (some? local-change) local-change)
(go
(let [{:keys [succ need-sync-remote unknown]}
(let [{:keys [succ need-sync-remote unknown] :as r}
(<! (sync-local->remote! local->remote-syncer [local-change]))]
(s/assert ::sync-local->remote!-result r)
(cond
succ
(.schedule this ::idle nil)
Expand All @@ -1139,7 +1162,6 @@
(debug/pprint ["stop sync-manager, graph-uuid" graph-uuid "base-path" base-path])
(swap! *sync-state sync-state--update-state ::stop))))


(defn sync-manager [graph-uuid base-path repo txid *sync-state full-sync-chan stop-sync-chan
remote->local-sync-chan local->remote-sync-chan local-changes-chan]
(let [*txid (atom txid)
Expand All @@ -1157,13 +1179,11 @@
full-sync-chan stop-sync-chan
remote->local-sync-chan local->remote-sync-chan local-changes-chan nil *txid nil nil nil false)))


(def full-sync-chan (chan 1))
(def stop-sync-chan (chan 1))
(def remote->local-sync-chan (chan))
(def local->remote-sync-chan (chan))


(defn sync-stop []
(when-let [sm (state/get-file-sync-manager)]
(println "stopping sync-manager")
Expand Down

0 comments on commit f7370de

Please sign in to comment.