pip install function_scheduling_distributed_framework --upgrade
分布式函数调度框架,支持5种并发模式,20种消息中间件,20种任务控制功能。
threading (使用的是可变线程池,可以智能自动缩小和扩大线程数量)
gevent
eventlet
asyncio (框架可以直接支持async 定义的携程函数作为任务,celery不支持)
single_thread
除此之外支持 多进程 叠加 以上5种并发,多进程和以上细粒度并发是叠加的不是平行的。
框架支持 rabbitmq redis python自带的queue.Queue sqlite sqlachemy kafka pulsar mongodb 等作为消息中间件。
同时此框架也支持操作 kombu 库作为中间件,所以此框架能够支持的中间件类型只会比celery更多。
python通用分布式函数调度框架。适用场景范围广泛, 框架非常适合io密集型(框架支持对函数自动使用 thread gevent eventlet asyncio 并发) 框架非常适合cpu密集型(框架能够在线程 协程基础上 叠加 多进程 multi_process 并发 ,不仅能够多进程执行任务还能多机器执行任务)。 不管是函数需要消耗时io还是消耗cpu,用此框架都很合适,因为任务都是在中间件里面,可以自动分布式分发执行。 此框架是函数的辅助控制倍增器。 框架不适合的场景是 函数极其简单,例如函数只是一行简单的 print hello,函数只需要非常小的cpu和耗时,运行一次函数只消耗了几十hz或者几纳秒, 此时那就采用直接调用函数就好了,因为框架施加了很多控制功能,当框架的运行逻辑耗时耗cpu 远大于函数本身 时候,使用框架反而会使函数执行变慢。 (python框架从全局概念上影响程序的代码组织和运行,包和模块是局部的只影响1个代码文件的几行。) 可以一行代码分布式并发调度起一切任何老代码的旧函数和新项目的新函数,并提供数十种函数控制功能。 还是不懂框架能做什么是什么,就必须先去了解下celery rq。如果连celery rq类似这种的用途概念听都没听说, 那就不可能知道框架的概念和功能用途。
20种控制功能包括:
分布式: 支持数十种最负盛名的消息中间件.(除了常规mq,还包括用不同形式的如 数据库 磁盘文件 redis等来模拟消息队列) 并发: 支持threading gevent eventlet asyncio 四种并发模式 叠加 多进程。 多进程不是和前面四种模式平行的,是叠加的,例如可以是 多进程 + 协程,多进程 + 多线程。 控频限流: 例如十分精确的指定1秒钟运行30次函数(无论函数需要随机运行多久时间,都能精确控制到指定的消费频率; 分布式控频限流: 例如一个脚本反复启动多次或者多台机器多个容器在运行,如果要严格控制总的qps,能够支持分布式控频限流。 任务持久化: 消息队列中间件天然支持 断点接续运行: 无惧反复重启代码,造成任务丢失。消息队列的持久化 + 消费确认机制 做到不丢失一个消息 定时: 可以按时间间隔、按指定时间执行一次、按指定时间执行多次,使用的是apscheduler包的方式。 延时任务: 例如规定任务发布后,延迟60秒执行,或者规定18点执行。这个概念和定时任务有一些不同。 指定时间不运行: 例如,有些任务你不想在白天运行,可以只在晚上的时间段运行 消费确认: 这是最为重要的一项功能之一,有了这才能肆无忌惮的任性反复重启代码也不会丢失一个任务。 (常规的手写 redis.lpush + redis.blpop,然后并发的运行取出来的消息,随意关闭重启代码瞬间会丢失大量任务, 那种有限的 断点接续 完全不可靠,根本不敢随意重启代码) 立即重试指定次数: 当函数运行出错,会立即重试指定的次数,达到最大次重试数后就确认消费了 重新入队: 在消费函数内部主动抛出一个特定类型的异常ExceptionForRequeue后,消息重新返回消息队列 超时杀死: 例如在函数运行时间超过10秒时候,将此运行中的函数kill 计算消费次数速度: 实时计算单个进程1分钟的消费次数,在日志中显示;当开启函数状态持久化后可在web页面查看消费次数 预估消费时间: 根据前1分钟的消费次数,按照队列剩余的消息数量来估算剩余的所需时间 函数运行日志记录: 使用自己设计开发的 控制台五彩日志(根据日志严重级别显示成五种颜色;使用了可跳转点击日志模板) + 多进程安全切片的文件日志 + 可选的kafka elastic日志 任务过滤: 例如求和的add函数,已经计算了1 + 2,再次发布1 + 2的任务到消息中间件,可以让框架跳过执行此任务。 任务过滤的原理是使用的是函数入参判断是否是已近执行过来进行过滤。 任务过滤有效期缓存: 例如查询深圳明天的天气,可以设置任务过滤缓存30分钟,30分钟内查询过深圳的天气,则不再查询。 30分钟以外无论是否查询过深圳明天的天气,则执行查询。 任务过期丢弃: 例如消息是15秒之前发布的,可以让框架丢弃此消息不执行,防止消息堆积, 在消息可靠性要求不高但实时性要求高的高并发互联网接口中使用 函数状态和结果持久化: 可以分别选择函数状态和函数结果持久化到mongodb,使用的是短时间内的离散mongo任务自动聚合成批量 任务后批量插入,尽可能的减少了插入次数 消费状态实时可视化: 在页面上按时间倒序实时刷新函数消费状态,包括是否成功 出错的异常类型和异常提示 重试运行次数 执行函数的机器名字+进程id+python脚本名字 函数入参 函数结果 函数运行消耗时间等 消费次数和速度生成统计表可视化: 生成echarts统计图,主要是统计最近60秒每秒的消费次数、最近60分钟每分钟的消费次数 最近24小时每小时的消费次数、最近10天每天的消费次数 rpc: 生产端(或叫发布端)获取消费结果。各个发布端对消费结果进行不同步骤的后续处理更灵活,而不是让消费端对消息的处理一干到底。
关于稳定性和性能,一句话概括就是直面百万c端用户(包括app和小程序), 已经连续超过三个季度稳定高效运行无事故,从没有出现过假死、崩溃、内存泄漏等问题。 windows和linux行为100%一致,不会像celery一样,相同代码前提下,很多功能在win上不能运行或出错。
以下这只是简单求和例子,实际情况换成任意函数里面写任意逻辑,框架可没有规定只能用于 求和函数 的自动调度并发。
而是根据实际情况函数的参数个数、函数的内部逻辑功能,全部都由用户自定义,函数里面想写什么就写什么,想干什么就干什么,极端自由。
也就是框架很容易学和使用,把下面的task_fun函数的入参和内部逻辑换成你自己想写的函数功能就可以了,框架只需要学习task_deco这一个函数的参数就行。
有一点要说明的是框架的消息中间件的ip 端口 密码 等配置是在你第一次运行代码时候,在你当前项目的根目录下生成的 distributed_frame_config.py 按需设置。
import time
from function_scheduling_distributed_framework import task_deco, BrokerEnum
@task_deco("task_queue_name1", qps=5, broker_kind=BrokerEnum.PERSISTQUEUE) # 入参包括20种,运行控制方式非常多,想得到的控制都会有。
def task_fun(x, y):
print(f'{x} + {y} = {x + y}')
time.sleep(3) # 框架会自动并发绕开这个阻塞,无论函数内部随机耗时多久都能自动调节并发达到每秒运行 5 次 这个 task_fun 函数的目的。
if __name__ == "__main__":
for i in range(100):
task_fun.push(i, y=i * 2) # 发布者发布任务
task_fun.consume() # 消费者启动循环调度并发消费任务
"""
对于消费函数,框架内部会生成发布者(生产者)和消费者。
1.推送。 task_fun.push(1,y=2) 会把 {"x":1,"y":2} (消息也自动包含一些其他辅助信息) 发送到中间件的 task_queue_name1 队列中。
2.消费。 task_fun.consume() 开始自动从中间件拉取消息,并发的调度运行函数,task_fun(**{"x":1,"y":2}),每秒运行5次
整个过程只有这两步,清晰明了,其他的控制方式需要看 task_deco 的中文入参解释,全都参数都很有用。
这个是单个脚本实现了发布和消费,一般都是分离成两个文件的,任务发布和任务消费无需在同一个进程的解释器内部,
因为是使用了中间件解耦消息和持久化消息,不要被例子误导成了,以为发布和消费必须放在同一个脚本里面
使用方式只需要这一个例子就行了,其他举得例子只是改了下broker_kind和其他参数而已,
而且装饰器的入参已近解释得非常详细了,框架浓缩到了一个装饰器,并没有用户需要从框架里面要继承什么组合什么的复杂写法。
"""
python比其他语言更需要分布式函数调度框架来执行函数,有两点原因
1 python有gil,
直接python xx.py启动没有包括multipricsessing的代码,在16核机器上,cpu最多只能达到100%,也就是最高使用率1/16,
别的语言直接启动代码最高cpu可以达到1600%。如果在python代码里面亲自写多进程将会十分麻烦,对代码需要改造需要很大
,多进程之间的通讯,多进程之间的任务共享、任务分配,将会需要耗费大量额外代码,
而分布式行函数调度框架天生使用中间件解耦的来存储任务,使得单进程的脚本和多进程在写法上
没有任何区别都不需要亲自导入multiprocessing包,也不需要手动分配任务给每个进程和搞进程间通信,
因为每个任务都是从中间件里面获取来的。
2 python性能很差,不光是gil问题,只要是动态语言无论是否有gil限制,都比静态语言慢很多。
那么就不光是需要跨进程执行任务了,例如跨pvm解释器启动脚本共享任务(即使是同一个机器,把python xx.py连续启动多次)、
跨docker容器、跨物理机共享任务。只有让python跑在更多进程的cpu核心 跑在更多的docker容器 跑在更多的物理机上,
python才能获得与其他语言只需要一台机器就实现的执行速度。分布式函数调度框架来驱动函数执行针对这些不同的场景,
用户代码不需要做任何变化。
所以比其他语言来说,python是更需要分布式函数调度框架来执行任务。
把1.3的求和例子,通过修改task_deco装饰器额参数和sleep大小反复测试两数求和,
从而体会框架的分布式 并发 控频。
这是最简单的框架,只有@task_deco 1行代码需要学习。说的是这是最简单框架,这不是最简单的python包。
如果连只有一个重要函数的框架都学不会,那就学不会学习得了更复杂的其他框架了,大部分框架都很复杂比学习一个包难很多。
大部分框架,都要深入使用里面的很多个类,还需要继承组合一顿。