Skip to content

Commit

Permalink
✨ feat: celery 异步框架测试运行。
Browse files Browse the repository at this point in the history
celery 不支持在 windows 上调用 async 函数。所有要部署在 linux 上,部署待测试。
  • Loading branch information
WhaleFell committed May 14, 2022
1 parent 21e0870 commit a8af96c
Show file tree
Hide file tree
Showing 6 changed files with 130 additions and 0 deletions.
7 changes: 7 additions & 0 deletions celery-client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
from celery_server.tasks import asyncRun

# r = test.delay(1,2)
# r2 = test.delay(1,2)

r = asyncRun.delay("13809213237")
print(r.get())
8 changes: 8 additions & 0 deletions celery_server/README.MD
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
# Celery 异步服务器后端模块

## 部署
```shell
pip install celery gevent -i https://pypi.doubanio.com/simple/
celery -A celery_server worker -l info --pool=eventlet
```
需要在 celery 5.0 中才能使用 async
Empty file added celery_server/__init__.py
Empty file.
17 changes: 17 additions & 0 deletions celery_server/celery.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
from celery import Celery
from celery.utils.log import get_task_logger

app = Celery(
'celery_server',
include=[
'celery_server.tasks'
]
)
app.config_from_object(
'celery_server.config',
)

logger = get_task_logger(__name__)

if __name__ == '__main__':
app.start()
11 changes: 11 additions & 0 deletions celery_server/config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
#broker(消息中间件来接收和发送任务消息)
BROKER_URL = 'redis://localhost:6379/1'
#backend(存储worker执行的结果)
CELERY_RESULT_BACKEND = 'redis://localhost:6379/2'

#设置时间参照,不设置默认使用的UTC时间
CELERY_TIMEZONE = 'Asia/Shanghai'
#指定任务的序列化
CELERY_TASK_SERIALIZER='json'
#指定执行结果的序列化
CELERY_RESULT_SERIALIZER='json'
87 changes: 87 additions & 0 deletions celery_server/utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
# encoding=utf8
# 请求的方法
from smsboom import load_getapi, load_json
from utils.log import logger
from utils.models import API
from utils import default_header
import httpx
from httpx import Limits
from typing import Union, List
import asyncio

import sys
sys.path.append("E:\coding\SMSBoom")


def reqAPI(api: API, client: httpx.AsyncClient):
if isinstance(api.data, dict):
resp = client.request(method=api.method, json=api.data,
headers=api.header, url=api.url, timeout=10)
else:
resp = client.request(method=api.method, data=api.data,
headers=api.header, url=api.url, timeout=10)
return resp


async def asyncReqs(src: Union[API, str], phone: Union[tuple, str], semaphore):
"""异步请求方法
:param:
:return:
"""
# 多手机号支持
if isinstance(phone, tuple):
phone_lst = [_ for _ in phone]
else:
phone_lst = [phone]
async with semaphore:
async with httpx.AsyncClient(
limits=Limits(max_connections=1000,
max_keepalive_connections=2000),
headers=default_header,
verify=False,
timeout=99999
) as c:

for ph in phone_lst:
try:
if isinstance(src, API):
src = src.handle_API(ph)
r = await reqAPI(src, c)
else:
# 利用元组传参安全因为元组不可修改
s = (src.replace(" ", "").replace("\n", "").replace("\t", "").replace(
"&", "").replace('\n', '').replace('\r', ''),)
r = await c.get(*s)
return r
except httpx.HTTPError as why:
# logger.error(f"异步请求失败{type(why)}")
pass
except TypeError:
# logger.error("类型错误")
pass
except Exception as wy:
# logger.exception(f"异步失败{wy}")
pass

def callback(result):
"""异步回调函数"""
log = result.result()
if log is not None:
# logger.info(f"请求结果:{log.text[:30]}")
print(log.text[:30])
pass


async def runAsync(apis: List[Union[API, str]], phone: Union[tuple, str]):

tasks = []

for api in apis:
semaphore = asyncio.Semaphore(999999)
task = asyncio.create_task(asyncReqs(api, phone, semaphore))
task.add_done_callback(callback)
tasks.append(task)

await asyncio.gather(
*tasks
)

0 comments on commit a8af96c

Please sign in to comment.