forked from OpenEthan/SMSBoom
-
Notifications
You must be signed in to change notification settings - Fork 4
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
celery 不支持在 windows 上调用 async 函数。所有要部署在 linux 上,部署待测试。
- Loading branch information
Showing
6 changed files
with
130 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,7 @@ | ||
from celery_server.tasks import asyncRun | ||
|
||
# r = test.delay(1,2) | ||
# r2 = test.delay(1,2) | ||
|
||
r = asyncRun.delay("13809213237") | ||
print(r.get()) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,8 @@ | ||
# Celery 异步服务器后端模块 | ||
|
||
## 部署 | ||
```shell | ||
pip install celery gevent -i https://pypi.doubanio.com/simple/ | ||
celery -A celery_server worker -l info --pool=eventlet | ||
``` | ||
需要在 celery 5.0 中才能使用 async |
Empty file.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,17 @@ | ||
from celery import Celery | ||
from celery.utils.log import get_task_logger | ||
|
||
app = Celery( | ||
'celery_server', | ||
include=[ | ||
'celery_server.tasks' | ||
] | ||
) | ||
app.config_from_object( | ||
'celery_server.config', | ||
) | ||
|
||
logger = get_task_logger(__name__) | ||
|
||
if __name__ == '__main__': | ||
app.start() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,11 @@ | ||
#broker(消息中间件来接收和发送任务消息) | ||
BROKER_URL = 'redis://localhost:6379/1' | ||
#backend(存储worker执行的结果) | ||
CELERY_RESULT_BACKEND = 'redis://localhost:6379/2' | ||
|
||
#设置时间参照,不设置默认使用的UTC时间 | ||
CELERY_TIMEZONE = 'Asia/Shanghai' | ||
#指定任务的序列化 | ||
CELERY_TASK_SERIALIZER='json' | ||
#指定执行结果的序列化 | ||
CELERY_RESULT_SERIALIZER='json' |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,87 @@ | ||
# encoding=utf8 | ||
# 请求的方法 | ||
from smsboom import load_getapi, load_json | ||
from utils.log import logger | ||
from utils.models import API | ||
from utils import default_header | ||
import httpx | ||
from httpx import Limits | ||
from typing import Union, List | ||
import asyncio | ||
|
||
import sys | ||
sys.path.append("E:\coding\SMSBoom") | ||
|
||
|
||
def reqAPI(api: API, client: httpx.AsyncClient): | ||
if isinstance(api.data, dict): | ||
resp = client.request(method=api.method, json=api.data, | ||
headers=api.header, url=api.url, timeout=10) | ||
else: | ||
resp = client.request(method=api.method, data=api.data, | ||
headers=api.header, url=api.url, timeout=10) | ||
return resp | ||
|
||
|
||
async def asyncReqs(src: Union[API, str], phone: Union[tuple, str], semaphore): | ||
"""异步请求方法 | ||
:param: | ||
:return: | ||
""" | ||
# 多手机号支持 | ||
if isinstance(phone, tuple): | ||
phone_lst = [_ for _ in phone] | ||
else: | ||
phone_lst = [phone] | ||
async with semaphore: | ||
async with httpx.AsyncClient( | ||
limits=Limits(max_connections=1000, | ||
max_keepalive_connections=2000), | ||
headers=default_header, | ||
verify=False, | ||
timeout=99999 | ||
) as c: | ||
|
||
for ph in phone_lst: | ||
try: | ||
if isinstance(src, API): | ||
src = src.handle_API(ph) | ||
r = await reqAPI(src, c) | ||
else: | ||
# 利用元组传参安全因为元组不可修改 | ||
s = (src.replace(" ", "").replace("\n", "").replace("\t", "").replace( | ||
"&", "").replace('\n', '').replace('\r', ''),) | ||
r = await c.get(*s) | ||
return r | ||
except httpx.HTTPError as why: | ||
# logger.error(f"异步请求失败{type(why)}") | ||
pass | ||
except TypeError: | ||
# logger.error("类型错误") | ||
pass | ||
except Exception as wy: | ||
# logger.exception(f"异步失败{wy}") | ||
pass | ||
|
||
def callback(result): | ||
"""异步回调函数""" | ||
log = result.result() | ||
if log is not None: | ||
# logger.info(f"请求结果:{log.text[:30]}") | ||
print(log.text[:30]) | ||
pass | ||
|
||
|
||
async def runAsync(apis: List[Union[API, str]], phone: Union[tuple, str]): | ||
|
||
tasks = [] | ||
|
||
for api in apis: | ||
semaphore = asyncio.Semaphore(999999) | ||
task = asyncio.create_task(asyncReqs(api, phone, semaphore)) | ||
task.add_done_callback(callback) | ||
tasks.append(task) | ||
|
||
await asyncio.gather( | ||
*tasks | ||
) |