forked from Jaammerr/The-Dawn-Bot
-
Notifications
You must be signed in to change notification settings - Fork 0
/
run.py
126 lines (91 loc) · 4.1 KB
/
run.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
import asyncio
import random
import sys
from typing import Callable, Coroutine, Any, List, Set
from loguru import logger
from loader import config, semaphore, file_operations, single_semaphore
from core.bot import Bot
from models import Account
from utils import setup
from console import Console
from database import initialize_database
accounts_with_initial_delay: Set[str] = set()
async def run_module_safe(
account: Account, process_func: Callable[[Bot], Coroutine[Any, Any, Any]]
) -> Any:
global accounts_with_initial_delay
async with semaphore if config.redirect_settings.enabled is False else single_semaphore:
bot = Bot(account)
await account.init_appid()
try:
if config.delay_before_start.min > 0:
if process_func == process_farming and account.email not in accounts_with_initial_delay:
random_delay = random.randint(config.delay_before_start.min, config.delay_before_start.max)
logger.info(f"Account: {account.email} | Initial farming delay: {random_delay} sec")
await asyncio.sleep(random_delay)
accounts_with_initial_delay.add(account.email)
elif process_func != process_farming:
random_delay = random.randint(config.delay_before_start.min, config.delay_before_start.max)
logger.info(f"Account: {account.email} | Sleep for {random_delay} sec")
await asyncio.sleep(random_delay)
result = await process_func(bot)
return result
finally:
await bot.close_session()
async def process_registration(bot: Bot) -> None:
operation_result = await bot.process_registration()
await file_operations.export_result(operation_result, "register")
async def process_re_verify_accounts(bot: Bot) -> None:
operation_result = await bot.process_reverify_email()
await file_operations.export_result(operation_result, "re-verify")
async def process_farming(bot: Bot) -> None:
await bot.process_farming()
async def process_export_stats(bot: Bot) -> None:
data = await bot.process_get_user_info()
await file_operations.export_stats(data)
async def process_complete_tasks(bot: Bot) -> None:
operation_result = await bot.process_complete_tasks()
await file_operations.export_result(operation_result, "tasks")
async def run_module(
accounts: List[Account], process_func: Callable[[Bot], Coroutine[Any, Any, Any]]
) -> tuple[Any]:
tasks = [run_module_safe(account, process_func) for account in accounts]
return await asyncio.gather(*tasks)
async def farm_continuously(accounts: List[Account]) -> None:
while True:
random.shuffle(accounts)
await run_module(accounts, process_farming)
await asyncio.sleep(10)
def reset_initial_delays():
global accounts_with_initial_delay
accounts_with_initial_delay.clear()
async def run() -> None:
await initialize_database()
await file_operations.setup_files()
reset_initial_delays()
module_map = {
"register": (config.accounts_to_register, process_registration),
"farm": (config.accounts_to_farm, farm_continuously),
"complete_tasks": (config.accounts_to_farm, process_complete_tasks),
"export_stats": (config.accounts_to_farm, process_export_stats),
"re_verify_accounts": (config.accounts_to_reverify, process_re_verify_accounts),
}
while True:
Console().build()
if config.module not in module_map:
logger.error(f"Unknown module: {config.module}")
break
accounts, process_func = module_map[config.module]
if not accounts:
logger.error(f"No accounts for {config.module}")
break
if config.module == "farm":
await process_func(accounts)
else:
await run_module(accounts, process_func)
input("\n\nPress Enter to continue...")
if __name__ == "__main__":
if sys.platform == "win32":
asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy())
setup()
asyncio.run(run())