forked from microsoft/UFO
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathpuppeteer.py
277 lines (228 loc) · 8.92 KB
/
puppeteer.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT License.
import os
from collections import deque
from typing import TYPE_CHECKING, Any, Deque, Dict, List, Optional, Type, Union
from pywinauto.controls.uiawrapper import UIAWrapper
from ufo.automator.app_apis.basic import WinCOMReceiverBasic
from ufo.automator.basic import CommandBasic, ReceiverBasic, ReceiverFactory
if TYPE_CHECKING:
from ufo.automator.ui_control.controller import ControlReceiver
class AppPuppeteer:
"""
The class for the app puppeteer to automate the app in the Windows environment.
"""
def __init__(self, process_name: str, app_root_name: str) -> None:
"""
Initialize the app puppeteer.
:param process_name: The process name of the app.
:param app_root_name: The app root name, e.g., WINWORD.EXE.
"""
self._process_name = process_name
self._app_root_name = app_root_name
self.command_queue: Deque[CommandBasic] = deque()
self.receiver_manager = ReceiverManager()
def create_command(
self, command_name: str, params: Dict[str, Any], *args, **kwargs
) -> Optional[CommandBasic]:
"""
Create the command.
:param command_name: The command name.
:param params: The arguments for the command.
"""
receiver = self.receiver_manager.get_receiver_from_command_name(command_name)
command = receiver.command_registry.get(command_name.lower(), None)
if receiver is None:
raise ValueError(f"Receiver for command {command_name} is not found.")
if command is None:
raise ValueError(f"Command {command_name} is not supported.")
return command(receiver, params, *args, **kwargs)
def get_command_types(self, command_name: str) -> str:
"""
Get the command types.
:param command_name: The command name.
:return: The command types.
"""
receiver = self.receiver_manager.get_receiver_from_command_name(command_name)
return receiver.type_name
def execute_command(
self, command_name: str, params: Dict[str, Any], *args, **kwargs
) -> str:
"""
Execute the command.
:param command_name: The command name.
:param params: The arguments.
:return: The execution result.
"""
command = self.create_command(command_name, params, *args, **kwargs)
return command.execute()
def execute_all_commands(self) -> List[Any]:
"""
Execute all the commands in the command queue.
:return: The execution results.
"""
results = []
while self.command_queue:
command = self.command_queue.popleft()
results.append(command.execute())
return results
def add_command(
self, command_name: str, params: Dict[str, Any], *args, **kwargs
) -> None:
"""
Add the command to the command queue.
:param command_name: The command name.
:param params: The arguments.
"""
command = self.create_command(command_name, params, *args, **kwargs)
self.command_queue.append(command)
def get_command_queue_length(self) -> int:
"""
Get the length of the command queue.
:return: The length of the command queue.
"""
return len(self.command_queue)
@property
def full_path(self) -> str:
"""
Get the full path of the process. Only works for COM receiver.
:return: The full path of the process.
"""
com_receiver = self.receiver_manager.com_receiver
if com_receiver is not None:
return com_receiver.full_path
return ""
def save(self) -> None:
"""
Save the current state of the app. Only works for COM receiver.
"""
com_receiver = self.receiver_manager.com_receiver
if com_receiver is not None:
com_receiver.save()
def save_to_xml(self, file_path: str) -> None:
"""
Save the current state of the app to XML. Only works for COM receiver.
:param file_path: The file path to save the XML.
"""
com_receiver = self.receiver_manager.com_receiver
dir_path = os.path.dirname(file_path)
if not os.path.exists(dir_path):
os.makedirs(dir_path)
if com_receiver is not None:
com_receiver.save_to_xml(file_path)
def close(self) -> None:
"""
Close the app. Only works for COM receiver.
"""
com_receiver = self.receiver_manager.com_receiver
if com_receiver is not None:
com_receiver.close()
@staticmethod
def get_command_string(command_name: str, params: Dict[str, str]) -> str:
"""
Generate a function call string.
:param command_name: The function name.
:param params: The arguments as a dictionary.
:return: The function call string.
"""
# Format the arguments
args_str = ", ".join(f"{k}={v!r}" for k, v in params.items())
# Return the function call string
return f"{command_name}({args_str})"
class ReceiverManager:
"""
The class for the receiver manager.
"""
_receiver_factory_registry: Dict[str, Dict[str, Union[str, ReceiverFactory]]] = {}
def __init__(self):
"""
Initialize the receiver manager.
"""
self.receiver_registry = {}
self.ui_control_receiver: Optional[ControlReceiver] = None
self._receiver_list: List[ReceiverBasic] = []
def create_ui_control_receiver(
self, control: UIAWrapper, application: UIAWrapper
) -> "ControlReceiver":
"""
Build the UI controller.
:param control: The control element.
:return: The UI controller receiver.
"""
factory: ReceiverFactory = self.receiver_factory_registry.get("UIControl").get(
"factory"
)
self.ui_control_receiver = factory.create_receiver(control, application)
self.receiver_list.append(self.ui_control_receiver)
self._update_receiver_registry()
return self.ui_control_receiver
def create_api_receiver(self, app_root_name: str, process_name: str) -> None:
"""
Get the API receiver.
:param app_root_name: The app root name.
:param process_name: The process name.
"""
for receiver_factory_dict in self.receiver_factory_registry.values():
# Check if the receiver is API
if receiver_factory_dict.get("is_api"):
receiver = receiver_factory_dict.get("factory").create_receiver(
app_root_name, process_name
)
if receiver is not None:
self.receiver_list.append(receiver)
self._update_receiver_registry()
def _update_receiver_registry(self) -> None:
"""
Update the receiver registry. A receiver registry is a dictionary that maps the command name to the receiver.
"""
for receiver in self.receiver_list:
if receiver is not None:
self.receiver_registry.update(receiver.self_command_mapping())
def get_receiver_from_command_name(self, command_name: str) -> ReceiverBasic:
"""
Get the receiver from the command name.
:param command_name: The command name.
:return: The mapped receiver.
"""
receiver = self.receiver_registry.get(command_name, None)
if receiver is None:
raise ValueError(f"Receiver for command {command_name} is not found.")
return receiver
@property
def receiver_list(self) -> List[ReceiverBasic]:
"""
Get the receiver list.
:return: The receiver list.
"""
return self._receiver_list
@property
def receiver_factory_registry(
self,
) -> Dict[str, Dict[str, Union[str, ReceiverFactory]]]:
"""
Get the receiver factory registry.
:return: The receiver factory registry.
"""
return self._receiver_factory_registry
@property
def com_receiver(self) -> WinCOMReceiverBasic:
"""
Get the COM receiver.
:return: The COM receiver.
"""
for receiver in self.receiver_list:
if issubclass(receiver.__class__, WinCOMReceiverBasic):
return receiver
return None
@classmethod
def register(cls, receiver_factory_class: Type[ReceiverFactory]) -> ReceiverFactory:
"""
Decorator to register the receiver factory class to the receiver manager.
:param receiver_factory_class: The receiver factory class to be registered.
:return: The receiver factory class instance.
"""
cls._receiver_factory_registry[receiver_factory_class.name()] = {
"factory": receiver_factory_class(),
"is_api": receiver_factory_class.is_api(),
}
return receiver_factory_class()