3.框架详细介绍

3.1 各种中间件选择的场景和优势

RABBITMQ_AMQPSTORM = 0  # 使用 amqpstorm 包操作rabbitmq  作为 分布式消息队列,支持消费确认.推荐这个。

RABBITMQ_RABBITPY = 1  # 使用 rabbitpy 包操作rabbitmq  作为 分布式消息队列,支持消费确认。

REDIS = 2  # 使用 redis 的 list结构,brpop 作为分布式消息队列。随意重启和关闭会丢失大量消息任务,不支持消费确认。

LOCAL_PYTHON_QUEUE = 3  # 使用python queue.Queue实现的基于当前python进程的消息队列,不支持跨进程 跨脚本 跨机器共享任务,不支持持久化,适合一次性短期简单任务。

RABBITMQ_PIKA = 4  # 使用pika包操作rabbitmq  作为 分布式消息队列。

MONGOMQ = 5  # 使用mongo的表中的行模拟的 作为分布式消息队列,支持消费确认。

PERSISTQUEUE = 6  # 使用基于sqlute3模拟消息队列,支持消费确认和持久化,但不支持跨机器共享任务,可以基于本机单机跨脚本和跨进程共享任务,好处是不需要安装中间件。

NSQ = 7  # 基于nsq作为分布式消息队列,支持消费确认。

KAFKA = 8  # 基于kafka作为分布式消息队列,建议使用BrokerEnum.CONFLUENT_KAFKA。

REDIS_ACK_ABLE = 9  # 基于redis的 list + 临时unack的set队列,采用了 lua脚本操持了取任务和加到pengding为原子性,随意重启和掉线不会丢失任务。

SQLACHEMY = 10  # 基于SQLACHEMY 的连接作为分布式消息队列中间件支持持久化和消费确认。支持mysql oracle sqlserver等5种数据库。

ROCKETMQ = 11  # 基于 rocketmq 作为分布式消息队列,这个中间件必须在linux下运行,win不支持。

REDIS_STREAM = 12  # 基于redis 5.0 版本以后,使用 stream 数据结构作为分布式消息队列,支持消费确认和持久化和分组消费,是redis官方推荐的消息队列形式,比list结构更适合。

ZEROMQ = 13  # 基于zeromq作为分布式消息队列,不需要安装中间件,可以支持跨机器但不支持持久化。

RedisBrpopLpush = 14  # 基于redis的list结构但是采用brpoplpush 双队列形式,和 redis_ack_able的实现差不多,实现上采用了原生命令就不需要lua脚本来实现取出和加入unack了。

"""
操作 kombu 包,这个包也是celery的中间件依赖包,这个包可以操作10种中间件(例如rabbitmq redis),但没包括分布式函数调度框架的kafka nsq zeromq 等。
同时 kombu 包的性能非常差,可以用原生redis的lpush和kombu的publish测试发布,使用brpop 和 kombu 的 drain_events测试消费,对比差距相差了5到10倍。
由于性能差,除非是分布式函数调度框架没实现的中间件才选kombu方式(例如kombu支持亚马逊队列  qpid pyro 队列),否则强烈建议使用此框架的操作中间件方式而不是使用kombu。
"""
KOMBU = 15

"""基于confluent-kafka包,包的性能比kafka-python提升10倍。同时应对反复随意重启部署消费代码的场景,此消费者实现至少消费一次,第8种BrokerEnum.KAFKA是最多消费一次。"""
CONFLUENT_KAFKA = 16

""" 基于emq作为中间件的。这个和上面的中间件有很大不同,服务端不存储消息。所以不能先发布几十万个消息,然后再启动消费。mqtt优点是web前后端能交互,
前端不能操作redis rabbitmq kafka,但很方便操作mqtt。这种使用场景是高实时的互联网接口。
"""
MQTT = 17

HTTPSQS = 18  # 基于httpsqs的

PULSAR = 20   # 下一代分布式消息系统。5年后会同时取代rabbitmq和kafka。

UDP = 21  # 基于socket udp 实现的。小规模使用不支持持久化,好处是不用安装软件。

TCP = 22  # 基于socket tcp 实现的。小规模使用不支持持久化,好处是不用安装软件。

HTTP = 23 # 基于http实现的,小规模使用不支持持久化,好处是不用安装软件。

NATS = 24  # 高性能中间件nats,中间件服务端性能很好,。

TXT_FILE = 25 # 磁盘txt文件作为消息队列,支持单机持久化,不支持多机分布式
你项目根目录下自动生成的 distributed_frame_config.py 文件中修改配置,会被自动读取到。

此文件按需修改,例如你使用redis中间件作为消息队列,可以不用管rabbitmq mongodb kafka啥的配置。
但有3个功能例外,如果你需要使用rpc模式或者分布式控频或者任务过滤功能,无论设置使用何种消息队列中间件都需要把redis连接配置好,
如果@task_deco装饰器设置is_using_rpc_mode为True或者 is_using_distributed_frequency_control为True或do_task_filtering=True则需要把redis连接配置好,默认是False。

3.2 框架支持的函数调度并发模式种类详细介绍

1、threading 多线程,使用自定义的可缩小、节制开启新线程的自定义线程池,不是直接用官方内置concurrent.futures.ThreadpoolExecutor
   此线程池非常智能,配合qps参数,任何场景可以无脑开500线程,真正的做到智能扩张,智能自动缩小。
   这线程池是智能线程池,由于非常好用,为这个线程池做了独立的pypi包,可以单独用于没有使用此框架的项目。

2、gevent    需要在运行起点的脚本首行打 gevent 猴子补丁。

3、eventlet  需要在运行起点的脚本首行打 eventlet 猴子补丁。

4、asyncio  async异步,主要是针对消费函数已经定义成了   async def fun(x)  这种情况,这种情况不能直接使用多线程,
   因为执行  fun(1)  后得到的并不是所想象的函数最终结果,而是得到的一个协程对象,所以针对已经定义成异步函数了的,需要使用此种并发模式。
   框架不鼓励用户定义异步函数,你就用同步的直观方式思维定义函数就行了,其余的并发调度交给框架就行了。

5、开启多进程启动多个consumer,此模式是 多进程  + 上面4种的其中一种并发方式,充分利用多核和充分利用io,用法如下。可以实现 多进程 叠加 协程并发。
# 这种是多进程方式,一次编写能够兼容win和linux的运行。

from function_scheduling_distributed_framework import task_deco, BrokerEnum, ConcurrentModeEnum
import os

@task_deco('test_multi_process_queue',broker_kind=BrokerEnum.REDIS_ACK_ABLE,
           concurrent_mode=ConcurrentModeEnum.THREADING,)
def fff(x):
    print(x * 10,os.getpid())

if __name__ == '__main__':
    fff.multi_process_consume(6)  # 一次性启动6进程叠加多线程。

3.3 框架最最重要的task_deco装饰器参数说明

    :param queue_name: 队列名字,只有这个参数是必填的,而且每个任务函数都要使用唯一的队列名字,其他参数都是可选的。
    :param function_timeout : 超时秒数,函数运行超过这个时间,则自动杀死函数。为0是不限制。
    :param concurrent_num:并发数量,
    :param specify_concurrent_pool:使用指定的线程池(协程池),可以多个消费者共使用一个线程池,不为None时候。threads_num失效
    :param concurrent_mode:并发模式,1线程(ConcurrentModeEnum.THREADING) 2gevent(ConcurrentModeEnum.GEVENT)
                              3eventlet(ConcurrentModeEnum.EVENTLET) 4 asyncio(ConcurrentModeEnum.ASYNC) 5单线程(ConcurrentModeEnum.SINGLE_THREAD)
    :param max_retry_times: 最大自动重试次数,当函数发生错误,立即自动重试运行n次,对一些特殊不稳定情况会有效果。
           可以在函数中主动抛出重试的异常ExceptionForRetry,框架也会立即自动重试。
           主动抛出ExceptionForRequeue异常,则当前消息会重返中间件。
    :param log_level:框架的日志级别,默认是debug级别,可以看到详细的执行信息,如果不想看到太多详细的日志,可以设置为logging.INFO常量(20) 或者数字20。
    :param is_print_detail_exception:是否打印详细的堆栈错误。为0则打印简略的错误占用控制台屏幕行数少。
    :param qps:指定1秒内的函数执行次数,可以是小数,例如0.1代表每10秒执行1次,10代表1秒执行10次函数。默认为0不进行控频
    :param msg_expire_senconds:消息过期时间,为0永不过期,为10则代表,10秒之前发布的任务如果现在才轮到消费则丢弃任务。
    :param is_using_distributed_frequency_control: 是否使用分布式空频(需要安装redis,每隔15秒自动获取分布式环境中的消费者数量,对于10000 qps控频对redis压力很小,
            因为不是采用redis的incr计数来控频,不是用频繁的操作redis incr命令来实现的,是使用的qps除以消费者数量),
            默认只对当前实例化的消费者空频有效。假如实例化了2个qps为10的使用同一队列名的消费者,
               并且都启动,则每秒运行次数会达到20。如果使用分布式空频则所有消费者加起来的总运行次数是10。
    :param is_send_consumer_hearbeat_to_redis   时候将发布者的心跳发送到redis,有些功能的实现需要统计活跃消费者。因为有的中间件不是真mq。
    :param logger_prefix: 日志前缀,可使不同的消费者生成不同的日志
    :param create_logger_file : 是否创建文件日志
    :param do_task_filtering :是否执行基于函数参数的任务过滤
    :param task_filtering_expire_seconds:任务过滤的失效期,为0则永久性过滤任务。例如设置过滤过期时间是1800秒 ,
           30分钟前发布过1 + 2 的任务,现在仍然执行,
           如果是30分钟以内发布过这个任务,则不执行1 + 2,现在把这个逻辑集成到框架,一般用于接口价格缓存。
    :param is_do_not_run_by_specify_time_effect :是否使不运行的时间段生效
    :param do_not_run_by_specify_time   :不运行的时间段
    :param schedule_tasks_on_main_thread :直接在主线程调度任务,意味着不能直接在当前主线程同时开启两个消费者。
    :param function_result_status_persistance_conf   :配置。是否保存函数的入参,运行结果和运行状态到mongodb。
           这一步用于后续的参数追溯,任务统计和web展示,需要安装mongo。
    :param is_using_rpc_mode 是否使用rpc模式,可以在发布端获取消费端的结果回调,但消耗一定性能,使用async_result.result时候会等待阻塞住当前线程。。
    :param broker_kind:中间件种类,一共支持30多种中间件,一次性支持了kombu的所有中间件。 入参见 BrokerEnum枚举类的属性。

关于task_deco参数太多的说明:

有人会抱怨入参超多很复杂,是因为要实现一切控制方式,实现的运行控制手段非常丰富,所以参数就会多。

看这个里面的参数解释非常重要,几乎能想到的控制功能全部都有。比如有人说日志太多,不想看那么详细的提示日志
,早就通过参数提供实现了,自己抱怨参数多又以为没提供这个功能,简直是自相矛盾。

想入参参数少那就看新增的那个10行代码的函数的最精简乞丐版实现的分布式函数执行框架,演示最本质实现原理。“ 
这个例子的框架啥控制手段都没有,参数自然就很少。

乞丐版分布式函数调度框架的代码在 

function_scheduling_distributed_framework/beggar_version_implementation/beggar_redis_consumer.py

3.3.2 task_deco装饰器 的 concurrent_num 和 qps 之间的关系。

 concurrent_num:并发数量。
    qps qps是有个很有趣的参数,能精确控制函数每秒运行多少次。
    concurrent_num和qps存在着一定的关系。
    
    例如对于下面这个函数
    
    def func(x):
           time.sleep(2)
           print(x)

    1)如果设置 concurrent_num = 1000(或100万)  qps = 10
    那么一秒钟会执行10次func函数。如果不指定qps的值,则不进行控频,消费框架会平均每秒钟会执行50次函数func。

    如果设置concurrent_num = 1000  qps = 5   
    那么一秒钟会执行5次func函数。所以可以看到,当你不知道要开多少并发合适的时候,可以粗暴开1000个线程,但要设置一个qps。
   
    那为什么次框架,可以让你粗暴的设置1000设置100万线程呢,并不是做了数字截取,判断线程设置大于多少就自动调小了,此消费框架并没有这样去实现。
    而是次框架使用的非concurrent.tutures.ThreadpoolExecutor,是使用的自定义的  ThreadPoolExecutorShrinkAble 线程池,
    此线程池其中之一的功能就是节制开更多的线程,因为对于上面的休眠2秒的func函数,如果设置concurrent_num = 1000000  qps = 5,
    正常来说开10个线程足够实现每秒执行5次了,此框架在调节线程新增线程时候进行了更多的判断,所以原生线程池不可以设置100万大小,
    而ThreadPoolExecutorShrinkAble可以设置为100万大小。

    此外ThreadPoolExecutorShrinkAble 实现了线程池自动缩小的功能,这也是原生concurrent.tutures.ThreadpoolExecutor没有的功能。
    自动缩小是什么意思呢,比如一段时间任务非常密集1秒钟来了几百个任务,所以当时开启了很多线程来应付,但一段时间后每分钟只来了个把任务,
    此时 ThreadPoolExecutorShrinkAble 能够自动缩小线程池,
    ThreadPoolExecutorShrinkAble实现了java ThreadpoolExecutor的KeepAliveTime参数的功能,
    原生concurrent.tutures.ThreadpoolExecutor线程池即使以后永久不来新任务,之前开的线程数量一致保持这。

    关于 ThreadPoolExecutorShrinkAble 的厉害之处,可以参考 https://github.com/ydf0509/threadpool_executor_shrink_able
    
    最终关于 concurrent_num 大小设置为多少,看自己需求,上面说的100万是举个例子,
    实际这个参数还被用作为线程池的任务队列的有界队列的大小,所以一定要设置为1000以下,否则如果设置为100万,
    从消息中间件预取出的消息过多,造成python内存大、单个消费者掏空消息队列中间件造成别的新启动的消费者无任务可消费、
    对于不支持消费确认类型的中间件的随意重启会丢失大量正在运行的任务 等不利影响。

    2)上面的func函数,设置 concurrent_num = 1  qps = 100,那会如何呢?
       由于你设置的并发是1,对于一个需要2秒运行完成的函数,显然平均每2秒才能执行1次,就是框架真正的只能达到0.5个qps。
       所以 concurrent_num 和 qps,既有关系,也不是绝对的关系。
    
    在对一个随机消耗时间的函数进行并发控制时候,如果函数的运行时间是0.5到20秒任意时间不确定的徘徊,你可以设置 concurrent_num = 100,
    如果合作方要求了只能1秒钟能让你使用多少次,例如需要精确控频10次,可以设置qps =10,concurrent_num随便搞个 一两百 两三百就行了,
    因为是智能的克制的调节线程池大小的,所以不会真的达到concurrent_num的值。

    3)qps是个小数可以小于1,如果要设置10秒执行一次函数,则设置qps=0.1

    这主要是介绍了 concurrent_num 和qps的关系和设置值,qps是优先,但受到concurrent_num的约束。

3.4 框架的乞丐精简版实现方式

由于框架的功能十分多,如果没学习36种设计模式,就很难看懂源码,现在演示精简实现原理

此精简例子十分之简单明了,就是死循环从中间件取任务然后丢到线程池里面执行。

此代码在 function_scheduling_distributed_framework/beggar_version_implementation/beggar_redis_consumer.py

这样简单明了,演示了基本原理,但是这个缺少消费确认(随意重启代码会造成大量任务丢失) qps恒定等20种功能。

def start_consuming_message(queue_name, consume_function, threads_num=50):
    pool = ThreadPoolExecutor(threads_num)
    while True:
        try:
            redis_task = redis.brpop(queue_name, timeout=60)
            if redis_task:
                task_str = redis_task[1].decode()
                print(f'从redis的 {queue_name} 队列中 取出的消息是: {task_str}')
                pool.submit(consume_function, **json.loads(task_str))
            else:
                print(f'redis的 {queue_name} 队列中没有任务')
        except redis.RedisError as e:
            print(e)


if __name__ == '__main__':
    import time


    def add(x, y):
        time.sleep(5)
        print(f'{x} + {y} 的结果是 {x + y}')

    # 推送任务
    for i in range(100):
        print(i)
        redis.lpush('test_beggar_redis_consumer_queue', json.dumps(dict(x=i, y=i * 2)))


    start_consuming_message('test_beggar_redis_consumer_queue', consume_function=add, threads_num=10)

3.5 框架的任务消费确认

此框架可以确保客户端任何时候 随意断电 粗暴重启代码 随意关机,任务万无一失。

3.4演示的精简版框架,实现redis的list的push和pop来模拟消息队列,很明显不靠谱,kill 9 重启代码或者重启电脑很容易会丢失大量任务。

分布式一致性消息传递、事件处理等场景中十分重要,分为3种情况:
At most Onece:最多一次,如果算子处理事件失败,算子将不再尝试该事件。
At Least Onece:至少一次,如果算子处理事件失败,算子会再次尝试该处理事件,直到有一次成功。
Exactly-Once:严格地,有且仅处理一次,通常有两种方法实现。

3.4实现的是最多一次,框架在多种中间件使用消费确认实现了万无一失 ,达到了Exactly-Once。
Exactly-Once是最好的也是实现难度最复杂的;At most Onece通常是最差的方式,也是最简单的实现方式。

框架在使用rabbitmq,内置默认了确认消费。

框架在使用redis作为中间件时候,有很多种实现方式,REDIS 是最不靠谱的会丢失消息。
REDIS_ACK_ABLE 、 REDIS_STREAM、 RedisBrpopLpush BrokerKind 这三种都是实现了确认消费。

3.6 框架的设计规范原则

源码实现思路基本90%遵守了oop的6个设计原则,很容易扩展中间件。
1、单一职责原则——SRP
2、开闭原则——OCP
3、里式替换原则——LSP
4、依赖倒置原则——DIP
5、接口隔离原则——ISP
6、迪米特原则——LOD

最主要是大量使用了模板模式、工厂模式、策略模式、鸭子类。
可以仿照源码中实现中间件的例子,只需要继承发布者、消费者基类后实现几个抽象方法即可添加新的中间件。