forked from Ultimaker/Uranium
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathOperationStack.py
138 lines (114 loc) · 5.27 KB
/
OperationStack.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
# Copyright (c) 2019 Ultimaker B.V.
# Uranium is released under the terms of the LGPLv3 or higher.
import threading
import time
from UM.Logger import Logger
from UM.Operations.Operation import Operation
from UM.Signal import Signal, signalemitter
from typing import List
## A stack of operations.
#
# This maintains the history of operations, which allows for undoing and
# re-doing these operations.
@signalemitter
class OperationStack():
def __init__(self, controller) -> None:
self._operations = [] # type: List[Operation]
self._current_index = -1 #Index of the most recently executed operation.
self._lock = threading.Lock() #Lock to make sure only one thread can modify the operation stack at a time.
# The merge behaviour must only occur when an operation is in the middle of a user action.
# So, whenever an operation is started or ended, we do not want this auto-merge.
self._merge_operations = False
self._controller = controller
self._controller.toolOperationStarted.connect(self._onToolOperationStarted)
self._controller.toolOperationStopped.connect(self._onToolOperationStopped)
def _onToolOperationStarted(self, tool):
self._merge_operations = False
def _onToolOperationStopped(self, tool):
self._merge_operations = False
## Push an operation on the stack.
#
# This will perform the following things in sequence:
# - If the current index is pointing to an item lower in the stack than
# the top, remove all operations from the current index to the top.
# - Append the operation to the stack.
# - Call redo() on the operation.
# - Perform merging of operations.
#
# \param operation \type{Operation} The operation to push onto the stack.
def push(self, operation):
start_time = time.time()
if not self._lock.acquire(False):
return
try:
if self._current_index < len(self._operations) - 1:
del self._operations[self._current_index + 1:len(self._operations)]
self._operations.append(operation)
operation.redo()
self._current_index += 1
self._doMerge()
self.changed.emit()
finally:
self._lock.release()
elapsed_time = time.time() - start_time
Logger.log("d", " ".join(repr(operation).splitlines()) + ", took {0}ms".format(int(elapsed_time * 1000))) #Don't remove; used in regression-tests.
## Undo the current operation.
#
# This will call undo() on the current operation and decrement the current index.
def undo(self):
with self._lock:
if self._current_index >= 0 and self._current_index < len(self._operations):
self._operations[self._current_index].undo()
self._current_index -= 1
self.changed.emit()
## Redo the next operation.
#
# This will call redo() on the current operation and increment the current index.
def redo(self):
with self._lock:
n = self._current_index + 1
if n >= 0 and n < len(self._operations):
self._operations[n].redo()
self._current_index += 1
self.changed.emit()
## Get the list of operations in the stack.
#
# The end of the list represents the more recent operations.
#
# \return A list of the operations on the stack, in order.
def getOperations(self):
with self._lock:
return self._operations
## Whether we can undo any more operations.
#
# \return True if we can undo any more operations, or False otherwise.
def canUndo(self):
return self._current_index >= 0
## Whether we can redo any more operations.
#
# \return True if we can redo any more operations, or False otherwise.
def canRedo(self):
return self._current_index < len(self._operations) - 1
## Signal for when the operation stack changes.
changed = Signal()
## private:
## Merges two operations at the current position in the stack.
#
# This merges the "most recent" operation with the one before it. The
# "most recent" operation is the one that would be undone if the user
# would trigger an undo, i.e. the one at _current_index.
def _doMerge(self):
if len(self._operations) >= 2:
op1 = self._operations[self._current_index]
op2 = self._operations[self._current_index - 1]
if not op1._always_merge and not op2._always_merge and not self._merge_operations:
if abs(op1._timestamp - op2._timestamp) > self._merge_window: #For normal operations, only merge if the operations were very quickly after each other.
return
self._merge_operations = True #A signal sets this to False again.
merged = op1.mergeWith(op2)
if merged: #Replace the merged operations in the stack with the new one.
del self._operations[self._current_index]
del self._operations[self._current_index - 1]
self._current_index -= 1
self._operations.append(merged)
_merge_window = 1.0 #Don't merge operations that were longer than this amount of seconds apart.