forked from jerrychan807/WSPIH
-
Notifications
You must be signed in to change notification settings - Fork 2
/
LinksCrawler.py
executable file
·187 lines (156 loc) · 7.25 KB
/
LinksCrawler.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
# !/usr/local/bin/python3
# -*- coding:utf-8 -*-
__author__ = 'jerry'
from collections import defaultdict
import sys
import json
from lib.common.basic import getExtension, getDomain
from lib.third.nyawc.Crawler import Crawler
from lib.third.nyawc.CrawlerActions import CrawlerActions
from lib.third.nyawc.Options import Options
from lib.third.nyawc.http.Request import Request
from lib.utils.extension import IGNORED_EXTESIONS, EXCEL_EXTENSIONS, WORD_EXTENSIONS, PDF_EXTENSIONS
import config
class LinksCrawler():
def __init__(self, subdomain, file_links_path):
self.subdomain = subdomain
self.file_links_path = file_links_path
self.options = Options()
self.crawled_urls_to_check_dups = []
self.file_links = {'word': [], 'excel': [], 'pdf': []}
self.other_links = defaultdict(list)
def prepare(self):
'''
预处理url
'''
self.subdomain_url = 'http://' + self.subdomain if 'http' not in self.subdomain else self.subdomain
self.subdomain_name = getDomain(self.subdomain_url)
def setOptions(self):
self._setPerformanceOptions()
self._setScopeOptions()
self._setIdentityOptions()
self._setMiscOptions()
self._setIgnoredExtensions()
# self._setFocusExtensions()
def _setPerformanceOptions(self):
'''
设置性能
refs: https://tijme.github.io/not-your-average-web-crawler/latest/options_performance.html
'''
self.options.performance.max_threads = config.CRAWLER_MAX_THREADS # 线程
self.options.performance.request_timeout = config.CRAWLER_REQUEST_TIMEOUT # 超时时间
def _setScopeOptions(self):
'''
设置作用域
:return:
'''
self.options.scope.protocol_must_match = False # 协议
self.options.scope.subdomain_must_match = True # 子域名
self.options.scope.hostname_must_match = True # 主机名
self.options.scope.tld_must_match = True # 顶级域名
self.options.scope.max_depth = config.CRAWLER_MAX_DEPTH # 深度
self.options.scope.request_methods = [ # 允许的方法
Request.METHOD_GET,
Request.METHOD_POST,
Request.METHOD_PUT,
Request.METHOD_DELETE,
Request.METHOD_OPTIONS,
Request.METHOD_HEAD
]
def _setIdentityOptions(self):
'''
设置认证
refs:
'''
self.options.identity.headers.update({
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/56.0.2924.87 Safari/537.36"})
def _setMiscOptions(self):
'''
设置其他选项
:return:
'''
self.options.misc.debug = True
def _setIgnoredExtensions(self):
'''
设置排除的拓展名
'''
self.ignored_extensions = IGNORED_EXTESIONS
# def _setFocusExtensions(self):
# '''
# 设置需要的拓展名
# '''
# self.focus_extensions = []
# self.focus_extensions.extend(EXCEL_EXTENSIONS)
# self.focus_extensions.extend(PDF_EXTENSIONS)
# self.focus_extensions.extend(WORD_EXTENSIONS)
# print(self.focus_extensions)
def _set_cb_crawler_before_start(self):
global subdomain_url
subdomain_url = self.subdomain_url
def cb_crawler_before_start():
print("\nTarget : " + subdomain_url)
print("--" * 30)
self.options.callbacks.crawler_before_start = cb_crawler_before_start # Called before the crawler starts crawling. Default is a null route.
def _set_cb_crawler_after_finish(self):
def cb_crawler_after_finish(queue):
print("Crawling finished.")
self.options.callbacks.crawler_after_finish = cb_crawler_after_finish # Called after the crawler finished crawling. Default is a null route.
def _set_cb_request_before_start(self):
global ignored_extensions, crawled_urls_to_check_dups
crawled_urls_to_check_dups = self.crawled_urls_to_check_dups
ignored_extensions = self.ignored_extensions
def cb_request_before_start(queue, queue_item):
if queue_item.request.url in crawled_urls_to_check_dups: # To avoid duplicate links crawling
return CrawlerActions.DO_SKIP_TO_NEXT
if getExtension(queue_item.request.url) in ignored_extensions: # Don't crawl gif, jpg , etc
return CrawlerActions.DO_SKIP_TO_NEXT
return CrawlerActions.DO_CONTINUE_CRAWLING
self.options.callbacks.request_before_start = cb_request_before_start # Called before the crawler starts a new request. Default is a null route.
def _set_cb_request_after_finish(self):
global crawled_urls_to_check_dups, file_links, other_links
crawled_urls_to_check_dups = self.crawled_urls_to_check_dups
file_links = self.file_links
other_links = self.other_links
file_links_path = self.file_links_path
def cb_request_after_finish(queue, queue_item, new_queue_items):
crawled_urls_to_check_dups.append(queue_item.request.url) # Add newly obtained URL in list
url_extension = getExtension(queue_item.request.url).lower()
if url_extension in EXCEL_EXTENSIONS:
path = queue_item.request.url
file_links['excel'].append(path)
print("[*] Excel > {}".format(path))
elif url_extension in WORD_EXTENSIONS:
path = queue_item.request.url
file_links['word'].append(path)
print("[*] Word > {}".format(path))
elif url_extension in PDF_EXTENSIONS:
path = queue_item.request.url
file_links['pdf'].append(path)
print("[*] Pdf > {}".format(path))
else:
# if ("?" in queue_item.request.url):
# path = queue_item.request.url[:queue_item.request.url.find("?")]
# query = queue_item.request.url[queue_item.request.url.find("?"):]
# else:
# path = queue_item.request.url
# query = ""
# other_links[path].append(query)
pass
open(file_links_path, "w").write(str(json.dumps(file_links)))
return CrawlerActions.DO_CONTINUE_CRAWLING
self.options.callbacks.request_after_finish = cb_request_after_finish # Called after the crawler finishes a request. Default is a null route.
def startCrawl(self):
self._set_cb_crawler_before_start()
self._set_cb_crawler_after_finish()
self._set_cb_request_before_start()
self._set_cb_request_after_finish()
self.crawler = Crawler(self.options)
self.crawler.start_with(Request(self.subdomain_url))
if __name__ == '__main__':
subdomain = sys.argv[1]
file_links_path = sys.argv[2]
links_crawler = LinksCrawler(subdomain, file_links_path)
links_crawler.prepare()
links_crawler.setOptions()
links_crawler.startCrawl()
print(links_crawler.file_links)