- 導入は楽(pip install luigiするだけでよい)
- 複雑なことをしようとするとPythonの作り込みが必要で、Pythonのコードが複雑になりがち ちゃんと運用しようとするとすごいパイソニスタが必要
- データのメール転送はdigdagほど簡単じゃない
- ログ出力が邪魔な時がある
- 確かにUIはそんなに期待できない
memo: Airflowの紹介 #wfemeetup
- SIer
- Bigtop/Yetusコミッタ
アジェンダ
- 背景
- 紹介
- デモ
- 要望
背景
- 以前はOozieを使っていた
- 設定はXMLで定義するのでだるい
- fork-joinを使うためにはすべての経路が成功する必要がある
- どれかがOKだったら全部OKみたいな運用ができない
- 任意のDAGとして定義できない
- coordinatorを使う場合はデータが到着してることが前提
- UIが古い
- Airflowに可能性を感じた
- ワークフロー Python
- Trigger Rules
- フローに制約がない
紹介
- 2016/03にAirbnbがApacheに寄贈したやつ
- 1.7(AIRFLOW-921配下のissueが全部解決されたらリリースされると思う)
pip install airflow
- 導入の敷居は高くないかも
中心的な概念
- DAG
- 全体のタスク
- Operator
- 定義
- Task
- 処理の各ジョブ
- インスタンス
- Task Instance
- 実行結果
- ステータスの保持
- ID以外にタイムスタンプを持っている
Operatorの例
- コマンド発行
- BashOperator DokcerOperator
- SQL発行
- HiveOperator
- JdbcOperator
- MySQLOperator
- PigOperator
- データ転送
- S3
- 通知
- HTTP
- Slack
- Operatorの独自定義も可能
その他の概念/機能
- connection: データストアの接続管理(パスワードなど)
- hook: データのload/dump
- pool: タスクの並列数
- queue: celery 外部のキューイングシステムをジョブキューとしてつかえる
- branching: DAG中の条件分岐を実現
- SLA: 一定時間内に成功しなかったやつを通知
デモ
- ペットショップの売上データの整形集計分析
- bigtopのサンプル https://github.com/apache/bigtop/tree/master/bigtop-bigpetstore/bigpetstore-spark
- 売上データの生成→クレンジング→売上集計→顧客ごとの商品レコメンド
- スケジューリングはcronの記法 or Python記述 @onceで一度きり
- generator/etl/stats/recommend
不足している機能
- HA(high availability)構成のサポート
- 運用性の工場、DAGの登録/更新/削除をweb uiから実施できるようにしたい
- タイムゾーンがUTCでしかサポートしていない
任意の場所で処理を停止/再開したい
Airflow Meetup Tokyoをやろう
- 西海岸では数カ月に1回くらいやってるぽい
- パッチを投げてほしい
memo: Azkaban #wfemeetup
memo: Azkaban #wfemeetup
- ジョブ管理ツール
- Hadoopのジョブ依存関係を解決するために作られた
- あんまりモダンじゃないJavaで実装されていること
- WebServerがGUIを提供
- ExecuterServerでジョブを実行
- 基本的なことはできるが、高度なことはできない
- 依存関係を貼る
- 実行時間のチェック
- スケジューリング
- 失敗通知
- リトライ
- ファイルができたら実行みたいなのはできない
- 通知はメールだけ
- バイナリがないのでソースをビルドする必要がある
- 開発もあんまりアクティブじゃない
メーリスもそんなに動いていない
jobファイルで記述する
- Azkaban用語
- project
- job
- flow
- ジョブの履歴を見れる
- スケジューリング可
- failure options
- finish all possible
- 依存関係のないジョブは継続して実行される
- デフォルトじゃない
- finish all possible
- concurrent execution options
- skip execution
- run concurrently
- pipeline
SLA notification
- サービスレベル
- ジョブ実行が一定時間をオーバーしたら通知
後続確認の定義○
- スケジューリング△
- 実行処理の切り替え ☓
- 後続の処理の実行△
- 失敗や遅延の通知○
- GUIからの確認○
ユーズケース
- hadoopジョブの管理
- APIを使っている
- 1フロー別に1ファイルを書いている
- 内製フレームワークを使ってAzkabanを実行している
流れ
GitHub→Jenkins→Azkaban
- バージョン管理とデプロイはできる
DB/Azkaban/Hadoop/InfiniDB Netezza Presto/Saiku Pentaho Cognos Prestogres
状況
- 120のフローがスケジューリング
- ほぼ日次バッチ
- Hive関係のもの
- バッチサーバにAzkabanでやっていて、webもexecuteも同じサーバにおいてる
- テンプレートを作り、変数や名前を渡すようにした
- SLAオプションは使っていない
- プルリクエストが通ったらつかう
- HTTPジョブコールバックは使っていない
- hipchatを使っている
所感
- シンプルに使える
- API/UIは便利
- 開発を活発にするためにはジョブの数をメーリスに投げてみるといいかもしれない
- 開発者へのモチベーションアップ
memo: Luigiを使っている話 #wfemeetup
memo: Luigiを使っている話 #wfemeetup
リクルートマーケティングパートナーズ
- hadoop
spark
Luigi
- 触っている人が少ない
- 日本語の情報が少ない
アジェンダ
- スタディサプリ
- Luigiの基礎
- Luigiと私
スタディサプリ
- スタディサプリ
- 塾講師の動画を定額で見れるやつ
- アーキ
- rails/postgre/mongo→kinesis/embulk/lambda→td(hive/presto)→tableau
- embulkでマスタテーブル連携
- TDへのクエリ発行とTableauへの転送をLuigiでやっている
- Digdagでもクエリを実行
- 社内部門へのレポーティングはLuigiでやっている
Luigiの基礎
- 複数のバッチ処理を組み合わせたジョブ制御
- スケジューリングに特化
- Python
- プラットフォームに依存しないので処理を一元的に記述できる
- Hadoop/Hive/Pig/Spark MySQL/PostgreSQL/TD/BigQuery/Redshift SSH FTP
- やっぱり世界的に有名な緑の配管工が名前のモデル
Luigiの範囲外
- リアルタイム処理
- 長時間継続実行
- 処理の分散処理(数万)
- スケジュール起動 トリガー起動
用語
- Task
- 処理の実体
- Target
- 正常終了を示すフラグ
- Parameter
- Taskの引数としてあたえることができる変数
例
https://luigi.readthedocs.io/en/stable/example_top_artists.html
- TDへのクエリ実行はluigi_tdモジュール
- Embulk実行はpythonからコマンドライン組み立て
https://www.slideshare.net/beniyama/ss-72260669
インストールと実行
pip install luigi
luigi --module foo examples.Foo --local-scheduler
- コマンド実行ごとにスケジューラが起動→タスクの依存関係を解決
- スケジューラプロセスを独立して使うことも可能
Luigiと私
微妙なハマり
Taskの実行時間が知りたい
- コンソールログはそれなりにでる
- けど、実行時間はわからない
PROCESSING_TIME
をつかう- 実行時間が格納されているので、この変数を参照する
- 並列実行とコマンド戻り値
- Taskが失敗してもコマンドの戻り値はデフォルト0(Jenkins連携すると失敗しているのに成功扱いとなってつらい)
RETCODE
の値を/etc/luigi/luigi.cfgで修正
- RETCODE設定が効かない
- luigi.cmdline.luigi_run()はRETCODEを参照する。luigi.run()ではRETCODEを参照しない
- 並列実行すると戻り値がおかしくなる
- Taskは失敗したのにリトライされて成功→パッチを投げて解決
- 2.5から直っている
- シンプルなワークフローを記述するには重い
- GUIは期待しないで
memo: Jenkins 2.0 Pipeline & Blue Ocean #wfemeetup
- ベアメタルクラウド
アジェンダ
- おさらい
- Pipeline / Blue Ocean(Jenkins2.0から)
- デモ
- 活用事例
おさらい
- Java製のCI/CDツール
- Hudsonからフォークされた(2011)
- ジョブはshell script / Groovyで記述
豊富なプラグイン
ビルド/テスト/デプロイの自動化
- 開発サイクルの高速化
Jenkinsおじさん
- 柔軟性がありすぎてなんでもできる
- ジョブの運用作成の属人化
- 依存関係が複雑になりがち
- 実行環境が再現しにくい
- バージョンアップするとプラグインが動かない
解決策
- Pipelieで簡略化
- BlueOceanで可視化
- Jenkinsでバージョン管理
Pipeline / Blue Ocean
Pipeline Plugin
- DSLで定義
- 条件分岐/例外処理/並列実行を書ける
- Jenkinsfileでリポジトリに含めておく
- ブランチごとに自動でジョブを作ってくれる
各種DSL文法
- ステップ step
- 環境変数設定 environment
- タイムアウトやリトライを設定 option
- when
try/finally
シェル芸やGroovyに頼らなくても良くなる
Jenkinsfileを作る
Blue Ocean Plugin
- JenkinsのUI/UXをおしゃれにする仕組み
- Pipelineのすぐあとに出た
- Pipeline
- 手順や状態の可視化
- 個人用ダッシュボードの作成
- Design Languageで統一性のあるUI
- bootstrapぽい React.jsで実装されている
よいこと
- デプロイ処理の可視化
- 失敗処理の可視化
デモ
https://github.com/hico-horiuchi/jedi-ansible
- GitHubからクローン
- Dockerインストール
- WebサーバとLBのコンテナを起動
- 監視と可視化ツールのコンテナを起動
Serverspecで正常性を確認
並列化と直列実行の組み合わせ
Jenkinsfileで書く
デモが動かなかった
進捗や結果を可視化できちゃう
- PipelineエディタでワークフローをGUI上で作れる
チームでの活用事例
- いろいろなところでJenkinsを使える
- NTTなのにEnterpriseではないGitHubを使っている
- デプロイ自動化
- 死活監視/外形監視とアラートの通知
Consulで監視、Slackで通知
インターフェイスを統一すること
Jenkins2.0に移行したい
- Multijob
- ジョブの校正や依存関係が複雑になりやすい
- GUIの使い勝手が悪い(リンクの繰り返し)
- Job Builder
- YAMLで書いたものをJenkinsに流し込む
- テンプレート展開の嵐 可読性よくない
- ジョブのリポジトリとコードのリポジトリが分かれている→コードとジョブを一緒に管理したい
memo: Digdagの特徴とQuick Start #wfemeetup
Digdag :Digdagの特徴とQuick Start #wfemeetup
TreasureData
- fluentd
- digdag
- messagepack
workload automation
- 手作業の自動化
- バッチデータ解析
- アクセスログ
- ETL 集計処理 レポート生成 通知
- データロード
- メール送信
- システム間のデータ連携
- プロビジョニング自動化
- サーバ/DB/ネットワーク機器管理
- CI
- バッチデータ解析
求められる機能
- タスクを実行
- 定期実行
- ファイルを作成されたら実行
- 過去分の一括ジック
- エラーハンドリング
- 失敗通知 -リトライ
- 状態監視
- 失敗してないけど実行時間が長いやつ
- タスクの実行時間
- 実行ログの収集と保存
- 高速化
- タスクの並列
- 直列実行だと時間がかかる
- 同時実行数制限
- ワークフローのバージョン管理
- GUIによるワークフロー開発
- ここがまだ
- ローカルで再現できる
- Docker
- タスクの並列
ワークフローエンジン
OSS
- Makefile
- Jenkins
- Luigi
商用
- CA Workload
プログラミング言語型
- Pythonなどコーディングで実装する
- Luigi
- Airflow
- なんでもかける
- 読み書きが少しむずかしい
- Gitで管理できる
- なんでも書ける
- 全体の俯瞰が難しい
GUI型
- GUIでポチポチやると動く
- Rundeck
- Jenkins
- どのサーバで何を実行するか
- 誰でも開発できる
- 組み上げが簡単
- 複雑なループ処理が難しい
- バージョン管理が難しい
- 再現性が低い ローカルで試しづらい
定義ファイル + スクリプト型
- 定義ファイルに設定を書く
- Azkaban
- Gitでバージョン管理できる
- それなりに書きやすい
- スクリプトの理解が必要で煩雑になる
- プログラミング言語型と比較して成約が多い
Digdag
- 定義ファイル
- オペレータ
グループ化
書きやすい 読みやすい
- よくやる処理ならプログラミングしなくていい
- 特殊な処理ならスクリプトを書ける
- 実行状態をチェックする管理UIがある
- Gitでバージョン管理できる
- DSLの理解が必要
グループ化
- ワークフロー
- Ingest
- Enrich
- Model
- Load
- Utilize
- タスクは直列と並列を織り交ぜて実行される
- グループ化することで処理の俯瞰が楽になる
デモ
- TDへクエリを実行して、実行結果をTDのテーブルに突っ込む
- プラスでグループ化
- TDとの連携は、楽
- 書いたdigをdigdagpushすることですることでGUI上で実行できるようになる
- GUIで進捗を確認できる
- RETRY FAILEDを押すことで失敗したジョブから先のジョブを実行できる
- RETRY ALLを押すことで全部再実行できる
- backfillを使って過去データを再集計できる
- (過去の時点でジョブを実行できる)
オペレーター
- wait_for_arrival
- s3
- load_table
- redshift
- td/mail
- パラレル実行
ループ
オペレーターのリストはdigdagのドキュメントにある
- http
- shell
- py
- rb
まとめ
- architecuteのドキュメント
- operatorsのドキュメント
- ruby
- re:Invent digdag
- Qiitaのまとめ
今後の展望
- GUI上での編集機能
- プラグインのRubyGems化
memo: intro #wfemeetup
workflow engineとは
- 処理同士の先行・後続関係を workflow として定義できる
- Workflow を定期的に、あるいはファイルの到着など何らかの条件に基づいて実行できる
- 先行する処理の実行結果に基づいて、次に実行する処理を切り替えることができる
- 複数の処理を並列に実行したり、それらが全て(失敗も含めて)完了するのを待って後続の処理を実行したりすることができる
- Workflow が途中で失敗したり、許容範囲を超えて遅延したりした場合に、メールなどの手段で管理者に通知できる
- コマンドラインやGUIから、workflow の定義内容を参照したり、進行度合いや実行結果を監視できる
「全部できる、一部できる」がワークフローエンジンの条件
- 開発者利用者に発表
- ユーザー同士のネットワーク
- プロダクト選定に役立てる
- 導入事例
おしながき
- digdag
- luigi
- azkaban
- jenkins
- airflow
ack-grep installation
sudo apt-get install ack-grep
Ubuntuならこれでもいいのですが、まあドキュメントを読んで使いやすいものを選ぶとよろしいかと。
ansible file stat
ファイルの存在確認とかのドキュメントはこちら。
stat - retrieve file or file system status — Ansible Documentation
install ruby documentation
公式を、改めて読んで、わかること
- どのツールを使ってインストールしたらいいかは明言されていない
- apt/yumなどからは古いrubyしか入らないので非推奨
- ソースコードからのインストールからでもいいけど、パッケージ管理されてないよ
memo: rbenv ubuntu
rbenvとかruby-buildは本家のりどみから取ってくるとして、別途apt-getでlibssl-devとlibreadline-devのインストールが必要なようだ。
sudo apt-get install -y libssl-dev libreadline-dev
peco v0.5.0
この前のやつが本ちゃんでリリースされたっぽい。
なお、fishだとexecでパイプラインが使えないようなので、exec抜きでもいける
ack $args . | peco --exec 'awk -F : '"'"'{print "+" $2 " " $1}'"'"' | xargs less '
fish shell documentation
exec replaces the currently running shell with a new command. On successful completion, exec never returns. exec cannot be used inside a pipeline.
どうしようか。
peco install
最終的にGOPATHはghqのパスと同じにした
[ghq] root = ~/projects/src
からの、glide installとgo build cmd/peco/peco.go。そして、ソースのディレクトリにできるpecoファイルにaliasを貼っておく