memo: Luigiを使っている話 #wfemeetup
リクルートマーケティングパートナーズ
- hadoop
spark
Luigi
- 触っている人が少ない
- 日本語の情報が少ない
アジェンダ
- スタディサプリ
- Luigiの基礎
- Luigiと私
スタディサプリ
- スタディサプリ
- 塾講師の動画を定額で見れるやつ
- アーキ
- rails/postgre/mongo→kinesis/embulk/lambda→td(hive/presto)→tableau
- embulkでマスタテーブル連携
- TDへのクエリ発行とTableauへの転送をLuigiでやっている
- Digdagでもクエリを実行
- 社内部門へのレポーティングはLuigiでやっている
Luigiの基礎
- 複数のバッチ処理を組み合わせたジョブ制御
- スケジューリングに特化
- Python
- プラットフォームに依存しないので処理を一元的に記述できる
- Hadoop/Hive/Pig/Spark MySQL/PostgreSQL/TD/BigQuery/Redshift SSH FTP
- やっぱり世界的に有名な緑の配管工が名前のモデル
Luigiの範囲外
- リアルタイム処理
- 長時間継続実行
- 処理の分散処理(数万)
- スケジュール起動 トリガー起動
用語
- Task
- 処理の実体
- Target
- 正常終了を示すフラグ
- Parameter
- Taskの引数としてあたえることができる変数
例
https://luigi.readthedocs.io/en/stable/example_top_artists.html
- TDへのクエリ実行はluigi_tdモジュール
- Embulk実行はpythonからコマンドライン組み立て
https://www.slideshare.net/beniyama/ss-72260669
インストールと実行
pip install luigi
luigi --module foo examples.Foo --local-scheduler
- コマンド実行ごとにスケジューラが起動→タスクの依存関係を解決
- スケジューラプロセスを独立して使うことも可能
Luigiと私
微妙なハマり
Taskの実行時間が知りたい
- コンソールログはそれなりにでる
- けど、実行時間はわからない
PROCESSING_TIME
をつかう- 実行時間が格納されているので、この変数を参照する
- 並列実行とコマンド戻り値
- Taskが失敗してもコマンドの戻り値はデフォルト0(Jenkins連携すると失敗しているのに成功扱いとなってつらい)
RETCODE
の値を/etc/luigi/luigi.cfgで修正
- RETCODE設定が効かない
- luigi.cmdline.luigi_run()はRETCODEを参照する。luigi.run()ではRETCODEを参照しない
- 並列実行すると戻り値がおかしくなる
- Taskは失敗したのにリトライされて成功→パッチを投げて解決
- 2.5から直っている
- シンプルなワークフローを記述するには重い
- GUIは期待しないで