diff --git a/src/main/frontend/components/encryption.cljs b/src/main/frontend/components/encryption.cljs index e20ed2528f1..2762f51c62d 100644 --- a/src/main/frontend/components/encryption.cljs +++ b/src/main/frontend/components/encryption.cljs @@ -116,7 +116,7 @@ (if (instance? js/Error persist-r) (js/console.error persist-r) (when (fn? after-input-password) - (async/ false diff --git a/src/main/frontend/components/header.cljs b/src/main/frontend/components/header.cljs index 81329fc3a23..b4ce14068c6 100644 --- a/src/main/frontend/components/header.cljs +++ b/src/main/frontend/components/header.cljs @@ -197,8 +197,7 @@ (ui/icon "chevron-left" {:size 26})])))] [:div.r.flex - (when (and sync-enabled? - current-repo + (when (and current-repo (not (config/demo-graph? current-repo)) (user-handler/alpha-or-beta-user?)) (fs-sync/indicator)) diff --git a/src/main/frontend/components/settings.cljs b/src/main/frontend/components/settings.cljs index 53e76dcf622..eadcffd246e 100644 --- a/src/main/frontend/components/settings.cljs +++ b/src/main/frontend/components/settings.cljs @@ -635,9 +635,7 @@ [enabled?] (ui/toggle enabled? (fn [] - (let [value (not enabled?)] - (storage/set :logseq-sync-enabled value) - (state/set-state! :feature/enable-sync? value))) + (file-sync-handler/set-sync-enabled! (not enabled?))) true)) (defn sync-switcher-row [enabled?] diff --git a/src/main/frontend/core.cljs b/src/main/frontend/core.cljs index 5f7efe45962..2bcb61a009c 100644 --- a/src/main/frontend/core.cljs +++ b/src/main/frontend/core.cljs @@ -49,7 +49,7 @@ (display-welcome-message) (persist-var/load-vars) (when config/dev? - (js/setTimeout #(sync/sync-start) 1000)))) + (js/setTimeout #(sync/FileTxn (first %) (first %) update? delete? TXId (last %)))) + (comp + (remove #(or (empty? (first %)) + (empty? (last %)))) + (map #(->FileTxn (first %) (first %) update? delete? TXId (last %)))) delete-xf - (comp - (remove #(empty? (first %))) - (map #(->FileTxn (first %) (first %) update? delete? TXId nil))) + (comp + (remove #(empty? (first %))) + (map #(->FileTxn (first %) (first %) update? delete? TXId nil))) rename-xf - (comp - (remove #(or (empty? (first %)) - (empty? (second %)))) - (map #(->FileTxn (second %) (first %) false false TXId nil))) + (comp + (remove #(or (empty? (first %)) + (empty? (second %)))) + (map #(->FileTxn (second %) (first %) false false TXId nil))) xf (case TXType "delete_files" delete-xf "update_files" update-xf @@ -618,21 +618,21 @@ #{} s1)) (comment - (defn map->FileMetadata [m] - (apply ->FileMetadata ((juxt :size :etag :path :encrypted-path :last-modified :remote? (constantly nil)) m))) - - (assert - (= - #{(map->FileMetadata {:size 1 :etag 2 :path 2 :encrypted-path 2 :last-modified 2})} - (diff-file-metadata-sets - (into #{} - (map map->FileMetadata) - [{:size 1 :etag 1 :path 1 :encrypted-path 1 :last-modified 1} - {:size 1 :etag 2 :path 2 :encrypted-path 2 :last-modified 2}]) - (into #{} - (map map->FileMetadata) - [{:size 1 :etag 1 :path 1 :encrypted-path 1 :last-modified 1} - {:size 1 :etag 1 :path 2 :encrypted-path 2 :last-modified 1}]))))) + (defn map->FileMetadata [m] + (apply ->FileMetadata ((juxt :size :etag :path :encrypted-path :last-modified :remote? (constantly nil)) m))) + + (assert + (= + #{(map->FileMetadata {:size 1 :etag 2 :path 2 :encrypted-path 2 :last-modified 2})} + (diff-file-metadata-sets + (into #{} + (map map->FileMetadata) + [{:size 1 :etag 1 :path 1 :encrypted-path 1 :last-modified 1} + {:size 1 :etag 2 :path 2 :encrypted-path 2 :last-modified 2}]) + (into #{} + (map map->FileMetadata) + [{:size 1 :etag 1 :path 1 :encrypted-path 1 :last-modified 1} + {:size 1 :etag 1 :path 2 :encrypted-path 2 :last-modified 1}]))))) (extend-protocol IChecksum FileMetadata @@ -1066,8 +1066,8 @@ (go-loop [] (let [{:keys [val stop]} (async/alt! - debug-print-sync-events-loop-stop-chan {:stop true} - out-ch ([v] {:val v}))] + debug-print-sync-events-loop-stop-chan {:stop true} + out-ch ([v] {:val v}))] (cond stop (do (async/unmix-all out-mix) (doseq [[topic ch] topic&chs] @@ -1083,28 +1083,28 @@ (comment - ;; sub one type event example: - (def c1 (chan 10)) - (async/sub sync-events-publication :created-local-version-file c1) - (offer! sync-events-chan {:event :created-local-version-file :data :xxx}) - (poll! c1) - - ;; sub multiple type events example: - ;; sub :created-local-version-file and :finished-remote->local events, - ;; output into channel c4-out - (def c2 (chan 10)) - (def c3 (chan 10)) - (def c4-out (chan 10)) - (def mix-out (async/mix c4-out)) - (async/admix mix-out c2) - (async/admix mix-out c3) - (async/sub sync-events-publication :created-local-version-file c2) - (async/sub sync-events-publication :finished-remote->local c3) - (offer! sync-events-chan {:event :created-local-version-file :data :xxx}) - (offer! sync-events-chan {:event :finished-remote->local :data :xxx}) - (poll! c4-out) - (poll! c4-out) - ) + ;; sub one type event example: + (def c1 (chan 10)) + (async/sub sync-events-publication :created-local-version-file c1) + (offer! sync-events-chan {:event :created-local-version-file :data :xxx}) + (poll! c1) + + ;; sub multiple type events example: + ;; sub :created-local-version-file and :finished-remote->local events, + ;; output into channel c4-out + (def c2 (chan 10)) + (def c3 (chan 10)) + (def c4-out (chan 10)) + (def mix-out (async/mix c4-out)) + (async/admix mix-out c2) + (async/admix mix-out c3) + (async/sub sync-events-publication :created-local-version-file c2) + (async/sub sync-events-publication :finished-remote->local c3) + (offer! sync-events-chan {:event :created-local-version-file :data :xxx}) + (offer! sync-events-chan {:event :finished-remote->local :data :xxx}) + (poll! c4-out) + (poll! c4-out) + ) ;;; sync events ends @@ -1162,27 +1162,27 @@ (let [file-meta-list (transient #{}) encrypted-path-list (transient []) exp-r - (path-map - (zipmap - encrypted-paths - (path-map %))) - txns-with-encrypted-paths)] + (mapv + (fn [txn] (update txn :path #(get encrypted-path->path-map %))) + txns-with-encrypted-paths)] txns))))) ( from-path remove-user-graph-uuid-prefix) - checksum]) - (:TXContent txn)))) - txns-with-encrypted-paths) + (mapv + (fn [txn] + (assoc txn :TXContent + (mapv + (fn [[to-path from-path checksum]] + [(remove-user-graph-uuid-prefix to-path) + (some-> from-path remove-user-graph-uuid-prefix) + checksum]) + (:TXContent txn)))) + txns-with-encrypted-paths) encrypted-paths - (mapcat - (fn [txn] - (remove - #(or (nil? %) (not (string/starts-with? % "e."))) - (mapcat - (fn [[to-path from-path _checksum]] [to-path from-path]) - (:TXContent txn)))) - txns-with-encrypted-paths*) + (mapcat + (fn [txn] + (remove + #(or (nil? %) (not (string/starts-with? % "e."))) + (mapcat + (fn [[to-path from-path _checksum]] [to-path from-path]) + (:TXContent txn)))) + txns-with-encrypted-paths*) encrypted-path->path-map - (zipmap - encrypted-paths - (path-map to-path to-path) - (some->> from-path (get encrypted-path->path-map)) - checksum]) - (:TXContent txn)))) - txns-with-encrypted-paths*)] + (mapv + (fn [txn] + (assoc + txn :TXContent + (mapv + (fn [[to-path from-path checksum]] + [(get encrypted-path->path-map to-path to-path) + (some->> from-path (get encrypted-path->path-map)) + checksum]) + (:TXContent txn)))) + txns-with-encrypted-paths*)] [txns (:TXId (last txns)) (:TXId (first txns))]))))) @@ -1435,12 +1435,12 @@ (.-deleted? e) :delete-filetxns (.renamed? e) :rename-filetxns)) filetxns) update-file-items (map - (fn [filetxn] - (let [path (relative-path filetxn)] - {:remote->local-type :update - :checksum (-checksum filetxn) - :path path})) - update-filetxns) + (fn [filetxn] + (let [path (relative-path filetxn)] + {:remote->local-type :update + :checksum (-checksum filetxn) + :path path})) + update-filetxns) rename-file-items (mapcat (fn [^FileTxn filetxn] (let [to-path (relative-path filetxn) @@ -1453,12 +1453,12 @@ :path from-path}])) rename-filetxns) delete-file-items (map - (fn [filetxn] - (let [path (relative-path filetxn)] - {:remote->local-type :delete - :checksum (-checksum filetxn) - :path path})) - delete-filetxns)] + (fn [filetxn] + (let [path (relative-path filetxn)] + {:remote->local-type :delete + :checksum (-checksum filetxn) + :path path})) + delete-filetxns)] (set (concat update-file-items rename-file-items delete-file-items)))) (defn- apply-filetxns @@ -1493,8 +1493,8 @@ [recent-remote->local-file-item]) (local-files - [recent-remote->local-file-item]))))) + (swap! *sync-state sync-state--remove-recent-remote->local-files + [recent-remote->local-file-item]))))) (let [update-local-files-ch (local-file-items 5s later (go (local-files - recent-remote->local-file-items)) + (swap! *sync-state sync-state--remove-recent-remote->local-files + recent-remote->local-file-items)) (cond (instance? ExceptionInfo r) r @*paused {:pause true} @@ -1647,7 +1647,7 @@ path (relative-path e)] {:remote->local-type tp :checksum (if (= tp :delete) nil - (val (first (! ch (->FileChangeEvent "unlink" repo-dir (:old-path rename-event*) nil nil)) (>! ch (->FileChangeEvent "add" repo-dir (:new-path rename-event*) @@ -1867,8 +1867,10 @@ :input-pwd-remote {:GraphUUID graph-uuid :init-graph-keys init-graph-keys - :after-input-password #(go ( (ex-data r) :err :status (= 404)) pwd (local-files % [])) (LocalSync (stop-remote->local! [_] (vreset! *stopped true)) @@ -2384,23 +2385,23 @@ local-files-meta-map)))) (defrecord ^:large-vars/cleanup-todo - Local->RemoteSyncer [user-uuid graph-uuid base-path repo *sync-state remoteapi - ^:mutable rate *txid ^:mutable remote->local-syncer stop-chan *stopped *paused + Local->RemoteSyncer [user-uuid graph-uuid base-path repo *sync-state remoteapi + ^:mutable rate *txid ^:mutable remote->local-syncer stop-chan *stopped *paused ;; control chans - private-immediately-local->remote-chan private-recent-edited-chan] + private-immediately-local->remote-chan private-recent-edited-chan] Object (filter-file-change-events-fn [_] (fn [^FileChangeEvent e] (go (and (instance? FileChangeEvent e) (if-let [mtime (:mtime (.-stat e))] - ;; if mtime is not nil, it should be after (- now 1min) - ;; ignore events too early + ;; if mtime is not nil, it should be after (- now 1min) + ;; ignore events too early (> (* 1000 mtime) (tc/to-long (t/minus (t/now) (t/minutes 1)))) true) (or (string/starts-with? (.-dir e) base-path) (string/starts-with? (str "file://" (.-dir e)) base-path)) ; valid path prefix (not (ignored? e)) ;not ignored - ;; download files will also trigger file-change-events, ignore them + ;; download files will also trigger file-change-events, ignore them (not (contains? (:recent-remote->local-files @*sync-state) (recent-remote->local-file-item graph-uuid e)))))))) @@ -2476,7 +2477,7 @@ {:need-sync-remote true}) (need-reset-local-txid? r*) ;; TODO: this cond shouldn't be true, - ;; but some potential bugs cause local-txid > remote-txid + ;; but some potential bugs cause local-txid > remote-txid (let [remote-graph-info-or-ex (remote! update txid" r*) - ;; persist txid + ;; persist txid (FileChangeEvent "change" base-path (.get-normalized-path ^FileMetadata %) {:size (:size %)} (:etag %))) (remove ignored?)) @@ -2556,7 +2557,7 @@ _ (swap! *sync-state #(sync-state-reset-full-local->remote-files % distinct-change-events)) change-events-partitions (sequence - ;; partition FileChangeEvents + ;; partition FileChangeEvents (partition-file-change-events upload-batch-size) distinct-change-events)] (println "[full-sync(local->remote)]" @@ -2567,7 +2568,7 @@ :graph-uuid graph-uuid :full-sync? true :epoch (tc/to-epoch (t/now))}}) - ;; 1. delete local files + ;; 1. delete local files (loop [[f & fs] delete-local-files] (when f (let [relative-p (relative-path f)] @@ -2583,7 +2584,7 @@ [fake-recent-remote->local-file-item]))))) (recur fs))) - ;; 2. upload local files + ;; 2. upload local files (loop [es-partitions change-events-partitions] (if @*stopped {:stop true} @@ -2600,14 +2601,14 @@ ;;; ### put all stuff together (defrecord ^:large-vars/cleanup-todo - SyncManager [graph-uuid base-path *sync-state - ^Local->RemoteSyncer local->remote-syncer ^Remote->LocalSyncer remote->local-syncer remoteapi - ^:mutable ratelimit-local-changes-chan - *txid ^:mutable state ^:mutable remote-change-chan ^:mutable *ws *stopped? *paused? - ^:mutable ops-chan - ;; control chans - private-full-sync-chan private-stop-sync-chan private-remote->local-sync-chan - private-remote->local-full-sync-chan private-pause-resume-chan] + SyncManager [graph-uuid base-path *sync-state + ^Local->RemoteSyncer local->remote-syncer ^Remote->LocalSyncer remote->local-syncer remoteapi + ^:mutable ratelimit-local-changes-chan + *txid ^:mutable state ^:mutable remote-change-chan ^:mutable *ws *stopped? *paused? + ^:mutable ops-chan + ;; control chans + private-full-sync-chan private-stop-sync-chan private-remote->local-sync-chan + private-remote->local-full-sync-chan private-pause-resume-chan] Object (schedule [this next-state args reason] {:pre [(s/valid? ::state next-state)]} @@ -2648,19 +2649,19 @@ (go-loop [] (let [{:keys [stop remote->local remote->local-full-sync local->remote-full-sync local->remote resume pause]} (async/alt! - private-stop-sync-chan {:stop true} - private-remote->local-full-sync-chan {:remote->local-full-sync true} - private-remote->local-sync-chan {:remote->local true} - private-full-sync-chan {:local->remote-full-sync true} - private-pause-resume-chan ([v] (if v {:resume true} {:pause true})) - remote-change-chan ([v] (println "remote change:" v) {:remote->local v}) - ratelimit-local-changes-chan ([v] - (let [rest-v (util/drain-chan ratelimit-local-changes-chan) - vs (cons v rest-v)] - (println "local changes:" vs) - {:local->remote vs})) - (timeout (* 20 60 1000)) {:local->remote-full-sync true} - :priority true)] + private-stop-sync-chan {:stop true} + private-remote->local-full-sync-chan {:remote->local-full-sync true} + private-remote->local-sync-chan {:remote->local true} + private-full-sync-chan {:local->remote-full-sync true} + private-pause-resume-chan ([v] (if v {:resume true} {:pause true})) + remote-change-chan ([v] (println "remote change:" v) {:remote->local v}) + ratelimit-local-changes-chan ([v] + (let [rest-v (util/drain-chan ratelimit-local-changes-chan) + vs (cons v rest-v)] + (println "local changes:" vs) + {:local->remote vs})) + (timeout (* 20 60 1000)) {:local->remote-full-sync true} + :priority true)] (cond stop (do (util/drain-chan ops-chan) @@ -2853,13 +2854,13 @@ (.schedule this ::idle nil nil))))))) (local->remote [this {local-changes :local}] - ;; local-changes:: list of FileChangeEvent + ;; local-changes:: list of FileChangeEvent (assert (some? local-changes) local-changes) (go (let [distincted-local-changes (distinct-file-change-events local-changes) _ (swap! *sync-state #(sync-state-reset-full-local->remote-files % distincted-local-changes)) change-events-partitions - (sequence (partition-file-change-events upload-batch-size) distincted-local-changes) + (sequence (partition-file-change-events upload-batch-size) distincted-local-changes) _ (put-sync-event! {:event :start :data {:type :local->remote :graph-uuid graph-uuid @@ -2967,6 +2968,9 @@ (reset! current-sm-graph-uuid graph-uuid) (sync-manager user-uuid graph-uuid base-path repo txid *sync-state))) +;; Avoid sync reentrancy +(defonce *sync-entered? (atom false)) + (defn > (:Graphs r) - (mapv :GraphUUID) - set - (#(contains? % local-graph-uuid)))))] + (or + ;; if api call failed, assume this remote graph still exists + (instance? ExceptionInfo r) + (and + (contains? r :Graphs) + (->> (:Graphs r) + (mapv :GraphUUID) + set + (#(contains? % local-graph-uuid)))))] (when-not result (notification/show! (t :file-sync/graph-deleted) :warning false)) @@ -3034,26 +3040,28 @@ (declare network-online-cursor) -(defn sync-start +(defn c (persist-var/-load graphs-txid))) - [user-uuid graph-uuid txid] @graphs-txid - txid (or txid 0) - repo (state/get-current-repo)] - (when (and (graph-sync-off? repo) @network-online-cursor) - (when (and user-uuid graph-uuid txid + (when (false? @*sync-entered?) + (reset! *sync-entered? true) + (let [*sync-state (atom (sync-state)) + current-user-uuid (user/user-uuid) + ;; put @graph-uuid & get-current-repo together, + ;; prevent to get older repo dir and current graph-uuid. + _ (c (persist-var/-load graphs-txid))) + [user-uuid graph-uuid txid] @graphs-txid + txid (or txid 0) + repo (state/get-current-repo)] + (when (and repo + (graph-sync-off? repo) @network-online-cursor + user-uuid graph-uuid txid (user/logged-in?) - repo (not (config/demo-graph? repo))) (try - (when-some [sm (sync-manager-singleton current-user-uuid graph-uuid - (config/get-repo-dir repo) repo - txid *sync-state)] + (when-let [sm (sync-manager-singleton current-user-uuid graph-uuid + (config/get-repo-dir repo) repo + txid *sync-state)] (when (check-graph-belong-to-current-user current-user-uuid user-uuid) (if-not (c (persist-var/load-vars))) (async/ (sync/sync-start) async/ (sync/c (persist-var/load-vars))) @@ -86,6 +86,7 @@ (state/set-state! :user/info result) (let [status (if (user-handler/alpha-or-beta-user?) :welcome :unavailable)] (when (and (= status :welcome) (user-handler/logged-in?)) + (file-sync-handler/set-sync-enabled! true) (async/ mins 30) "calculating..." :else (str mins " mins left")))))) + +(defn set-sync-enabled! + [value] + (storage/set :logseq-sync-enabled value) + (state/set-state! :feature/enable-sync? value))