Welcome to function_scheduling_distributed_framework_docs’s documentation!

0. 此项目迁移到 funboost 框架

此项目迁移到新框架funboost地址:funboost框架地址链接

此项目改名为新的funboost,新旧框架的代码和功能完全一模一样,只是框架的名字做了变化。 以后停止更新此项目代码,使用funboost。

funboost框架取名说明:

funboost是function_scheduling_distributed_framework框架的新名字,把框架名字长度减小.
funboost名字是两个单词,fun是function指的是python函数,boost是加速的意思,合一起是加速函数并发运行.

两个框架的兼容性说明:

funboost 和 function_scheduling_distributed_framework 项目的代码一模一样,以后新代码只更新funboost项目。
from funboost import xx 和  from function_scheduling_distributed_framework import xx 是完全一模一样的.
boost是task_deco的别名,两个都可以使用。在消费函数上写@boost 和 @task_deco是一模一样的,两个都可以使用。
所以在有的文档或者截图中如果写 
from  function_scheduling_distributed_framework import task_deco , @task_deco
用户需要知道效果和from funboost import boost , @boost 是一模一样的。

此项目迁移到新框架funboost地址:funboost框架地址链接

1.分布式函数调度框架简介

1.0 github地址和文档地址

1.0.1 分布式函数调度框架文档地址

查看分布式函数调度框架文档

文档很长,但归根结底只需要学习 1.3 里面的这1个例子就行,主要是修改下@task_deco的各种参数,
通过不同的入参,实践测试下各种控制功能。

对比 celery 有20种改善,其中之一是无依赖文件夹层级和文件夹名字 文件名字。
首先能把  https://github.com/ydf0509/celery_demo
这个例子的已经写好的不规则目录层级和文件名字的函数用celery框架玩起来,才能说是了解celery,
否则如果项目文件夹层级和文件名字不规矩,后期再用celery,会把celery新手折磨得想死,
很多新手需要小心翼翼模仿网上说的项目目录结构,以为不按照那么规划目录和命名就玩不起来,本身说明celery很坑。

1.0.2 分布式函数调度框架github地址

查看分布式函数调度框架github项目

1.1 安装方式

pip install function_scheduling_distributed_framework –upgrade

1.2 框架功能介绍

分布式函数调度框架,支持5种并发模式,20+种消息中间件,20种任务控制功能。
用途概念就是常规经典的 生产者 + 消息队列中间件 + 消费者 编程思想。

有了这个框架,用户再也无需亲自手写操作进程、线程、协程的并发的代码了。

有了这个框架,用户再也无需亲自手写操作redis rabbitmq socket kafka 了。

https://z3.ax1x.com/2021/01/19/sgV2xP.pngsgV2xP.png

1.2.1 框架支持5种并发模式

threading (使用的是可变线程池,可以智能自动缩小和扩大线程数量)
gevent
eventlet
asyncio (框架可以直接支持async 定义的携程函数作为任务,celery不支持)
single_thread

除此之外,直接内置方便的支持 多进程multiprocess 叠加 以上5种并发,多进程和以上细粒度并发是叠加的而不是平行的二选一关系。


总结一下那就是此框架可以适应所有编程场景,无论是io密集 cpu密集 还是cpu io双密集场景,框架能非常简便的应对任意场景。
框架的 单线程  多线程  gevent eventlet  asyncio 多进程  这些并发模型,囊括了目前python界所有的并发方式。
框架能自动实现 单线程  ,多线程, gevent , eventlet ,asyncio ,多进程 并发 ,
多进程 + 单线程 ,多进程 + 多线程,多进程 + gevent,  多进程 + eventlet  ,多进程 + asyncio 的组合并发
这么多并发方式能够满足任意编程场景。

以下两种方式,都是10线程加python内存queue方式运行f函数,有了此框架,用户无需代码手写手动操作线程 协程 进程并发。 https://s1.ax1x.com/2021/12/10/o5lEEq.pngo5lEEq.png

1.2.2 框架支持20种中间件

框架支持 rabbitmq redis python自带的queue.Queue sqlite sqlachemy kafka pulsar mongodb 直接socket 等作为消息中间件。

同时此框架也支持操作 kombu 库作为中间件,所以此框架能够支持的中间件类型只会比celery更多。

框架支持的中间件种类大全和选型见文档3.1章节的介绍:

3.1 各种中间件选择的场景和优势

1.2.3 框架对任务支持20种控制功能。


python通用分布式函数调度框架。适用场景范围广泛, 框架非常适合io密集型(框架支持对函数自动使用 thread gevent eventlet asyncio 并发)
框架非常适合cpu密集型(框架能够在线程 协程基础上 叠加 多进程 multi_process 并发 ,不仅能够多进程执行任务还能多机器执行任务)。
不管是函数需要消耗时io还是消耗cpu,用此框架都很合适,因为任务都是在中间件里面,可以自动分布式分发执行。 此框架是函数的辅助控制倍增器。

框架不适合的场景是 函数极其简单,例如函数只是一行简单的 print hello,函数只需要非常小的cpu和耗时,运行一次函数只消耗了几十hz或者几纳秒,
此时那就采用直接调用函数就好了,因为框架施加了很多控制功能,当框架的运行逻辑耗时耗cpu 远大于函数本身 时候,使用框架反而会使函数执行变慢。

(python框架从全局概念上影响程序的代码组织和运行,包和模块是局部的只影响1个代码文件的几行。)

可以一行代码分布式并发调度起一切任何老代码的旧函数和新项目的新函数,并提供数十种函数控制功能。

还是不懂框架能做什么是什么,就必须先去了解下celery rq。如果连celery rq类似这种的用途概念听都没听说, 那就不可能知道框架的概念和功能用途。

20种控制功能包括:

     分布式:
        支持数十种最负盛名的消息中间件.(除了常规mq,还包括用不同形式的如 数据库 磁盘文件 redis等来模拟消息队列)

     并发:
        支持threading gevent eventlet asyncio 单线程 5种并发模式 叠加 多进程。
        多进程不是和前面四种模式平行的,是叠加的,例如可以是 多进程 + 协程,多进程 + 多线程。
     
     控频限流:
        例如十分精确的指定1秒钟运行30次函数或者0.02次函数(无论函数需要随机运行多久时间,都能精确控制到指定的消费频率;
       
     分布式控频限流:
        例如一个脚本反复启动多次或者多台机器多个容器在运行,如果要严格控制总的qps,能够支持分布式控频限流。
      
     任务持久化:
        消息队列中间件天然支持
     
     断点接续运行:
        无惧反复重启代码,造成任务丢失。消息队列的持久化 + 消费确认机制 做到不丢失一个消息
        (此框架很重视消息的万无一失,就是执行函数的机器支持在任何时候随时肆无忌惮反复粗暴拉电闸断电,或者强制硬关机,
        或者直接用锄头把执行函数代码的机器砸掉,只要不是暴力破坏安装了消息队列中间件的机器就行,消息就万无一失,
        现在很多人做的简单redis list消息队列,以为就叫做分布式断点接续,那是不正确的,因为这种如果把消息从reidis brpop取出来后,
        如果消息正在被执行,粗暴的kill -9脚本或者直接强制关机,那么正在运行的消息就丢失了,如果是多线程同时并发运行很多消息,粗暴重启
        会丢失几百个大量消息,这种简单的redis list根本就不能叫做安全的断点续传。
        分布式函数调度框架的消费确认机制,保证函数运行完了才确认消费,正在运行突然强制关闭进程不会丢失一个消息,
        下次启动还会消费或者被别的机器消费。
        此框架的消息万无一失特性,不仅支持rabbbitmq因为原生支持,也支持redis,框架对redis的实现机制是因为客户端加了一层保障)。
     
     定时:
        可以按时间间隔、按指定时间执行一次、按指定时间执行多次,使用的是apscheduler包的方式。
     
     延时任务:
         例如规定任务发布后,延迟60秒执行,或者规定18点执行。这个概念和定时任务有一些不同。
              
     指定时间不运行:
        例如,有些任务你不想在白天运行,可以只在晚上的时间段运行
     
     消费确认:
        这是最为重要的一项功能之一,有了这才能肆无忌惮的任性反复重启代码也不会丢失一个任务。
        (常规的手写 redis.lpush + redis.blpop,然后并发的运行取出来的消息,随意关闭重启代码瞬间会丢失大量任务,
        那种有限的 断点接续 完全不可靠,根本不敢随意重启代码)
     
     立即重试指定次数:
        当函数运行出错,会立即重试指定的次数,达到最大次重试数后就确认消费了
     
     重新入队:
        在消费函数内部主动抛出一个特定类型的异常ExceptionForRequeue后,消息重新返回消息队列
     
     超时杀死:
        例如在函数运行时间超过10秒时候,将此运行中的函数kill
     
     计算消费次数速度:
        实时计算单个进程1分钟的消费次数,在日志中显示;当开启函数状态持久化后可在web页面查看消费次数
     
     预估消费时间:
        根据前1分钟的消费次数,按照队列剩余的消息数量来估算剩余的所需时间
     
     函数运行日志记录:
        使用自己设计开发的 控制台五彩日志(根据日志严重级别显示成五种颜色;使用了可跳转点击日志模板)
        + 多进程安全切片的文件日志 + 可选的kafka elastic日志
                   
     任务过滤:
        例如求和的add函数,已经计算了1 + 2,再次发布1 + 2的任务到消息中间件,可以让框架跳过执行此任务。
        任务过滤的原理是使用的是函数入参判断是否是已近执行过来进行过滤。
     
     任务过滤有效期缓存:
        例如查询深圳明天的天气,可以设置任务过滤缓存30分钟,30分钟内查询过深圳的天气,则不再查询。
        30分钟以外无论是否查询过深圳明天的天气,则执行查询。
        
     任务过期丢弃:
        例如消息是15秒之前发布的,可以让框架丢弃此消息不执行,防止消息堆积,
        在消息可靠性要求不高但实时性要求高的高并发互联网接口中使用
                
     函数状态和结果持久化:
        可以分别选择函数状态和函数结果持久化到mongodb,使用的是短时间内的离散mongo任务自动聚合成批量
        任务后批量插入,尽可能的减少了插入次数
                      
     消费状态实时可视化:
        在页面上按时间倒序实时刷新函数消费状态,包括是否成功 出错的异常类型和异常提示 
        重试运行次数 执行函数的机器名字+进程id+python脚本名字 函数入参 函数结果 函数运行消耗时间等
                     
     消费次数和速度生成统计表可视化:
        生成echarts统计图,主要是统计最近60秒每秒的消费次数、最近60分钟每分钟的消费次数
        最近24小时每小时的消费次数、最近10天每天的消费次数
                                
     rpc:
        生产端(或叫发布端)获取消费结果。各个发布端对消费结果进行不同步骤的后续处理更灵活,而不是让消费端对消息的处理一干到底。

     远程服务器部署消费函数:
        代码里面 task_fun.fabric_deploy('192.168.6.133', 22, 'xiaomin', '123456', process_num=2) 只需要这样就可以自动将函数部署在远程机器运行,
        无需任何额外操作,不需要借助阿里云codepipeline发版工具 和 任何运维发版管理工具,就能轻松将函数运行在多台远程机器。task_fun指的是被@task_deco装饰的函数

关于稳定性和性能,一句话概括就是直面百万c端用户(包括app和小程序), 已经连续超过三个季度稳定高效运行无事故,从没有出现过假死、崩溃、内存泄漏等问题。 windows和linux行为100%一致,不会像celery一样,相同代码前提下,很多功能在win上不能运行或出错。

1.3 框架使用例子

以下这只是简单求和例子,实际情况换成任意函数里面写任意逻辑,框架可没有规定只能用于 求和函数 的自动调度并发。
而是根据实际情况函数的参数个数、函数的内部逻辑功能,全部都由用户自定义,函数里面想写什么就写什么,想干什么就干什么,极端自由。
也就是框架很容易学和使用,把下面的task_fun函数的入参和内部逻辑换成你自己想写的函数功能就可以了,框架只需要学习task_deco这一个函数的参数就行。
测试使用的时候函数里面加上sleep模拟阻塞,从而更好的了解框架的并发和各种控制功能。

有一点要说明的是框架的消息中间件的ip 端口 密码 等配置是在你第一次运行代码时候,在你当前项目的根目录下生成的 distributed_frame_config.py 按需设置。
import time
from function_scheduling_distributed_framework import task_deco, BrokerEnum


@task_deco("task_queue_name1", qps=5, broker_kind=BrokerEnum.PERSISTQUEUE)  # 入参包括20种,运行控制方式非常多,想得到的控制都会有。
def task_fun(x, y):
    print(f'{x} + {y} = {x + y}')
    time.sleep(3)  # 框架会自动并发绕开这个阻塞,无论函数内部随机耗时多久都能自动调节并发达到每秒运行 5 次 这个 task_fun 函数的目的。


if __name__ == "__main__":
    for i in range(100):
        task_fun.push(i, y=i * 2)  # 发布者发布任务
    task_fun.consume()  # 消费者启动循环调度并发消费任务
"""
对于消费函数,框架内部会生成发布者(生产者)和消费者。
1.推送。 task_fun.push(1,y=2) 会把 {"x":1,"y":2} (消息也自动包含一些其他辅助信息) 发送到中间件的 task_queue_name1 队列中。
2.消费。 task_fun.consume() 开始自动从中间件拉取消息,并发的调度运行函数,task_fun(**{"x":1,"y":2}),每秒运行5次
整个过程只有这两步,清晰明了,其他的控制方式需要看 task_deco 的中文入参解释,全都参数都很有用。


这个是单个脚本实现了发布和消费,一般都是分离成两个文件的,任务发布和任务消费无需在同一个进程的解释器内部,
因为是使用了中间件解耦消息和持久化消息,不要被例子误导成了,以为发布和消费必须放在同一个脚本里面


使用方式只需要这一个例子就行了,其他举得例子只是改了下broker_kind和其他参数而已,
而且装饰器的入参已近解释得非常详细了,框架浓缩到了一个装饰器,并没有用户需要从框架里面要继承什么组合什么的复杂写法。
"""

1.4 python分布式函数执行为什么重要?

python比其他语言更需要分布式函数调度框架来执行函数,有两点原因

1 python有gil,
  直接python xx.py启动没有包括multipricsessing的代码,在16核机器上,cpu最多只能达到100%,也就是最高使用率1/16,
  别的语言直接启动代码最高cpu可以达到1600%。如果在python代码里面亲自写多进程将会十分麻烦,对代码需要改造需要很大
  ,多进程之间的通讯,多进程之间的任务共享、任务分配,将会需要耗费大量额外代码,
  而分布式行函数调度框架天生使用中间件解耦的来存储任务,使得单进程的脚本和多进程在写法上
  没有任何区别都不需要亲自导入multiprocessing包,也不需要手动分配任务给每个进程和搞进程间通信,
  因为每个任务都是从中间件里面获取来的。
  
2 python性能很差,不光是gil问题,只要是动态语言无论是否有gil限制,都比静态语言慢很多。
 那么就不光是需要跨进程执行任务了,例如跨pvm解释器启动脚本共享任务(即使是同一个机器,把python xx.py连续启动多次)、
 跨docker容器、跨物理机共享任务。只有让python跑在更多进程的cpu核心 跑在更多的docker容器 跑在更多的物理机上,
 python才能获得与其他语言只需要一台机器就实现的执行速度。分布式函数调度框架来驱动函数执行针对这些不同的场景,
 用户代码不需要做任何变化。
 
所以比其他语言来说,python是更需要分布式函数调度框架来执行任务。
  

1.5 框架学习方式

把1.3的求和例子,通过修改task_deco装饰器额参数和sleep大小反复测试两数求和,
从而体会框架的分布式 并发 控频。

这是最简单的框架,只有@task_deco 1行代码需要学习。说的是这是最简单框架,这不是最简单的python包。
如果连只有一个重要函数的框架都学不会,那就学不会学习得了更复杂的其他框架了,大部分框架都很复杂比学习一个包难很多。
大部分框架,都要深入使用里面的很多个类,还需要继承组合一顿。

https://visitor-badge.glitch.me/badge?page_id=distributed_framework

2. 对比celery框架

是骡子是马必须拿出来溜溜。

此章节对比celery和分布式函数调度框架,是采用最严格的控制变量法精准对比。
例如保持 中间件一致  控制参数一致 并发类型一致 并发数量一致等等,变化的永远只有采用什么框架。 

2.0,在比较之前,说明一下和celery的关系?

此框架和celery没有关系,没有收到celery启发,也不可能找出与celery连续3行一模一样的代码。
这个是从原来项目代码里面大量重复while 1:redis.blpop()  发散扩展的。

这个和celery唯一有相同点是,都是生产者 消费者 + 消息队列中间件的模式,这种生产消费的编程思想或者叫想法不是celery的专利。
包括我们现在java框架实时处理数据的,其实也就是生产者 消费者加kfaka中间件封装的,难道java人员也是需要模仿python celery源码吗。
任何人都有资格开发封装生产者消费者模式的框架,生产者 消费者模式不是celery专利。生产消费模式很容易想到,不是什么高深的架构思想,不需要受到celery的启发才能开发。

2.1 celery对目录层级文件名称格式要求很高

celery对目录层级文件名称格式要求太高,只适合规划新的项目,对不规则文件夹套用难度高。

所以新手使用celery很仔细的建立文件夹名字、文件夹层级、python文件名字

所以网上的celery博客教程虽然很多,但是并不能学会使用,因为要运行起来需要以下6个方面都掌握好,博客文字很难表达清楚或者没有写全面以下6个方面。 celery消费任务不执行或者报错NotRegistered,与很多方面有关系,如果要别人排错,至少要发以下6方面的截图,

1) 整个项目目录结构,celery的目录结构和任务函数位置,有很大影响
   
2) @task入参 ,用户有没有主动设置装饰器的入参 name,设置了和没设置有很大不同,建议主动设置这个名字对函数名字和所处位置依赖减小
   
3) celery的配置,task_queues(在3.xx叫 CELERY_QUEUES )和task_routes (在3.xx叫 task_routes)

4) celery的配置 include (在3.xx叫 CELERY_INCLUDE)或者 imports (3.xx CELERY_IMPORTS)  或者 app.autodiscover_tasks的入参

5) cmd命令行启动参数 --queues=  的值
   
6) 用户在启动cmd命令行时候,用户所在的文件夹。
   (如果不精通这个demo的,使用cmd命令行启动时候,用户必须cd切换到当前python项目的根目录,
   如果精通主动自己设置PYTHONPATH和精通此demo,可以在任何目录下启动celery命令行或者不使用celery命令行而是调用app.work_main()启动消费

在不规范的文件夹路径下,使用celery难度很高,一般教程都没教。 项目文件夹目录格式不规范下的celery使用演示

分布式函数调度框架天生没有这些方面的问题,因为此框架实现分布式消费的写法简单很多。

如你所见,使用此框架为什么没有配置中间件的 账号 密码 端口号呢。只有运行任何一个导入了框架的脚本文件一次,就会自动生成一个配置文件
然后在配置文件中按需修改需要用到的配置就行。

@task_deco 和celery的 @app.task 装饰器区别很大,导致写代码方便简化容易很多。没有需要先实例化一个 Celery对象一般叫app变量,
然后任何脚本的消费函数都再需要导入这个app,然后@app.task,一点小区别,但造成的两种框架写法难易程度区别很大。
使用此框架,不需要固定的项目文件夹目录,任意多层级深层级文件夹不规则python文件名字下写函数都行,
celery 实际也可以不规则文件夹和文件名字来写任务函数,但是很难掌握,如果这么写的话,那么在任务注册时候会非常难,
一般demo演示文档都不会和你演示这种不规则文件夹和文件名字下写celery消费函数情况,因为如果演示这种情况会非常容易的劝退绝大部分小白。
但是如果不精通celery的任务注册导入机制同时又没严格按照死板固定的目录格式来写celery任务,
一定会出现令人头疼的 Task of kind 'tasks.add' is not registered, please make sure it's imported. 类似这种错误。
主要原因是celery 需要严格Celery类的实例化对象app变量,然后消费函数所在脚本必须import这个app,这还没完,
你必须在settings配置文件写 include imports 等配置,否则cmd 启动celery 后台命令时候,celery并不知情哪些文件脚本导入了 app这个变量,
当celery框架取出到相关的队列任务时候,就会报错找不到应该用哪个脚本下的哪个函数去运行取出的消息了。
你可能会想,为什么celery app 变量的脚本为什么不可以写导入消费函数的import声明呢,比如from dir1.dir2.pyfilename imprt add 了,
这样celery运行时候就能找到函数了是不是?那要动脑子想想,如果celery app主文件用了 from dir1.dir2.pyfilename import add,
同时消费函数 add 所在的脚本 dir1/dir2/pyfilename.py 又从celery app的猪脚本中导入app,然后把@app.task加到add函数上面 ,
那这就是出现了互相导入,a导入b,b导入a的问题了,脚本一启动就报错,正是因为这个互相导入的问题,
celery才需要从配置中写好 include imports  autodiscover_tasks,从而实现一方延迟导入以解决互相导入。

此框架的装饰器不存在需要一个类似Celery app实例的东西,不会有这个变量将大大减少编程难度,消费函数写在任意深层级不规则文件下都行。

例如董伟明的 celery 教程例子的项目目录结构,然后很多练习者需要小心翼翼模仿文件夹层级和py文件名字。

_images/img_4.pngimg_4.png

可以看代码,当文件夹层级不规则和文件名称不规则时候,要使用celery绝非简单事情,如果你只看普通的celery入门文档,是绝对解决不了
这种情况下的celery如何正确使用。

_images/img.pngimg.png

2.2 性能远远超过celery

任意并发模式,任意中间件类型,发布和消费性能远远超过celery

测试分布式函数调度框架和celery的性能对比

_images/img_2.pngimg_2.png

2.3 celery的重要方法全部无法ide自动补全提示

函数调度框架为了代码在ide能自动补全做了额外优化,celery全部重要公有方法无法补全提示.

1、配置文件方式的代码补全,此框架使用固定的项目根目录下的 distributed_frame_config.py 补全,
   不会造成不知道有哪些配置项可以配置,celery的配置项有100多个,用户不知道能配置什么。
   
2、启动方式补全,celery采用celery -A celeryproj work + 一大串cmd命令行,很容易打错字母,或者不知道
   celery命令行可以接哪些参数。次框架使用 fun.consume()/fun.multi_process_consume()启动消费,
   运行直接 python xx.py方式启动
   
3、发布参数补全,对于简单的只发布函数入参,celery使用delay发布,此框架使用push发布,一般delay5个字母不会敲错。
   对于除了需要发布函数入参还要发布函数任务控制配置的发布,此框架使用publish不仅可以补全函数名本身还能补全函数入参。
   celery使用 add.apply_async 发布,不仅apply_async函数名本身无法补全,最主要是apply_async入参达到20种,不能补全
   的话造成完全无法知道发布任务时候可以传哪些任务控制配置,无法补全时候容易敲错入参字母,导致配置没生效。
   举个其他包的例子是例如 requests.get 函数,由于无法补全如果用户把headers写成header或者haeders,函数不能报错导致请求头设置无效。
   此框架的发布publish方法不仅函数名本身可以补全,发布任务控制的配置也都可以补全。
   

4、消费任务函数装饰器代码补全,celery使用@app.task,源码入参是 def task(self, *args, **opts),那么args和opts到底能传什么参数,
  从方法本身的注释来看无法找到,即使跳转到源码去也没有说明,task能传什么参数,实际上可以传递大约20种参数,主要是任务控制参数。
  此框架的@task_deco装饰器的 20个函数入参和入参类型全部可以自动补全提示,以及入参意义注释使用ctrl + shift + i 快捷键可以看得很清楚。
  
5、此框架能够在pycharm下自动补全的原因主要是适当的做了一些调整,以及主要的面向用户的公有方法宁愿重复声明入参,也不使用*args **kwargs这种。

  举个例子说明是 @task_deco这个装饰器(这里假设装饰到fun函数上),
  此装饰器的入参和get_consumer工厂函数一模一样,但是为了补全方便没有采用*args **kwargs来达到精简源码的目的,
  因为这个装饰器是真个框架最最最重要的,所以这个是重复吧所有入参都声明了一遍。
  
  对于被装饰的消费函数,此装饰器会自动动态的添加很多方法和属性,附着到被装饰的任务函数上面。
  这些动态运行时添加到 fun函数的,pycharm本来是无法自动补全提示的,但框架对_deco内部函数返回值添加了一个返回类型声明 -> IdeAutoCompleteHelper,
  IdeAutoCompleteHelper这个对象具有的方法方法和属性都使用# type: 语法注释了类型,而且所有属性和方法刚好是附着到fun上面的方法和属性,
  所以类似fun.clear fun.publish fun.consume  fun.multi_process_conusme 这些方法名本身和他的入参都能够很好的自动补全。
  
6、自动补全为什么重要?对于入参丰富动不动高达20种入参,且会被频繁使用的重要函数,如果不能自动补全,用户无法知道有哪些方法名 方法能传什么参数 或
  者敲了错误的方法名和入参。如果自动补全不重要,那为什么不用vim和txt写python代码,说不重要的人,那以后就别用pycharm vscode这些ide写代码。
  
  celery的复杂难用,主要是第一个要求的目录文件夹格式严格,对于新手文件夹层级 名字很严格,必须小心翼翼模仿。
  第二个是列举的1 2 3 4这4个关键节点的代码补全,分别是配置文件可以指定哪些参数、命令行启动方式不知道可以传哪些参数、apply_async可以传哪些参数、
  @app.task的入参代码补全,最重要的则4个流程节点的代码全都无法补全,虽然是框架很强大但是也很难用。
  

2.4 比celery强的方面的优势大全

 0)celer4 以后官方放弃对windwos的支持和测试,例如celery的默认多进程模式在windwos启动瞬间就会报错,
    虽然生产一般是linux,但开发机器一般是windwos。
 1) 如5.4所写,新增了python内置 queue队列和 基于本机的持久化消息队列。不需要安装中间件,即可使用。
     只要是celery能支持的中间件,这个全部能支持。因为此框架的 BrokerEnum.KOMBU 中间件模式一次性
     支持了celery所能支持的所有中间件。但celery不支持kafka、nsq、mqtt、zeromq、rocketmq等。
 2) 任意中间件和并发模式,发布和消费性能比celery框架远远的大幅度提高。
 4) 全部公有方法或函数都能在pycharm智能能提示补全参数名称和参数类型。
    一切为了调用时候方便而不是为了实现时候简略,例如get_consumer函数和AbstractConsumer的入参完全重复了,
    本来实现的时候可以使用*args **kwargs来省略入参,
    但这样会造成ide不能补全提示,此框架一切写法只为给调用者带来使用上的方便。不学celery让用户不知道传什么参数。
    如果拼错了参数,pycharm会显红,大大降低了用户调用出错概率。过多的元编程过于动态,不仅会降低性能,还会让ide无法补全提示,动态一时爽,重构火葬场不是没原因的。
 5)不使用命令行启动,在cmd打那么长的一串命令,容易打错字母。并且让用户不知道如何正确的使用celery命令,不友好。
    此框架是直接python xx.py 就启动了。
 6)框架不依赖任何固定的目录结构,无结构100%自由,想把使用框架写在哪里就写在哪里,写在10层级的深层文件夹下都可以。
    脚本可以四处移动改名。celery想要做到这样,要做额外的处理。
 7)使用此框架比celery更简单10倍,如例子所示。使用此框架代码绝对比使用celery少几十行。
 8)消息中间件里面存放的消息任务很小,简单任务 比celery的消息小了50倍。 消息中间件存放的只是函数的参数,
    辅助参数由consumer自己控制。消息越小,中间件性能压力越小。
 9)由于消息中间件里面没有存放其他与python 和项目配置有关的信息,这是真正的跨语言的函数调度框架。
    java人员也可以直接使用java的redis类rabbitmq类,发送json参数到中间件,由python消费。
    celery里面的那种参数,高达几十项,和项目配置混合了,java人员绝对拼凑不出来这种格式的消息结构。
 10)celery有应该中心化的celery app实例,函数注册成任务,添加装饰器时候先要导入app,然后@app.task,
    同时celery启动app时候,调度函数就需要知道函数在哪里,所以celery app所在的py文件也是需要导入消费函数的,否则会
    celery.exceptions.NotRegistered报错
    这样以来就发生了务必蛋疼的互相导入的情况,a要导入b,b要导入a,这问题太令人窘迫了,通常解决这种情况是让其中一个模块后导入,
    这样就能解决互相导入的问题了。celery的做法是,使用imports一个列表,列表的每一项是消费函数所在的模块的字符串表示形式,
    例如 如果消费函数f1在项目的a文件夹下的b文件夹下的c.py中,消费函数与f2在项目的d文件夹的e.py文件中,
    为了解决互相导入问题,celery app中需要配置 imports = ["a.b.c",'d.e'],这种import在pycharm下容易打错字,
    例如scrapy和django的中间件注册方式,也是使用的这种类似的字符串表示导入路径,每添加一个函数,只要不在之前的模块中,就要这么写,
     不然不写improt的话,那是调度不了消费函数的。此框架原先没有装饰器方式,来加的装饰器方式与celery的用法大不相同,
    因为没有一个叫做app类似概念的东西,不需要相互导入,启动也是任意文件夹下的任意脚本都可以,自然不需要写什么imports = ['a.b.c']
 11)简单利于团队推广,不需要看复杂的celry 那样的5000页英文文档,因为函数调度框架只需要学习@task_deco一个装饰器,只有一行代码。
 对于不规则文件夹项目的clery使用时如何的麻烦,可以参考 celery_demo项目 https://github.com/ydf0509/celery_demo。
  12)此框架原生支持 asyncio 原始函数,不用用户额外处理 asyncio loop相关麻烦的问题。celery不支持async定义的函数,celery不能把@app.task
     加到一个async def 的函数上面。
  13) 这是最重要的,光使用简单还不够,性能是非常重要的指标。
     此框架消息发布性能和消息消费性能远远超过celery数十倍。为此专门开了一个对比项目,发布和消费10万任务,
     对分布式函数调度框架和celery进行严格的控制变量法来benchmark,分别测试两个框架的发布和消费性能。 
     对比项目在此,可以直接拉取并分别运行两个项目的发布和消费一共4个脚本。
     https://github.com/ydf0509/distrubuted_framework_vs_celery_benchmark
  14) 此框架比celery对函数的辅助运行控制方式更多,支持celery的所有如 并发 控频 超时杀死 重试 消息过期
      确认消费 等一切所有功能,同时包括了celery没有支持的功能,例如原生对函数入参的任务过滤等。
  15) celery不支持分布式全局控频,celery的rate_limit 基于单work控频,如果把脚本在同一台机器启动好几次,
      或者在多个容器里面启动消费,那么总的qps会乘倍数增长。此框架能支持单个消费者控频,同时也支持分布式全局控频。
  16) 此框架比celery更简单开启 多进程 + 线程或协程。celery的多进程和多线程是互斥的并发模式,此框架是叠加的。
      很多任务都是需要 多进程并发利用多核 + 细粒度的线程/协程绕过io 叠加并发 ,才能使运行速度更快。
  17) 此框架精确控频率精确度达到99.9%,celery控频相当不准确,最多到达60%左右,两框架同样是做简单的加法然后sleep0.7秒,都设置500并发100qps。
      测试对比代码见qps测试章节,欢迎亲自测试证明。
   18) celery不支持分布式全局控频,只支持当前解释器的控频。
   19) 日志的颜色和格式,远超celery。此框架的日志使用nb_log,日志在windwos远超celery,在linux也超过celery很多。

3.框架详细介绍

3.1 各种中间件选择的场景和优势

RABBITMQ_AMQPSTORM = 0  # 使用 amqpstorm 包操作rabbitmq  作为 分布式消息队列,支持消费确认.推荐这个。

RABBITMQ_RABBITPY = 1  # 使用 rabbitpy 包操作rabbitmq  作为 分布式消息队列,支持消费确认。

REDIS = 2  # 使用 redis 的 list结构,brpop 作为分布式消息队列。随意重启和关闭会丢失大量消息任务,不支持消费确认。

LOCAL_PYTHON_QUEUE = 3  # 使用python queue.Queue实现的基于当前python进程的消息队列,不支持跨进程 跨脚本 跨机器共享任务,不支持持久化,适合一次性短期简单任务。

RABBITMQ_PIKA = 4  # 使用pika包操作rabbitmq  作为 分布式消息队列。

MONGOMQ = 5  # 使用mongo的表中的行模拟的 作为分布式消息队列,支持消费确认。

PERSISTQUEUE = 6  # 使用基于sqlute3模拟消息队列,支持消费确认和持久化,但不支持跨机器共享任务,可以基于本机单机跨脚本和跨进程共享任务,好处是不需要安装中间件。

NSQ = 7  # 基于nsq作为分布式消息队列,支持消费确认。

KAFKA = 8  # 基于kafka作为分布式消息队列,建议使用BrokerEnum.CONFLUENT_KAFKA。

REDIS_ACK_ABLE = 9  # 基于redis的 list + 临时unack的set队列,采用了 lua脚本操持了取任务和加到pengding为原子性,随意重启和掉线不会丢失任务。

SQLACHEMY = 10  # 基于SQLACHEMY 的连接作为分布式消息队列中间件支持持久化和消费确认。支持mysql oracle sqlserver等5种数据库。

ROCKETMQ = 11  # 基于 rocketmq 作为分布式消息队列,这个中间件必须在linux下运行,win不支持。

REDIS_STREAM = 12  # 基于redis 5.0 版本以后,使用 stream 数据结构作为分布式消息队列,支持消费确认和持久化和分组消费,是redis官方推荐的消息队列形式,比list结构更适合。

ZEROMQ = 13  # 基于zeromq作为分布式消息队列,不需要安装中间件,可以支持跨机器但不支持持久化。

RedisBrpopLpush = 14  # 基于redis的list结构但是采用brpoplpush 双队列形式,和 redis_ack_able的实现差不多,实现上采用了原生命令就不需要lua脚本来实现取出和加入unack了。

"""
操作 kombu 包,这个包也是celery的中间件依赖包,这个包可以操作10种中间件(例如rabbitmq redis),但没包括分布式函数调度框架的kafka nsq zeromq 等。
同时 kombu 包的性能非常差,可以用原生redis的lpush和kombu的publish测试发布,使用brpop 和 kombu 的 drain_events测试消费,对比差距相差了5到10倍。
由于性能差,除非是分布式函数调度框架没实现的中间件才选kombu方式(例如kombu支持亚马逊队列  qpid pyro 队列),否则强烈建议使用此框架的操作中间件方式而不是使用kombu。
"""
KOMBU = 15

"""基于confluent-kafka包,包的性能比kafka-python提升10倍。同时应对反复随意重启部署消费代码的场景,此消费者实现至少消费一次,第8种BrokerEnum.KAFKA是最多消费一次。"""
CONFLUENT_KAFKA = 16

""" 基于emq作为中间件的。这个和上面的中间件有很大不同,服务端不存储消息。所以不能先发布几十万个消息,然后再启动消费。mqtt优点是web前后端能交互,
前端不能操作redis rabbitmq kafka,但很方便操作mqtt。这种使用场景是高实时的互联网接口。
"""
MQTT = 17

HTTPSQS = 18  # 基于httpsqs的

PULSAR = 20   # 下一代分布式消息系统。5年后会同时取代rabbitmq和kafka。

UDP = 21  # 基于socket udp 实现的。小规模使用不支持持久化,好处是不用安装软件。

TCP = 22  # 基于socket tcp 实现的。小规模使用不支持持久化,好处是不用安装软件。

HTTP = 23 # 基于http实现的,小规模使用不支持持久化,好处是不用安装软件。

NATS = 24  # 高性能中间件nats,中间件服务端性能很好,。

TXT_FILE = 25 # 磁盘txt文件作为消息队列,支持单机持久化,不支持多机分布式
你项目根目录下自动生成的 distributed_frame_config.py 文件中修改配置,会被自动读取到。

此文件按需修改,例如你使用redis中间件作为消息队列,可以不用管rabbitmq mongodb kafka啥的配置。
但有3个功能例外,如果你需要使用rpc模式或者分布式控频或者任务过滤功能,无论设置使用何种消息队列中间件都需要把redis连接配置好,
如果@task_deco装饰器设置is_using_rpc_mode为True或者 is_using_distributed_frequency_control为True或do_task_filtering=True则需要把redis连接配置好,默认是False。

3.2 框架支持的函数调度并发模式种类详细介绍

1、threading 多线程,使用自定义的可缩小、节制开启新线程的自定义线程池,不是直接用官方内置concurrent.futures.ThreadpoolExecutor
   此线程池非常智能,配合qps参数,任何场景可以无脑开500线程,真正的做到智能扩张,智能自动缩小。
   这线程池是智能线程池,由于非常好用,为这个线程池做了独立的pypi包,可以单独用于没有使用此框架的项目。

2、gevent    需要在运行起点的脚本首行打 gevent 猴子补丁。

3、eventlet  需要在运行起点的脚本首行打 eventlet 猴子补丁。

4、asyncio  async异步,主要是针对消费函数已经定义成了   async def fun(x)  这种情况,这种情况不能直接使用多线程,
   因为执行  fun(1)  后得到的并不是所想象的函数最终结果,而是得到的一个协程对象,所以针对已经定义成异步函数了的,需要使用此种并发模式。
   框架不鼓励用户定义异步函数,你就用同步的直观方式思维定义函数就行了,其余的并发调度交给框架就行了。

5、开启多进程启动多个consumer,此模式是 多进程  + 上面4种的其中一种并发方式,充分利用多核和充分利用io,用法如下。可以实现 多进程 叠加 协程并发。
# 这种是多进程方式,一次编写能够兼容win和linux的运行。

from function_scheduling_distributed_framework import task_deco, BrokerEnum, ConcurrentModeEnum
import os

@task_deco('test_multi_process_queue',broker_kind=BrokerEnum.REDIS_ACK_ABLE,
           concurrent_mode=ConcurrentModeEnum.THREADING,)
def fff(x):
    print(x * 10,os.getpid())

if __name__ == '__main__':
    fff.multi_process_consume(6)  # 一次性启动6进程叠加多线程。

3.3 框架最最重要的task_deco装饰器参数说明

    :param queue_name: 队列名字,只有这个参数是必填的,而且每个任务函数都要使用唯一的队列名字,其他参数都是可选的。
    :param function_timeout : 超时秒数,函数运行超过这个时间,则自动杀死函数。为0是不限制。
    :param concurrent_num:并发数量,
    :param specify_concurrent_pool:使用指定的线程池(协程池),可以多个消费者共使用一个线程池,不为None时候。threads_num失效
    :param concurrent_mode:并发模式,1线程(ConcurrentModeEnum.THREADING) 2gevent(ConcurrentModeEnum.GEVENT)
                              3eventlet(ConcurrentModeEnum.EVENTLET) 4 asyncio(ConcurrentModeEnum.ASYNC) 5单线程(ConcurrentModeEnum.SINGLE_THREAD)
    :param max_retry_times: 最大自动重试次数,当函数发生错误,立即自动重试运行n次,对一些特殊不稳定情况会有效果。
           可以在函数中主动抛出重试的异常ExceptionForRetry,框架也会立即自动重试。
           主动抛出ExceptionForRequeue异常,则当前消息会重返中间件。
    :param log_level:框架的日志级别,默认是debug级别,可以看到详细的执行信息,如果不想看到太多详细的日志,可以设置为logging.INFO常量(20) 或者数字20。
    :param is_print_detail_exception:是否打印详细的堆栈错误。为0则打印简略的错误占用控制台屏幕行数少。
    :param qps:指定1秒内的函数执行次数,可以是小数,例如0.1代表每10秒执行1次,10代表1秒执行10次函数。默认为0不进行控频
    :param msg_expire_senconds:消息过期时间,为0永不过期,为10则代表,10秒之前发布的任务如果现在才轮到消费则丢弃任务。
    :param is_using_distributed_frequency_control: 是否使用分布式空频(需要安装redis,每隔15秒自动获取分布式环境中的消费者数量,对于10000 qps控频对redis压力很小,
            因为不是采用redis的incr计数来控频,不是用频繁的操作redis incr命令来实现的,是使用的qps除以消费者数量),
            默认只对当前实例化的消费者空频有效。假如实例化了2个qps为10的使用同一队列名的消费者,
               并且都启动,则每秒运行次数会达到20。如果使用分布式空频则所有消费者加起来的总运行次数是10。
    :param is_send_consumer_hearbeat_to_redis   时候将发布者的心跳发送到redis,有些功能的实现需要统计活跃消费者。因为有的中间件不是真mq。
    :param logger_prefix: 日志前缀,可使不同的消费者生成不同的日志
    :param create_logger_file : 是否创建文件日志
    :param do_task_filtering :是否执行基于函数参数的任务过滤
    :param task_filtering_expire_seconds:任务过滤的失效期,为0则永久性过滤任务。例如设置过滤过期时间是1800秒 ,
           30分钟前发布过1 + 2 的任务,现在仍然执行,
           如果是30分钟以内发布过这个任务,则不执行1 + 2,现在把这个逻辑集成到框架,一般用于接口价格缓存。
    :param is_do_not_run_by_specify_time_effect :是否使不运行的时间段生效
    :param do_not_run_by_specify_time   :不运行的时间段
    :param schedule_tasks_on_main_thread :直接在主线程调度任务,意味着不能直接在当前主线程同时开启两个消费者。
    :param function_result_status_persistance_conf   :配置。是否保存函数的入参,运行结果和运行状态到mongodb。
           这一步用于后续的参数追溯,任务统计和web展示,需要安装mongo。
    :param is_using_rpc_mode 是否使用rpc模式,可以在发布端获取消费端的结果回调,但消耗一定性能,使用async_result.result时候会等待阻塞住当前线程。。
    :param broker_kind:中间件种类,一共支持30多种中间件,一次性支持了kombu的所有中间件。 入参见 BrokerEnum枚举类的属性。

关于task_deco参数太多的说明:

有人会抱怨入参超多很复杂,是因为要实现一切控制方式,实现的运行控制手段非常丰富,所以参数就会多。

看这个里面的参数解释非常重要,几乎能想到的控制功能全部都有。比如有人说日志太多,不想看那么详细的提示日志
,早就通过参数提供实现了,自己抱怨参数多又以为没提供这个功能,简直是自相矛盾。

想入参参数少那就看新增的那个10行代码的函数的最精简乞丐版实现的分布式函数执行框架,演示最本质实现原理。“ 
这个例子的框架啥控制手段都没有,参数自然就很少。

乞丐版分布式函数调度框架的代码在 

function_scheduling_distributed_framework/beggar_version_implementation/beggar_redis_consumer.py

3.3.2 task_deco装饰器 的 concurrent_num 和 qps 之间的关系。

 concurrent_num:并发数量。
    qps qps是有个很有趣的参数,能精确控制函数每秒运行多少次。
    concurrent_num和qps存在着一定的关系。
    
    例如对于下面这个函数
    
    def func(x):
           time.sleep(2)
           print(x)

    1)如果设置 concurrent_num = 1000(或100万)  qps = 10
    那么一秒钟会执行10次func函数。如果不指定qps的值,则不进行控频,消费框架会平均每秒钟会执行50次函数func。

    如果设置concurrent_num = 1000  qps = 5   
    那么一秒钟会执行5次func函数。所以可以看到,当你不知道要开多少并发合适的时候,可以粗暴开1000个线程,但要设置一个qps。
   
    那为什么次框架,可以让你粗暴的设置1000设置100万线程呢,并不是做了数字截取,判断线程设置大于多少就自动调小了,此消费框架并没有这样去实现。
    而是次框架使用的非concurrent.tutures.ThreadpoolExecutor,是使用的自定义的  ThreadPoolExecutorShrinkAble 线程池,
    此线程池其中之一的功能就是节制开更多的线程,因为对于上面的休眠2秒的func函数,如果设置concurrent_num = 1000000  qps = 5,
    正常来说开10个线程足够实现每秒执行5次了,此框架在调节线程新增线程时候进行了更多的判断,所以原生线程池不可以设置100万大小,
    而ThreadPoolExecutorShrinkAble可以设置为100万大小。

    此外ThreadPoolExecutorShrinkAble 实现了线程池自动缩小的功能,这也是原生concurrent.tutures.ThreadpoolExecutor没有的功能。
    自动缩小是什么意思呢,比如一段时间任务非常密集1秒钟来了几百个任务,所以当时开启了很多线程来应付,但一段时间后每分钟只来了个把任务,
    此时 ThreadPoolExecutorShrinkAble 能够自动缩小线程池,
    ThreadPoolExecutorShrinkAble实现了java ThreadpoolExecutor的KeepAliveTime参数的功能,
    原生concurrent.tutures.ThreadpoolExecutor线程池即使以后永久不来新任务,之前开的线程数量一致保持这。

    关于 ThreadPoolExecutorShrinkAble 的厉害之处,可以参考 https://github.com/ydf0509/threadpool_executor_shrink_able
    
    最终关于 concurrent_num 大小设置为多少,看自己需求,上面说的100万是举个例子,
    实际这个参数还被用作为线程池的任务队列的有界队列的大小,所以一定要设置为1000以下,否则如果设置为100万,
    从消息中间件预取出的消息过多,造成python内存大、单个消费者掏空消息队列中间件造成别的新启动的消费者无任务可消费、
    对于不支持消费确认类型的中间件的随意重启会丢失大量正在运行的任务 等不利影响。

    2)上面的func函数,设置 concurrent_num = 1  qps = 100,那会如何呢?
       由于你设置的并发是1,对于一个需要2秒运行完成的函数,显然平均每2秒才能执行1次,就是框架真正的只能达到0.5个qps。
       所以 concurrent_num 和 qps,既有关系,也不是绝对的关系。
    
    在对一个随机消耗时间的函数进行并发控制时候,如果函数的运行时间是0.5到20秒任意时间不确定的徘徊,你可以设置 concurrent_num = 100,
    如果合作方要求了只能1秒钟能让你使用多少次,例如需要精确控频10次,可以设置qps =10,concurrent_num随便搞个 一两百 两三百就行了,
    因为是智能的克制的调节线程池大小的,所以不会真的达到concurrent_num的值。

    3)qps是个小数可以小于1,如果要设置10秒执行一次函数,则设置qps=0.1

    这主要是介绍了 concurrent_num 和qps的关系和设置值,qps是优先,但受到concurrent_num的约束。

3.4 框架的乞丐精简版实现方式

由于框架的功能十分多,如果没学习36种设计模式,就很难看懂源码,现在演示精简实现原理

此精简例子十分之简单明了,就是死循环从中间件取任务然后丢到线程池里面执行。

此代码在 function_scheduling_distributed_framework/beggar_version_implementation/beggar_redis_consumer.py

这样简单明了,演示了基本原理,但是这个缺少消费确认(随意重启代码会造成大量任务丢失) qps恒定等20种功能。

def start_consuming_message(queue_name, consume_function, threads_num=50):
    pool = ThreadPoolExecutor(threads_num)
    while True:
        try:
            redis_task = redis.brpop(queue_name, timeout=60)
            if redis_task:
                task_str = redis_task[1].decode()
                print(f'从redis的 {queue_name} 队列中 取出的消息是: {task_str}')
                pool.submit(consume_function, **json.loads(task_str))
            else:
                print(f'redis的 {queue_name} 队列中没有任务')
        except redis.RedisError as e:
            print(e)


if __name__ == '__main__':
    import time


    def add(x, y):
        time.sleep(5)
        print(f'{x} + {y} 的结果是 {x + y}')

    # 推送任务
    for i in range(100):
        print(i)
        redis.lpush('test_beggar_redis_consumer_queue', json.dumps(dict(x=i, y=i * 2)))


    start_consuming_message('test_beggar_redis_consumer_queue', consume_function=add, threads_num=10)

3.5 框架的任务消费确认

此框架可以确保客户端任何时候 随意断电 粗暴重启代码 随意关机,任务万无一失。

3.4演示的精简版框架,实现redis的list的push和pop来模拟消息队列,很明显不靠谱,kill 9 重启代码或者重启电脑很容易会丢失大量任务。

分布式一致性消息传递、事件处理等场景中十分重要,分为3种情况:
At most Onece:最多一次,如果算子处理事件失败,算子将不再尝试该事件。
At Least Onece:至少一次,如果算子处理事件失败,算子会再次尝试该处理事件,直到有一次成功。
Exactly-Once:严格地,有且仅处理一次,通常有两种方法实现。

3.4实现的是最多一次,框架在多种中间件使用消费确认实现了万无一失 ,达到了Exactly-Once。
Exactly-Once是最好的也是实现难度最复杂的;At most Onece通常是最差的方式,也是最简单的实现方式。

框架在使用rabbitmq,内置默认了确认消费。

框架在使用redis作为中间件时候,有很多种实现方式,REDIS 是最不靠谱的会丢失消息。
REDIS_ACK_ABLE 、 REDIS_STREAM、 RedisBrpopLpush BrokerKind 这三种都是实现了确认消费。

3.6 框架的设计规范原则

源码实现思路基本90%遵守了oop的6个设计原则,很容易扩展中间件。
1、单一职责原则——SRP
2、开闭原则——OCP
3、里式替换原则——LSP
4、依赖倒置原则——DIP
5、接口隔离原则——ISP
6、迪米特原则——LOD

最主要是大量使用了模板模式、工厂模式、策略模式、鸭子类。
可以仿照源码中实现中间件的例子,只需要继承发布者、消费者基类后实现几个抽象方法即可添加新的中间件。

4.使用框架的各种代码示例

框架极其简单并且自由,只有一个task_deco装饰器的参数学习, 实际上这个章节所有的例子都是调整了一下task_deco的参数而已。

有一点要说明的是框架的消息中间件的ip 端口 密码 等配置是在你第一次随意运行代码时候,在你当前项目的根目录下生成的 distributed_frame_config.py 按需设置。

所有例子的发布和消费都没必须写在同一个py文件,(除了使用python 自带语言queue),因为是使用中间件解耦的消息,好多人误以为发布和消费必须在同一个python文件。

4.1 装饰器方式调度函数

from function_scheduling_distributed_framework import task_deco, BrokerEnum


# qps可以指定每秒运行多少次,可以设置0.001到10000随意。
# broker_kind 指定使用什么中间件,如果用redis,就需要在 distributed_frame_config.py 设置redis相关的配置。
@task_deco('queue_test_f01', qps=0.2, broker_kind=BrokerEnum.REDIS_ACK_ABLE)  # qps 0.2表示每5秒运行一次函数,broker_kind=2表示使用redis作中间件。
def add(a, b):
    print(a + b)

if __name__ == '__main__':
    for i in range(10, 20):
        add.pub(dict(a=i, b=i * 2))  # 使用add.pub 发布任务
        add.push(i, b=i * 2)  # 使用add.push 发布任务
    add.consume()  # 使用add.consume 消费任务
    # add.multi_process_consume(4)  # 这是开启4进程 叠加 细粒度(协程/线程)并发,速度更强。

4.2 非装饰器调度函数

from function_scheduling_distributed_framework import get_consumer, BrokerEnum


def add(a, b):
    print(a + b)


# 非装饰器方式,多了一个入参,需要手动指定consuming_function入参的值。
consumer = get_consumer('queue_test_f01', consuming_function=add, qps=0.2, broker_kind=BrokerEnum.REDIS_ACK_ABLE)

if __name__ == '__main__':
    for i in range(10, 20):
        consumer.publisher_of_same_queue.publish(dict(a=i, b=i * 2))  # consumer.publisher_of_same_queue.publish 发布任务
    consumer.start_consuming_message()  # 使用consumer.start_consuming_message 消费任务

4.3 演示如何解决多个步骤的消费函数

看这个例子,step1函数中不仅可以给step2发布任务,也可以给step1自身发布任务。

qps规定了step1每2秒执行一次,step2每秒执行3次。

import time

from function_scheduling_distributed_framework import task_deco, BrokerEnum


@task_deco('queue_test_step1', qps=0.5, broker_kind=BrokerEnum.LOCAL_PYTHON_QUEUE)
def step1(x):
    print(f'x 的值是 {x}')
    if x == 0:
        for i in range(1, 300):
            step1.pub(dict(x=x + i))
    for j in range(10):
        step2.push(x * 100 + j)  # push是直接发送多个参数,pub是发布一个字典
    time.sleep(10)


@task_deco('queue_test_step2', qps=3, broker_kind=BrokerEnum.LOCAL_PYTHON_QUEUE)
def step2(y):
    print(f'y 的值是 {y}')
    time.sleep(10)


if __name__ == '__main__':
    # step1.clear()
    step1.push(0)  # 给step1的队列推送任务。

    step1.consume()  # 可以连续启动两个消费者,因为conusme是启动独立线程里面while 1调度的,不会阻塞主线程,所以可以连续运行多个启动消费。
    step2.consume()

4.4 演示如何定时运行。

# 定时运行消费演示,定时方式入参用法可以百度 apscheduler 定时包。
import datetime
from function_scheduling_distributed_framework import task_deco, BrokerEnum, fsdf_background_scheduler, timing_publish_deco


@task_deco('queue_test_666', broker_kind=BrokerEnum.LOCAL_PYTHON_QUEUE)
def consume_func(x, y):
    print(f'{x} + {y} = {x + y}')


if __name__ == '__main__':
    fsdf_background_scheduler.add_job(timing_publish_deco(consume_func), 'interval', id='3_second_job', seconds=3, kwargs={"x": 5, "y": 6})  # 每隔3秒发布一次任务,自然就能每隔3秒消费一次任务了。
    fsdf_background_scheduler.add_job(timing_publish_deco(consume_func), 'date', run_date=datetime.datetime(2020, 7, 24, 13, 53, 6), args=(5, 6,))  # 定时,只执行一次
    fsdf_background_scheduler.add_timing_publish_job(consume_func, 'cron', day_of_week='*', hour=14, minute=51, second=20, args=(5, 6,))  # 定时,每天的11点32分20秒都执行一次。
    # 启动定时
    fsdf_background_scheduler.start()
    # 启动消费
    consume_func.consume()

4.5 多进程并发 + 多线程/协程,代码例子。

ff.multi_process_start(2) 就是代表启动2个独立进程并发 + 叠加 asyncio、gevent、eventlet、threding细粒度并发,
例如fun函数加上@task_deco(concurrent_num=200),fun.multi_process_start(16) ,这样16进程叠加每个进程内部开200线程/协程,运行性能炸裂。

多进程消费

import time
from function_scheduling_distributed_framework import task_deco, BrokerEnum, IdeAutoCompleteHelper, PriorityConsumingControlConfig, run_consumer_with_multi_process

"""
演示多进程启动消费,多进程和 asyncio/threading/gevnt/evntlet是叠加关系,不是平行的关系。
"""

# qps=5,is_using_distributed_frequency_control=True 分布式控频每秒执行5次。
# 如果is_using_distributed_frequency_control不设置为True,默认每个进程都会每秒执行5次。
@task_deco('test_queue', broker_kind=BrokerEnum.REDIS, qps=5, is_using_distributed_frequency_control=True)
def ff(x, y):
    import os
    time.sleep(2)
    print(os.getpid(), x, y)


if __name__ == '__main__':
    ff.clear() # 清除
    # ff.publish()
    for i in range(1000):
        ff.push(i, y=i * 2)

        # 这个与push相比是复杂的发布,第一个参数是函数本身的入参字典,后面的参数为任务控制参数,例如可以设置task_id,设置延时任务,设置是否使用rpc模式等。
        ff.publish({'x': i * 10, 'y': i * 2}, priority_control_config=PriorityConsumingControlConfig(countdown=1, misfire_grace_time=15))

    ff(666, 888)  # 直接运行函数
    ff.start()  # 和 conusme()等效
    ff.consume()  # 和 start()等效
    run_consumer_with_multi_process(ff, 2)  # 启动两个进程
    ff.multi_process_start(2)  # 启动两个进程,和上面的run_consumer_with_multi_process等效,现在新增这个multi_process_start方法。
    IdeAutoCompleteHelper(ff).multi_process_start(3)  # IdeAutoCompleteHelper 可以补全提示,但现在装饰器加了类型注释,ff. 已近可以在pycharm下补全了。

4.6 演示rpc模式,即客户端调用远程函数并及时得到结果。

如果在发布端要获取消费端的执行结果,有两种方式
1、需要在@task_deco设置is_using_rpc_mode=True,默认是False不会得到结果。
2、如果@task_deco没有指定,也可以在发布任务的时候,用publish方法并写上
  priority_control_config=PriorityConsumingControlConfig(is_using_rpc_mode=True)

1)远程服务端脚本,执行求和逻辑。 test_frame\test_rpc\test_consume.py

import time
from function_scheduling_distributed_framework import task_deco, BrokerEnum


@task_deco('test_rpc_queue', is_using_rpc_mode=True, broker_kind=BrokerEnum.REDIS_ACK_ABLE,concurrent_num=200)
def add(a, b):
    time.sleep(3)
    return a + b


if __name__ == '__main__':
    add.consume()

2.1)客户端调用脚本,单线程发布阻塞获取两书之和的结果,执行求和过程是在服务端。 test_frame\test_rpc\test_publish.py

这种方式如果在主线程单线程for循环运行100次,因为为了获取结果,导致需要300秒才能完成100次求和。

from function_scheduling_distributed_framework import PriorityConsumingControlConfig
from test_frame.test_rpc.test_consume import add

for i in range(100):
    async_result = add.push(i, i * 2)
    print(async_result.result)   # 执行 .result是获取函数的运行结果,会阻塞当前发布消息的线程直到函数运行完成。

    # 如果add函数的@task_deco装饰器参数没有设置 is_using_rpc_mode=True,则在发布时候也可以指定使用rpc模式。
    async_result = add.publish(dict(a=i * 10, b=i * 20), priority_control_config=
    PriorityConsumingControlConfig(is_using_rpc_mode=True))
    print(async_result.status_and_result)

2.2) 设置rpc回调结果在线程池中处理

上面2.1)方式中是在单线程环境下阻塞的一个接一个打印结果。如果想快速并发处理结果,可以自己手动在多线程或线程池处理结果。 框架也提供一个设置回调函数,自动在线程池中处理回调结果,回调函数有且只有一个入参,表示函数运行结果及状态。

如下脚本则不需要300秒运行完成只需要3秒即可,会自动在并发池中处理结果。

from function_scheduling_distributed_framework import PriorityConsumingControlConfig
from test_frame.test_rpc.test_consume import add

def show_result(status_and_result:dict):
    """
    :param status_and_result: 一个字典包括了函数入参、函数结果、函数是否运行成功、函数运行异常类型
    """
    print(status_and_result)
    
    
for i in range(100):
    async_result = add.push(i, i * 2)
    # print(async_result.result)   # 执行 .result是获取函数的运行结果,会阻塞当前发布消息的线程直到函数运行完成。
    async_result.set_callback(show_result) # 使用回调函数在线程池中并发的运行函数结果

4.7 演示qps控频

演示框架的qps控频功能

此框架对函数耗时随机波动很大的控频精确度达到96%以上

此框架对耗时恒定的函数控频精确度达到99.9%

在指定rate_limit 超过 20/s 时候,celery对耗时恒定的函数控频精确度60%左右,下面的代码会演示这两个框架的控频精准度对比。

此框架针对不同指定不同qps频次大小的时候做了不同的三种处理方式。
框架的控频是直接基于代码计数,而非使用redis 的incr计数,因为python操作一次redis指令要消耗800行代码左右,
如果所有任务都高频率incr很消耗python脚本的cpu也对redis服务端产生灾难压力。
例如假设有50个不同的函数,分别都要做好几千qps的控频,如果采用incr计数,光是incr指令每秒就要操作10万次redis,
所以如果用redis的incr计数控频就是个灾难,redis incr的计数只适合 1到10大小的qps,不适合 0.01 qps 和 1000 qps这样的任务。

同时此框架也能很方便的达到 5万 qps的目的,装饰器设置qps=50000 和 is_using_distributed_frequency_control=True,
然后只需要部署很多个进程 + 多台机器,框架通过redis统计活跃消费者数量,来自动调节每台机器的qps,框架的分布式控频开销非常十分低,
因为分布式控频使用的仍然不是redis的incr计数,而是基于每个消费者的心跳来统计活跃消费者数量,然后给每个进程分配qps的,依然基于本地代码计数。

例如部署100个进程(如果机器是128核的,一台机器足以,或者20台8核机器也可以)
以20台8核机器为例子,如果把机器减少到15台或增加机器到30台,随便减少部署的机器数量或者随便增加机器的数量,
代码都不需要做任何改动和重新部署,框架能够自动调节来保持持续5万次每秒来执行函数,不用担心部署多了30台机器,实际运行qps就变成了10几万。
(前提是不要把机器减少到10台以下,因为这里假设这个函数是一个稍微耗cpu耗时的函数,要保证所有资源硬件加起来有实力支撑5万次每秒执行函数)

每台机器都运行 test_fun.multi_process_conusme(8),只要10台以上1000台以下随意随时随地增大减小运行机器数量,
代码不需要做任何修改变化,就能很方便的达到每秒运行5万次函数的目的。

4.7.1 演示qps控频和自适应扩大和减小并发数量。

通过不同的时间观察控制台,可以发现无论f2这个函数需要耗时多久(无论是函数耗时需要远小于1秒还是远大于1秒),框架都能精确控制每秒刚好运行2次。
当函数耗时很小的时候,只需要很少的线程就能自动控制函数每秒运行2次。
当函数突然需要耗时很大的时候,智能线程池会自动启动更多的线程来达到每秒运行2次的目的。
当函数耗时从需要耗时很大变成只需要耗时很小的时候,智能线程池会自动缩小线程数量。
总之是围绕qps恒定,会自动变幻线程数量,做到既不多开浪费cpu切换,也不少开造成执行速度慢。
import time
import threading
from function_scheduling_distributed_framework import task_deco, BrokerEnum, ConcurrentModeEnum

t_start = time.time()


@task_deco('queue_test2_qps', qps=2, broker_kind=BrokerEnum.PERSISTQUEUE, concurrent_mode=ConcurrentModeEnum.THREADING, concurrent_num=600)
def f2(a, b):
    """
    这个例子是测试函数耗时是动态变化的,这样就不可能通过提前设置参数预估函数固定耗时和搞鬼了。看看能不能实现qps稳定和线程池自动扩大自动缩小
    要说明的是打印的线程数量也包含了框架启动时候几个其他的线程,所以数量不是刚好和所需的线程计算一样的。
    """
    result = a + b
    sleep_time = 0.01
    if time.time() - t_start > 60:  # 先测试函数耗时慢慢变大了,框架能不能按需自动增大线程数量
        sleep_time = 7
    if time.time() - t_start > 120:
        sleep_time = 30
    if time.time() - t_start > 240:  # 最后把函数耗时又减小,看看框架能不能自动缩小线程数量。
        sleep_time = 0.8
    if time.time() - t_start > 300:
        sleep_time = None
    print(f'{time.strftime("%H:%M:%S")}  ,当前线程数量是 {threading.active_count()},   {a} + {b} 的结果是 {result}, sleep {sleep_time} 秒')
    if sleep_time is not None:
        time.sleep(sleep_time)  # 模拟做某事需要阻塞n秒种,必须用并发绕过此阻塞。
    return result


if __name__ == '__main__':
    f2.clear()
    for i in range(1000):
        f2.push(i, i * 2)
    f2.consume()

4.7.2 此框架对固定耗时的任务,持续控频精确度高于99.9%

4.7.2 此框架对固定耗时的任务,持续控频精确度高于99.9%,远超celery的rate_limit 60%控频的精确度。

对于耗时恒定的函数,此框架精确控频率精确度达到99.9%,celery控频相当不准确,最多到达60%左右,两框架同样是做简单的加法然后sleep0.7秒,都设置500并发100qps。
@task_deco('test_queue66', broker_kind=BrokerEnum.REDIS, qps=100)
def f(x, y):
    print(f''' {int(time.time())} 计算  {x} + {y} = {x + y}''')
    time.sleep(0.7)
    return x + y


@celery_app.task(name='求和啊', rate_limit='100/s')
def add(a, b):
    print(f'{int(time.time())} 计算 {a} + {b} 得到的结果是  {a + b}')
    time.sleep(0.7)
    return a + b
# 在pycahrm控制台搜索 某一秒的时间戳 + 计算 作为关键字查询,分布式函数调度框架启动5秒后,以后持续每一秒都是100次,未出现过99和101的现象。
在pycahrm控制台搜索 某一秒的时间戳 + 计算 作为关键字查询,celery框架,每一秒求和次数都是飘忽不定的,而且是六十几徘徊,
如果celery能控制到95至105次每秒徘徊波动还能接受,类似现象还有celery设置rate_limit=50/s,实际32次每秒徘徊,
设置rate_limit=30/s,实际18-22次每秒徘徊,可见celery的控频相当差。
设置rate_limit=10/s,实际7-10次每秒徘徊,大部分时候是9次,当rate_limit大于20时候就越来越相差大了,可见celery的控频相当差。

4.7.3 对函数耗时随机性大的控频功能证明

对函数耗时随机性大的控频功能证明,使用外网连接远程broker,持续qps控频。

设置函数的qps为100,来调度需要消耗任意随机时长的函数,能够做到持续精确控频,频率误差小。
如果设置每秒精确运行超过500000次以上的固定频率,前提是cpu够强机器数量多,
设置qps=50000,并指定is_using_distributed_frequency_control=True(只有这样才是分布式全局控频,默认是基于单个进程的控频),。

如果任务不是很重型很耗cpu,此框架单个消费进程可以控制每秒运行次数的qps 从0.01到1000很容易。
当设置qps为0.01时候,指定的是每100秒运行1次,qps为100指的是每一秒运行100次。
import time
import random
from function_scheduling_distributed_framework import task_deco, BrokerEnum, run_consumer_with_multi_process


@task_deco('test_rabbit_queue7', broker_kind=BrokerEnum.RABBITMQ_AMQPSTORM, qps=100, log_level=20)
def test_fun(x):
    # time.sleep(2.9)
    # sleep时间随机从0.1毫秒到5秒任意徘徊。传统的恒定并发数量的线程池对未知的耗时任务,持续100次每秒的精确控频无能为力。
    # 但此框架只要简单设置一个qps就自动达到了这个目的。
    random_sleep = random.randrange(1, 50000) / 10000
    time.sleep(random_sleep)
    print(x, random_sleep)


if __name__ == '__main__':
    test_fun.consume()
    # test_fun.multi_process_consume(3)

分布式函数调度框架对耗时波动大的函数持续控频曲线 _images/img_3.pngimg_3.png

4.7.4 分布式全局控频和单个消费者控频区别

@task_deco中指定 is_using_distributed_frequency_control=True 则启用分布式全局控频,是跨进程跨python解释器跨服务器的全局控频。
否则是基于当前消费者的控频。

例如 你设置的qps是100,如果你不设置全局控频,run_consume.py 脚本中启动 fun.consume() ,如果你反复启动5次这个 run_consume.py,
如果不设置分布式控频,那么5个独立的脚本运行,频率总共会达到 500次每秒,因为你部署了5个脚本。
同理你如果用 fun.multi_process_consume(4)启动了4个进程消费,那么就是4个消费者,总qps也会达到400次每秒。
这个控频方式是看你需求了。


如果设置了 is_using_distributed_frequency_control=True,那就会使用每个消费者发送到redis的心跳来统计总消费者个数。
如果你部署了2次,那么每个消费者会平分qps,每个消费者是变成50qps,总共100qps。
如果你部署了5次,那么每个消费者会平分qps,每个消费者是变成20qps,总共100qps。
如果你中途关闭2个消费者,变成了3个消费者,每个消费者是变成 33.33qps,总共100qps。(框架qps支持小数,0.1qps表示每10秒执行1次)

4.8 再次说明qps能做什么,qps为什么流弊?常规并发方式无法完成的需求是什么?

以模拟请求一个flask接口为例子,我要求每一秒都持续精确完成8次请求,即控制台每1秒都持续得到8次flask接口返回的hello world结果。

4.8.1 下面讲讲常规并发手段为什么对8qps控频需求无能为力?

不用框架也可以实现8并发, 例如Threadpool设置8线程很容易,但不用框架实现8qps不仅难而且烦

虽然
框架能自动实现 单线程  ,多线程, gevent , eventlet ,asyncio ,多进程 并发 ,
多进程 + 单线程 ,多进程 + 多线程,多进程 + gevent,  多进程 + eventlet  ,多进程 + asyncio 的组合并发
可以说囊括了python界的一切知名的并发模型,能做到这一点就很方便强大了
但是
此框架还额外能实现qps控频,能够实现函数每秒运行次数的精确控制,我觉得这个功能也很实用,甚至比上面的那些并发用起来还实用。
这个代码是模拟常规并发手段无法达到每秒持续精确运行8次函数(请求flask接口8次)的目的。

下面的代码使用8个线程并发运行 request_flask_api 函数,
当flask_veiw_mock 接口耗时0.1秒时候,在python输出控制台可以看到,10秒钟就运行结束了,控制台每秒打印了80次hello world,严重超频10倍了不符合需求
当flask_veiw_mock 接口耗时刚好精确等于1秒时候,在python输出控制台可以看到,100秒钟运行结束了,控制台每秒打印了8次hello world,只有当接口耗时刚好精确等于1秒时候,并发数量才符合qps需求
当flask_veiw_mock 接口耗时10秒时候,在python输出控制台可以看到,需要1000秒钟运行结束,控制台每隔10秒打印8次hello world,严重不符合持续每秒打印8次的目的。
由此可见,用设置并发数量来达到每秒请求8次flask的目的非常困难,99.99%的情况下服务端没那么巧刚好耗时1秒。

天真的人会说根据函数耗时大小,来设置并发数量,这可行吗?

有人会说,为了达到8qps目的,当函数里面sleep 0.1 时候他开1线程,那你这样仍然超频啊,你是每1秒钟打印10次了超过了8次。
当sleep 0.01 时候,为了达到8qps目的,就算你开1线程,那不是每1秒钟打印100次hello?你不会想到开0.08个线程个线程来实现控频吧?
当sleep 10秒时候,为了8qps目的,你开80线程,那这样是控制台每隔10秒打印80次hello,我要求的是控制台每一秒都打印8次hello,没告诉你是每隔10秒打印80次hello吧?还是没达到目的。
如果函数里面是sleep 0.005 0.07 0.09 1.3 2.7 7 11 13这些不规则无法整除的数字?请问你是如何一一计算精确开多少线程来达到8qps的?
如果flask网站接口昨天是3秒的响应时间,今天变成了0.1秒的响应时间,你的线程池数量不做变化,代码不进行重启,请问你如何做到自适应无视请求耗时,一直持续8qps的目的?
固定并发数量大小就是定速巡航不够智能前车减速会撞车,qps自适应控频那就是acc自适应巡航了,自动调整极端智能,压根无需提前测算预估函数需要耗时多久(接口响应耗时多久)。
所以综上所述,如果你有控频需求,你想用并发数量来达到控频目的,那是不可能的。

有些人到现在还没明白并发数量和qps(每秒执行多少次)之间的区别,并发数量只有在函数耗时刚好精确等于1秒时候才等于qps。


```python
import time
from concurrent.futures import ThreadPoolExecutor


def flask_veiw_mock(x):
    # time.sleep(0.1)  # 通过不同的sleep大小来模拟服务端响应需要消耗的时间
    # time.sleep(1)   # 通过不同的sleep大小来模拟服务端响应需要消耗的时间
    time.sleep(10)  # 通过不同的sleep大小来模拟服务端响应需要消耗的时间
    return f"hello world {x}"


def request_flask_api(x):
    response = flask_veiw_mock(x)
    print(time.strftime("%H:%M:%S"), '   ', response)


if __name__ == '__main__':
    with ThreadPoolExecutor(8) as pool:
        for i in range(800):
            pool.submit(request_flask_api,i)

截图是当 flask_veiw_mock 耗时为10秒时候,控制台是每隔10秒打印8次 hello world,没达到每一秒都打印8次的目的
当 flask_veiw_mock 耗时为0.1 秒时候,控制台是每隔1秒打印80次 hello world,没达到每一秒都打印8次的目的

_images/img_16.pngimg_16.png

4.8.2 使用分布式函数调度框架,无论接口耗时多少,轻松达到8qps的例子

这个代码是模拟常规并发手段无法达到每秒持续精确运行8次函数(请求flask接口8次)的目的。
但是使用分布式函数调度框架能轻松达到这个目的。

下面的代码使用分部署函数调度框架来调度运行 request_flask_api 函数,

flask_veiw_mock 接口耗时0.1秒时候,控制台每秒打印8次 hello world,非常精确的符合控频目标
flask_veiw_mock 接口耗时1秒时候,控制台每秒打印8次 hello world,非常精确的符合控频目标
flask_veiw_mock 接口耗时10秒时候控,控制台每秒打印8次 hello world,非常精确的符合控频目标
flask_veiw_mock 接口耗时0.001秒时候,控制台每秒打印8次 hello world,非常精确的符合控频目标
flask_veiw_mock 接口耗时50 秒时候,控制台每秒打印8次 hello world,非常精确的符合控频目标
可以发现分布式函数调度框架无视函数耗时大小,都能做到精确控频,常规的线程池 asyncio什么的,面对这种不确定的接口耗时,简直毫无办法。

有些人到现在还没明白并发数量和qps(每秒执行多少次)之间的区别,并发数量只有在函数耗时刚好精确等于1秒时候才等于qps。
import time
from function_scheduling_distributed_framework import task_deco,BrokerEnum


def flask_veiw_mock(x):
    time.sleep(0.1)  # 通过不同的sleep大小来模拟服务端响应需要消耗的时间
    # time.sleep(1)   # 通过不同的sleep大小来模拟服务端响应需要消耗的时间
    # time.sleep(10)  # 通过不同的sleep大小来模拟服务端响应需要消耗的时间
    return f"hello world {x}"

@task_deco("test_qps",broker_kind=BrokerEnum.MEMORY_QUEUE,qps=8)
def request_flask_api(x):
    response = flask_veiw_mock(x)
    print(time.strftime("%H:%M:%S"), '   ', response)


if __name__ == '__main__':
    for i in range(800):
        request_flask_api.push(i)
    request_flask_api.consume()

从截图可以看出,分布式函数调度框架,控频稳如狗,完全无视flask_veiw_mock耗时是多少。 _images/img_17.pngimg_17.png

有些人到现在还没明白并发数量和qps(每秒执行多少次)之间的区别,并发数量只有在函数耗时刚好精确等于1秒时候才等于qps。

4.9 演示延时运行任务

因为有很多人有这样的需求,希望发布后不是马上运行,而是延迟60秒或者现在发布晚上18点运行。
然来是希望用户自己亲自在消费函数内部写个sleep(60)秒再执行业务逻辑,来达到延时执行的目的,
但这样会被sleep占据大量的并发线程/协程,如果是用户消费函数内部写sleep7200秒这么长的时间,那
sleep等待会占据99.9%的并发工作线程/协程的时间,导致真正的执行函数的速度大幅度下降,所以框架
现在从框架层面新增这个延时任务的功能。

之前已做的功能是定时任务,现在新增延时任务,这两个概念有一些不同。

定时任务一般情况下是配置为周期重复性任务,延时任务是一次性任务。
1)框架实现定时任务原理是代码本身自动定时发布,自然而然就能达到定时消费的目的。
2)框架实现延时任务的原理是马上立即发布,当消费者取出消息后,并不是立刻去运行,
   而是使用定时运行一次的方式延迟这个任务的运行。

在需求开发过程中,我们经常会遇到一些类似下面的场景:
1)外卖订单超过15分钟未支付,自动取消
2)使用抢票软件订到车票后,1小时内未支付,自动取消
3)待处理申请超时1天,通知审核人员经理,超时2天通知审核人员总监
4)客户预定自如房子后,24小时内未支付,房源自动释放


分布式函数调度框架的延时任务概念类似celery的countdown和eta入参,  add.apply_async(args=(1, 2),countdown=20)  # 规定取出后20秒再运行
此框架的入参名称那就也叫 countdown和eta。
countdown 传一个数字,表示多少秒后运行。
eta传一个datetime对象表示,精确的运行时间运行一次。

消费,消费代码没有任何变化

from function_scheduling_distributed_framework import task_deco, BrokerEnum


@task_deco('test_delay', broker_kind=BrokerEnum.REDIS_ACK_ABLE)
def f(x):
    print(x)


if __name__ == '__main__':
    f.consume()

发布延时任务

# 需要用publish,而不是push,这个前面已经说明了,如果要传函数入参本身以外的参数到中间件,需要用publish。
# 不然框架分不清哪些是函数入参,哪些是控制参数。如果无法理解就,就好好想想琢磨下celery的 apply_async 和 delay的关系。

from test_frame.test_delay_task.test_delay_consume import f
import datetime
import time
from function_scheduling_distributed_framework import PriorityConsumingControlConfig

"""
测试发布延时任务,不是发布后马上就执行函数。

countdown 和 eta 只能设置一个。
countdown 指的是 离发布多少秒后执行,
eta是指定的精确时间运行一次。

misfire_grace_time 是指定消息轮到被消费时候,如果已经超过了应该运行的时间多少秒之内,仍然执行。
misfire_grace_time 如果设置为None,则消息一定会被运行,不会由于大连消息积压导致消费时候已近太晚了而取消运行。
misfire_grace_time 如果不为None,必须是大于等于1的整数,此值表示消息轮到消费时候超过本应该运行的时间的多少秒内仍然执行。
此值的数字设置越小,如果由于消费慢的原因,就有越大概率导致消息被丢弃不运行。如果此值设置为1亿,则几乎不会导致放弃运行(1亿的作用接近于None了)
如果还是不懂这个值的作用,可以百度 apscheduler 包的 misfire_grace_time 概念

"""
for i in range(1, 20):
    time.sleep(1)

    # 消息发布10秒后再执行。如果消费慢导致任务积压,misfire_grace_time为None,即使轮到消息消费时候离发布超过10秒了仍然执行。
    f.publish({'x': i}, priority_control_config=PriorityConsumingControlConfig(countdown=10))

    # 规定消息在17点56分30秒运行,如果消费慢导致任务积压,misfire_grace_time为None,即使轮到消息消费时候已经过了17点56分30秒仍然执行。
    f.publish({'x': i * 10}, priority_control_config=PriorityConsumingControlConfig(
        eta=datetime.datetime(2021, 5, 19, 17, 56, 30) + datetime.timedelta(seconds=i)))

    # 消息发布10秒后再执行。如果消费慢导致任务积压,misfire_grace_time为30,如果轮到消息消费时候离发布超过40 (10+30) 秒了则放弃执行,
    # 如果轮到消息消费时候离发布时间是20秒,由于 20 < (10 + 30),则仍然执行
    f.publish({'x': i * 100}, priority_control_config=PriorityConsumingControlConfig(
        countdown=10, misfire_grace_time=30))

    # 规定消息在17点56分30秒运行,如果消费慢导致任务积压,如果轮到消息消费时候已经过了17点57分00秒,
    # misfire_grace_time为30,如果轮到消息消费时候超过了17点57分0秒 则放弃执行,
    # 如果如果轮到消息消费时候是17点56分50秒则执行。
    f.publish({'x': i * 1000}, priority_control_config=PriorityConsumingControlConfig(
        eta=datetime.datetime(2021, 5, 19, 17, 56, 30) + datetime.timedelta(seconds=i),
        misfire_grace_time=30))

    # 这个设置了消息由于推挤导致运行的时候比本应该运行的时间如果小于1亿秒,就仍然会被执行,所以几乎肯定不会被放弃运行
    f.publish({'x': i * 10000}, priority_control_config=PriorityConsumingControlConfig(
        eta=datetime.datetime(2021, 5, 19, 17, 56, 30) + datetime.timedelta(seconds=i),
        misfire_grace_time=100000000))

4.10 在web中如flask fastapi django 如何搭配使用消费框架的例子。

在web中推送任务,后台进程消费任务,很多人问怎么在web使用,用法和不与web框架搭配并没有什么不同之处。


因为发布和消费是使用中间件解耦的,一般可以分成web接口启动一次,后台消费启动一次,需要独立部署两次。

演示了flask 使用app应用上下文。

web接口中发布任务到消息队列,独立启动异步消费。

flask + 分布式函数调度框架演示例子在:

https://github.com/ydf0509/distributed_framework/blob/master/test_frame/use_in_flask_tonardo_fastapi

fastapi + 分布式函数调度框架演示例子在: https://github.com/ydf0509/fastapi_use_distributed_framework_demo

如果前端在乎任务的结果:

非常适合使用mqtt, 前端订阅唯一uuid的topic 然后表单中带上这个topic名字请求python接口 -> 接口中发布任务到rabbitmq或redis消息队列 ->
后台消费进程执行任务消费,并将结果发布到mqtt的那个唯一uuid的topic -> mqtt 把结果推送到前端。
使用ajax轮训或者后台导入websocket相关的包来做和前端的长耗时任务的交互 是伪命题。

4.11 开启消费状态、结果web页面

(1)需要安装mongodb,并且设置 MONGO_URL 的值

如果需要使用这个页面,那么无论选择何种中间件,即使不是使用mongo作为消息队列,也需要安装mongodb,因为因为是从这里读取数据的。
需要在 distributed_frame_config.py 中设置MONGO_URL的值,mongo url的格式如下,这是通用的可以百度mongo url连接形式。
有密码 MONGO_CONNECT_URL = f'mongodb://yourname:yourpassword@127.0.01:27017/admin'
没密码 MONGO_CONNECT_URL = f'mongodb://192.168.6.132:27017/'

(2) 装饰器上需要设置持久化的配置参数,代码例子

框架默认不会保存消息状态和结果到mongo的,因为大部分人并没有安装mongo,且这个web显示并不是框架代码运行的必须部分,还有会降低一丝丝运行性能。
框架插入mongo的原理是采用先进的自动批量聚合插入,例如1秒运行几千次函数,并不会操作几千次mongo,是2秒操作一次mongo


如果需要页面显示消费状态和结果,需要配置装饰器的 function_result_status_persistance_conf 的参数
FunctionResultStatusPersistanceConfig的如参是 (is_save_status: bool, is_save_result: bool, expire_seconds: int)
is_save_status 指的是是否保存消费状态,这个只有设置为True,才会保存消费状态到mongodb,从而使web页面能显示该队列任务的消费信息
is_save_result 指的是是否保存消费结果,如果函数的结果超大字符串或者对函数结果不关心或者函数没有返回值,可以设置为False。
expire_seconds 指的是多久以后,这些保存的数据自动从mongodb里面消失删除,避免爆磁盘。
from function_scheduling_distributed_framework import task_deco, FunctionResultStatusPersistanceConfig


@task_deco('queue_test_f01', qps=2,function_result_status_persistance_conf=FunctionResultStatusPersistanceConfig(True, True, 7 * 24 * 3600))
def f(a, b):
    return a + b


f(5, 6)  # 可以直接调用

for i in range(0, 200):
    f.push(i, b=i * 2)

f.consume()

(3)启动python分布式函数调度框架之函数运行结果状态web

web代码在function_scheduling_distributed_framework包里面,所以可以直接使用命令行运行起来,不需要用户现亲自下载代码就可以直接运行。


# 第一步 export PYTHONPATH=你的项目根目录 ,这么做是为了这个web可以读取到你项目根目录下的distributed_frame_config.py里面的配置
# 例如 export PYTHONPATH=/home/ydf/codes/ydfhome
  或者  export PYTHONPATH=./   (./是相对路径,前提是已近cd到你的项目根目录了)

第二步   
win上这么做 python3 -m function_scheduling_distributed_framework.function_result_web.app 

linux上可以这么做使用gunicorn启动性能好一些,当然也可以按win的做。
gunicorn -w 4 --threads=30 --bind 0.0.0.0:27018 function_scheduling_distributed_framework.function_result_web.app:app

(4)使用浏览器打开 127.0.0.1(启动web服务的机器ip):27018,输入默认用户名 密码 admin 123456,即可打开函数运行状态和结果页面。

4.12 框架 asyncio 方式运行协程

见7.8的demo介绍,

@task_deco(….,concurrent_mode=ConcurrentModeEnum.ASYNC)

这种方式是@task_deco装饰在async def定义的函数上面。

celery不支持直接调度执行async def定义的函数,但此框架是直接支持asyncio并发的。

7.8 asyncio方式运行函数

4.13 跨python项目怎么发布任务或者获取函数执行结果?

别的语言项目或者别的python项目手动发布消息到中间件,让分布式函数调度框架消费任务,
例如项目b中有add函数,项目a里面无法 import 导入这个add 函数。

1)第一种方式,使用能操作消息中间件的python包,手动发布任务到消息队列中间件
如果是别的语言发布任务,或者python项目a发布任务但是让python项目b的函数去执行,可以直接发布消息到中间件里面。
手动发布时候需要注意 中间件类型 中间件地址 队列名 @task_deco和distributed_frame_config.py指定的配置要保持一致。
需要发布的消息内容是 入参字典转成json字符串,然后发布到消息队列中间件。
以下以redis中间件为例子。演示手动发布任务到中间件。

@task_deco('test_queue668', broker_kind=BrokerEnum.REDIS)
def add(x, y):
    print(f'''  计算  {x} + {y} = {x + y}''')
    time.sleep(4)
    return x + y

if __name__ == '__main__':
    r = Redis(db=7,host='127.0.0.1')
    for i in range(10):
        add.push(i, i * 2)    # 正常同一个python项目是这么发布消息,使用函数.push或者publish方法
        r.lpush('test_queue668',json.dumps({'x':i,'y':i*2}))   # 不同的项目交互,可以直接手动发布消息到中间件
这个很容易黑盒测试出来,自己观察下中间件里面的内容格式就能很容易手动模仿构造出消息内容了。

需要说明的是 消息内容不仅包括 入参本身,也包括其他控制功能的辅助参数,可以用框架的自动发布功能发布消息,然后观察中间件里面的字段内容,模拟构造。

举个例子之一,例如如果要使用消息过期丢弃这个功能,那么就需要发布消息当时的时间戳了。
  1. 第二种方式,使用伪函数来作为任务,只写函数声明不写函数体。

此方式是一名网友的很机智的建议,我觉得可行。
例如还是以上面的求和函数任务为例,在项目a里面可以定义一个假函数声明,并且将b项目的求和add函数装饰器复制过去,但函数体不需要具体内容

@task_deco('test_queue668', broker_kind=BrokerEnum.REDIS)  # a项目里面的这行和b项目里面的add函数装饰器保持一致。
def add(x, y):   # 方法名可以一样,也可以不一样,但函数入参个数 位置 名称需要保持不变。
   pass          # 方法体,没有具体的求和逻辑代码,只需要写个pass就行了。

之后通过这个假的add函数就可以享受到与在同一个项目中如何正常发布和获取求和函数的执行结果 一模一样的写法方式了。
例如add.clear() 清空消息队列,add.push发布,add.publish发布,async_result.get获取结果,都可以正常使用, 但不要使用add.consume启动消费,因为这个是假的函数体,不能真正的执行求和.

4.14 [分布式函数调度框架qq群]

现在新建一个qq群 189603256

5.框架运行时截图

5.1 windwos pycharm 运行截图

https://s1.ax1x.com/2020/06/30/N5yZin.png

5.2 linux 运行率截图

https://i.niupic.com/images/2019/09/16/_222.png

5.3 函数执行结果及状态搜索查看

https://i.niupic.com/images/2019/11/05/_476.png

高并发 https://i.niupic.com/images/2019/09/20/_331.png

函数结果和运行次数和错误异常查看。使用的测试函数如下。

def add(a, b):
    logger.info(f'消费此消息 {a} + {b} 中。。。。。')
    time.sleep(random.randint(3, 5))  # 模拟做某事需要阻塞10秒种,必须用并发绕过此阻塞。
    if random.randint(4, 6) == 5:
        raise RandomError('演示随机出错')
    logger.info(f'计算 {a} + {b} 得到的结果是  {a + b}')
    return a + b

https://i.niupic.com/images/2019/11/05/_495.png

任务消费统计曲线。 https://i.niupic.com/images/2019/11/05/_496.png

6.常见问题回答

直接统一回复常见问题,例如是不是模仿celery

6.1 你干嘛要写这个框架?和celery 、rq有什么区别?

你干嘛要写这个框架?和celery 、rq有什么区别?是不是完全重复造轮子为了装x?

见第二章的解释,有接近20种优势。
celery 从性能、用户编码需要的代码量、用户使用难度 各方面都远远差于此框架。
可以使用例子中的场景代码进行了严格的控制变量法实际运行对比验证。

6.2 为什么包的名字这么长?

为什么包的名字这么长,为什么不学celery把包名取成 花菜 茄子什么的?

答: 为了直接表达框架的意思。现在代码在ide都能补全,名字长没关系。
生产消费模式不是celery专利,是通用常见的编程思想,不是必须用水果取名。

6.3 框架是使用什么序列化协议来序列化消息的。

   答:框架默认使用json。并且不提供序列化方式选择,有且只能用json序列化。json消息可读性很强,远超其他序列化方式。
   默认使用json来序列化和反序列化消息。所以推送的消息必须是简单的,不要把一个自定义类型的对象作为消费函数的入参,
   json键的值必须是简单类型,例如 数字 字符串 数组 字典这种。不可以是不可被json序列化的python自定义类型的对象。
   
   用json序列化已经满足所有场景了,picke序列化更强,但仍然有一些自定义类型的对象的实例属性由于是一个不可被序列化
   的东西,picke解决不了,这种东西例如self.r = Redis(),而redis对象又包括threding.Lock类型的属性 ,不可以被pike序列化

   就算能序列化的对象也是要用一串很长的东西来。
   用pike来序列化复杂嵌套属性类型的对象,不仅会导致中间件要存储很大的东西传输效率会降低,在编码和解码也会消耗更多的cpu。如果框架支持了pike序列化,会让使用者养成不好的粗暴习惯。
   想消费函数传redis对象作为入参,这种完全可以使用json来解决,例如指定ip 和端口,在消费函数内部来使用redis。所以用json一定可以满足一切传参场景。
   
   如果json不能满足你的消费任务的序列化,那不是框架的问题,一定是你代码设计的问题。所以没有预留不同种类序列化方式的扩展,
   也不打算准备加入其他序列化方式。

6.4 框架如何实现定时?

答:使用的是定时发布任务,那么就能定时消费任务了。导入fsdf_background_scheduler然后添加定时发布任务。

FsdfBackgroundScheduler继承自 apscheduler 的 BackgroundScheduler,定时方式可以百度 apscheduler

6.5 为什么强调是函数调度框架不是类调度框架,不是方法调度框架?

为什么强调是函数调度框架不是类调度框架,不是方法调度框架?你代码里面使用了类,是不是和此框架水火不容了?

问的是consuming_function的值能不能是一个类或者一个实例方法。

   答:一切对类的调用最后都是体现在对方法的调用。这个问题莫名其妙。
   celery rq huery 框架都是针对函数。
   调度函数而不是类是因为:
   1)类实例化时候构造方法要传参,类的公有方法也要传参,这样就不确定要把中间件里面的参数哪些传给构造方法哪些传给普通方法了。
      见5.8
   2) 这种分布式一般要求是幂等的,传啥参数有固定的结果,函数是无依赖状态的。类是封装的带有状态,方法依赖了对象的实例属性。
   3) 比如例子的add方法是一个是实例方法,看起来好像传个y的值就可以,实际是add要接受两个入参,一个是self,一个是y。如果把self推到消息队列,那就不好玩了。
      对象的序列化浪费磁盘空间,浪费网速传输大体积消息,浪费cpu 序列化和反序列化。所以此框架的入参已近说明了,
      仅仅支持能够被json序列化的东西,像普通的自定义类型的对象就不能被json序列化了。
       celery也是这样的,演示的例子也是用函数(也可以是静态方法),而不是类或者实例方法,
       这不是刻意要和celery一样,原因已经说了,自己好好体会好好想想原因吧。
   
   框架如何调用你代码里面的类。
   假设你的代码是:
   class A():
      def __init__(x):
          self.x = x
       
      def add(self,y):
          return self.x + y
   
   那么你不能 a =A(1) ; a.add.push(2),因为self也是入参之一,不能只发布y,要吧a对象(self)也发布进来。
   add(2)的结果是不确定的,他是受到a对象的x属性的影响的,如果x的属性是100,那么a.add(2)的结果是102.
   如果框架对实例方法,自动发布对象本身作为第一个入参到中间件,那么就需要采用pickle序列化,picke序列化对象,
   消耗的cpu很大,占用的消息体积也很大,而且相当一大部分的对象压根无法支持pickle序列化。
   无法支持序列化的对象我举个例子,
   
import pickle
import threading
import redis

class CannotPickleObject:
   def __init__(self):
       self._lock = threading.Lock()


class CannotPickleObject2:
   def __init__(self):
       self._redis = redis.Redis()

print(pickle.dumps(CannotPickleObject())) # 报错,因为lock对象无法pickle
print(pickle.dumps(CannotPickleObject2())) # 报错,因为redis客户端对象也有一个属性是lock对象。

以上这两个对象如果你想实例化,那就是天方夜谭,不可能绝对不可能。
真实场景下,一个类的对象包含了很多属性,而属性指向另一个对象,另一个对象的属性指向下一个对象,
只要其中某一个属性的对象不可pickle序列化,那么此对象就无法pickle序列化。
pickle序列化并不是全能的,所以经常才出现python在win下的多进程启动报错,
因为windows开多进程需要序列化入参,但复杂的入参,例如不是简单的数字 字母,而是一个自定义对象,
万一这个对象无法序列化,那么win上启动多进程就会直接报错。

        
所以如果为了调度上面的class A的add方法,你需要再写一个函数
def your_task(x,y):
   return  A(x).add(y)
然后把这个your_task函数传给框架就可以了。所以此框架和你在项目里面写类不是冲突的,
本人是100%推崇oop编程,非常鲜明的反对极端面向过程编程写代码,但是此框架鼓励你写函数而不是类+实例方法。
框架能支持@staticmethod装饰的静态方法,不支持实例方法,因为静态方法的第一个入参不是self。
   
   
如果对以上为什么不支持实例方法解释还是无法搞明白,主要是说明没静下心来仔细想想,
如果是你设计框架,你会怎么让框架支持实例方法?

statckflow上提问,celery为什么不支持实例方法加@task
https://stackoverflow.com/questions/39490052/how-to-make-any-method-from-view-model-as-celery-task

celery的作者的回答是:

You can create tasks out of methods. The bad thing about this is that the object itself gets passed around 
(because the state of the object in worker has to be same as the state of the caller) 
in order for it to be called, so you lose some flexibility. So your object has to be pickled every 
time, which is why I am against this solution. Of course this concerns only class methods, s
tatic methods have no such problem.

Another solution, which I like, is to create separate tasks.py or class based tasks and call the methods 
from within them. This way, you will have FULL control over Analytics object within your worker.

这段英文的意思和我上面解释的完全一样。所以主要是你没仔细思考想想为什么不支持实例方法。
 

6.6 是怎么调度一个函数的。

    答:基本原理如下
    
    def add(a,b):
        print(a + b)
        
    从消息中间件里面取出参数{"a":1,"b":2}
    然后使用  add(**{"a":1,"b":2}),就是这样运行函数的。

6.7 框架适用哪些场景?

     答:分布式 、并发、 控频、断点接续运行、定时、指定时间不运行、
         消费确认、重试指定次数、重新入队、超时杀死、计算消费次数速度、预估消费时间、
         函数运行日志记录、任务过滤、任务过期丢弃等数十种功能。
        
         只需要其中的某一种功能就可以使用这。即使不用分布式,也可以使用python内置queue对象。
         这就是给函数添加几十项控制的超级装饰器。是快速写代码的生产力保障。
         
         适合一切耗时的函数,不管是cpu密集型 还是io密集型。
         
       不适合的场景主要是:
          比如你的函数非常简单,仅仅只需要1微妙 几十纳秒就能完成运行,比如做两数之和,print一下hello,这种就不是分需要使用这种框架了,
          如果没有解耦的需求,直接调用这样的简单函数她不香吗,还加个消息队列在中间,那是多此一举。
          

6.8 怎么引入使用这个框架?门槛高不高?

 答:先写自己的函数(类)来实现业务逻辑需求,不需要思考怎么导入框架。
     写好函数后把 函数和队列名字绑定传给消费框架就可以了。一行代码就能启动分布式消费。
     在你的函数上面加@task_deco装饰器,执行 your_function.conusme() 就能自动消费。
     所以即使你不想用这个框架了,你写的your_function函数代码并没有作废。
     所以不管是引入这个框架 、废弃使用这个框架、 换成celery框架,你项目的99%行 的业务代码都还是有用的,并没有成为废物。
     别的框架如flask换django,scrapy换spider,代码形式就成了废物。

6.9 怎么写框架?

 答: 需要学习真oop和36种设计模式。唯有oop编程思想和设计模式,才能持续设计开发出新的好用的包甚至框架。
     如果有不信这句话的,你觉得可以使用纯函数编程,使用0个类来实现这样的框架。
     
     如果完全不理会设计模式,实现threding gevent evenlet 3种并发模式,加上10种中间件类型,实现分布式消费流程,
     需要反复复制粘贴扣字30次。代码绝对比你这个多。例如基于nsq消息队列实现任务队列框架,加空格只用了80行。
     如果完全反对oop,需要多复制好几千行来实现。

     例如没听说设计模式的人,在写完rabbitmq版本后写redis版本,肯定十有八九是在rabbitmq版本写完后,把整个所有文件夹,
     全盘复制粘贴,然后在里面扣字母修改,把有关rabbitmq操作的全部扣字眼修改成redis。如果某个逻辑需要修改,
     要在两个地方都修改,更别说这是10几种中间件,改一次逻辑需要修改10几次。
     我接手维护得老项目很多,这种写法的编程思维的是特别常见的,主要是从来没听说设计模式4个字造成的,
     在我没主动学习设计模式之前,我也肯定会是这么写代码的。
     
     
     只要按照36种设计模式里面的oop4步转化公式思维写代码三个月,光就代码层面而言,写代码的速度、流畅度、可维护性
     不会比三年经验的老程序员差,顶多是老程序员的数据库 中间件种类掌握的多一点而已,这个有机会接触只要花时间就能追赶上,
     但是编程思维层次,如果没觉悟到,可不是那么容易转变的,包括有些科班大学学过java的也没这种意识,
     非科班的只要牢牢抓住把设计模式 oop思维放在第一重要位置,写出来的代码就会比科班好,
     不能光学 if else 字典 列表 的基本语法,以前我看python pdf资料时候,资料里面经常会有两章以上讲到类,
     我非常头疼,一看到这里的章节,就直接跳过结束学习了,现在我也许只会特意去看这些章节,
     然后看资料里面有没有把最本质的特点讲述好,从而让用户知道为什么要使用oop,而不是讲下类的语法,这样导致用户还是不会去使用的。
     
     
     你来写完包括完成10种中间件和3种并发模式,并且预留消息中间件的扩展。
     然后我们来和此框架 比较 实现框架难度上、 实现框架的代码行数上、 用户调用的难度上 这些方面。

6.10 框架能做什么

答:你在你的函数里面写什么,框架就是自动并发做什么。
框架在你的函数上加了自动使用消息队列、分布式、自动多进程+多线程(协程)超高并发、qps控频、自动重试。
只是增加了稳定性、扩展性、并发,但做什么任务是你的函数里面的代码目的决定的。

只要是你代码涉及到了使用并发,涉及到了手动调用线程或线程池或asyncio,那么就可以使用此框架,
使你的代码本身里面就不需要亲自操作任何线程 协程 asyncio了。

不需要使用此框架的场景是函数不需要消耗cpu也不需要消耗io,例如print("hello"),如果1微秒就能完成的任务不需要使用此框架。

6.11 日志的颜色不好看或者觉得太绚丽刺瞎眼,想要调整。


一 、关于日志颜色是使用的 \033实现的,控制台日志颜色不光是颜色代码决定的,最主要还是和ide的自身配色主题有关系,
同一个颜色代码,在pycahrm的十几个控制台颜色主题中,表现的都不一样。
所以代码一运行时候就已经能提示用户怎么设置优化控制台颜色了,文这个问题说明完全没看控制台的提示。
"""
1)使用pycharm时候,建议重新自定义设置pycharm的console里面的主题颜色。
   设置方式为 打开pycharm的 file -> settings -> Editor -> Color Scheme -> Console Colors 选择monokai,
   并重新修改自定义6个颜色,设置Blue为1585FF,Cyan为06B8B8,Green 为 05A53F,Magenta为 ff1cd5,red为FF0207,yellow为FFB009。         
2)使用xshell或finashell工具连接linux也可以自定义主题颜色,默认使用shell连接工具的颜色也可以。

颜色效果如连接 https://i.niupic.com/images/2020/11/04/8WZf.png

在当前项目根目录的 nb_log_config.py 中可以修改当get_logger方法不传参时后的默认日志行为。
"""



二、关于日志太绚丽,你觉得不需要背景色块,在当前项目根目录的 nb_log_config.py 中可以设置
DISPLAY_BACKGROUD_COLOR_IN_CONSOLE = False  # 在控制台是否显示彩色块状的日志。为False则不使用大块的背景颜色。

6.12 是不是抄袭模仿 celery

答:有20种优势,例如celery不支持asyncio、celery的控频严重不精确,光抄袭解决不了。
我到现在也只能通过实际运行来达到了解推车celery的目的,并不能直接默读代码就搞懂。
celery的层层继承,特别是层层组合,又没多少类型提示,说能精通里面每一行源码的人,多数是高估自己自信过头了。

celery的代码太魔幻,不运行想默读就看懂是不可能的,不信的人可以把自己关在小黑屋不吃不喝把celery源码背诵3个月,
然后3个月后 试试默写能不能写出来实现里面的兼容 多种中间件 + 多种并发模式 + 几十种控制方式的框架。

这是从一个乞丐版精简框架衍生的,加上36种设计模式付诸实践。

此框架运行print hello函数, 性能强过celery 20倍以上(测试每秒消费次数,具体看我的性能对比项目)。
此框架支持的中间件比celery多
此框架引用方式和celery完全不一样,完全不依赖任何特定的项目结构,celery门槛很高。
此框架和celery没有关系,没有受到celery启发,也不可能找出与celery连续3行一模一样的代码。
这个是从原来项目代码里面大量重复while 1:redis.blpop()  发散扩展的。

这个和celery唯一有相同点是,都是生产者 消费者 + 消息队列中间件的模式,这种生产消费的编程思想或者叫想法不是celery的专利。
包括我们现在java框架实时处理数据的,其实也就是生产者 消费者加kfaka中间件封装的,难道java人员开发框架时候也是需要模仿一下python celery源码或者思想吗。
任何人都有资格开发封装生产者消费者模式的框架,生产者 消费者模式不是celery专利。生产消费模式很容易想到,不是什么高深的架构思想,不需要受到celery的启发才能开发。

6.13 使用此框架时候,在一个python项目中如何连接多个相同种类的消息队列中间件ip地址

这个问题是问一个项目中,有些脚本要连接 192.168.0.1的redis ,有些脚本要连接192.168.0.2的redis,但框架配置文件只有一个,如何解决?

例如目录结构是
your_proj/
      distributed_frame_config.py   (此文件是第一次启动任意消费脚本后自动生成的,用户按需修改配置)
      dira/a_consumer.py  (此脚本中启动funa函数消费)
      dirb/b_consumer.py   (此脚本中启动funb函数消费)
      
如果funa函数要连接 192.168.0.1的redis,funb函数要连接192.168.0.2的redis,有两种解决方式

第一种是在启动消费的脚本,脚本里面手动调用 patch_frame_config()函数来设置各种中间件的值

第二种是 把 distributed_frame_config.py  分别复制到dira和dirb文件夹.
这种就会自动优先使用 a_consumer.py和b_consumer.py同文件夹层级的配置了,
而非是自动优先读取python项目根目录的配置文件,这个是利用了python语言的import 模块导入优先级机制。

6.14 什么是确认消费?为什么框架总是强调确认消费?

发布端:

from scripxx  import fun

for i in range(10):
    fun.push(i)

消费端:

import time
from function_scheduling_distributed_framework import task_deco

@task_deco('test_confirm')
def fun(x):
    print(f'开始处理 {x}')
    time.sleep(120)
    print(f'处理完成 {x}')

fun.consume()
启动消费脚本后,任意时刻随意强制反复关闭重启消费代码,只要函数没有完整的执行完成,函数参数就不会丢失。达到了消息万无一失。
具体的那些中间件消费者支持消费确认,具体见 3.1 介绍。
实现了4种redis消息队列中间件,其中有3种是确认消费的。

确认消费很重要,如果你自己写个简单粗暴的 while 1:redis.blpop()的脚本,你以为是可以断点接续呢,
在多线程并发执行函数时候,大量的消息会丢的很惨。导致虽然是断点接续但你不敢随意重启。

6.15 如何等待队列中的消息全部消费完成

如果有这种需求需要等待消费完成,使用 wait_for_possible_has_finish_all_tasks()

f.consume()
f.wait_for_possible_has_finish_all_tasks(minutes=3)  # 框架提供阻塞方法,直至队列任务全部消费完成,才会运行到下一行。
print("over")   # 如果不加上面那一行,这个会迅速打印over

6.16 框架支不支持函数上加两个装饰器?

如图所示,消费函数上支持两个装饰器,加100个装饰器都可以。

注意事项:
1、task_deco需要放在最上面。
2、不要忘了给装饰器内部函数加上 wraps(f),否则如果使用 fun.multi_process_consume(2)多进程方式无法启动消费。
 wraps是将 被修饰的函数(wrapped) 的一些属性值赋值给 修饰器函数(wrapper)

_images/img_20.pngimg_20.png

6.17 嫌框架日志记录太详细?

日志是了解当前框架正在运行什么的好手段,不然用户懵逼不知道背后在发生执行什么。
@task_deco 装饰器设置 log_level=20 或logging.INFO,就不会再记录框架正在运行什么函数了。
如图再装饰器加上 log_level=20后,框架以后就再也不会记录框架正在运行什么函数入参结果是什么了。

_images/img_31.pngimg_31.png

6.18 框架与你项目依赖的三方包版本不一致冲突?

用户完全可以自由选择任何三方包版本。例如你的 sqlalchemy pymongo等等与框架需要的版本不一致,你完全可以自由选择任何版本。
我开发时候实现了很多种中间件,没有时间长期对每一种中间件三方包的每个发布版本都做兼容测试,所以我固定死了。
用户完全可以选择自己的三方包版本,大胆点,等报错了再说,不出错怎么进步,不要怕代码报错,请大胆点升级你想用的版本。

如果是用requirements.txt方式自动安装三方包,我建议你在文件中第一行写上 function_scheduling_distributed_framework,之后再写其它包
这样就能使用你喜欢的版本覆盖此框架依赖的版本了。
等用的时候报错了再说,提isuu我做兼容适配。一般不会报错的大胆点。

7.更新记录

7.1 新增第十种Consumer,以redis为中间件,但增加了消费确认,是RedisConsumerAckAble类。

支持运行过程中,随意关闭和启动python程序。无惧反复关闭python和 突然断电导致任务丢失几百个。

之前开100线程/协程的话,随意重启python和断电会导致极大概率丢失200个任务。

官方Threadpoolexecutor是无界队列。使用这个会导致丢失无数个任务,
因为他会迅速把redis的消息全部取出来,添加到自己的queue队列慢慢消费。
因为这个原因所以需要自定义写BoundedThreadpoolexecutor和CustomThreadpoolexecutor。       

改版的CustomThreadpoolexecutor修改成了queue最大长度是max_works,自己内部存储100个,
运行中100个,突然关闭python会丢失200个任务。如果queue设置大小为0,则只会丢失100个运行中的任务。

采用的是消费者去除消息时候,用lua脚本同时pop和添加到unacked的独立zset中,函数运行成功后会从set中删除该任务。
同时有一个一直每隔5秒发送心跳到redis服务中的线程,心跳标识中有消费者的唯一标识,绝对不会重复。
如果突然关闭消费者(例如突然断电或者点击关闭python),那么该消费者的心跳将会停止了。这时其他机器的同队列消费者或者当前机器重新启动代码后,在15秒后会
检到被关闭的消费者是非活跃消费者,那么自动将该消费者的unack里面任务全部重新取出返回到待消费队列中。

RedisConsumerAckAble类比RedisConsumer会有一丝丝性能损耗,但python玩redis大部分情况还是python代码本身有性能瓶颈,
而不是造成redis服务端有性能瓶颈,一般只要用在有意义的业务上,就算python很忙把cpu占光了,也不会造成redis服务端达到极限,
python是性能很差的语言,没玩垮redis,自身就把电脑玩死了,所以大部分情况下不要在意加入确认消费后产生额外的对redis服务端的性能压力。

redis要是能直接作为mq使用,redis早就一统天下了,哪里还不断有几十种mq出来。
所以直接基于redis list的如果要做到可靠就必须改进。

7.2 新增基于以redis为消息中间件时候的页面管理和消费速度显示。

基于redisboard,但对redis的list模拟mq功能,进行页面显示优化突出消息队列消费,
加黄显示正在运行中的队列和每10秒的消费速度。每隔10秒自动刷新统计。

由于实时发布和消费,例如10秒内发布20个,消费50个,页面只能显示大小降低了30个,
这个只有专业的mq才能分别显示出来,redis list只是简单数组。

rabbitmq nsq都有官方自带速率显示。

https://i.niupic.com/images/2019/08/26/_122.pngImage text

7.3 新增一个10行代码的函数的最精简乞丐版实现的分布式函数执行框架.

新增一个10行代码的函数的最精简乞丐版实现的分布式函数执行框架,演示最本质实现原理,不要亲自这么使用。

beggar_redis_consumer.py文件的 start_consuming_message函数。

def start_consuming_message(queue_name, consume_function, threads_num):
    pool = ThreadPoolExecutor(threads_num)
    while True:
        try:
            redis_task = redis_db_frame.brpop(queue_name, timeout=60)
            if redis_task:
                task_str = redis_task[1].decode()
                print(f'从redis的 {queue_name} 队列中 取出的消息是: {task_str}')
                pool.submit(consume_function, **json.loads(task_str))
            else:
                print(f'redis的 {queue_name} 队列中没有任务')
        except redis.RedisError as e:
            print(e)


def add(x, y):
    time.sleep(5)
    print(f'{x} + {y} 的结果是 {x + y}')


# 推送任务
for i in range(100):
    redis_db_frame.lpush('test_beggar_redis_consumer_queue', json.dumps(dict(x=i, y=i * 2)))

# 消费任务 
start_consuming_message('test_beggar_redis_consumer_queue', consume_function=add, threads_num=10)

看完整版代码很长很多,是由于控制功能太多,中间件类型多,并发模式多, 所以加入一个最精简版,精简版的本质实现原理和完整版相同。

7.4 新增sqlachemy 支持的数据库作为消息中间件

新增sqlachemy 支持的数据库作为消息中间件,包括sqlserver mysql postgre oracle sqlite

每个队列是一张表模拟的。

https://i.niupic.com/images/2020/01/13/6hkO.pngImage text

每个任务是表里面的一行记录。

https://i.niupic.com/images/2020/01/11/6gZr.pngImage text

7.5 日志改为导入独立包nb_log,支持用户配置文件自定义日志配置。

例如设置默认需不需要彩色,需不需要大背景彩色色块,需不需要自动拦截转化python内置的print. 在用户当前项目根目录下生成的nb_log_config.py 可以自定义优先日志配置。

7.6 优化qps控频。

将qps按范围分段,采用不同的等待或计数方式。使当qps设置很高的时候,控频更精确。

增加了分布式控频,需要依赖redis中间件。
分布式环境中的控频指的是,假如xx.py文件中有一个consumer,设置func函数的qps为10。
如果在线上部署了三个容器服务,如果不使用分布式控频,则func函数的每秒运行总次数会是30。
即使只有1台机器,如果开多进程,Process运行3个进程,或者把xx.py反复运行启动3个,
也会造成func函数每秒运行总次数是30。
分布式控频主要是解决这种问题。默认不使用分布式控频,
当设置 is_using_distributed_frequency_control为True的时候,使用分布式控频。

7.7 增加rocketmq支持。 (2020-7)

from function_scheduling_distributed_framework import task_deco, BrokerEnum


@task_deco('queue_test_f03', qps=2, broker_kind=BrokerEnum.ROCKETMQ)
def f(a, b):
    print(f'{a} + {b} = {a + b}')


if __name__ == '__main__':
    for i in range(100):
        f.push(i, i * 2)
    f.consume()

7.8 新增 async 并发模式 (2020-12)

之前一直都没支持这种并发模式,异步代码不仅消费函数本身与同步代码很多不同,例如函数的定义和调用以及三方库,
不同于gevent和eventlet打个猴子补丁就可以变并发方式并且代码保持100%原样,asyncio的方式代比同步码真的是要大改特改。
而且在框架层面要支持异步也要增加和修改很多,支持异步并不是很容易。这一点连celery5.0目前都还没支持到(据官方文档说5.0要加入支持,但目前的5.0.3还没加入。)

如果消费函数已经写成了async def这种,那么可以设置 concurrent_mode=ConcurrentModeEnum.ASYNC,
框架会在一个新的线程的loop里面自动运行协程,所有协程任务会自动在一个loop里面运行,不是每次临时都生成新的loop只运行一个当前任务方式。

from function_scheduling_distributed_framework import task_deco, BrokerEnum, ConcurrentModeEnum
import asyncio


# 此段代码使用的是语言级Queue队列,不需要安装中间件,可以直接复制运行测试。
@task_deco('test_async_queue2', concurrent_mode=ConcurrentModeEnum.ASYNC,
           broker_kind=BrokerEnum.LOCAL_PYTHON_QUEUE, concurrent_num=500, qps=20)
async def async_f(x):
    # 测试异步阻塞并发, 此处不能写成time.sleep(1),否则无论设置多高的并发,1秒钟最多只能运行1次函数。
    # 同理asyncio 不能和 requests搭配,要和 aiohttp 搭配。
    await asyncio.sleep(1)
    print(id(asyncio.get_event_loop()))
    # 通过 id 可以看到每个并发函数使用的都是同一个loop,而不是采用了愚蠢的临时 asyncio.new_event_loop().run_until_complete(async_f(x)) 方式调度。
    print(x)


if __name__ == '__main__':
    async_f.clear()
    for i in range(100):
        async_f.push(i, )
    async_f.consume()

7.8.2 gevent/eventlet 和 asyncio 用法区别感受

比方说汽车的自动挡和手动挡,学了手动挡一定会开自动挡,只学自动挡很难开手动挡。
asyncio方式的代码比正常普通同步思维的代码写法也要难得多了,能玩asyncio的人一定会用threading gevent,
但只用过threading gevent,不去专门学习asyncio的用法,100%是玩不转的。

gevent就像自动挡汽车,自动换挡相当于自动切换阻塞。
asyncio就像手动挡,全要靠自己写 await / async def /loop / run_until_complete /run_forever/ 
run_coroutine_threadsafe /wait / wait_for /get_event_loop / new_event_loop / get_running_loop
,写法很麻烦很难。异步多了一个loop就像手动挡汽车多了一个离合器一样,十分之难懂。

手动挡玩的溜性能比自动挡高也更省油。asyncio玩的溜那么他的io并发执行速度和效率也会更好,cpu消耗更少。
如果你写一般的代码,那就用同步方式思维来写吧,让分布式函数调度框架来替你自动并发就可以啦。
如果追求更好的控制和性能,不在乎代码写法上的麻烦,并且asyncio技术掌握的很溜,那就用asyncio的方式吧。 

7.8.3 关于 async 并发模式,为什么框架还使用 pyredis pika pymongo,而没有使用aioredis aiomongo

异步鬓发模式里面,整个调用链路必须是一旦异步,必须处处异步,在base_consumer.py的AbstractConsumer中,
方法 _async_run_consuming_function_with_confirm_and_retry里面使用的还是操作中间件的同步库,

主要是因为框架目前支持15种中间件,一个一个的使用异步模式的库操作中间件来实现,比现在代码起码要增加80%,无异于重写一个项目了。
异步和同步真的写法语法相差很大的,不信可以比比aiomysql 和pymysql库,aiohttp和requests,如果非常简单能实现异步,
那aiohttp和aiomysql作者为什么要写几万行代码来重新实现,不在原来基础上改造个七八行来实现?



目前此库对 消息拉取和消息消费完全是属于在两个不同的线程里面,井水不犯河水,所以用同步库拉取消息对asyncio的消费函数没有任何影响,不存在同步库阻塞异步库的问题。
对于消息确认 消息重新入队 任务过滤  mongo插入,都是采用的同步库,但是使用了 run_in_executor,
把这些操作在异步链路中交给线程池来运行了,同事这个线程池不是官方内置线程池,是智能缩小扩大线程池 ThreadPoolExecutorShrinkAble。
run_in_executor 会把一个同步的操作,sumbit提交给线程池,线程池返回的是一个concurrent.futures包的Future对象,
run_in_executor包装转化了这个Future(此Future不是asyncio的,不是一个awaitable对象)成为了一个asyncio包的Future对象,asyncio的Future对象可以被await,
所以这是非常快捷的同步阻塞函数在异步链路中转同步转异步语法的最佳方式。官方也是这么推荐的。

除了框架内部的阻塞函数是run_in_executor快速转化成非阻塞事件循环的,但是主要的用户的消费函数,是使用的真async模式运行在一个loop循环中的,
也即是单线陈鬓发运行用户的异步函数。

其次框架的同步阻塞函数,都是操作中间件类型的库,异步就是 入队 确认消费 查询是否过滤,这些操作一般都会在1毫秒之内完成,不阻塞太长的事件,
即使不使用run_in_executor,直接在异步链路使用这些同步操作,也没太大问题。一旦异步必须处处异步,说的是不能调用耗时太长的同步阻塞函数,
1毫秒的无伤大雅,因为celery 1秒钟最多能调度300个 def f: print(hello) 这样的无cpu 无io的函数,此框架调度运行速度任然超过celery。

     

还有一种调度起 async def定义 的消费函数方式是继续开多线程并发,然后使用 临时loop = get_event_loop,loop.run_until_complete,这方式愚蠢了,
相当于只是为了运行起这个函数,但全流程丝毫没有丁点异步。

7.9 2021-04 新增以 redis 的 stream 数据结构 为中间件的消息队列。

这个是 redis 的 真消息队列,这次是 真mq,
stream 数据结构功能更加丰富接近 rabbitmq kafka这种真mq的消息队列协议,比 list 做消息队列更强。
需要redis的服务端5.0版本以上才能使用这个数据结构。
代码文件在 function_scheduling_distributed_framework/consumers/redis_stream_consumer.py

这个 REDIS_STREAM 中间件和 REDIS_ACK_ABLE 都支持消费确认,不管客户端怎么掉线关闭,都可以确保消息万无一失。
BrokerEnum.REDIS 中间件 不支持消费确认,随意重启或者断电断线会丢失一批任务。
from function_scheduling_distributed_framework import task_deco, BrokerEnum


@task_deco('queue_test_f01', broker_kind=BrokerEnum.REDIS_STREAM, )
def f(a, b):
    print(f'{a} + {b} = {a + b}')


if __name__ == '__main__':
    for i in range(100):
        f.push(i, b=i * 2)
    f.consume()

7.10 2021-04 新增以 redis 的 list 为数据结构,但使用 brpoplpush 命令 双队列 作为中间件的消息队列。

此 brpoplpush 双队列方式 + 消费者唯一id标识的心跳检测,可以媲美 rabbitmq 的确认消费功能。

代码演示省略,设置broker_kind=BrokerEnum.RedisBrpopLpush就行了。 
@task_deco('queue_test_f01', broker_kind=BrokerEnum.RedisBrpopLpush,)

7.11 2021-04 新增以 zeromq 为中间件的消息队列。

zeromq 和rabbbitmq kafka redis都不同,这个不需要安装一个服务端软件,是纯代码的。
zeromq方式是启动一个端口,所以queue_name传一个大于20000小于65535的数字,不能传字母。

消费端代码,启动消费端时候会自动启动 broker 和 server。

import time
from function_scheduling_distributed_framework import task_deco, BrokerEnum


@task_deco('30778', broker_kind=BrokerEnum.ZEROMQ, qps=2)
def f(x):
    time.sleep(1)
    print(x)


if __name__ == '__main__':
    f.consume()

发布端代码

from test_frame.test_broker.test_consume import f

for i in range(100):
    f.push(i) 

7.12 2021-04 新增以 操作kombu包 为中间件的消息队列

一次性新增操作10种消息队列,.但比较知名的例如rabbitmq redis sqlite3 函数调度框架已经在之前实现了。
使用方式为设置 @task_deco 装饰器的 broker_kind 为 BrokerEnum.KOMBU
在你项目根目录下的 distributed_frame_config.py  文件中设置 
KOMBU_URL = 'redis://127.0.0.1:6379/7' 那么就是使用komb 操作redis。
KOMBU_URL = 'amqp://username:password@127.0.0.1:5672/',那么就是操纵rabbitmq
KOMBU_URL = 'sqla+sqlite:////dssf_sqlite.sqlite',那么就是在你的代码所在磁盘的根目录创建一个sqlite文件。四个////表示根目,三个///表示当前目录。
其余支持的中间件种类大概有10种,不是很常用,可以百度 google查询kombu或者celery的 broker_url 配置方式。

操作 kombu 包,这个包也是celery的中间件依赖包,这个包可以操作10种中间件(例如rabbitmq redis),
但没包括分布式函数调度框架能支持的kafka nsq zeromq 等。


但是 kombu 包的性能非常差,如何测试对比性能呢?
可以用原生redis的lpush和kombu的publish测试发布
使用brpop 和 kombu 的 drain_events测试消费,对比差距相差了5到10倍。
由于性能差,除非是分布式函数调度框架没实现的中间件才选kombu方式(例如kombu支持亚马逊队列  qpid pyro 队列),
否则强烈建议使用此框架的操作中间件方式而不是使用kombu。

可以把@task_deco装饰器的broker_kind参数 设置为 BrokerEnum.REDIS_ACK_ABLE 和BrokerEnum.KOMBU(配置文件的KOMBU_URL配置为redis),
进行对比,REDIS_ACK_ABLE的消费速度远远超过 BrokerEnum.KOMBU,所以之前专门测试对比celery和此框架的性能,
差距很大,光一个 kombu 就拉了celery大腿很多,再加上celery的除了kombu的执行性能也很低,所以celery比此框架慢很多。
test_frame\test_celery 下面有celery的发布 消费例子,可以测试对比下速度,同样gevent 并发和redis中间件,
celery 执行 print hello 这样的最简单任务,单核单进程每秒执行次数过不了300,celery性能真的是太差了。

消费

import time
from function_scheduling_distributed_framework import task_deco, BrokerEnum


@task_deco('test_kombu2', broker_kind=BrokerEnum.KOMBU, qps=5, )
def f(x):
    time.sleep(60)
    print(x)


if __name__ == '__main__':
    f.consume()

发布

from test_frame.test_broker.test_consume import f

for i in range(10000):
    f.push(i)

你项目根目录下的 distributed_frame_config.py

KOMBU_URL = 'redis://127.0.0.1:6379/7'
# KOMBU_URL = f'amqp://{RABBITMQ_USER}:{RABBITMQ_PASS}@{RABBITMQ_HOST}:{RABBITMQ_PORT}/{RABBITMQ_VIRTUAL_HOST}'
# KOMBU_URL = 'sqla+sqlite:////celery_sqlite3.sqlite'  # 4个//// 代表磁盘根目录下生成一个文件。推荐绝对路径。3个///是相对路径。

7.14 2021-04 新增以mqtt emq 作为消息中间件

例子,设置 broker_kind=BrokerEnum.MQTT

from function_scheduling_distributed_framework import task_deco, BrokerEnum


@task_deco('mqtt_topic_test', broker_kind=BrokerEnum.MQTT)
def f(x, y):
    print(f''' {x} + {y} = {x + y}''')
    return x + y


for i in range(100):
    f.push(i, i * 2)

f.consume()
这个默认做成服务端不存储消息,mqtt中间件适合前后端实时交互的。可以直接绕开后端flask django 接口,不用写接口,
前端直接发任务到mqtt,后端订阅,后端完成后,发送结果到唯一任务的topic

当然也可以 前端订阅topic,前端发任务到python flask接口,flask接口中发布任务到rabbitmq redis等,
后台消费完成把函数结果发布到mqtt,mqtt推送给前端
此框架的消费做成了mqtt的共享订阅,例如启动多个重复的消费者脚本,不会所有消费脚本都去重复处理一个消息

7.15 2021-04 新增以 httpsqs 作为消息中间件

@task_deco('httpsqs_queue_test',broker_kind=BrokerEnum.HTTPSQS)

7.16 2021-04 新增支持下一代分布式消息系统 pulsar 。

@task_deco('httpsqs_queue_test',broker_kind=BrokerEnum.PULSAR)

使用此中间件,代码必须在linux mac上运行。

在开源的业界已经有这么多消息队列中间件了,pulsar作为一个新势力到底有什么优点呢?
pulsar自从出身就不断的再和其他的消息队列(kafka,rocketmq等等)做比较,但是Pulsar的设计思想和大多数的消息队列中间件都不同
,具备了高吞吐,低延迟,计算存储分离,多租户,异地复制等功能,所以pulsar也被誉为下一代消息队列中间件

pulsar 的消费者数量可以不受topic 分区数量的限制,比kafka和rabbitmq 强。5年后会替代kafka rabbitmq。
 

7.17 2021-04 新增延时运行任务,介绍见4.8

7.18 2021-09 新增 轻松远程服务器部署运行函数

框架叫分布式函数调度框架,可以在多台机器运行,因为消息队列任务是共享的。

我用的时候生产环境是使用 阿里云 codepipeline k8s部署的多个容器。还算方便。
在测试环境一般就是单机多进程运行的,用supervisor部署很方便。

所以之前没有涉及到多态机器的轻松自动部署。
如果要实现轻松的部署多台物理机,不借助除了python以外的其他手段的话,只能每台机器登录上然后下载代码,启动运行命令,机器多了还是有点烦的。
现在最新加入了 Python代码级的函数任务部署,不需要借助其他手段,python代码自动上传代码到远程服务器,并自动启动函数消费任务。
目前的自动化在远程机器启动函数消费,连celery都没有做到。

不依赖阿里云codepipeline 和任何运维发布管理工具,只需要在python代码层面就能实现多机器远程部署。

7.19 2021-09 新增 socket udp/tcp/http 消息队列,不需要安装消息中间件软件。

消费脚本,test_udp_consumer.py

必须先启动这个consume脚本消费然后才能发布,因为消费脚本里面启动了socket服务端,socket客户端才能连接。

from function_scheduling_distributed_framework import task_deco,BrokerEnum


@task_deco('127.0.0.1:5689',broker_kind=BrokerEnum.UDP)  #使用BrokerEnum.TCP就是tcp,BrokerEnum.HTTP就是http
def f(x):
    print(x)


if __name__ == '__main__':
    f.consume()
    f.push('hello')

发布脚本,test_udp_publisher.py

启动这个脚本之前必须先启动上面的消费脚本启动sockert服务端。

import time
from test_udp_consumer import f

for i in range(100000):
    time.sleep(2)
    f.push(i)

7.20 2021-09 新增 支持 nats 高性能消息队列

用法一如既往,只需要修改broker_kind的枚举,并在 distributed_frame_config.py 配置好 NATS_URL 的值就完了。

@task_deco('test_queue66c', broker_kind=BrokerEnum.NATS)
def f(x, y):
    pass

https://visitor-badge.glitch.me/badge?page_id=distributed_framework

8.用于爬虫

很多人问这个框架能不能爬虫?

答:此框架不仅可以对标celery框架,也可以取代scrapy框架。

无论是比 自由度、兼容常规代码程度、用户需要编写的实际代码行数、消息万无一失可靠度、对爬虫的qps频率/并发控制手段、超高速调度请求, 此框架从以上任何方面远远超过scrapy,一切都是只会是有过之而无不及。

应该还是主要是很浮躁,不仅没看详细文档,应该是连简介都没看。
分布式函数调度框架,定位于调度用户的任何函数,只要用户在函数里面写爬虫代码,就可以分布式调度爬虫,并且对爬虫函数施加20种控制功能,
例如 qps恒定 任何时候随意关机重启代码消息万无一失确认消费 非常简单的开启多进程叠加线程/协程,这些强大的功能绝大部分爬虫框架还做不到。

此框架如果用于爬虫,不管从任何方面比较可以领先scrapy20年,也比任意写的爬虫框架领先10年。
不是框架作者代码编写实力问题,主要是思维问题,爬虫框架一般就设计为url请求调度框架,url怎么请求都是被框内置架束缚死了,
所以有些奇葩独特的想法在那种框架里面难以实现,需要非常之精通框架本身然后改造框架才能达到随心所欲的驾驭的目的。
而此框架是函数调度框架,函数里面可以实现一切任意自由想法,天生不会有任何束缚。
主要还是思想问题,国内一般人设计的爬虫框架都是仿scrapy api,天生不自由受束缚。

此框架如果用于写爬虫,建议的写法是一种页面(或者接口)对应一个函数,例如列表页是一个函数,详情页是一个函数。 1个函数里面只包括一次请求(也可以两三次请求,但不要在函数本身里面去for循环遍历发十几个请求这种写法),

8.1 演示获取汽车之家资讯的 新闻 导购 和 评测 3个板块 的 文章。

页面连接 https://www.autohome.com.cn/all/

这是一个非常经典的列表页-详情页两层级爬虫调度。演示爬虫一定最少需要演示两个层级的调度,只要会了两层级爬虫,3层级就很简单。
此框架如果用于写爬虫,建议的写法是一种页面(或者接口)对应一个函数,例如列表页是一个函数,详情页是一个函数。
1个函数里面只包括一次请求(也可以两三次请求,但不要在函数本身里面去for循环遍历发十几个请求这种写法),
"""
演示分布式函数调度框架来驱动爬虫函数,使用此框架可以达使爬虫任务 自动调度、 分布式运行、确认消费万无一失、超高速自动并发、精确控频、
种子过滤(函数入参过滤实现的)、自动重试、定时爬取。可谓实现了一个爬虫框架应有的所有功能。

此框架是自动调度一个函数,而不是自动调度一个url请求,一般框架是yield Requet(),所以不兼容用户自己手写requests urllib的请求,
如果用户对请求有特殊的定制,要么就需要手写中间件添加到框架的钩子,复杂的需要高度自定义的特殊请求在这些框架中甚至无法实现,极端不自由。

此框架由于是调度一个函数,在函数里面写 请求 解析 入库,用户想怎么写就怎么写,极端自由,使用户编码思想放荡不羁但整体上有统一的调度。
还能直接复用用户的老函数,例如之前是裸写requests爬虫,没有规划成使用框架爬虫,那么只要在函数上面加一个@task_deco的装饰器就可以自动调度了。

而90%一般普通爬虫框架与用户手写requests 请求解析存储,在流程逻辑上是严重互斥的,要改造成使用这种框架改造会很大。
此框架如果用于爬虫和国内那些90%仿scrapy api的爬虫框架,在思想上完全不同,会使人眼界大开,思想之奔放与被scrapy api束缚死死的那种框架比起来有云泥之别。
因为国内的框架都是仿scrapy api,必须要继承框架的Spider基类,然后重写 def parse,然后在parse里面yield Request(url,callback=annother_parse),
请求逻辑实现被 Request 类束缚得死死的,没有方便自定义的空间,一般都是要写middware拦截http请求的各个流程,写一些逻辑,那种代码极端不自由,而且怎么写middware,
也是被框架束缚的死死的,很难学。分布式函数调度框架由于是自动调度函数而不是自动调度url请求,所以天生不存在这些问题。

用其他爬虫框架需要继承BaseSpider类,重写一大堆方法写一大堆中间件方法和配置文件,在很多个文件夹中来回切换写代码。
而用这个爬虫,只需要学习 @task_deco 一个装饰器就行,代码行数大幅度减少,随意重启代码任务万无一失,大幅度减少操心。

这个爬虫例子具有代表性,因为实现了演示从列表页到详情页的分布式自动调度。

"""

"""
除了以上解释的最重要的极端自由的自定义请求解析存储,比普通爬虫框架更强的方面还有:
2、此爬虫框架支持 redis_ack_able rabbitmq模式,在爬虫大规模并发请求中状态时候,能够支持随意重启代码,种子任务万无一失,
   普通人做的reids.blpop,任务取出来正在消费,但是突然关闭代码再启动,瞬间丢失大量任务,这种框架那就是个伪断点接续。
3、此框架不仅能够支持恒定并发数量爬虫,也能支持恒定qps爬虫。例如规定1秒钟爬7个页面,一般人以为是开7个线程并发,这是大错特错,
  服务端响应时间没说是永远都刚好精确1秒,只有能恒定qps运行的框架,才能保证每秒爬7个页面,恒定并发数量的框架差远了。
4、能支持 任务过滤有效期缓存,普通爬虫框架全部都只能支持永久过滤,例如一个页面可能每周都更新,那不能搞成永久都过滤任务。
因为此框架带有20多种控制功能,所以普通爬虫框架能实现的控制,这个全部都自带了。
"""

爬虫任务消费代码

代码在 https://github.com/ydf0509/distributed_framework/tree/master/test_frame/car_home_crawler_sample/test_frame/car_home_crawler_sample/car_home_consumer.py

import requests
from parsel import Selector
from function_scheduling_distributed_framework import task_deco, BrokerEnum, run_consumer_with_multi_process


@task_deco('car_home_list', broker_kind=BrokerEnum.REDIS_ACK_ABLE, max_retry_times=5, qps=10)
def crawl_list_page(news_type, page, do_page_turning=False):
    """ 函数这里面的代码是用户想写什么就写什么,函数里面的代码和框架没有任何绑定关系
    例如用户可以用 urllib3请求 用正则表达式解析,没有强迫你用requests请求和parsel包解析。
    """
    url = f'https://www.autohome.com.cn/{news_type}/{page}/#liststart'
    resp_text = requests.get(url).text
    sel = Selector(resp_text)
    for li in sel.css('ul.article > li'):
        if len(li.extract()) > 100:  # 有的是这样的去掉。 <li id="ad_tw_04" style="display: none;">
            url_detail = 'https:' + li.xpath('./a/@href').extract_first()
            title = li.xpath('./a/h3/text()').extract_first()
            crawl_detail_page.push(url_detail, title=title, news_type=news_type)  # 发布详情页任务
    if do_page_turning:
        last_page = int(sel.css('#channelPage > a:nth-child(12)::text').extract_first())
        for p in range(2, last_page + 1):
            crawl_list_page.push(news_type, p)  # 列表页翻页。


@task_deco('car_home_detail', broker_kind=BrokerEnum.REDIS_ACK_ABLE, qps=50,
           do_task_filtering=True, is_using_distributed_frequency_control=True)
def crawl_detail_page(url, title, news_type):
    resp_text = requests.get(url).text
    sel = Selector(resp_text)
    author = sel.css('#articlewrap > div.article-info > div > a::text').extract_first() or
    sel.css('#articlewrap > div.article-info > div::text').extract_first() or ''
    author = author.replace("\n", "").strip()
    print(f'保存数据  {news_type}   {title} {author} {url} 到 数据库')  # 用户自由发挥保存。

if __name__ == '__main__':
    # crawl_list_page('news',1)
    crawl_list_page.consume()  # 启动列表页消费
    crawl_detail_page.consume()
    # 这样速度更猛,叠加多进程
    crawl_detail_page.multi_process_consume(4)

爬虫任务发布代码

代码在 https://github.com/ydf0509/distributed_framework/blob/master/test_frame/car_home_crawler_sample/car_home_publisher.py

from function_scheduling_distributed_framework import fsdf_background_scheduler
from test_frame.car_home_crawler_sample.car_home_consumer import crawl_list_page, crawl_detail_page

crawl_list_page.clear()  # 清空列表页
crawl_detail_page.clear()  # 清空详情页

# # 推送列表页首页,同时设置翻页为True
crawl_list_page.push('news', 1, do_page_turning=True)  # 新闻
crawl_list_page.push('advice', page=1, do_page_turning=True)  # 导购
crawl_list_page.push(news_type='drive', page=1, do_page_turning=True)  # 驾驶评测

# 定时任务,语法入参是apscheduler包相同。每隔120秒查询一次首页更新,这部分可以不要。
for news_typex in ['news', 'advice', 'drive']:
    fsdf_background_scheduler.add_timing_publish_job(crawl_list_page, 'interval', seconds=120, kwargs={"news_type": news_typex, "page": 1, "do_page_turning": False})
fsdf_background_scheduler.start()  # 启动首页查询有没有新的新闻的定时发布任务
从消费代码可以看出,代码就是常规思维的平铺直叙主线程思维写代码,写函数代码时候无需考虑和框架怎么结合,写完后加个@task_deco装饰器就行了。
因为这不是类似国内的仿scrapy框架必须要求你必须继承个什么类,强迫你重写什么方法,然后yield Request(your_url,callback=my_parse)
此框架爬虫既能实现你无拘无束使用任意包来请求url和解析网页,又能很方便的使用到自动超高并发 超高可靠性的万无一失断点续传。

qps设置为很低时候,为了展示更多控制台日志流程细节,分布式函数调度框架驱动爬虫函数的慢速爬取运行截图。 _images/img_7.png

qps设置高时候的运行截图,分布式函数调度框架驱动爬虫函数的快速爬取运行截图。 _images/img_6.png

8.2 演示经典的豆瓣top250电影的爬虫

页面连接 https://movie.douban.com/top250

这是一个非常经典的列表页-详情页两层级爬虫调度。演示爬虫一定最少需要演示两个层级的调度,只要会了两层级爬虫,3层级就很简单。

此框架如果用于写爬虫,建议的写法是一种页面(或者接口)对应一个函数,例如列表页是一个函数,详情页是一个函数。

1个函数里面只包括一次请求(也可以两三次请求,但不要在函数本身里面去for循环遍历发十几个请求这种写法),
from function_scheduling_distributed_framework import task_deco, BrokerEnum
import requests
from parsel import Selector

HEADERS = {'User-Agent': 'Mozilla/4.0 (compatible; MSIE 8.0; Windows NT 6.0)', }


@task_deco('douban_list_page_task_queue', broker_kind=BrokerEnum.PERSISTQUEUE, qps=0.1)  # qps 自由调节精确每秒爬多少次,远强于一般框架只能指定固定的并发线程数量。
def craw_list_page(page):
    """ 函数这里面的代码是用户想写什么就写什么,函数里面的代码和框架没有任何绑定关系,框架只对函数负责,不对请求负责。
    例如用户可以用 urllib3请求 用正则表达式解析,没有强迫你用requests请求和parsel包解析。
    """
    """ 豆瓣列表页,获取列表页电影链接"""
    url = f'https://movie.douban.com/top250?start={page * 25}&filter='
    resp = requests.get(url, headers=HEADERS)
    sel = Selector(resp.text)
    for li_item in sel.xpath('//*[@id="content"]/div/div[1]/ol/li'):
        movie_name = li_item.xpath('./div/div[2]/div[1]/a/span[1]/text()').extract_first()
        movei_detail_url = li_item.xpath('./div/div[2]/div[1]/a/@href').extract_first()
        craw_detail_page.push(movei_detail_url, movie_name)


@task_deco('douban_detail_page_task_queue', broker_kind=BrokerEnum.PERSISTQUEUE, qps=4)
def craw_detail_page(detail_url, movie_name):
    """豆瓣详情页,获取电影的详细剧情描述。"""
    resp = requests.get(detail_url, headers=HEADERS)
    sel = Selector(resp.text)
    description = sel.xpath('//*[@id="link-report"]/span[1]/text()[1]').extract_first().strip()
    print('保存到数据库:', movie_name, detail_url, description)


if __name__ == '__main__':
    # craw_list_page(0)
    # craw_detail_page('https://movie.douban.com/subject/6786002/','触不可及')
    for p in range(10):
        craw_list_page.push(p)
    craw_list_page.consume()
    craw_detail_page.consume()

8.3 演示3种最常见代码思维方式爬取汽车之家资讯

演示了三种方式爬汽车之家,是骡子是马拉出来溜溜,必须精确对比。

第一种是 无框架每次临时手写全流程代码,每次临时设计手写爬虫调度全流程
第二种是 使用scrapy框架来爬取,用户要写的代码行数很多,文件很多,对于特殊独特奇葩想法极端不自由
第三种是 使用分布式函数调度框架来自动调度常规函数

8.3.1 每次临时手写rquests + 多线程,使用low爬虫方式的缺点

这样搞有以下缺点:
1、不是分布式的,不能多个脚本启动共享任务
2、不能断点爬取,即使是内置Queue改成手写使用redis的普通pop,要实现确认消费每次写一大堆代码,很难。
3、如果要调试爬虫,要反复手动自己手写添加print或log调试信息
4、写得虽然自己认为没有用爬虫框架很简洁,但导致接盘侠不知道你的代码的设计布局和意思
5、自己每次临时灵机一动搞个临时的爬虫调度设计,没有固定套路,难维护,接盘侠一个个的看每个爬虫是怎么设计布局和调度的
6、需要每次临时手写操作queue任务队列
7、需要临时手写并发
8、每次需要临时手写如何判断和添加过滤任务
9、需要临时手写怎么提取错误重试。
10、需要临时动脑筋设计怎么调度,浪费自己的时间来思考,每次都重复思考重复设计重复写爬虫全流程。

8.3.2 scrapy 爬虫框架来实现缺点

scrapy 爬虫框架来实现,(本质是Request 对象自动调度框架)

scrapy_proj_carhome 是 scrapy_redis 弄得项目,写项目需要背诵scrapy 命令行,
并且要反复在spiderxx.py  settings.py items.py pipeliens.py 
middwares.py push_start_urls.py run.py 7个文件里面来回切换写代码,
如果一年只临时要写一次爬虫效率很低,比low爬虫还写的慢。

需要500行,实际要手写或者修改的行数为150行,如果是写多个爬虫,平均每次实际手写的代码函数会降低一些。

8.3.3 scrapy 框架 和 分布式函数调度框架爬虫对比

不是分布式函数调度框架比scrapy爬虫框架代码质量好,主要是理念问题,
Request对象自动调度框架永远没法和函数自动调度框架的灵活自由性相比。
scrapy 自动调度 全靠 yield Request( url, callback=None, method='GET', headers=None, body=None,
                 cookies=None, meta=None, encoding='utf-8', priority=0,
                 dont_filter=False, errback=None, flags=None)
本质就是自动框架自动调度 Request对象,虽然入参比较丰富,大部分想法都能通过传参来搞定,但如果是一些自定义的想法,
要加到scrapy项目中就非常难。写任何一行代码都要考虑与框架的集成,
不能随便用 requests ,urllib3 ,selenium ,独立的每两个页面间的cookie自定义关联 等 乱自己来写请求,
包括换proxies要写中间件然后把类加到settings里面也是烦得要死。

比如一个很愚蠢的想法写法,在详情页解析回调这么写代码,这样瞎写完全没有考虑scrapy框架的感受。

    def parse_detail_page(self, response):
        driver = Chrome()
        driver.get(response.url)
        text = driver.page_source
        
scrapy框架能自动并发调度运行Request请求,但不能自动并发运行parse方法。
第一,selenium会阻塞框架。
第二,reponse本来就是一个响应结果了,他是已经被scrapy的urllib请求了,只要解析结果就好了,但这么一写有用浏览器打开一次url,
等于是请求了两次页面,这样请求两次 是嫌电脑cpu太好 还是 流量太便宜了呢。

总之使用了scrapy后就是写任何代码不能乱写,要多考虑框架的感受。

只有celery这样的函数和函数入参自动调度才能很香,
scrapy这样的固化的 Request对象入参 + 自定义中间件类添加到 settings里面的 自动调度很不自由。
分布式函数调度框架是通过把函数的入参推到队列(支持15中队列,包括语言级Queue队列  sqlite队列 redis队列 各种mq队列),
然后框架会自动从对应的队列取出任务,自动并发的运行对应的函数。函数里面怎么写那就非常自由了,你想随便有什么想法怎么写都可以,
这种方式是极端的自由和灵活,只需要按同步的思维写常规思维的函数,最后加个装饰器就能自动并发了,写函数的时候完全不用考虑框架的束缚。
任何函数都能被自动并发调度。

以下这些功能对爬虫的各种控制例如 精确的每秒爬几次  分布式中间件支持种类  消费确认 对爬虫的辅助控制远强于scrapy。

分布式:
    支持数十种最负盛名的消息中间件.(除了常规mq,还包括用不同形式的如 数据库 磁盘文件 redis等来模拟消息队列)

 并发:
    支持threading gevent eventlet asyncio 四种并发模式 + 多进程
 
 控频限流:
    例如十分精确的指定1秒钟运行30次函数(无论函数需要随机运行多久时间,都能精确控制到指定的消费频率;
   
 分布式控频限流:
    例如一个脚本反复启动多次或者多台机器多个容器在运行,如果要严格控制总的qps,能够支持分布式控频限流。
  
 任务持久化:
    消息队列中间件天然支持
 
 断点接续运行:
    无惧反复重启代码,造成任务丢失。消息队列的持久化 + 消费确认机制 做到不丢失一个消息
 
 定时:
    可以按时间间隔、按指定时间执行一次、按指定时间执行多次,使用的是apscheduler包的方式。
 
 指定时间不运行:
    例如,有些任务你不想在白天运行,可以只在晚上的时间段运行
 
 消费确认:
    这是最为重要的一项功能之一,有了这才能肆无忌惮的任性反复重启代码也不会丢失一个任务
 
 立即重试指定次数:
    当函数运行出错,会立即重试指定的次数,达到最大次重试数后就确认消费了
 
 重新入队:
    在消费函数内部主动抛出一个特定类型的异常ExceptionForRequeue后,消息重新返回消息队列
 
 超时杀死:
    例如在函数运行时间超过10秒时候,将此运行中的函数kill
 
 计算消费次数速度:
    实时计算单个进程1分钟的消费次数,在日志中显示;当开启函数状态持久化后可在web页面查看消费次数
 
 预估消费时间:
    根据前1分钟的消费次数,按照队列剩余的消息数量来估算剩余的所需时间
 
 函数运行日志记录:
    使用自己设计开发的 控制台五彩日志(根据日志严重级别显示成五种颜色;使用了可跳转点击日志模板)
    + 多进程安全切片的文件日志 + 可选的kafka elastic日志
               
 任务过滤:
    例如求和的add函数,已经计算了1 + 2,再次发布1 + 2的任务到消息中间件,可以让框架跳过执行此任务。
    任务过滤的原理是使用的是函数入参判断是否是已近执行过来进行过滤。
 
 任务过滤有效期缓存:
    例如查询深圳明天的天气,可以设置任务过滤缓存30分钟,30分钟内查询过深圳的天气,则不再查询。
    30分钟以外无论是否查询过深圳明天的天气,则执行查询。
    
 任务过期丢弃:
    例如消息是15秒之前发布的,可以让框架丢弃此消息不执行,防止消息堆积,
    在消息可靠性要求不高但实时性要求高的高并发互联网接口中使用
            
 函数状态和结果持久化:
    可以分别选择函数状态和函数结果持久化到mongodb,使用的是短时间内的离散mongo任务自动聚合成批量
    任务后批量插入,尽可能的减少了插入次数
                  
 消费状态实时可视化:
    在页面上按时间倒序实时刷新函数消费状态,包括是否成功 出错的异常类型和异常提示 
    重试运行次数 执行函数的机器名字+进程id+python脚本名字 函数入参 函数结果 函数运行消耗时间等
                 
 消费次数和速度生成统计表可视化:
    生成echarts统计图,主要是统计最近60秒每秒的消费次数、最近60分钟每分钟的消费次数
    最近24小时每小时的消费次数、最近10天每天的消费次数
                            
 rpc:
    生产端(或叫发布端)获取消费结果。各个发布端对消费结果进行不同步骤的后续处理更灵活,而不是让消费端对消息的处理一干到底。

8.3.4 临时手写爬虫全流程的代码

这样搞有以下缺点:
1、不是分布式的,不能多个脚本启动共享任务
2、不能断点爬取
3、如果要调试爬虫,要反复手动自己手写添加print或log调试
4、写得虽然自己认为没有用爬虫框架很简洁,但导致接盘侠不知道你的代码的设计布局和意思
5、自己每次临时灵机一动搞个临时的爬虫调度设计,没有固定套路,难维护,接盘侠一个个的看每个爬虫是怎么设计布局和调度的
6、需要每次临时手写操作queue任务队列
7、需要临时手写并发
8、每次需要临时手写如何判断和添加过滤任务
9 需要临时手写怎么提取错误重试。
10、需要临时动脑筋设计怎么调度,浪费自己的时间来思考

from queue import Queue, Empty
import time
import requests
from urllib3 import disable_warnings
from parsel import Selector
from concurrent.futures import ThreadPoolExecutor
from threading import Thread
import redis

disable_warnings()

queue_list_page = Queue(1000)
queue_detail_page = Queue(1000)

pool_list_page = ThreadPoolExecutor(30)
pool_detail_page = ThreadPoolExecutor(100)

# detail_task_filter_set = set()
r = redis.Redis()


def crawl_list_page(news_type, page):
    def _run_list_page_retry(current_retry_times):
        try:
            url = f'https://www.autohome.com.cn/{news_type}/{page}/#liststart'
            print(f'请求的列表页url是 {url}')
            resp = requests.request('get', url, timeout=5)
            if resp.status_code != 200:
                raise ValueError
            resp_text = resp.content.decode('gbk')
            sel = Selector(resp_text)
            for li in sel.css('#Ul1 > li'):
                url = 'https:' + li.xpath('./a/@href').extract_first()
                title = li.xpath('./a/h3/text()').extract_first()
                task = (url, title)
                print('向详情页队列添加任务:', task)
                queue_detail_page.put(task)
            if page == 1:
                last_page = int(sel.css('#channelPage > a:nth-child(12)::text').extract_first())
                for p in range(2, last_page + 1):
                    task = (news_type, p)
                    print('向列表页页队列添加任务:', task)
                    queue_list_page.put(task)
        except Exception as e:
            print(f'第{current_retry_times}次爬取列表页出错', e.__traceback__, e)
            if current_retry_times < 5:
                _run_list_page_retry(current_retry_times + 1)
            else:
                print('重试了5次仍然错误')

    _run_list_page_retry(1)


def crawl_detail_page(url, title):
    def _run_detail_page_retry(current_retry_times):
        if r.sismember('filter_carhome_detail_page', url):
            print(f'此入参已经爬取过了 {url} {title}')
            return
        else:
            try:
                print(f'请求的详情页url是 {url}')
                resp = requests.request('get', url, timeout=5)
                if resp.status_code != 200:
                    raise ValueError
                resp_text = resp.content.decode('gbk')
                sel = Selector(resp_text)
                author = sel.css('#articlewrap > div.article-info > div > a::text').extract_first() or
                         sel.css('#articlewrap > div.article-info > div::text').extract_first() or ''
                author = author.replace("\n", "").strip()
                print(f'{time.strftime("%H:%M:%S")} 保存到数据库 {url} {title} {author} ')
                r.sadd('filter_carhome_detail_page', url)  # 运行成功了,放入过滤中
            except Exception as e:
                print(f'第{current_retry_times}次爬取详情页页出错', e.__traceback__, e)
                if current_retry_times < 3:
                    _run_detail_page_retry(current_retry_times + 1)
                else:
                    print('重试了3次仍然错误')
                    r.sadd('filter_carhome_detail_page', url)  # 运行最大次数了,放入过滤中

    _run_detail_page_retry(1)


def start_list_page():
    while True:
        try:
            task = queue_list_page.get(block=True, timeout=600)
            print(f'取出的列表页爬取任务是 {task}')
            pool_list_page.submit(crawl_list_page, *task)
        except Empty:
            print('列表页超过600秒没有任务,列表页爬完了')
            break


def start_detail_page():
    while True:
        try:
            task = queue_detail_page.get(block=True, timeout=600)
            print(f'取出的详情页爬取任务是 {task}')
            pool_detail_page.submit(crawl_detail_page, *task)
        except Empty:
            print('详情页超过600秒没有任务,详情页爬完了')
            break


if __name__ == '__main__':
    # 单独的测试函数功能
    # crawl_list_page('advice',1)  #
    # crawl_detail_page('https://www.autohome.com.cn/news/202008/1022380.html#pvareaid=102624','xxx')

    t1 = Thread(target=start_list_page)
    t2 = Thread(target=start_detail_page)
    t1.start()
    t2.start()

    queue_list_page.put(('news', 1))  # 新闻
    queue_list_page.put(('advice', 1))  # 导购
    queue_list_page.put(('drive', 1))  # 评测

举个网上下载 mzitu 网站图片的代码截图,就是采用的无框架爬虫,任务调度靠直接for循环调用函数,任务并发全靠手写操作threads,

这样的代码看起来很多很混乱,写一个还行,要是爬虫项目多了,没有统一化的逻辑思维,接盘侠每次都要阅读很长的代码才知道运行逻辑,那就非常悲催。

_images/img_13.pngimg_13.png

8.3.5 scrapy爬虫代码

这是scrapy爬虫代码的基本结构,用户写代码需要非常频繁的在 spider items middware pipeline settings cmd命令行 来回切换写代码测试代码,很吓人。

需要在不同的地方写middleware类 pipeline类并把类名添加到settings里面。

scrapy目录结构,代码文件数量很多

_images/img_5.pngimg_5.png

这是 scrapy carhome_spider.py的代码

# This package will contain the spiders of your Scrapy project
#
# Please refer to the documentation for information on how to create and manage
# your spiders.
import json

from scrapy.http import Request
from scrapy_redis.spiders import RedisSpider

from scrapy_proj_carhome.items import ScrapyProjCarhomeItem

import nb_log


class carHomeSpider(RedisSpider):
    name = "carhome"
    allowed_domains = ["www.autohome.com.cn"]

    redis_key = "carhome:start_urls"

    def make_requests_from_url(self, data: str):
        '''
        data就是放入 carhome:start_urls 中的任务,因为最初的种子信息还需要携带其他信息,例如新闻类型的中文种类,不是单纯的url,所以需要重写此方法
        :param data:
        :return:
        '''
        start_task = json.loads(data)
        url = start_task['url']

        # 此处也可以改为post请求
        return Request(
            url,
            meta=start_task
        )

    def parse(self, response):
        # https://www.autohome.com.cn/news/2/#liststart
        # print(response.body)
        for li in response.css('#Ul1 > li'):
            url = 'https:' + li.xpath('./a/@href').extract_first()
            title = li.xpath('./a/h3/text()').extract_first()
            yield Request(url, callback=self.parse_detail_page, meta={'url': url, 'title': title, 'news_type': response.meta['news_type']},
                          dont_filter=False, priority=10)
        page = response.url.split('/')[-2]
        if page == '1':
            last_page = int(response.css('#channelPage > a:nth-child(12)::text').extract_first())
            for p in range(2, last_page + 1):
                url_new = response.url.replace('/1/', f'/{p}/')
                self.logger.debug(url_new)
                yield Request(url_new, callback=self.parse, dont_filter=True, meta=response.meta)

    def parse_detail_page(self, response):
        author = response.css('#articlewrap > div.article-info > div > a::text').extract_first() or
                 response.css('#articlewrap > div.article-info > div::text').extract_first() or ''
        author = author.replace("\n", "").strip()
        item = ScrapyProjCarhomeItem()
        item['author'] = author
        item['url'] = response.meta['url']
        item['title'] = response.meta['title']
        item['news_type'] = response.meta['news_type']
        yield item


if __name__ == '__main__':
    pass

这是 items.py 的代码

# -*- coding: utf-8 -*-

# Define here the models for your scraped items
#
# See documentation in:
# https://doc.scrapy.org/en/latest/topics/items.html

import scrapy


class ScrapyProjCarhomeItem(scrapy.Item):
    # define the fields for your item here like:
    # name = scrapy.Field()
    author = scrapy.Field()
    url = scrapy.Field()
    title = scrapy.Field()
    news_type = scrapy.Field()

这个文件的代码是最坑的,任何自定义想法需要写一个类,继承middware类,重写process_request process_request方法,然后把类名添加到settings里面。

这是 middlewares.py的代码
# -*- coding: utf-8 -*-

# Define here the models for your spider middleware
#
# See documentation in:
# https://doc.scrapy.org/en/latest/topics/spider-middleware.html

from scrapy import signals


class ScrapyProjCarhomeSpiderMiddleware(object):
    # Not all methods need to be defined. If a method is not defined,
    # scrapy acts as if the spider middleware does not modify the
    # passed objects.

    @classmethod
    def from_crawler(cls, crawler):
        # This method is used by Scrapy to create your spiders.
        s = cls()
        crawler.signals.connect(s.spider_opened, signal=signals.spider_opened)
        return s

    def process_spider_input(self, response, spider):
        # Called for each response that goes through the spider
        # middleware and into the spider.

        # Should return None or raise an exception.
        return None

    def process_spider_output(self, response, result, spider):
        # Called with the results returned from the Spider, after
        # it has processed the response.

        # Must return an iterable of Request, dict or Item objects.
        for i in result:
            yield i

    def process_spider_exception(self, response, exception, spider):
        # Called when a spider or process_spider_input() method
        # (from other spider middleware) raises an exception.

        # Should return either None or an iterable of Response, dict
        # or Item objects.
        pass

    def process_start_requests(self, start_requests, spider):
        # Called with the start requests of the spider, and works
        # similarly to the process_spider_output() method, except
        # that it doesn’t have a response associated.

        # Must return only requests (not items).
        for r in start_requests:
            yield r

    def spider_opened(self, spider):
        spider.logger.info('Spider opened: %s' % spider.name)


class ScrapyProjCarhomeDownloaderMiddleware(object):
    # Not all methods need to be defined. If a method is not defined,
    # scrapy acts as if the downloader middleware does not modify the
    # passed objects.

    @classmethod
    def from_crawler(cls, crawler):
        # This method is used by Scrapy to create your spiders.
        s = cls()
        crawler.signals.connect(s.spider_opened, signal=signals.spider_opened)
        return s

    def process_request(self, request, spider):
        # Called for each request that goes through the downloader
        # middleware.

        # Must either:
        # - return None: continue processing this request
        # - or return a Response object
        # - or return a Request object
        # - or raise IgnoreRequest: process_exception() methods of
        #   installed downloader middleware will be called
        return None

    def process_response(self, request, response, spider):
        # Called with the response returned from the downloader.

        # Must either;
        # - return a Response object
        # - return a Request object
        # - or raise IgnoreRequest
        return response

    def process_exception(self, request, exception, spider):
        # Called when a download handler or a process_request()
        # (from other downloader middleware) raises an exception.

        # Must either:
        # - return None: continue processing this exception
        # - return a Response object: stops process_exception() chain
        # - return a Request object: stops process_exception() chain
        pass

    def spider_opened(self, spider):
        spider.logger.info('Spider opened: %s' % spider.name)

这是pipelines.py 的代码,保存数据。

# -*- coding: utf-8 -*-

# Define your item pipelines here
#
# Don't forget to add your pipeline to the ITEM_PIPELINES setting
# See: https://doc.scrapy.org/en/latest/topics/item-pipeline.html
from scrapy_proj_carhome.items import ScrapyProjCarhomeItem


class ScrapyProjCarhomePipeline(object):
    def process_item(self, item, spider):
        print(type(item))
        if isinstance(item, ScrapyProjCarhomeItem):
            print(f'保存到数据库 {item["news_type"]}  {item["url"]} {item["title"]} {item["author"]} ')
        return item

这是settings.py的代码

# -*- coding: utf-8 -*-

# Scrapy settings for scrapy_proj_carhome project
#
# For simplicity, this file contains only settings considered important or
# commonly used. You can find more settings consulting the documentation:
#
#     https://doc.scrapy.org/en/latest/topics/settings.html
#     https://doc.scrapy.org/en/latest/topics/downloader-middleware.html
#     https://doc.scrapy.org/en/latest/topics/spider-middleware.html

BOT_NAME = 'scrapy_proj_carhome'

SPIDER_MODULES = ['scrapy_proj_carhome.spiders']
NEWSPIDER_MODULE = 'scrapy_proj_carhome.spiders'

# Crawl responsibly by identifying yourself (and your website) on the user-agent
# USER_AGENT = 'scrapy_proj_carhome (+http://www.yourdomain.com)'

# Obey robots.txt rules
ROBOTSTXT_OBEY = True

# Configure maximum concurrent requests performed by Scrapy (default: 16)
# CONCURRENT_REQUESTS = 32

# Configure a delay for requests for the same website (default: 0)
# See https://doc.scrapy.org/en/latest/topics/settings.html#download-delay
# See also autothrottle settings and docs
# DOWNLOAD_DELAY = 3
# The download delay setting will honor only one of:
# CONCURRENT_REQUESTS_PER_DOMAIN = 16
# CONCURRENT_REQUESTS_PER_IP = 16

# Disable cookies (enabled by default)
# COOKIES_ENABLED = False

# Disable Telnet Console (enabled by default)
# TELNETCONSOLE_ENABLED = False

# Override the default request headers:
# DEFAULT_REQUEST_HEADERS = {
#   'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',
#   'Accept-Language': 'en',
# }

# Enable or disable spider middlewares
# See https://doc.scrapy.org/en/latest/topics/spider-middleware.html
# SPIDER_MIDDLEWARES = {
#    'scrapy_proj_carhome.middlewares.ScrapyProjCarhomeSpiderMiddleware': 543,
# }

# Enable or disable downloader middlewares
# See https://doc.scrapy.org/en/latest/topics/downloader-middleware.html
DOWNLOADER_MIDDLEWARES = {
    'scrapy_proj_carhome.middlewares.ScrapyProjCarhomeDownloaderMiddleware': 543,
}

# Enable or disable extensions
# See https://doc.scrapy.org/en/latest/topics/extensions.html
# EXTENSIONS = {
#    'scrapy.extensions.telnet.TelnetConsole': None,
# }

# Configure item pipelines
# See https://doc.scrapy.org/en/latest/topics/item-pipeline.html
ITEM_PIPELINES = {
    'scrapy_proj_carhome.pipelines.ScrapyProjCarhomePipeline': 300,
}

# Enable and configure the AutoThrottle extension (disabled by default)
# See https://doc.scrapy.org/en/latest/topics/autothrottle.html
# AUTOTHROTTLE_ENABLED = True
# The initial download delay
# AUTOTHROTTLE_START_DELAY = 5
# The maximum download delay to be set in case of high latencies
# AUTOTHROTTLE_MAX_DELAY = 60
# The average number of requests Scrapy should be sending in parallel to
# each remote server
# AUTOTHROTTLE_TARGET_CONCURRENCY = 1.0
# Enable showing throttling stats for every response received:
# AUTOTHROTTLE_DEBUG = False

# Enable and configure HTTP caching (disabled by default)
# See https://doc.scrapy.org/en/latest/topics/downloader-middleware.html#httpcache-middleware-settings
# HTTPCACHE_ENABLED = True
# HTTPCACHE_EXPIRATION_SECS = 0
# HTTPCACHE_DIR = 'httpcache'
# HTTPCACHE_IGNORE_HTTP_CODES = []
# HTTPCACHE_STORAGE = 'scrapy.extensions.httpcache.FilesystemCacheStorage'


REDIS_HOST = "127.0.0.1"
REDIS_PORT = 6379
REDIS_PARAMS = {'db': 2, 'password': ''}
REDIS_ENCODING = "utf-8"

SCHEDULER = "scrapy_redis.scheduler.Scheduler"
DUPEFILTER_CLASS = "scrapy_redis.dupefilter.RFPDupeFilter"
SCHEDULER_PERSIST = True
# DUPEFILTER_KEY = "dupefilter:%(timestamp)s"

这是 push_start_urls.py 的代码

from redis import Redis
import json
from scrapy_proj_carhome import settings

r = Redis(host=settings.REDIS_HOST, port=settings.REDIS_PORT, **settings.REDIS_PARAMS)

r.flushdb()

# 因为要让初始种子就携带其他信息,初始种子发布的不是url本身,所以需要继承重写spider的make_requests_from_url方法。
r.lpush('carhome:start_urls', json.dumps({'url': 'https://www.autohome.com.cn/news/1/#liststart', 'news_type': '新闻'}, ensure_ascii=False))
r.lpush('carhome:start_urls', json.dumps({'url': 'https://www.autohome.com.cn/advice/1/#liststart', 'news_type': '导购'}, ensure_ascii=False))
r.lpush('carhome:start_urls', json.dumps({'url': 'https://www.autohome.com.cn/drive/1/#liststart', 'news_type': '驾驶评测'}, ensure_ascii=False))

这是run.py的代码


from scrapy import cmdline

cmdline.execute(['scrapy', 'crawl', 'carhome'])
从上面的代码可以看到scrapy要在8个文件频繁的来回切换写代码,非常的烦躁。
即使是除去scrapy 建项目自动生产的固定代码行数,此scrapy项目的代码行数仍然远远高于分布式函数调度框架的代码行数

8.3.6 分布式函数调度框架的代码

只需要单个文件(当然也可以拆解成发布和消费独立成两个文件)

所需代码行数远小于无框架每次临时手写爬虫全流程和使用scrapy的方式。

此框架不仅可以对标celery框架,也可以取代scrapy框架。



from function_scheduling_distributed_framework import task_deco, BrokerEnum
import requests
from parsel import Selector


@task_deco('car_home_list', broker_kind=BrokerEnum.REDIS_ACK_ABLE, max_retry_times=5, qps=10)
def crawl_list_page(news_type, page):
    url = f'https://www.autohome.com.cn/{news_type}/{page}/#liststart'
    resp_text = requests.get(url).text
    sel = Selector(resp_text)
    for li in sel.css('#Ul1 > li'):
        url_detail = 'https:' + li.xpath('./a/@href').extract_first()
        title = li.xpath('./a/h3/text()').extract_first()
        crawl_detail_page.push(url_detail, title=title, news_type=news_type)
    if page == 1:
        last_page = int(sel.css('#channelPage > a:nth-child(12)::text').extract_first())
        for p in range(2, last_page + 1):
            crawl_list_page.push(news_type, p)


@task_deco('car_home_detail', broker_kind=BrokerEnum.REDIS_ACK_ABLE, concurrent_num=100, qps=30, do_task_filtering=False)
def crawl_detail_page(url, title, news_type):
    resp_text = requests.get(url).text
    sel = Selector(resp_text)
    author = sel.css('#articlewrap > div.article-info > div > a::text').extract_first() or
             sel.css('#articlewrap > div.article-info > div::text').extract_first() or ''
    author = author.replace("\n", "").strip()
    print(f'使用print模拟保存到数据库  {news_type}   {title} {author} {url}')  # ,实际为调用数据库插入函数,压根不需要return item出来在另外文件的地方进行保存。


if __name__ == '__main__':
    # 单独的测试函数功能
    # crawl_list_page('advice',1)  #
    # crawl_detail_page('https://www.autohome.com.cn/news/202008/1022380.html#pvareaid=102624','xxx')

    # 清空消息队列
    crawl_list_page.clear()
    crawl_detail_page.clear()
    #
    # # 推送列表页首页
    crawl_list_page.push('news', 1)  # 新闻
    crawl_list_page.push('advice', page=1)  # 导购
    crawl_list_page.push(news_type='drive', page=1)  # 驾驶评测

    # 启动列表页消费和详情页消费,上面的清空和推送可以卸载另外的脚本里面,因为是使用的中间件解耦,所以可以推送和消费独立运行。
    crawl_list_page.consume()
    crawl_detail_page.consume()

使用分布式函数调度框架运行的爬虫,自动并发,自动控频,是指定了列表页qps=2,详情页qps=3的情况下运行的控制台日志

https://z3.ax1x.com/2021/09/24/4BquHf.png4BquHf.png

可以得出结论,控频效果精确度达到了99%以上,目前世界所有爬虫框架只能指定并发请求数量,但不能指定每秒爬多少次页面,此框架才能做到。

9 轻松远程服务器部署运行函数

别的机器不需要先安装git,也不需要先手动上传代码到该机器上,就能自动部署运行,前提python基本环境是要搞好的。
celery不支持这种自动运行在别的机器上的方式。

9.1 远程服务器部署函数的意义

框架叫分布式函数调度框架,可以在多台机器运行,因为消息队列任务是共享的。
我用的时候生产环境是使用 阿里云 codepipeline k8s部署的多个容器。还算方便。
在测试环境一般就是单机多进程运行的,用supervisor部署很方便。
所以之前没有涉及到多态机器的轻松自动部署。
如果要实现轻松的部署多台物理机,不借助除了python以外的其他手段的话,只能每台机器登录上然后下载代码,启动运行命令,机器多了还是有点烦的。
现在最新加入了 Python代码级的函数任务部署,不需要借助其他手段,python代码自动上传代码到远程服务器,并自动启动函数消费任务。
目前的自动化在远程机器启动函数消费,连celery都没有做到。

不依赖阿里云codepipeline 和任何运维发布管理工具,只需要在python代码层面就能实现多机器远程部署。
 这实现了函数级别的精确部署,而非是部署一个 .py的代码,远程部署一个函数实现难度比远程部署一个脚本更高一点,部署更灵活。
之前有人问怎么方便的部署在多台机器,一般用阿里云codepipeline  k8s自动部署。被部署的远程机器必须是linux,不能是windwos。
但是有的人是直接操作多台物理机,有些不方便,现在直接加一个利用python代码本身实现的跨机器自动部署并运行函数任务。

自动根据任务函数所在文件,转化成python模块路径,实现函数级别的精确部署,比脚本级别的部署更精确到函数。
例如 test_frame/test_fabric_deploy/test_deploy1.py的fun2函数 自动转化成 from test_frame.test_fabric_deploy.test_deploy1 import f2
从而自动生成部署语句
export PYTHONPATH=/home/ydf/codes/distributed_framework:$PYTHONPATH ;cd /home/ydf/codes/distributed_framework;
python3 -c "from test_frame.test_fabric_deploy.test_deploy1 import f2;f2.multi_process_consume(2)"  -fsdfmark fsdf_fabric_mark_queue_test30

这个是可以直接在远程机器上运行函数任务。无需用户亲自部署代码和启动代码。自动上传代码,自动设置环境变量,自动导入函数,自动运行。
这个原理是使用python -c 实现的精确到函数级别的部署,不是python脚本级别的部署。
可以很灵活的指定在哪台机器运行什么函数,开几个进程。这个比celery更为强大,celery需要登录到每台机器,手动下载代码并部署在多台机器,celery不支持代码自动运行在别的机器上

9.2 远程服务器部署函数的入参介绍。


:param host: 需要部署的远程linux机器的 ip
:param port:需要部署的远程linux机器的 port
:param user: 需要部署的远程linux机器的用户名
:param password:需要部署的远程linux机器的密码
:param path_pattern_exluded_tuple:排除的文件夹或文件路径
:param file_suffix_tuple_exluded:排除的后缀
:param only_upload_within_the_last_modify_time:只上传多少秒以内的文件,如果完整运行上传过一次后,之后可以把值改小,避免每次全量上传。
:param file_volume_limit:大于这个体积的不上传,因为python代码文件很少超过1M
:param extra_shell_str :自动部署前额外执行的命令,例如可以设置环境变量什么的
:param invoke_runner_kwargs : 
         invoke包的runner.py 模块的 run()方法的所有一切入参,例子只写了几个入参,实际可以传入十几个入参,大家可以自己琢磨fabric包的run方法,按需传入。
         hide 是否隐藏远程机器的输出,值可以为 False不隐藏远程主机的输出  “out”为只隐藏远程机器的正常输出,“err”为只隐藏远程机器的错误输出,True,隐藏远程主机的一切输出
         pty 的意思是,远程机器的部署的代码进程是否随着当前脚本的结束而结束。如果为True,本机代码结束远程进程就会结束。如果为False,即使本机代码被关闭结束,远程机器还在运行代码。
         warn 的意思是如果远程机器控制台返回了异常码本机代码是否立即退出。warn为True这只是警告一下,warn为False,远程机器返回异常code码则本机代码直接终止退出。
    
:param process_num:启动几个进程
:return:

9.3 远程服务器部署消费函数的代码示例。

定义了两个函数任务,和f1和f2.

_images/img_11.pngimg_11.png

运行的控制台图片,说明部署级别精确到了函数而非脚本级别,可以灵活的指定哪台机器跑哪些函数。

_images/img_10.pngimg_10.png