前提
- JJUGのみなさまならばJava8 Stream APIは知っているだろう
- Java8 StreamAPIとApache SparkとAsakusa FrameworkのコーディングはDAGの点から似ている
- Asakusa Frameworkの紹介
自己紹介
Scalaですっごいお世話になっています
年表
- 2006 Apache Hadoop
- 2010 Hadoopを知る
- 2010 Spark OSS化
- 2011/3 Asakusa Framework公開
- 2014/2 Apache Sparkトップレベル昇格
2014/3 Java8リリース
バッチ処理を早くしたいという同期
- 2,3倍の速度なんて無理だろうと思っていたらHadoopで簡単にできるよ!ってなったので勉強しはじめた
- 勉強会も超人気
- なるべく早くHadoopの情報を知りたいという理由からTwitterを始める
SparkをScalaをごにょごにょしている人がいたのでScalaも勉強しはじめるHadoop(HBase)のサンプルをScalaで書いている人がいたのでScalaの勉強を始めました。そして、Scalaで出来た分散フレームワークということでSparkを知りました。- Hadoopを勉強しているうちにAsakusa Frameworkなどを知る
- Asakusa Frameworkをやっていたら転職できる!
StreamAPI Spark AsakusaFW
- 省略!したいと思った
- 内部イテレーターで処理を行うためのAPI
Hadoopについて
- 分散処理フレームワーク
- HDFS(分散ファイルシステム)
- MapReduce(処理方式 アルゴリズム)
YARN(リソース管理)
データを複数のマシンに分散配置
- MapReduceアプリケーション(jarファイル)を各マシンに転送して分散処理
従来だと
- RDBにデータを置きバッチサーバを経由して転送する(分散の意味があまりない)
- (補足) バッチサーバーを分散してもDBが分散しないとDBに負荷が集中して意味がないということ。(DBを分散させるということでNoSQLが流行りました)
Hadoopバッチ
- Hadoopクラスターがあって、データとアプリケーションが分散配置されている(アプリケーション 数百メガバイトのjarファイルもデータのあるサーバに転送する)
- (補足)何GB・何TBのデータを転送するよりは、大きくても数百MBのjarファイルを転送する方が転送量が少ない、ということです。
自分が思うHadoopの功績
- ビッグデータはバズワード化したが、Hadoopは分散処理を身近にした
- MapReduceはシンプルだがアセンブラでコーディングするというイメージなので、コーディングしづらい
- 常にファイルを読み書きするシンプルな構成なのでメモリにデータをキャッシュして使い回すみたいなことはできない(機械学習はそういうことをしたいらしい)
→という問題点があったので、Apache Sparkが出てきた
Apache Spark
- RDDを使ったコーディング(Scala)
- インメモリでキャッシュして使いまわせる
- 分散ファイル入出力にはHDFSを利用
Asakusa FW
- 分散処理するバッチアプリケーションを作成するためのフレームワーク
- 実行基盤としてHadoopやSparkを使える
- ノーチラステクノロジーズが開発
類似点
- Java8 Stream API
- メソッド参照
これをDAGで表現する
グラフ理論
- 向きがある
- 閉路(循環)がない
といったフローのことを指す
Java8 Stream API
- 向きがあって循環のない流れ
- s0 filter map out1
cf. Scala
filterしてmapして…
ScalaのStreamもJava8 Stream APIと変わらない
Apache Spark
- DAGなのは変わらない
- RDDが使われている
- s0 filter map out1
Asakusa FW
- フローというところでDAGなのは変わらない
- 入出力はファイル
- s0 Branch(f) Update(m) out1
- inputがsourceで現れる
StreamAPIとかSparkなどと比較しても、DAGを使っているという点ではAsakusaFWと一緒
コンパイルするとMyOperatorFactoryが生成される アノテーションとしてBranchとかUpdateとかが生成される
- (補足)BranchやUpdateのアノテーションをOperatorのメソッドに指定すると、それに応じたフロー用のメソッドを持つFactoryクラスが生成されます。
類似点まとめ
- 処理がDAG(有向非循環グラフ)で表せる!
相違点
複数入力
- 合流(union)
- 結合(join)
- zip
- unionやjoinはSQLとイメージはいっしょ
合流 union
- Stream API concat
- Spark ++を使う(Scalaは記号をメソッドとして使える)
- AsakusaFW confluent(可変長引数ではないがある程度の数の引数なら面倒見れる)
結合 join 一対一
- Stream API ない
- Spark join
AsakusaFW masterJoin(join)
- (補足)多対一です。(少なくともAsakusaFWは)
マッチしないものは結合しない
結合 cogroup 多対多
- Stream API ない
- Spark cogroup
- AsakusaFW CoGroup(group)
複数入力 zip
順番でレコードのKeyValueをくっつける
Stream API ない
- Spark zip
AsakusaFW ない
結構難しい実装なので、多用は禁物
複製 duplicate
- Stream API ない
- Spark map
- AsakusaFW m1 m2
分岐 branch
複数出力
Stream API ない
- Spark ない
- AsakusaFW branch
根本的な相違点 使用目的
- Stream API 内部イテレーターのため ワンライナーで書きやすい
- Scalaのコレクション Scalaの書き方で、並列Streamでマルチスレッド処理が可能
- Spark 複数マシンで分散する処理を書ける マルチプロセス
AsakusaFW バッチアプリケーションを書く。テスト機構あり
マルチスレッド 複数マシン分散 扱うデータ量の違い
- マルチスレッド処理といっても、数十万件といったデータでないとメリットがない
- 数千万 数億とかだと、データ自体を分散し、それぞれ処理するほうが効率が良い Hadoop Spark(自分で頑張るよりは…)
- (補足)「自分で頑張るよりは」は、Stream APIでもファイル読み込み部分を自分で分散読み込みさせれば出来るけれども、そこを自分で頑張るよりは既にあるHadoopやSparkを使った方がいいんじゃないか、ということです。
Asakusa Frameworkの目的
分散処理するバッチアプリケーションを作成するためのフレームワーク
- 当初はHadoopが実行基盤
- 少量データの場合はオーバーヘッドが大きい
- マシンVMを立ち上げるので、普通のバッチであれば数秒で終わるのが、逆に時間がかかる
- 解決策としてのスモールジョブ実行エンジンを実装
- 単体テストの実行環境としても使える
- 最新版では実行基盤としてSparkを使用可能
Asakusa Frameworkで作ったアプリケーションはコンパイルしてjarファイルを生成
- Spark版実行バイナリはアプリケーションをリコンパイルするだけ!
- Javaなどもバージョンが変わってもコンパイルするだけでいいので、結構似ている
- Hadoop Spark以外も採用される可能性がある?
- Spark版実行バイナリはアプリケーションをリコンパイルするだけ!
バッチの実行時間
- Hadoop(180個のジョブを45-50分)
- Hadoop + スモールジョブ実行エンジン(180個のジョブを10-15分)
- Spark(180個のジョブを3-4分)
- 超速い Sparkは効率が良いらしい
- Hadoopだと開いているタスクができるらしい
やっぱりDAGでフローを書ける
- 関数合成で最適化
分散処理するバッチアプリケーションを作成するためのフレームワーク(再掲)
- そもそも分散処理が必要
- Sparkでも小さなデータが扱えるようになる。
- joinが必要な場合はAsakusaFWとか使えば良いと思われる
余談 ハードウェアとソフトウェアの関係
- ハードウェアが変わればそれに最適なソフトウェアも変わる
- HadoopはHDDを念頭においた作りになっている
- 大きなサイズのブロックアクセスに弱い(HDDはランダムアクセスに弱い)
- HDDはある程度大きなサイズのブロックの(シーケンシャルな)読み込みの方が効率が良く、ランダムアクセスは効率が悪いです。
余談 ハードウェアの技術動向
- ディスク HDD SSD
- CPU クロック数は頭打ち メニーコア化 100コア超 高性能になるともしかしたらディスクもいらなくなるのでは?
- メモリー 不揮発性メモリー(電源を切ってもメモリのデータが消えない) フラッシュメモリー MRAMなど
- DBの人はこれに注目してて、DBのログをハードディスクに書き込むなどしているが、メモリが不揮発性になるとこれが要らなくなるのでは
余談 メニーコアに対する今後の予想
- Java8 Stream API
- parallel()を使えばいい(ソース修正が必要)
- Apache Spark
- executorはSpark管理下なので、実行環境の指定を変えるだけでいいかも
- Asakusa Framework
- リコンパイルするだけで対応できると思われる(現在開発中)
まとめ
- DAGを使ってコーディングしているという点では似ている
- Asakusa FWは難しいという声を聞くが、考え方はStream APIと同じ
- 使用目的(扱うデータ量 実行基盤)の違い
- Asakusa FWはリコンパイルするだけで実行基盤を切り替えられる(実装を変える必要がないので、メンテナンスがすごく楽)
質疑
- AsakusaFW 設計思想的に例外で落ちることはなく、落ちる場合はバグってる
- 時系列集計など、やりにくい処理はある? range joinみたいな仕組みがあるので、ちょっとコーディングしたらできる