by shigemk2

当面は技術的なことしか書かない

memo: Luigiを使っている話 #wfemeetup

memo: Luigiを使っている話 #wfemeetup

リクルートマーケティングパートナーズ

  • hadoop
  • spark

  • Luigi

    • 触っている人が少ない
    • 日本語の情報が少ない

アジェンダ

  • スタディサプリ
  • Luigiの基礎
  • Luigiと私

スタディサプリ

  • スタディサプリ
  • 塾講師の動画を定額で見れるやつ
  • アーキ
    • rails/postgre/mongo→kinesis/embulk/lambda→td(hive/presto)→tableau
    • embulkでマスタテーブル連携
    • TDへのクエリ発行とTableauへの転送をLuigiでやっている
    • Digdagでもクエリを実行
    • 社内部門へのレポーティングはLuigiでやっている

Luigiの基礎

  • 複数のバッチ処理を組み合わせたジョブ制御
  • スケジューリングに特化
  • Python
  • プラットフォームに依存しないので処理を一元的に記述できる
    • Hadoop/Hive/Pig/Spark MySQL/PostgreSQL/TD/BigQuery/Redshift SSH FTP
  • やっぱり世界的に有名な緑の配管工が名前のモデル

Luigiの範囲外

  • リアルタイム処理
  • 長時間継続実行
  • 処理の分散処理(数万)
  • スケジュール起動 トリガー起動

用語

  • Task
    • 処理の実体
  • Target
    • 正常終了を示すフラグ
  • Parameter
    • Taskの引数としてあたえることができる変数

https://luigi.readthedocs.io/en/stable/example_top_artists.html

  • TDへのクエリ実行はluigi_tdモジュール
  • Embulk実行はpythonからコマンドライン組み立て

https://www.slideshare.net/beniyama/ss-72260669

インストールと実行

pip install luigi luigi --module foo examples.Foo --local-scheduler

  • コマンド実行ごとにスケジューラが起動→タスクの依存関係を解決
  • スケジューラプロセスを独立して使うことも可能

Luigiと私

  • 微妙なハマり

  • Taskの実行時間が知りたい

    • コンソールログはそれなりにでる
    • けど、実行時間はわからない
    • PROCESSING_TIME をつかう
    • 実行時間が格納されているので、この変数を参照する
  • 並列実行とコマンド戻り値
    • Taskが失敗してもコマンドの戻り値はデフォルト0(Jenkins連携すると失敗しているのに成功扱いとなってつらい)
    • RETCODE の値を/etc/luigi/luigi.cfgで修正
  • RETCODE設定が効かない
    • luigi.cmdline.luigi_run()はRETCODEを参照する。luigi.run()ではRETCODEを参照しない
  • 並列実行すると戻り値がおかしくなる
    • Taskは失敗したのにリトライされて成功→パッチを投げて解決
    • 2.5から直っている
  • シンプルなワークフローを記述するには重い
  • GUIは期待しないで