forked from NaiboWang/EasySpider
-
Notifications
You must be signed in to change notification settings - Fork 0
/
easyspider_executestage.py
2495 lines (2438 loc) · 135 KB
/
easyspider_executestage.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
# -*- coding: utf-8 -*-
# import atexit
import atexit
import copy
import platform
import shutil
import string
import threading
# import undetected_chromedriver as uc
from utils import detect_optimizable, download_image, extract_text_from_html, get_output_code, isnotnull, lowercase_tags_in_xpath, myMySQL, new_line, \
on_press_creator, on_release_creator, readCode, rename_downloaded_file, replace_field_values, send_email, split_text_by_lines, write_to_csv, write_to_excel, write_to_json
from constants import WriteMode, DataWriteMode
from myChrome import MyChrome
from threading import Thread, Event
from PIL import Image
from commandline_config import Config
import os
import csv
from openpyxl import load_workbook, Workbook
import random
from selenium.webdriver import ActionChains
from selenium.webdriver.support.ui import Select
from selenium.webdriver.common.desired_capabilities import DesiredCapabilities
from selenium.common.exceptions import StaleElementReferenceException, InvalidSelectorException
from selenium.common.exceptions import TimeoutException
from selenium.common.exceptions import NoSuchElementException
from selenium.webdriver.common.by import By
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.support.ui import WebDriverWait
from selenium import webdriver
from selenium.webdriver.common.action_chains import ActionChains
from selenium.webdriver.common.keys import Keys
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.chrome.service import Service
from datetime import datetime
import io # 遇到错误退出时应执行的代码
import json
# from lib2to3.pgen2 import driver
import re
# import shutil
import subprocess
import sys
# from urllib import parse
# import base64
# import hashlib
import time
import requests
from multiprocessing import freeze_support
freeze_support() # 防止无限死循环多开
try:
from ddddocr import DdddOcr
import onnxruntime
onnxruntime.set_default_logger_severity(3) # 隐藏onnxruntime的日志
except:
print("OCR识别无法在当前环境下使用(ddddocr库缺失),请使用完整版执行器easyspider_executestage_full来运行需要OCR识别的任务。")
print("OCR recognition cannot be used in the current environment (ddddocr library is missing), please use the executor with ddddocr 'easyspider_executestage_full' to run the task which requires OCR recognition.")
from urllib.parse import urljoin
from lxml import etree, html
try:
import pandas as pd
except:
print("数据去重无法在当前环境下使用(pandas库缺失),请使用完整版执行器easyspider_executestage_full来运行需要去重的任务。")
print("Data deduplication cannot be used in the current environment (pandas library is missing), please use the executor with pandas 'easyspider_executestage_full' to run the task which requires data deduplication.")
time.sleep(1)
# import numpy
# import pytesseract
# import uuid
if sys.platform != "darwin":
from myChrome import MyUCChrome
desired_capabilities = DesiredCapabilities.CHROME
desired_capabilities["pageLoadStrategy"] = "none"
class BrowserThread(Thread):
def __init__(self, browser_t, id, service, version, event, saveName, config, option):
Thread.__init__(self)
self.logs = io.StringIO()
self.log = bool(service.get("recordLog", True))
self.browser = browser_t
self.option = option
self.config = config
self.version = version
self.totalSteps = 0
self.id = id
self.event = event
now = datetime.now()
self.saveName = service.get("saveName", now.strftime("%Y_%m_%d_%H_%M_%S")) # 保存文件的名字
self.OUTPUT = ""
self.SAVED = False
self.BREAK = False
self.CONTINUE = False
self.browser.maximize_window() if service.get("maximizeWindow") == 1 else ...
# 名称设定
if saveName != "": # 命令行覆盖保存名称
self.saveName = saveName # 保存文件的名字
now = datetime.now().strftime("%Y_%m_%d_%H_%M_%S")
self.saveName = self.saveName.replace("current_time", now)
self.print_and_log("任务ID", id, "的保存文件名为:", self.saveName)
self.print_and_log("Save Name for task ID", id, "is:", self.saveName)
if not os.path.exists("Data/Task_" + str(id)):
os.mkdir("Data/Task_" + str(id))
self.downloadFolder = "Data/Task_" + str(id) + "/" + self.saveName
if not os.path.exists(self.downloadFolder):
os.mkdir(self.downloadFolder) # 创建保存文件夹用来保存截图和文件
if not os.path.exists(self.downloadFolder + "/files"):
os.mkdir(self.downloadFolder + "/files")
if not os.path.exists(self.downloadFolder + "/images"):
os.mkdir(self.downloadFolder + "/images")
self.getDataStep = 0
self.startSteps = 0
try:
if service.get("startFromExit", 0) == 1:
with open("Data/Task_" + str(self.id) + "/" + self.saveName + '_steps.txt', 'r',
encoding='utf-8-sig') as file_obj:
self.startSteps = int(file_obj.read()) # 读取已执行步数
except Exception as e:
self.print_and_log(f"读取steps.txt失败,原因:{str(e)}")
if self.startSteps != 0:
self.print_and_log("此模式下,任务ID", self.id, "将从上次退出的步骤开始执行,之前已采集条数为",
self.startSteps, "条。")
self.print_and_log("In this mode, task ID", self.id,
"will start from the last step, before we already collected", self.startSteps, " items.")
else:
self.print_and_log("此模式下,任务ID", self.id,
"将从头开始执行,如果需要从上次退出的步骤开始执行,请在保存任务时设置是否从上次保存位置开始执行为“是”。")
self.print_and_log("In this mode, task ID", self.id,
"will start from the beginning, if you want to start from the last step, please set the option 'start from the last step' to 'yes' when saving the task.")
stealth_path = driver_path[:driver_path.find(
"chromedriver")] + "stealth.min.js"
with open(stealth_path, 'r') as f:
js = f.read()
self.print_and_log("Loading stealth.min.js")
self.browser.execute_cdp_cmd('Page.addScriptToEvaluateOnNewDocument', {'source': js}) # TMALL 反扒
self.browser.execute_cdp_cmd("Page.addScriptToEvaluateOnNewDocument", {
"source": """
Object.defineProperty(navigator, 'webdriver', {
get: () => undefined
})
"""
})
WebDriverWait(self.browser, 10)
self.browser.command_executor._commands["send_command"] = ("POST", '/session/$sessionId/chromium/send_command')
path = os.path.join(os.path.abspath("./"), "Data", "Task_" + str(self.id), self.saveName, "files")
self.paramss = {'cmd': 'Page.setDownloadBehavior', 'params': {'behavior': 'allow', 'downloadPath': path}}
self.browser.execute("send_command", self.paramss) # 下载目录改变
self.monitor_event = threading.Event()
self.monitor_thread = threading.Thread(target=rename_downloaded_file, args=(path, self.monitor_event)) #path后面的逗号不能省略,是元组固定写法
self.monitor_thread.start()
# self.browser.get('about:blank')
self.procedure = service["graph"] # 程序执行流程
self.maxViewLength = service.get("maxViewLength", 15) # 最大显示长度
self.outputFormat = service.get("outputFormat", "csv") # 输出格式
self.save_threshold = service.get("saveThreshold", 10) # 保存最低阈值
self.dataWriteMode = service.get("dataWriteMode", DataWriteMode.Append.value) # 数据写入模式,1为追加,2为覆盖,3为重命名文件
self.task_version = service.get("version", "") # 任务版本
if not self.task_version:
self.print_and_log("版本不一致,请使用v0.2.0版本的EasySpider运行该任务!")
self.print_and_log("Version not match, please use EasySpider v0.2.0 to run this task!")
self.browser.quit()
sys.exit()
if self.task_version >= "0.3.1": # 0.3.1及以上版本以上的EasySpider兼容从0.3.1版本开始的所有版本
pass
elif self.task_version != version: # 0.3.1以下版本的EasySpider不兼容0.3.1及以上版本的EasySpider
self.print_and_log(f"版本不一致,请使用{self.task_version}版本的EasySpider运行该任务!")
self.print_and_log(f"Version not match, please use EasySpider {self.task_version} to run this task!")
self.browser.quit()
sys.exit()
service_links = service.get("links")
if service_links:
self.links = list(filter(isnotnull, service_links.split("\n"))) # 要执行的link的列表
else:
self.links = list(filter(isnotnull, service["url"])) # 要执行的link
self.OUTPUT = [] # 采集的数据
if self.outputFormat in ["csv", "txt", "xlsx", "json"]:
if os.path.exists("Data/Task_" + str(self.id) + "/" + self.saveName + '.' + self.outputFormat):
if self.dataWriteMode == DataWriteMode.Cover.value:
os.remove("Data/Task_" + str(self.id) + "/" + self.saveName + '.' + self.outputFormat)
elif self.dataWriteMode == DataWriteMode.Rename.value:
i = 2
while os.path.exists("Data/Task_" + str(self.id) + "/" + self.saveName + '_' + str(i) + '.' + self.outputFormat):
i = i + 1
self.saveName = self.saveName + '_' + str(i)
self.print_and_log("文件已存在,已重命名为", self.saveName)
self.writeMode = WriteMode.Create.value # 写入模式,0为新建,1为追加
if self.outputFormat in ['csv', 'txt', 'xlsx']:
if not os.path.exists(f"Data/Task_{str(self.id)}/{self.saveName}.{self.outputFormat}"):
self.OUTPUT.append([]) # 添加表头
self.writeMode = WriteMode.Create.value
elif self.outputFormat == "json":
self.writeMode = WriteMode.Json.value # JSON模式无需判断是否存在文件
elif self.outputFormat == "mysql":
self.mysql = myMySQL(config["mysql_config_path"])
self.mysql.create_table(self.saveName, service["outputParameters"],
remove_if_exists=self.dataWriteMode == DataWriteMode.Cover.value)
self.writeMode = WriteMode.MySQL.value # MySQL模式
if self.writeMode == WriteMode.Create.value:
self.print_and_log("新建模式|Create Mode")
elif self.writeMode == WriteMode.Append.value:
self.print_and_log("追加模式|Append Mode")
elif self.writeMode == WriteMode.MySQL.value:
self.print_and_log("MySQL模式|MySQL Mode")
elif self.writeMode == WriteMode.Json.value:
self.print_and_log("JSON模式|JSON Mode")
self.containJudge = service["containJudge"] # 是否含有判断语句
self.outputParameters = {}
self.service = service
self.outputParametersTypes = []
self.outputParametersRecord = [] # 字段是否被记录
self.dataNotFoundKeys = {} # 记录没有找到数据的key
self.history = {"index": 0, "handle": None} # 记录页面现在所以在的历史记录的位置
self.SAVED = False # 记录是否已经存储了
for param in service["outputParameters"]: # 初始化输出参数
if param["name"] not in self.outputParameters.keys():
self.outputParameters[param["name"]] = ""
self.dataNotFoundKeys[param["name"]] = False
self.outputParametersTypes.append(param.get("type", "text"))
self.outputParametersRecord.append(bool(param.get("recordASField", True)))
# 文件叠加的时候不添加表头
if self.outputFormat in ["csv", "txt", "xlsx"] and self.writeMode == WriteMode.Create.value:
self.OUTPUT[0].append(param["name"])
self.urlId = 0 # 全局记录变量
self.preprocess() # 预处理,优化提取数据流程
self.inputExcel = service.get("inputExcel", "") # 输入Excel
self.readFromExcel() # 读取Excel获得参数值
# 检测如果没有复杂的操作,优化提取数据流程
def preprocess(self):
for index_node, node in enumerate(self.procedure):
parameters = node["parameters"]
iframe = parameters.get('iframe')
parameters["iframe"] = False if not iframe else ...
if parameters.get("xpath"):
parameters["xpath"] = lowercase_tags_in_xpath(parameters["xpath"])
if parameters.get("waitElementIframeIndex"):
parameters["waitElementIframeIndex"] = int(parameters["waitElementIframeIndex"])
else:
parameters["waitElement"] = ""
parameters["waitElementTime"] = 10
parameters["waitElementIframeIndex"] = 0
if node["option"] == 1: # 打开网页操作
try:
cookies = node["parameters"]["cookies"]
except:
node["parameters"]["cookies"] = ""
elif node["option"] == 2: # 点击操作
try:
alertHandleType = node["parameters"]["alertHandleType"]
except:
node["parameters"]["alertHandleType"] = 0
if node["parameters"]["useLoop"]:
if self.task_version <= "0.3.5":
# 0.3.5及以下版本的EasySpider下的循环点击不支持相对XPath
node["parameters"]["xpath"] = ""
self.print_and_log("您的任务版本号为" + self.task_version +
",循环点击不支持相对XPath写法,已自动切换为纯循环的XPath")
elif node["option"] == 3: # 提取数据操作
node["parameters"]["recordASField"] = 0
try:
params = node["parameters"]["params"]
except:
node["parameters"]["params"] = node["parameters"]["paras"] # 兼容0.5.0及以下版本的EasySpider
params = node["parameters"]["params"]
try:
clear = node["parameters"]["clear"]
except:
node["parameters"]["clear"] = 0
try:
newLine = node["parameters"]["newLine"]
except:
node["parameters"]["newLine"] = 1
for param in params:
try:
iframe = param["iframe"]
except:
param["iframe"] = False
try:
param["relativeXPath"] = lowercase_tags_in_xpath(param["relativeXPath"])
except:
pass
try:
node["parameters"]["recordASField"] = param["recordASField"]
except:
node["parameters"]["recordASField"] = 1
try:
splitLine = int(param["splitLine"])
except:
param["splitLine"] = 0
if param["contentType"] == 8:
self.print_and_log(
"默认的ddddocr识别功能如果觉得不好用,可以自行修改源码get_content函数->contentType == 8的位置换成自己想要的OCR模型然后自己编译运行;或者可以先设置采集内容类型为“元素截图”把图片保存下来,然后用自定义操作调用自己写的程序,程序的功能是读取这个最新生成的图片,然后用好用的模型,如PaddleOCR把图片识别出来,然后把返回值返回给程序作为参数输出。")
self.print_and_log(
"If you think the default ddddocr function is not good enough, you can modify the source code get_content function -> contentType == 8 position to your own OCR model and then compile and run it; or you can first set the content type of the crawler to \"Element Screenshot\" to save the picture, and then call your own program with custom operations. The function of the program is to read the latest generated picture, then use a good model, such as PaddleOCR to recognize the picture, and then return the return value as a parameter output to the program.")
param["optimizable"] = detect_optimizable(param)
elif node["option"] == 4: # 输入文字
try:
index = node["parameters"]["index"] # 索引值
except:
node["parameters"]["index"] = 0
elif node["option"] == 5: # 自定义操作
try:
clear = node["parameters"]["clear"]
except:
node["parameters"]["clear"] = 0
try:
newLine = node["parameters"]["newLine"]
except:
node["parameters"]["newLine"] = 1
elif node["option"] == 7: # 移动到元素
if node["parameters"]["useLoop"]:
if self.task_version <= "0.3.5":
# 0.3.5及以下版本的EasySpider下的循环点击不支持相对XPath
node["parameters"]["xpath"] = ""
self.print_and_log("您的任务版本号为" + self.task_version +
",循环点击不支持相对XPath写法,已自动切换为纯循环的XPath")
elif node["option"] == 8: # 循环操作
try:
exitElement = node["parameters"]["exitElement"]
if exitElement == "":
node["parameters"]["exitElement"] = "//body"
except:
node["parameters"]["exitElement"] = "//body"
node["parameters"]["quickExtractable"] = False # 是否可以快速提取
try:
skipCount = node["parameters"]["skipCount"]
except:
node["parameters"]["skipCount"] = 0
# 如果(不)固定元素列表循环中只有一个提取数据操作,且提取数据操作的提取内容为元素截图,那么可以快速提取
if len(node["sequence"]) == 1 and self.procedure[node["sequence"][0]]["option"] == 3 and (int(node["parameters"]["loopType"]) == 1 or int(node["parameters"]["loopType"]) == 2):
try:
params = self.procedure[node["sequence"][0]]["parameters"]["params"]
except:
params = self.procedure[node["sequence"][0]]["parameters"]["paras"] # 兼容0.5.0及以下版本的EasySpider
try:
waitElement = self.procedure[node["sequence"][0]]["parameters"]["waitElement"]
except:
waitElement = ""
if node["parameters"]["iframe"]:
node["parameters"]["quickExtractable"] = False # 如果是iframe,那么不可以快速提取
else:
node["parameters"]["quickExtractable"] = True # 先假设可以快速提取
if node["parameters"]["skipCount"] > 0:
node["parameters"]["quickExtractable"] = False # 如果有跳过的元素,那么不可以快速提取
for param in params:
optimizable = detect_optimizable(param, ignoreWaitElement=False, waitElement=waitElement)
try:
iframe = param["iframe"]
except:
param["iframe"] = False
if param["iframe"] and not param["relative"]: # 如果是iframe,那么不可以快速提取
optimizable = False
if not optimizable: # 如果有一个不满足优化条件,那么就不能快速提取
node["parameters"]["quickExtractable"] = False
break
if node["parameters"]["quickExtractable"]:
self.print_and_log("循环操作<" + node["title"] + ">可以快速提取数据")
self.print_and_log("Loop operation <" + node["title"] + "> can extract data quickly")
try:
node["parameters"]["clear"] = self.procedure[node["sequence"][0]]["parameters"]["clear"]
except:
node["parameters"]["clear"] = 0
try:
node["parameters"]["newLine"] = self.procedure[node["sequence"][0]]["parameters"]["newLine"]
except:
node["parameters"]["newLine"] = 1
if int(node["parameters"]["loopType"]) == 1: # 不固定元素列表
node["parameters"]["baseXPath"] = node["parameters"]["xpath"]
elif int(node["parameters"]["loopType"]) == 2: # 固定元素列表
node["parameters"]["baseXPath"] = node["parameters"]["pathList"]
node["parameters"]["quickParams"] = []
for param in params:
content_type = ""
if param["relativeXPath"].find("/@href") >= 0 or param["relativeXPath"].find("/text()") >= 0 or param["relativeXPath"].find(
"::text()") >= 0:
content_type = ""
elif param["nodeType"] == 2:
content_type = "//@href"
elif param["nodeType"] == 4: # 图片链接
content_type = "//@src"
elif param["contentType"] == 1:
content_type = "/text()"
elif param["contentType"] == 0:
content_type = "//text()"
if param["relative"]: # 如果是相对XPath
xpath = "." + param["relativeXPath"] + content_type
else:
xpath = param["relativeXPath"] + content_type
# 如果是id()或(//div)[1]这种形式,不需要包/html/body
# if xpath.find("/body") < 0 and xpath.startswith("/"):
# xpath = "/html/body" + xpath
node["parameters"]["quickParams"].append({
"name": param["name"],
"relative": param["relative"],
"xpath": xpath,
"nodeType": param["nodeType"],
"default": param["default"],
})
self.procedure[index_node]["parameters"] = parameters
self.print_and_log("预处理完成|Preprocess completed")
def readFromExcel(self):
if self.inputExcel == "":
return 0
try:
workbook = load_workbook(self.inputExcel)
except:
self.print_and_log("读取Excel失败,将会使用默认参数执行任务,请检查文件路径是否正确:",
os.path.abspath(self.inputExcel))
self.print_and_log(
"Failed to read Excel, will execute the task with default parameters, please check if the file path is correct: ",
os.path.abspath(self.inputExcel))
time.sleep(5)
return 0
sheet_name_list = workbook.sheetnames
sheet = workbook[sheet_name_list[0]]
data = []
for row in sheet.iter_rows(values_only=True):
data.append(list(row))
result = list(zip(*data))
result_dict = {}
for row in result:
key = row[0]
values = [str(val) for val in row[1:] if val is not None]
result_dict.setdefault(key, []).extend([values])
data = {}
for key, arr in result_dict.items():
result = []
for cols in zip(*arr):
result.append("~".join(cols))
data[key] = result
try:
if "urlList_0" in data.keys():
self.links = data["urlList_0"]
except:
self.links = "about:blank"
task = self.service
for key, value in data.items():
for i in range(len(task["inputParameters"])):
if key == task["inputParameters"][i]["name"]:
nodeId = int(task["inputParameters"][i]["nodeId"])
node = task["graph"][nodeId]
value = "\r\n".join(value)
if node["option"] == 1:
node["parameters"]["links"] = value
elif node["option"] == 4:
node["parameters"]["value"] = value
elif node["option"] == 8 and node["parameters"]["loopType"] == 0:
node["parameters"]["exitCount"] = int(value)
elif node["option"] == 8:
node["parameters"]["textList"] = value
break
self.print_and_log("已从Excel读取输入参数,覆盖了原有输入参数。")
self.print_and_log(
"Already read input parameters from Excel and overwrite the original input parameters.")
def removeDuplicateData(self):
try:
removeDuplicateData = self.service["removeDuplicate"]
except:
removeDuplicateData = 0
if removeDuplicateData == 1:
self.print_and_log("正在去除重复数据,请稍后……")
self.print_and_log("Removing duplicate data, please wait...")
if self.outputFormat == "csv" or self.outputFormat == "txt" or self.outputFormat == "json" or self.outputFormat == "xlsx":
file_name = "Data/Task_" + \
str(self.id) + "/" + self.saveName + \
'.' + self.outputFormat
if self.outputFormat == "csv" or self.outputFormat == "txt":
df = pd.read_csv(file_name)
df.drop_duplicates(inplace=True)
df.to_csv(file_name, index=False)
elif self.outputFormat == "xlsx":
df = pd.read_excel(file_name)
df.drop_duplicates(inplace=True)
df.to_excel(file_name, index=False)
elif self.outputFormat == "json":
df = pd.read_json(file_name)
df.drop_duplicates(inplace=True)
df.to_json(file_name, orient="records", force_ascii=False)
elif self.outputFormat == "mysql":
self.mysql.remove_duplicate_data()
self.print_and_log("去重完成。")
self.print_and_log("Duplicate data removed.")
def run(self):
# 挨个执行程序
for i in range(len(self.links)):
self.print_and_log("正在执行第", i + 1, "/", len(self.links), "个链接")
self.print_and_log("Executing link", i + 1,
"/", len(self.links))
self.executeNode(0)
self.urlId = self.urlId + 1
# files = os.listdir("Data/Task_" + str(self.id) + "/" + self.saveName)
# 如果目录为空,则删除该目录
# if not files:
# os.rmdir("Data/Task_" + str(self.id) + "/" + self.saveName)
self.print_and_log("Done!")
self.print_and_log("执行完成!")
self.saveData(exit=True)
self.removeDuplicateData()
if self.outputFormat == "mysql":
self.mysql.close()
try:
quitWaitTime = self.service["quitWaitTime"]
except:
quitWaitTime = 60
self.print_and_log(f"任务执行完毕,将在{quitWaitTime}秒后自动退出浏览器并清理临时用户目录,等待时间可在保存任务对话框中设置。")
self.print_and_log(f"The task is completed, the browser will exit automatically and the temporary user directory will be cleaned up after {quitWaitTime} seconds, the waiting time can be set in the save task dialog.")
time.sleep(quitWaitTime)
try:
self.browser.quit()
except:
pass
self.print_and_log("正在清理临时用户目录……|Cleaning up temporary user directory...")
try:
shutil.rmtree(self.option["tmp_user_data_folder"])
except:
pass
self.monitor_event.set()
self.print_and_log("清理完成!|Clean up completed!")
self.print_and_log("您现在可以安全的关闭此窗口了。|You can safely close this window now.")
def recordLog(self, *args, **kwargs):
now = datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")
print(now + ":", *args, file=self.logs, **kwargs)
# 定义一个自定义的 print 函数,它将内容同时打印到屏幕和文件中
def print_and_log(self, *args, **kwargs):
now = datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")
# 将内容打印到屏幕
print(*args, **kwargs)
# 将内容写入文件
print(now + ":", *args, file=self.logs, **kwargs)
def saveData(self, exit=False):
# 每save_threshold条保存一次
if exit == True or len(self.OUTPUT) >= self.save_threshold:
# 写入日志
# self.recordLog("持久化存储数据/Persistently store data")
if self.log:
with open("Data/Task_" + str(self.id) + "/" + self.saveName + '.log', 'a',
encoding='utf-8-sig') as file_obj:
file_obj.write(self.logs.getvalue())
file_obj.close()
# 写入已执行步数
with open("Data/Task_" + str(self.id) + "/" + self.saveName + '_steps.txt', 'w',
encoding='utf-8-sig') as file_obj:
file_obj.write(str(self.totalSteps + 1))
file_obj.close()
# 写入数据
if self.outputFormat == "csv" or self.outputFormat == "txt":
file_name = "Data/Task_" + \
str(self.id) + "/" + self.saveName + \
'.' + self.outputFormat
write_to_csv(file_name, self.OUTPUT,
self.outputParametersRecord)
elif self.outputFormat == "xlsx":
file_name = "Data/Task_" + \
str(self.id) + "/" + self.saveName + '.xlsx'
write_to_excel(
file_name, self.OUTPUT, self.outputParametersTypes, self.outputParametersRecord)
elif self.outputFormat == "json":
file_name = "Data/Task_" + \
str(self.id) + "/" + self.saveName + '.json'
write_to_json(file_name, self.OUTPUT, self.outputParametersTypes,
self.outputParametersRecord, self.outputParameters.keys())
elif self.outputFormat == "mysql":
self.mysql.write_to_mysql(
self.OUTPUT, self.outputParametersRecord, self.outputParametersTypes)
self.OUTPUT = []
self.logs.truncate(0) # 清空日志
self.logs.seek(0) # 清空日志
def scrollDown(self, param, rt=""):
try:
time.sleep(param["scrollWaitTime"]) # 下拉前等待
except:
pass
scrollType = int(param["scrollType"])
try:
param["scrollCount"] = int(param["scrollCount"])
except:
param["scrollCount"] = 1
try:
if scrollType != 0 and param["scrollCount"] > 0: # 控制屏幕向下滚动
if scrollType == 1 or scrollType == 2:
for i in range(param["scrollCount"]):
body = self.browser.find_element(
By.CSS_SELECTOR, "body", iframe=param["iframe"])
if scrollType == 1:
body.send_keys(Keys.PAGE_DOWN)
elif scrollType == 2:
body.send_keys(Keys.END)
try:
time.sleep(param["scrollWaitTime"]) # 下拉完等待
except:
pass
self.print_and_log("向下滚动,第", i + 1, "次。")
self.print_and_log(
"Scroll down, the", i + 1, "time.")
elif scrollType == 3:
bodyText = ""
i = 0
while True:
newBodyText = self.browser.find_element(
By.CSS_SELECTOR, "body", iframe=False).text
if param["iframe"]: # 如果标记了iframe
iframes = self.browser.find_elements(
By.CSS_SELECTOR, "iframe", iframe=False)
for iframe in iframes:
self.browser.switch_to.default_content()
self.browser.switch_to.frame(iframe)
iframe_text = super(self.browser.__class__, self.browser).find_element(
By.CSS_SELECTOR, "body").text # 用super调用父类的方法
newBodyText += iframe_text
self.browser.switch_to.default_content()
if newBodyText == bodyText:
self.print_and_log("页面已检测不到新内容,停止滚动。")
self.print_and_log(
"No new content detected on the page, stop scrolling.")
break
else:
bodyText = newBodyText
body = self.browser.find_element(
By.CSS_SELECTOR, "body", iframe=param["iframe"])
body.send_keys(Keys.END)
self.print_and_log("滚动到底部,第", i + 1, "次。")
self.print_and_log(
"Scroll to the bottom, the", i + 1, "time.")
i = i + 1
try:
time.sleep(param["scrollWaitTime"]) # 下拉完等待
except:
pass
except Exception as e:
self.print_and_log("滚动屏幕时出错|Error scrolling screen:", e)
try:
self.browser.execute_script('window.stop()')
except:
pass
if scrollType != 0 and param["scrollCount"] > 0: # 控制屏幕向下滚动
if scrollType == 1 or scrollType == 2:
for i in range(param["scrollCount"]):
body = self.browser.find_element(
By.CSS_SELECTOR, "body", iframe=param["iframe"])
if scrollType == 1:
body.send_keys(Keys.PAGE_DOWN)
elif scrollType == 2:
body.send_keys(Keys.END)
try:
time.sleep(param["scrollWaitTime"]) # 下拉完等待
except:
pass
self.print_and_log("向下滚动,第", i + 1, "次。")
self.print_and_log(
"Scroll down, the", i + 1, "time.")
elif scrollType == 3:
bodyText = ""
i = 0
while True:
newBodyText = self.browser.find_element(
By.CSS_SELECTOR, "body", iframe=False).text
if param["iframe"]: # 如果标记了iframe
iframes = self.browser.find_elements(
By.CSS_SELECTOR, "iframe", iframe=False)
for iframe in iframes:
self.browser.switch_to.default_content()
self.browser.switch_to.frame(iframe)
iframe_text = super(self.browser.__class__, self.browser).find_element(
By.CSS_SELECTOR, "body").text # 用super调用父类的方法
newBodyText += iframe_text
self.browser.switch_to.default_content()
if newBodyText == bodyText:
self.print_and_log("页面已检测不到新内容,停止滚动。")
self.print_and_log(
"No new content detected on the page, stop scrolling.")
break
else:
bodyText = newBodyText
body = self.browser.find_element(
By.CSS_SELECTOR, "body", iframe=param["iframe"])
body.send_keys(Keys.END)
self.print_and_log("滚动到底部,第", i + 1, "次。")
self.print_and_log(
"Scroll to the bottom, the", i + 1, "time.")
i = i + 1
try:
time.sleep(param["scrollWaitTime"]) # 下拉完等待
except:
pass
if rt != "":
rt.end()
def execute_code(self, codeMode, code, max_wait_time, element=None, iframe=False):
output = ""
if code == "":
return ""
if max_wait_time == 0:
max_wait_time = 999999
# self.print_and_log(codeMode, code)
# 将value中的Field[""]替换为outputParameters中的键值
code = replace_field_values(code, self.outputParameters, self)
if iframe and self.browser.iframe_env == False:
# 获取所有的 iframe
self.browser.switch_to.default_content()
iframes = self.browser.find_elements(
By.CSS_SELECTOR, "iframe", iframe=False)
# 遍历所有的 iframe 并点击里面的元素
for iframe in iframes:
# 切换到 iframe
try:
self.browser.switch_to.default_content()
self.browser.switch_to.frame(iframe)
self.browser.iframe_env = True
break
except:
self.print_and_log("Iframe switch failed")
elif not iframe and self.browser.iframe_env == True:
self.browser.switch_to.default_content()
self.browser.iframe_env = False
if int(codeMode) == 0:
self.recordLog("Execute JavaScript:" + code)
self.recordLog("执行JavaScript:" + code)
self.browser.set_script_timeout(max_wait_time)
try:
output = self.browser.execute_script(code)
except:
output = ""
self.recordLog("JavaScript execution failed")
elif int(codeMode) == 2:
self.recordLog("Execute JavaScript for element:" + code)
self.recordLog("对元素执行JavaScript:" + code)
self.browser.set_script_timeout(max_wait_time)
try:
output = self.browser.execute_script(code, element)
except:
output = ""
self.recordLog("JavaScript execution failed")
elif int(codeMode) == 5:
try:
code = readCode(code)
# global_namespace = globals().copy()
# global_namespace["self"] = self
output = exec(code)
self.recordLog("执行下面的代码:" + code)
self.recordLog("Execute the following code:" + code)
except Exception as e:
self.print_and_log("执行下面的代码时出错:" + code, ",错误为:", e)
self.print_and_log("Error executing the following code:" +
code, ", error is:", e)
elif int(codeMode) == 6:
try:
code = readCode(code)
output = eval(code)
self.recordLog("获得下面的代码返回值:" + code)
self.recordLog(
"Get the return value of the following code:" + code)
except Exception as e:
self.print_and_log("获得下面的代码返回值时出错:" + code, ",错误为:", e)
self.print_and_log(
"Error executing and getting return value the following code:" + code, ", error is:", e)
elif int(codeMode) == 1:
self.recordLog("Execute System Call:" + code)
self.recordLog("执行系统命令:" + code)
# 执行系统命令
try:
# output = subprocess.run(code, capture_output=True, text=True, timeout=max_wait_time, encoding="utf-8", shell=True)
output = subprocess.run(
code, capture_output=True, text=True, timeout=max_wait_time, shell=True)
# 输出命令返回值
output = output.stdout
self.print_and_log(output)
except subprocess.TimeoutExpired:
# 命令执行时间超过指定值,抛出异常
self.recordLog("Command timed out")
self.recordLog("命令执行超时")
except Exception as e:
self.print_and_log(e) # 打印异常信息
self.recordLog("Command execution failed")
self.recordLog("命令执行失败")
try:
output = str(output)
except:
output = "无法转换为字符串|Unable to convert to string"
self.print_and_log("无法转换为字符串|Unable to convert to string", output)
return output
def customOperation(self, node, loopValue, loopPath, index):
params = node["parameters"]
if params["clear"] == 1:
self.clearOutputParameters()
codeMode = int(params["codeMode"])
code = params["code"]
output = ""
max_wait_time = int(params["waitTime"])
if codeMode == 2: # 使用循环的情况下,传入的clickPath就是实际的xpath
try:
loopPath = replace_field_values(
loopPath, self.outputParameters, self)
elements = self.browser.find_elements(
By.XPATH, loopPath, iframe=params["iframe"])
element = elements[index]
output = self.execute_code(
codeMode, code, max_wait_time, element, iframe=params["iframe"])
except:
output = ""
self.print_and_log("JavaScript execution failed")
elif codeMode == 3:
self.BREAK = True
self.recordLog("跳出循环|Break the loop")
elif codeMode == 4:
self.CONTINUE = True
self.recordLog("跳过本次循环|Skip this loop")
elif codeMode == 7: # 暂停程序执行
self.event.clear()
self.print_and_log(
f"根据设置的自定义操作,任务已暂停,长按{self.service['pauseKey']}键继续执行...|Task paused according to custom operation, long press '{self.service['pauseKey']}' to continue...")
elif codeMode == 8: # 刷新页面
self.browser.refresh()
self.print_and_log("根据设置的自定义操作,任务已刷新页面|Task refreshed page according to custom operation")
elif codeMode == 9: # 发送邮件
send_email(node["parameters"]["emailConfig"])
elif codeMode == 10: # 清空所有字段值
self.clearOutputParameters()
elif codeMode == 11: # 生成新的数据行
line = new_line(self.outputParameters,
self.maxViewLength, self.outputParametersRecord)
self.OUTPUT.append(line)
elif codeMode == 12: # 退出程序
self.print_and_log("根据设置的自定义操作,任务已退出|Task exited according to custom operation")
self.saveData(exit=True)
self.browser.quit()
self.print_and_log("正在清理临时用户目录……|Cleaning up temporary user directory...")
try:
shutil.rmtree(self.option["tmp_user_data_folder"])
except:
pass
self.print_and_log("清理完成!|Clean up completed!")
os._exit(0)
else: # 0 1 5 6
output = self.execute_code(
codeMode, code, max_wait_time, iframe=params["iframe"])
recordASField = bool(params["recordASField"])
# if recordASField:
# self.print_and_log("操作<" + node["title"] + ">的返回值为:" + output)
# self.print_and_log("The return value of operation <" + node["title"] + "> is: " + output)
self.outputParameters[node["title"]] = output
if recordASField and params["newLine"]:
line = new_line(self.outputParameters,
self.maxViewLength, self.outputParametersRecord)
self.OUTPUT.append(line)
def switchSelect(self, param, loopValue):
optionMode = param["optionMode"]
optionValue = param["optionValue"]
if param["useLoop"]:
index = param["index"]
if index != 0:
try:
optionValue = loopValue.split("~")[index - 1]
except:
self.print_and_log("取值失败,可能是因为取值索引超出范围,将使用整个文本值")
self.print_and_log(
"Failed to get value, maybe because the index is out of range, will use the entire text value")
else:
optionValue = loopValue
optionMode = 1
try:
xpath = replace_field_values(
param["xpath"], self.outputParameters, self)
dropdown = Select(self.browser.find_element(
By.XPATH, xpath, iframe=param["iframe"]))
try:
if optionMode == 0:
# 获取当前选中的选项索引
current_index = dropdown.options.index(
dropdown.first_selected_option)
# 计算下一个选项的索引
next_index = (current_index + 1) % len(dropdown.options)
# 选择下一个选项
dropdown.select_by_index(next_index)
elif optionMode == 1:
dropdown.select_by_index(int(optionValue))
elif optionMode == 2:
dropdown.select_by_value(optionValue)
elif optionMode == 3:
dropdown.select_by_visible_text(optionValue)
# self.recordLog("切换到下拉框选项|Change to drop-down box option:", xpath)
except:
self.print_and_log("切换下拉框选项失败:", xpath,
param["optionMode"], param["optionValue"])
self.print_and_log("Failed to change drop-down box option:",
xpath, param["optionMode"], param["optionValue"])
except:
self.print_and_log("找不到下拉框元素:", xpath)
self.print_and_log("Cannot find drop-down box element:", xpath)
def moveToElement(self, param, loopElement=None, loopPath="", index=0):
time.sleep(0.1) # 移动之前等待0.1秒
loopPath = replace_field_values(loopPath, self.outputParameters, self)
xpath = replace_field_values(
param["xpath"], self.outputParameters, self)
if param["useLoop"]: # 使用循环的情况下,传入的clickPath就是实际的xpath
if xpath == "":
path = loopPath
else:
path = "(" + loopPath + ")" + \
"[" + str(index + 1) + "]" + \
xpath
index = 0 # 如果是相对循环内元素的点击,在定位到元素后,index应该重置为0
# element = loopElement
else:
index = 0
path = xpath # 不然使用元素定义的xpath
path = replace_field_values(path, self.outputParameters, self)
try:
elements = self.browser.find_elements(
By.XPATH, path, iframe=param["iframe"])
element = elements[index]
try:
ActionChains(self.browser).move_to_element(element).perform()
# self.recordLog("移动到元素|Move to element:", path)
except:
self.print_and_log("移动鼠标到元素失败:", xpath)
self.print_and_log("Failed to move mouse to element:", xpath)
except:
self.print_and_log("找不到元素:", xpath)
self.print_and_log("Cannot find element:", xpath)
# 执行节点关键函数部分
def executeNode(self, nodeId, loopValue="", loopPath="", index=0):
node = self.procedure[nodeId]
# WebDriverWait(self.browser, 10).until
# # 等待元素出现才进行操作,10秒内未出现则报错
# (EC.visibility_of_element_located(
# (By.XPATH, node["parameters"]["xpath"])))
try:
if node["parameters"]["waitElement"] != "":
waitElement = replace_field_values(
node["parameters"]["waitElement"], self.outputParameters, self)
waitElementTime = float(node["parameters"]["waitElementTime"])
waitElementIframeIndex = node["parameters"]["waitElementIframeIndex"]
self.print_and_log("等待元素出现:", waitElement)
self.print_and_log(
"Waiting for element to appear:", waitElement)
if waitElementIframeIndex > 0:
iframes = self.browser.find_elements(
By.CSS_SELECTOR, "iframe", iframe=False)
iframe = iframes[waitElementIframeIndex - 1]
self.browser.switch_to.frame(iframe)
WebDriverWait(self.browser, waitElementTime).until(
EC.presence_of_element_located((By.XPATH, waitElement))
)
if waitElementIframeIndex > 0:
self.browser.switch_to.default_content()
except Exception as e:
if waitElement != "":
self.print_and_log("等待元素出现超时:", waitElement, ",将继续执行。")
self.print_and_log("Timeout waiting for element to appear:",
waitElement, ", will continue to execute.")
self.recordLog(e)
self.recordLog("Wait element not found")
self.recordLog("执行节点|Execute node:", node["title"])
try:
# 根据不同选项执行不同操作
if node["option"] == 0 or node["option"] == 10: # root操作,条件分支操作
for i in node["sequence"]: # 从根节点开始向下读取
self.executeNode(i, loopValue, loopPath, index)
elif node["option"] == 1: # 打开网页操作
# if not (nodeId == 1 and self.service["cloudflare"] == 1):
self.openPage(node["parameters"], loopValue)
elif node["option"] == 2: # 点击元素
self.clickElement(node["parameters"], loopValue, loopPath, index)
elif node["option"] == 3: # 提取数据
# 针对提取数据操作,设置操作开始的步骤,用于不小心关闭后的恢复的增量采集
if self.totalSteps >= self.startSteps:
self.getData(node["parameters"], loopValue, node["isInLoop"],
parentPath=loopPath, index=index)
self.saveData()
else:
# self.getDataStep += 1
self.print_and_log("跳过第" + str(self.totalSteps) + "次提取数据。")
self.print_and_log(