任务调度

介绍

在本方案中,使用celery作为任务分发平台。对于存入celery的大量任务,能达到以下的要求:

  1. 任务逻辑相互独立
  2. 横向扩展任务处理能力
  3. 抽象(抽象的意义在于,化繁为简) 除业务逻辑以外的 处理过程,将后续代码编写的关注点主要放在业务逻辑的实现上
  4. 链式任务触发

结构

flow.png

设计方案

任务逻辑封装

我们将任务逻辑按照约定的格式封装,并以设置name属性的方式为任务打上标记,而后通过current_node/default_node确定app 发送的节点。这些准备工作完成后,将任务名称和任务数据交由celery app进行发送。如此能够保持发送端的轻量级,使任务更快、更稳定、更无压力的发送到执行端。

以下是任务的封装功能伪代码。


class BaseLogic(object):

    logic_name = None
    nodes = []

    def __init__(self):
        if not self.logic_name or not self.nodes:
            raise attributeError

    def next_node(current_node=None): # 逻辑具有了修改下一个执行节点的能力。

    def send_other(self,  other_logic_name):

    def send_others(self, other_logic_names):

    def set(self,  key,  value):

    def get(self,  key,  default=None):

    def die(self):

    def __repr__(self):  

class SpiderLogic(BaseLogic):

    def crawl(self):

    def publish(self):

    def store(self):

      

发送器封装

class App():

    def __init__(self, app->celery.app):
        self.app = app

    def send(self, node_name, data=None): # 发送初始任务。发送任务名称和传递的数据(LogicClass.name, data)

任务处理流程简述

不同节点的worker会收到属于本节点的任务。worker提供任务的执行流程。

worker是无状态的,worker的每次运行会传入此次运行所需的数据,多次任务运行之间相互不会产生影响。对于无状态的系统,可以避免考虑数据同步等额外的交互问题。同时根据任务数量级和任务执行所需要的资源的不同,可以对worker进行横向扩展。

基础task处理逻辑的封装功能伪代码:

class ProcessTask(celery.Task):
    """
    worker中实际运行的任务流程封装
    """
    name = None

    def __init__(self):

    def run(self, data):
        logic_ins = logic_factory(data)
        self._run(logic_ins)
        self.send_next(logic_ins)
        self.send_others(logic_ins)

    def _run(self, logic_ins):
        raise NotImplementedError

    def send_next(self, logic_ins):
        node_name = logic_ins.next_nodes.pop()
        self.send_to_node(node_name)

    def send_others(self, logic_ins):

        for node_name in logic_ins.other_nodes:
            self.send_to_node(node_name)

    def send_to_node(self, node_name):

class CrawlProcessTask(ProcessTask):

    name = 'crawl'

    def _run(self, logic_ins):
        logic_ins.crawl()

收到任务的名称以后,worker会通过 工厂方法 根据任务名称实例化对应的类,并且按照worker既定的执行流程,执行对应的业务逻辑。

执行完成后,按照logic class 既定的顺序,自动触发下一个流程。当然如果需要将数据进行链式处理,那么在逻辑类中,通过定义other_logics,数据也会发送到对应节点开始新的流程。

总结

任务逻辑相互独立的意义在于,当一个任务需要调整逻辑时,会自然而然的将修改锁定在独立的代码块中,也就是最小化此次修改的影响范围。所以我们将任务相互独立的抽象成不同的逻辑类。而当任务相互独立以后,我们需要一个统一的任务运行机制,并且此机制希望对于任务毫无干预,也就是机制不关注运行任务的内容是什么,而是关注运行任务的流程。所以我们对于任务运行设计了一套流程。

在流程中,我们将任务封装成为一个个的类,在类中定义好业务逻辑以及处理节点的顺序。类通过一个统一的入口进入流程处理。同时,通过这个顺序,被封装后的celery app 可以找到首个接收的worker 节点,然后通过celery的分布式任务分发能力,进行任务的分发。

同时在类中设置链式处理逻辑,解除单个任务之间的壁垒,将任务链条串起来,解决任务之间数据交互的问题。

故此,我们将业务代码抽离出任务分发流程,任务相互独立,同时提供数据传递的方案,保证任务流程的正常执行。同时通过worker节点的无状态,以及celery节点的扩容能力,使得当有大量任务产生的时候,能够对任意任务节点数量进行横向扩展。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容