# 7.更新记录 ## 7.1 新增第十种Consumer,以redis为中间件,但增加了消费确认,是RedisConsumerAckAble类。 ``` 支持运行过程中,随意关闭和启动python程序。无惧反复关闭python和 突然断电导致任务丢失几百个。 之前开100线程/协程的话,随意重启python和断电会导致极大概率丢失200个任务。 官方Threadpoolexecutor是无界队列。使用这个会导致丢失无数个任务, 因为他会迅速把redis的消息全部取出来,添加到自己的queue队列慢慢消费。 因为这个原因所以需要自定义写BoundedThreadpoolexecutor和CustomThreadpoolexecutor。 改版的CustomThreadpoolexecutor修改成了queue最大长度是max_works,自己内部存储100个, 运行中100个,突然关闭python会丢失200个任务。如果queue设置大小为0,则只会丢失100个运行中的任务。 采用的是消费者去除消息时候,用lua脚本同时pop和添加到unacked的独立zset中,函数运行成功后会从set中删除该任务。 同时有一个一直每隔5秒发送心跳到redis服务中的线程,心跳标识中有消费者的唯一标识,绝对不会重复。 如果突然关闭消费者(例如突然断电或者点击关闭python),那么该消费者的心跳将会停止了。这时其他机器的同队列消费者或者当前机器重新启动代码后,在15秒后会 检到被关闭的消费者是非活跃消费者,那么自动将该消费者的unack里面任务全部重新取出返回到待消费队列中。 RedisConsumerAckAble类比RedisConsumer会有一丝丝性能损耗,但python玩redis大部分情况还是python代码本身有性能瓶颈, 而不是造成redis服务端有性能瓶颈,一般只要用在有意义的业务上,就算python很忙把cpu占光了,也不会造成redis服务端达到极限, python是性能很差的语言,没玩垮redis,自身就把电脑玩死了,所以大部分情况下不要在意加入确认消费后产生额外的对redis服务端的性能压力。 redis要是能直接作为mq使用,redis早就一统天下了,哪里还不断有几十种mq出来。 所以直接基于redis list的如果要做到可靠就必须改进。 ``` ## 7.2 新增基于以redis为消息中间件时候的页面管理和消费速度显示。 ``` 基于redisboard,但对redis的list模拟mq功能,进行页面显示优化突出消息队列消费, 加黄显示正在运行中的队列和每10秒的消费速度。每隔10秒自动刷新统计。 由于实时发布和消费,例如10秒内发布20个,消费50个,页面只能显示大小降低了30个, 这个只有专业的mq才能分别显示出来,redis list只是简单数组。 rabbitmq nsq都有官方自带速率显示。 ``` ![Image text](https://i.niupic.com/images/2019/08/26/_122.png) ## 7.3 新增一个10行代码的函数的最精简乞丐版实现的分布式函数执行框架. 新增一个10行代码的函数的最精简乞丐版实现的分布式函数执行框架,演示最本质实现原理,不要亲自这么使用。 beggar_redis_consumer.py文件的 start_consuming_message函数。 ```python def start_consuming_message(queue_name, consume_function, threads_num): pool = ThreadPoolExecutor(threads_num) while True: try: redis_task = redis_db_frame.brpop(queue_name, timeout=60) if redis_task: task_str = redis_task[1].decode() print(f'从redis的 {queue_name} 队列中 取出的消息是: {task_str}') pool.submit(consume_function, **json.loads(task_str)) else: print(f'redis的 {queue_name} 队列中没有任务') except redis.RedisError as e: print(e) def add(x, y): time.sleep(5) print(f'{x} + {y} 的结果是 {x + y}') # 推送任务 for i in range(100): redis_db_frame.lpush('test_beggar_redis_consumer_queue', json.dumps(dict(x=i, y=i * 2))) # 消费任务 start_consuming_message('test_beggar_redis_consumer_queue', consume_function=add, threads_num=10) ``` 看完整版代码很长很多,是由于控制功能太多,中间件类型多,并发模式多, 所以加入一个最精简版,精简版的本质实现原理和完整版相同。 ## 7.4 新增sqlachemy 支持的数据库作为消息中间件 新增sqlachemy 支持的数据库作为消息中间件,包括sqlserver mysql postgre oracle sqlite 每个队列是一张表模拟的。 ![Image text](https://i.niupic.com/images/2020/01/13/6hkO.png) 每个任务是表里面的一行记录。 ![Image text](https://i.niupic.com/images/2020/01/11/6gZr.png) ## 7.5 日志改为导入独立包nb_log,支持用户配置文件自定义日志配置。 例如设置默认需不需要彩色,需不需要大背景彩色色块,需不需要自动拦截转化python内置的print. 在用户当前项目根目录下生成的nb_log_config.py 可以自定义优先日志配置。 ## 7.6 优化qps控频。 ``` 将qps按范围分段,采用不同的等待或计数方式。使当qps设置很高的时候,控频更精确。 增加了分布式控频,需要依赖redis中间件。 分布式环境中的控频指的是,假如xx.py文件中有一个consumer,设置func函数的qps为10。 如果在线上部署了三个容器服务,如果不使用分布式控频,则func函数的每秒运行总次数会是30。 即使只有1台机器,如果开多进程,Process运行3个进程,或者把xx.py反复运行启动3个, 也会造成func函数每秒运行总次数是30。 分布式控频主要是解决这种问题。默认不使用分布式控频, 当设置 is_using_distributed_frequency_control为True的时候,使用分布式控频。 ``` ## 7.7 增加rocketmq支持。 (2020-7) ```python from function_scheduling_distributed_framework import task_deco, BrokerEnum @task_deco('queue_test_f03', qps=2, broker_kind=BrokerEnum.ROCKETMQ) def f(a, b): print(f'{a} + {b} = {a + b}') if __name__ == '__main__': for i in range(100): f.push(i, i * 2) f.consume() ``` ## 7.8 新增 async 并发模式 (2020-12) ``` 之前一直都没支持这种并发模式,异步代码不仅消费函数本身与同步代码很多不同,例如函数的定义和调用以及三方库, 不同于gevent和eventlet打个猴子补丁就可以变并发方式并且代码保持100%原样,asyncio的方式代比同步码真的是要大改特改。 而且在框架层面要支持异步也要增加和修改很多,支持异步并不是很容易。这一点连celery5.0目前都还没支持到(据官方文档说5.0要加入支持,但目前的5.0.3还没加入。) 如果消费函数已经写成了async def这种,那么可以设置 concurrent_mode=ConcurrentModeEnum.ASYNC, 框架会在一个新的线程的loop里面自动运行协程,所有协程任务会自动在一个loop里面运行,不是每次临时都生成新的loop只运行一个当前任务方式。 ``` ```python from function_scheduling_distributed_framework import task_deco, BrokerEnum, ConcurrentModeEnum import asyncio # 此段代码使用的是语言级Queue队列,不需要安装中间件,可以直接复制运行测试。 @task_deco('test_async_queue2', concurrent_mode=ConcurrentModeEnum.ASYNC, broker_kind=BrokerEnum.LOCAL_PYTHON_QUEUE, concurrent_num=500, qps=20) async def async_f(x): # 测试异步阻塞并发, 此处不能写成time.sleep(1),否则无论设置多高的并发,1秒钟最多只能运行1次函数。 # 同理asyncio 不能和 requests搭配,要和 aiohttp 搭配。 await asyncio.sleep(1) print(id(asyncio.get_event_loop())) # 通过 id 可以看到每个并发函数使用的都是同一个loop,而不是采用了愚蠢的临时 asyncio.new_event_loop().run_until_complete(async_f(x)) 方式调度。 print(x) if __name__ == '__main__': async_f.clear() for i in range(100): async_f.push(i, ) async_f.consume() ``` ### 7.8.2 gevent/eventlet 和 asyncio 用法区别感受 ``` 比方说汽车的自动挡和手动挡,学了手动挡一定会开自动挡,只学自动挡很难开手动挡。 asyncio方式的代码比正常普通同步思维的代码写法也要难得多了,能玩asyncio的人一定会用threading gevent, 但只用过threading gevent,不去专门学习asyncio的用法,100%是玩不转的。 gevent就像自动挡汽车,自动换挡相当于自动切换阻塞。 asyncio就像手动挡,全要靠自己写 await / async def /loop / run_until_complete /run_forever/ run_coroutine_threadsafe /wait / wait_for /get_event_loop / new_event_loop / get_running_loop ,写法很麻烦很难。异步多了一个loop就像手动挡汽车多了一个离合器一样,十分之难懂。 手动挡玩的溜性能比自动挡高也更省油。asyncio玩的溜那么他的io并发执行速度和效率也会更好,cpu消耗更少。 如果你写一般的代码,那就用同步方式思维来写吧,让分布式函数调度框架来替你自动并发就可以啦。 如果追求更好的控制和性能,不在乎代码写法上的麻烦,并且asyncio技术掌握的很溜,那就用asyncio的方式吧。 ``` ### 7.8.3 关于 async 并发模式,为什么框架还使用 pyredis pika pymongo,而没有使用aioredis aiomongo ``` 异步鬓发模式里面,整个调用链路必须是一旦异步,必须处处异步,在base_consumer.py的AbstractConsumer中, 方法 _async_run_consuming_function_with_confirm_and_retry里面使用的还是操作中间件的同步库, 主要是因为框架目前支持15种中间件,一个一个的使用异步模式的库操作中间件来实现,比现在代码起码要增加80%,无异于重写一个项目了。 异步和同步真的写法语法相差很大的,不信可以比比aiomysql 和pymysql库,aiohttp和requests,如果非常简单能实现异步, 那aiohttp和aiomysql作者为什么要写几万行代码来重新实现,不在原来基础上改造个七八行来实现? 目前此库对 消息拉取和消息消费完全是属于在两个不同的线程里面,井水不犯河水,所以用同步库拉取消息对asyncio的消费函数没有任何影响,不存在同步库阻塞异步库的问题。 对于消息确认 消息重新入队 任务过滤 mongo插入,都是采用的同步库,但是使用了 run_in_executor, 把这些操作在异步链路中交给线程池来运行了,同事这个线程池不是官方内置线程池,是智能缩小扩大线程池 ThreadPoolExecutorShrinkAble。 run_in_executor 会把一个同步的操作,sumbit提交给线程池,线程池返回的是一个concurrent.futures包的Future对象, run_in_executor包装转化了这个Future(此Future不是asyncio的,不是一个awaitable对象)成为了一个asyncio包的Future对象,asyncio的Future对象可以被await, 所以这是非常快捷的同步阻塞函数在异步链路中转同步转异步语法的最佳方式。官方也是这么推荐的。 除了框架内部的阻塞函数是run_in_executor快速转化成非阻塞事件循环的,但是主要的用户的消费函数,是使用的真async模式运行在一个loop循环中的, 也即是单线陈鬓发运行用户的异步函数。 其次框架的同步阻塞函数,都是操作中间件类型的库,异步就是 入队 确认消费 查询是否过滤,这些操作一般都会在1毫秒之内完成,不阻塞太长的事件, 即使不使用run_in_executor,直接在异步链路使用这些同步操作,也没太大问题。一旦异步必须处处异步,说的是不能调用耗时太长的同步阻塞函数, 1毫秒的无伤大雅,因为celery 1秒钟最多能调度300个 def f: print(hello) 这样的无cpu 无io的函数,此框架调度运行速度任然超过celery。 还有一种调度起 async def定义 的消费函数方式是继续开多线程并发,然后使用 临时loop = get_event_loop,loop.run_until_complete,这方式愚蠢了, 相当于只是为了运行起这个函数,但全流程丝毫没有丁点异步。 ``` ## 7.9 2021-04 新增以 redis 的 stream 数据结构 为中间件的消息队列。 ``` 这个是 redis 的 真消息队列,这次是 真mq, stream 数据结构功能更加丰富接近 rabbitmq kafka这种真mq的消息队列协议,比 list 做消息队列更强。 需要redis的服务端5.0版本以上才能使用这个数据结构。 代码文件在 function_scheduling_distributed_framework/consumers/redis_stream_consumer.py 这个 REDIS_STREAM 中间件和 REDIS_ACK_ABLE 都支持消费确认,不管客户端怎么掉线关闭,都可以确保消息万无一失。 BrokerEnum.REDIS 中间件 不支持消费确认,随意重启或者断电断线会丢失一批任务。 ``` ```python from function_scheduling_distributed_framework import task_deco, BrokerEnum @task_deco('queue_test_f01', broker_kind=BrokerEnum.REDIS_STREAM, ) def f(a, b): print(f'{a} + {b} = {a + b}') if __name__ == '__main__': for i in range(100): f.push(i, b=i * 2) f.consume() ``` ## 7.10 2021-04 新增以 redis 的 list 为数据结构,但使用 brpoplpush 命令 双队列 作为中间件的消息队列。 此 brpoplpush 双队列方式 + 消费者唯一id标识的心跳检测,可以媲美 rabbitmq 的确认消费功能。 ``` 代码演示省略,设置broker_kind=BrokerEnum.RedisBrpopLpush就行了。 @task_deco('queue_test_f01', broker_kind=BrokerEnum.RedisBrpopLpush,) ``` ## 7.11 2021-04 新增以 zeromq 为中间件的消息队列。 ``` zeromq 和rabbbitmq kafka redis都不同,这个不需要安装一个服务端软件,是纯代码的。 zeromq方式是启动一个端口,所以queue_name传一个大于20000小于65535的数字,不能传字母。 ``` 消费端代码,启动消费端时候会自动启动 broker 和 server。 ```python import time from function_scheduling_distributed_framework import task_deco, BrokerEnum @task_deco('30778', broker_kind=BrokerEnum.ZEROMQ, qps=2) def f(x): time.sleep(1) print(x) if __name__ == '__main__': f.consume() ``` 发布端代码 ```python from test_frame.test_broker.test_consume import f for i in range(100): f.push(i) ``` ## 7.12 2021-04 新增以 操作kombu包 为中间件的消息队列 ``` 一次性新增操作10种消息队列,.但比较知名的例如rabbitmq redis sqlite3 函数调度框架已经在之前实现了。 使用方式为设置 @task_deco 装饰器的 broker_kind 为 BrokerEnum.KOMBU 在你项目根目录下的 distributed_frame_config.py 文件中设置 KOMBU_URL = 'redis://127.0.0.1:6379/7' 那么就是使用komb 操作redis。 KOMBU_URL = 'amqp://username:password@127.0.0.1:5672/',那么就是操纵rabbitmq KOMBU_URL = 'sqla+sqlite:////dssf_sqlite.sqlite',那么就是在你的代码所在磁盘的根目录创建一个sqlite文件。四个////表示根目,三个///表示当前目录。 其余支持的中间件种类大概有10种,不是很常用,可以百度 google查询kombu或者celery的 broker_url 配置方式。 操作 kombu 包,这个包也是celery的中间件依赖包,这个包可以操作10种中间件(例如rabbitmq redis), 但没包括分布式函数调度框架能支持的kafka nsq zeromq 等。 但是 kombu 包的性能非常差,如何测试对比性能呢? 可以用原生redis的lpush和kombu的publish测试发布 使用brpop 和 kombu 的 drain_events测试消费,对比差距相差了5到10倍。 由于性能差,除非是分布式函数调度框架没实现的中间件才选kombu方式(例如kombu支持亚马逊队列 qpid pyro 队列), 否则强烈建议使用此框架的操作中间件方式而不是使用kombu。 可以把@task_deco装饰器的broker_kind参数 设置为 BrokerEnum.REDIS_ACK_ABLE 和BrokerEnum.KOMBU(配置文件的KOMBU_URL配置为redis), 进行对比,REDIS_ACK_ABLE的消费速度远远超过 BrokerEnum.KOMBU,所以之前专门测试对比celery和此框架的性能, 差距很大,光一个 kombu 就拉了celery大腿很多,再加上celery的除了kombu的执行性能也很低,所以celery比此框架慢很多。 test_frame\test_celery 下面有celery的发布 消费例子,可以测试对比下速度,同样gevent 并发和redis中间件, celery 执行 print hello 这样的最简单任务,单核单进程每秒执行次数过不了300,celery性能真的是太差了。 ``` 消费 ```python import time from function_scheduling_distributed_framework import task_deco, BrokerEnum @task_deco('test_kombu2', broker_kind=BrokerEnum.KOMBU, qps=5, ) def f(x): time.sleep(60) print(x) if __name__ == '__main__': f.consume() ``` 发布 ```python from test_frame.test_broker.test_consume import f for i in range(10000): f.push(i) ``` 你项目根目录下的 distributed_frame_config.py ```python KOMBU_URL = 'redis://127.0.0.1:6379/7' # KOMBU_URL = f'amqp://{RABBITMQ_USER}:{RABBITMQ_PASS}@{RABBITMQ_HOST}:{RABBITMQ_PORT}/{RABBITMQ_VIRTUAL_HOST}' # KOMBU_URL = 'sqla+sqlite:////celery_sqlite3.sqlite' # 4个//// 代表磁盘根目录下生成一个文件。推荐绝对路径。3个///是相对路径。 ``` ## 7.14 2021-04 新增以mqtt emq 作为消息中间件 例子,设置 broker_kind=BrokerEnum.MQTT ```python from function_scheduling_distributed_framework import task_deco, BrokerEnum @task_deco('mqtt_topic_test', broker_kind=BrokerEnum.MQTT) def f(x, y): print(f''' {x} + {y} = {x + y}''') return x + y for i in range(100): f.push(i, i * 2) f.consume() ``` ``` 这个默认做成服务端不存储消息,mqtt中间件适合前后端实时交互的。可以直接绕开后端flask django 接口,不用写接口, 前端直接发任务到mqtt,后端订阅,后端完成后,发送结果到唯一任务的topic 当然也可以 前端订阅topic,前端发任务到python flask接口,flask接口中发布任务到rabbitmq redis等, 后台消费完成把函数结果发布到mqtt,mqtt推送给前端 ``` ``` 此框架的消费做成了mqtt的共享订阅,例如启动多个重复的消费者脚本,不会所有消费脚本都去重复处理一个消息 ``` ## 7.15 2021-04 新增以 httpsqs 作为消息中间件 ``` @task_deco('httpsqs_queue_test',broker_kind=BrokerEnum.HTTPSQS) ``` ## 7.16 2021-04 新增支持下一代分布式消息系统 pulsar 。 ``` @task_deco('httpsqs_queue_test',broker_kind=BrokerEnum.PULSAR) 使用此中间件,代码必须在linux mac上运行。 在开源的业界已经有这么多消息队列中间件了,pulsar作为一个新势力到底有什么优点呢? pulsar自从出身就不断的再和其他的消息队列(kafka,rocketmq等等)做比较,但是Pulsar的设计思想和大多数的消息队列中间件都不同 ,具备了高吞吐,低延迟,计算存储分离,多租户,异地复制等功能,所以pulsar也被誉为下一代消息队列中间件 pulsar 的消费者数量可以不受topic 分区数量的限制,比kafka和rabbitmq 强。5年后会替代kafka rabbitmq。 ``` ## 7.17 2021-04 新增延时运行任务,介绍见4.8 ## 7.18 2021-09 新增 轻松远程服务器部署运行函数 ``` 框架叫分布式函数调度框架,可以在多台机器运行,因为消息队列任务是共享的。 我用的时候生产环境是使用 阿里云 codepipeline k8s部署的多个容器。还算方便。 在测试环境一般就是单机多进程运行的,用supervisor部署很方便。 所以之前没有涉及到多态机器的轻松自动部署。 如果要实现轻松的部署多台物理机,不借助除了python以外的其他手段的话,只能每台机器登录上然后下载代码,启动运行命令,机器多了还是有点烦的。 现在最新加入了 Python代码级的函数任务部署,不需要借助其他手段,python代码自动上传代码到远程服务器,并自动启动函数消费任务。 目前的自动化在远程机器启动函数消费,连celery都没有做到。 不依赖阿里云codepipeline 和任何运维发布管理工具,只需要在python代码层面就能实现多机器远程部署。 ``` ## 7.19 2021-09 新增 socket udp/tcp/http 消息队列,不需要安装消息中间件软件。 消费脚本,test_udp_consumer.py 必须先启动这个consume脚本消费然后才能发布,因为消费脚本里面启动了socket服务端,socket客户端才能连接。 ```python from function_scheduling_distributed_framework import task_deco,BrokerEnum @task_deco('127.0.0.1:5689',broker_kind=BrokerEnum.UDP) #使用BrokerEnum.TCP就是tcp,BrokerEnum.HTTP就是http def f(x): print(x) if __name__ == '__main__': f.consume() f.push('hello') ``` 发布脚本,test_udp_publisher.py 启动这个脚本之前必须先启动上面的消费脚本启动sockert服务端。 ```python import time from test_udp_consumer import f for i in range(100000): time.sleep(2) f.push(i) ``` ## 7.20 2021-09 新增 支持 nats 高性能消息队列 用法一如既往,只需要修改broker_kind的枚举,并在 distributed_frame_config.py 配置好 NATS_URL 的值就完了。 ```python @task_deco('test_queue66c', broker_kind=BrokerEnum.NATS) def f(x, y): pass ``` ![](https://visitor-badge.glitch.me/badge?page_id=distributed_framework)