データフロー制御フレームワーク
spotifyが開発
タスク同士の依存関係を解決し、オブジェクト指向でデータ処理のタスクが書ける
Hadoopとの連携
class Artists(luigi.Task): date_interval = luigi.DateIntervalParameter() use_hadoop = luigi.BoolParameter() def requires(self): if self.use_hadoop: return AggregateArtistsHadoop(self.date_interval) else: return AggregateArtists(self.date_interval) def output(self): return luigi.LocalTarget("data/top_artists_%s.tsv" % self.date_interval) def run(self): top_10 = nlargest(10, self._input_iterator()) with self.output().open('w') as out_file: for streams, artist in top_10: out_line = '\t'.join([ str(self.date_interval.date_a), str(self.date_interval.date_b), artist, str(streams) ]) out_file.write((out_line + '\n')) def _input_iterator(self): with self.input().open('r') as in_file: for line in in_file: artist, streams = line.strip().split() yield int(streams), artist