-
Notifications
You must be signed in to change notification settings - Fork 50
/
Copy pathtasks.py
135 lines (104 loc) · 3.73 KB
/
tasks.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
# -*- coding:utf-8 -*-
#!/user/bin python
#Description: celery task
#Author: Bing
#Email: [email protected]
#DateTime: 2017-05-10 23:08:39
import time
from celery import Celery, platforms
from datetime import timedelta
from celery.schedules import crontab
import smtplib
from email.mime.text import MIMEText
from core.settings import *
from libraries.github import run as github
from libraries.web import Work as Web
from libraries.pscan import Work as Pscan
from libraries.brute import Work as Brute
from libraries.awvs import Work as Awvs
from libraries.nessus import Work as Nessus
#from multiprocessing import Pool,Process
#import gevent
app = Celery()
platforms.C_FORCE_ROOT = True
app.conf.update(
CELERY_IMPORTS = ("tasks", ),
BROKER_URL = 'redis://127.0.0.1:6379/0',
CELERY_RESULT_BACKEND = 'redis://127.0.0.1:6379/2',
CELERY_TASK_SERIALIZER='json',
CELERY_RESULT_SERIALIZER='json',
CELERY_ACCEPT_CONTENT = ['json'],
CELERY_TIMEZONE='Asia/Shanghai',
CELERY_ENABLE_UTC=True,
CELERY_REDIS_MAX_CONNECTIONS=5000,
BROKER_TRANSPORT_OPTIONS = {'visibility_timeout': 3600},
# BROKER_TRANSPORT_OPTIONS = {'fanout_prefix': True},
# CELERYBEAT_SCHEDULE = {
# 'add-every-30-seconds': {
# 'task': 'tasks.mail_notify',
# 'schedule': timedelta(seconds=10),
# 'args': ("test","Talscan Notice !!! ")
# },
# # 'add-every-monday-morning': {
# # 'task': 'tasks.add',
# # 'schedule': crontab(hour=1, minute=16, day_of_week=4),
# # 'args': (40, 30),
# # },
# }
)
@app.task
def custom_poc_scan(taskid = "" , host = "" , args = ""):
'''
args 为扫描POC的类型;参数可以是数组,也可以是字符 如:args = "subdomain";args = ["subdomain","dnszone"]
'''
test = []
if type(args) == type(test) and len(args) > 0 :
for scan_type in args :
t = Web( scan_id = taskid ,scan_target = host,scan_type = scan_type)
t.run()
else :
t = Web( scan_id = taskid ,scan_target = host,scan_type = args)
t.run()
@app.task
def custom_brute_scan(taskid = "" , host = "" , args = ""):
test = []
if type(args) == type(test) and len(args) > 0 :
for scan_type in args :
t = Brute( scan_id = taskid ,scan_target = host,scan_type = scan_type)
t.run()
else :
t = Brute( scan_id = taskid ,scan_target = host,scan_type = args)
t.run()
@app.task
def custom_nmap_scan(taskid = "" , host = "" , args = ""):
t = Pscan( scan_id = taskid ,scan_target = host)
t.run()
@app.task
def awvs_scan(taskid = "" , host = "" , args = ""):
t = Awvs( scan_id = taskid ,scan_target = host)
t.run()
@app.task
def nessus_scan(taskid = "" , host = "" , args = ""):
t = Nessus( scan_id = taskid ,scan_target = host)
t.run()
@app.task
def rsas_scan(taskid = "" , host = "" , args = ""):
pass
@app.task
def github_scan(taskid = "" ,host = "" ,args = "") :
github( scan_id = taskid ,scan_target = host ,scan_args = args)
@app.task
def custom_spider_scan(taskid = "" , host = "" , args = {}):
pass
@app.task
def custom_rule_scan(taskid = "" , host = "" , args = {}):
pass
#celery -A tasks beat --loglevel=info
#celery beat -A tasks work --loglevel=info
#celery -A tasks work --loglevel=info
# custom_poc_scan(taskid = "taskid-296" ,host = "100.xueersi.com",args = "subdomain")
# custom_brute_scan(taskid = "taskid-296" ,host = "100.xueersi.com",args = "subdomain")
# custom_nmap_scan(taskid = "taskid-296" ,host = "www.baidu.com",args = "subdomain")
# awvs_scan(taskid = "taskid-296" ,host = "100.xueersi.com")
# nessus_scan(taskid = "taskid-296" ,host = "100.xueersi.com")
# github_scan(taskid = "taskid-296" ,host = "100.xueersi.com" ,args = ["email"])