luigi

データフロー制御フレームワーク
spotifyが開発
タスク同士の依存関係を解決し、オブジェクト指向でデータ処理のタスクが書ける
Hadoopとの連携

class Artists(luigi.Task):

	date_interval = luigi.DateIntervalParameter()
	use_hadoop = luigi.BoolParameter()

	def requires(self):
		if self.use_hadoop:
			return AggregateArtistsHadoop(self.date_interval)
		else:
			return AggregateArtists(self.date_interval)

	def output(self):
		return luigi.LocalTarget("data/top_artists_%s.tsv" % self.date_interval)

	def run(self):
		top_10 = nlargest(10, self._input_iterator())
		with self.output().open('w') as out_file:
			for streams, artist in top_10:
				out_line = '\t'.join([
						str(self.date_interval.date_a),
						str(self.date_interval.date_b),
						artist,
						str(streams)
				])
				out_file.write((out_line + '\n'))

	def _input_iterator(self):
		with self.input().open('r') as in_file:
			for line in in_file:
				artist, streams = line.strip().split()
				yield int(streams), artist