luigi使用 - multiple pipeline


一般地,通常使用luigi框架搭建流程都是只有一个pipeline(暂时没有通过google找到有多个Pipeline的教程)

由于工作需要,需要把之前写好的多个流程串联起来作为一个总的pipeline,并且各个pipeline之间有一定的依赖关系


假设有 pipeline_1, pipeline_2, pipeline_3 三个子流程(可单独运行),结构如下:

  class TaskSon(luigi.Task):
    def run(self):
      pass
    def output(self):
      return luigi.LocalTarget("tmp")

  class workflow(luigi.Task):
    def required(self):
      return TaskSon()

同时,有主流程main_pipeline,结构如下:

  class Pipeline1_Task(luigi.Task):
    def run(self):
      # 执行子流程 pipeline_1
      pass
    def output(self):
      # 返回子流程 pipeline_1 的输出
      pass

  class Pipeline2_Task(luigi.Task):
    def required(self):
      # 依赖于子流程 pipeline_1 的输出
      return Pipeline1_Task()
    def run(self):
      # 执行子流程 pipeline_2
      pass
    def output():
      # 返回子流程 pipeline_2 的输出
      pass

  class Pipeline3_Task(luigi.Task):
    def required(self):
      # 依赖于子流程 pipeline_2 的输出
      return Pipeline2_Task()
    def run(self):
      # 执行子流程 pipeline_3
      pass
    def output(self):
      # 返回子流程 pipeline_3 的输出
      pass

  class workflow(luigi.Task):
    def required(self):
      return Pipeline3_Task()
Screenshot.png

这里需要考虑一个问题

  • 如何将子流程的输入输出跟主流程中对应任务的输入输出对接

为了解决这个问题,首先需要考虑,如何将子流程中所有任务的输出反馈到主流程

  • 一般地,流程的结构设计都是有一个主入口(workflow),由主入口任务(在required方法中)初始化并启动其他任务

  • 那么,就需要在workflow任务中把整个流程中其他任务的输出作为一个整体输出:

  class workflow(luigi.Task):
    def required(self):
      return [otherTask()]
    def output(self):
      # *** 这样就可以将主入口所依赖的所有其他任务的输出返回 ***
      return self.input()

既然能够获取到子流程中所有任务的总输出,那么就需要考虑把输出反馈给主流程

  • 考虑到workflow任务获取其他任务的总输出的方法,可以直接将workflow的output方法跟对应主流程任务的output方法结合:
  class Pipeline1_Task()
    def output(self):
      # *** 这样子流程的output就会跟主流程任务的output对接 ***
      # 同时,这样处理在主流程启动时,luigi框架依旧是会检查子流程的输出是否已经完整
      from pipeline1 import workflow as pipeline1
      return pipeline1().output()
  • 至于主流程中的任务的依赖就比较容易处理了:
  class Pipeline2_Task(luigi.Task):
    def required(self):
      # 由于Pipeline1_Task的输出即为子流程pipeline_1的输出,所以这里luigi会检查到子流程pipeline_1的输出是否完整
      return Pipeline1_Task()

子流程的输入输出已经可以跟主流程的输入输出对应上了,那么就需要考虑如何怎么运行子流程

这里是没有办法通过pipeline1.workflow().run()直接执行,因为入口任务是没有重载run方法

  • 所以,这里把子流程作为一个黑箱执行:
  class Pipeline1_Task(luigi.Task):
    def run(self):
      from pipeline1 import workflow as pipeline1
      # *** 黑箱 ***
      luigi.Build([pipeline1()])

由于需要确保主流程中的任务“挂载”的是统一的一个子流程,则可以定义一个变量来储存子流程对象

  class Pipeline1_Task(luigi.Task):
    pipeline = None
    def run(self):
      luigi.Build([self.pipeline])
    def output(self):
      # 由于每个任务在流程中优先执行的是output方法(当任务被依赖的时候luigi会利用output方法检查输出的完整性),所以self.pipeline的初始化应该在output方法内执行
      from pipeline1 import workflow as pipeline1
      self.pipeline = pipeline1()
      return pipeline1.output()

这样,就可以完整地把子流程装载到主流程的任务中

©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容

  • 策划活动需要有创意,需要不断创新。不要做常规的打折、促销、优惠券、买一送一,节假日促销、年终大促、老板跳楼价,太没...
    春天的昵图阅读 482评论 0 0
  • 昨天深夜,丁彦雨航在个人微博上宣布了经过一个礼拜的时间深思熟虑后的决定——即将到来的新赛季继续留在CBA为山东队冲...
    我的名字叫做坚韧阅读 292评论 0 0
  • 金句:我拼尽全力,只为换来一个不确定的奇迹! 把自己一整天锁在家里,目的就是静默思考。 完成了的事项: 1.60s...
    Ada彩英阅读 127评论 0 0
  • 今天去和弟弟吃了麻辣香锅,花了100多元,味道确实美,还喝到了很好喝的茶味满满的甜胚子奶茶,看见了童工,热情而让人...
    简单的J阅读 158评论 0 0