by shigemk2

当面は技術的なことしか書かない

オープンソースアプリケーションのアーキテクチャ読書会(4) #read_aosa

内容

PDF

http://m-takagi.github.io/aosa-ja/aosa.pdf

GitHub

Tex

  • 第9章 The Hadoop Distributed File System
  • 9.3 ファイルI/O操作およびレプリカの管理
  • 9.4 Yahoo!における実例
  • 9.5 教訓
  • 9.6 謝辞
  • 第15章 RiakとErlang/OTP

9章

前回のラブライブ!(Hadoop 9.2)

  • NameNode
  • DataNode

  • 最新情報はメモリの中に持っている

  • 情報はメモリとディスク両方に持っていて、最新情報はメモリにある

どこかのタイミングでメモリの情報をディスクに持っていかないといけない →ジャーナル情報があって差分をそこに残している(データのロストを防ぐ) →ジャーナル+ハードディスクの情報という隙を生じぬ二段構え

ジャーナル情報は重い

ジャーナルを減らさない

ある段階でのジャーナル情報=チェックポイントノード

スナップショット=あるファイルの中身を残した状態

例: 12時の時点の情報を残しておいて、Hadoopがクラッシュしても情報を残せる

9.3 ファイルI/O操作およびレプリカの管理

ファイルの読み書き

ファイルシステムはファイルのデータを格納できてナンボのものだ。それをHDFSでどう実現しているのかを理解するには、読み書きをどのように行っているのか、そしてブロックをどのように管理しているのかを知る必要がある

  • 一旦書き込んだデータは削除できなくって、追記モードで再オープンする
  • シングルライターマルチリーダーモデル(ひとつの書き込み みんなで読み込み)
  • ハートビートを定期的にNameNodeに送信してリースを更新

(ハードリミットとソフトリミットの違い) http://x68000.q-e-d.net/~68user/unix/pickup?limit

  • ハードリミット(1時間)を超えてからクライアントがファイルのクローズやリースの更新に失敗すると、HDFSはそのクライアントが終了したものとみなす
  • クライアント側でバッファリング→バッファがいっぱいになるとデータをパイプラインへ送る
  • 受領通知を受け取る前にパイプラインにデータを送ることが出来る

  • 他のクライアントからのファイルの読み込みは可能である。そのため、ひとつのファイルを同時に複数のリーダーが読むこともあり得る

  • ソフトリミット=リースではない

  • パイプライン=一連の動作
  • 一旦NameNodeに問い合わせてる
  • 内部的にキューを持ってる
  • hflush命令

  • 何千ものノードからなるクラスタでは、どれかひとつのノードに障害が発生する(たいていはストレージ障害)なんてことは日常茶飯事

  • HDFSでは、HDFSファイルの各データブロックのチェックサムを計算して保存

http://ja.wikipedia.org/wiki/%E3%83%81%E3%82%A7%E3%83%83%E3%82%AF%E3%82%B5%E3%83%A0

HDFSのI/Oの設計はMapReduceのようなバッチ処理システムに最適化されており、シーケンシャルリード/ライトについては高いスループットを要求

ブロックの配置

  • よくある方法は、複数のラックにまたがってノードを展開する方法である
  • 多くの場合、同一ラック内のノード間のほうが異なるラックの場合よりもネットワーク帯域が広い
  • デフォルトのHDFSブロック配置ポリシーは、書き込みのコストを最小化することとデータの信頼性や可用性の最大化や読み込み帯域の集約とのトレードオフ
  • すべてのターゲットノードが選べたら、ノード群をパイプライン化する。その順番は、最初のレプリカに近い順となる。データはこの順でノードに送られていく
  • このポリシーに従えば、ラック間やノード間の書き込みトラフィックを軽減でき、一般に書き込みのパフォーマンスが向上する。ラックに障害が発生する可能性はノードの障害よりもはるかに低いので、このポリシーはデータの信頼性や可用性にはあまり影響を及ぼさない
  • デフォルトだと3つのレプリカが出来る
  • ラックの選び方は書いていない(ランダムかもしれないし、何かしらのルールがあるかもしれない)

レプリケーションの管理

  • NameNodeは、各ブロックが常に十分な数のレプリカを確保できているように努める
  • あるブロックのレプリカが少なすぎたり多すぎたりしないかどうかを、NameNodeはDataNodeからのブロックレポートで判断する
  • あるブロックのレプリカが多すぎる場合、NameNodeはどのレプリカを削除するかを選択する
  • 可用性と使用量のバランス
  • 新たなレプリカの作成コストを削減しつつ、ルールに基づいてレプリカを作成していく。
  • レプリカを作ったら、古いレプリカは削除する
  • レプリカが多すぎると対応出来ないから

(使用料→使用量)

バランサー

  • DataNodeのディスク利用状況は考慮しない
    • 新しいデータつまり、より参照されやすいデータを大量の空き領域があるDataNodeのごく一部に配置されてしまわないようにするため
  • バランサーは、HDFSクラスタ上のディスク使用量のバランスをとるためのツールである
    • このツールはアプリケーションプログラムとして配布されており、クラスタの管理者が実行することができる
    • バランサーは、その処理を最適化するために、ラックをまたがるデータコピーを最小限に抑える
  • 帯域 バランシング操作で使う帯域

ブロックスキャナー

  • 各DataNodeではブロックスキャナーが動作
    • 定期的にブロックのレプリカをスキャンし、保存されているチェックサムがブロックのデータと一致するかを調べる
    • スキャン時には読み込みの帯域を調節し、設定した期間で検証を終えられるようにする
    • レプリカの検証が目的
    • ブロックの破損が検知されたらそのブロックはすぐには削除しないで、別のところからレプリカを生成してから削除する
    • チェックサムは化けないという仮定
    • データをできるだけ長く永続させることが狙い

廃止措置

  • 「廃止をする」というマークをつけてから削除する
    • 廃止予定のノードにあるブロックのレプリカを他のDataNodeに移し始める
    • すべてのブロックが他のDataNodeにレプリケートできたことを確認した時点で、そのノードは廃止状態となる
    • 廃止状態のノードは、データの可用性に一切影響を及ぼさず安全に削除できる
    • どういうタイミングでマークをつけるかは知らない

クラスタ間データコピー

  • DistCp
    • HDFSクラスタへのデータのコピーが出来るツール
    • MapReduceフレームワークが、並列タスクスケジューリングやエラーの検出そしてリカバリーを自動的に処理

9.4 Yahoo!における実例 - 9.5 教訓 - 9.6 謝辞

  • Yahoo!の大規模HDFSクラスタは、約4000ノードで構成されている(著者調べ)
    • ひとつのラックに40ノードが格納
    • 大規模クラスタ(4000ノード)で扱っているのは、約6500万のファイルと8000万のブロック
    • データの安定性と永続性、コストパフォーマンス、ユーザーコミュニティのメンバー間でのリソース共有の仕組み、システム運用者による管理のしやすさなどが重要
(/ (* 80000000 3) 4000)
60000

データの永続性

  • 障害対応
    • データを三重にレプリケーションすることによる相関性のないノード障害によるデータのロストに対する強力な守り
    • ラックあるいはコアスイッチの障害
    • ラスタへの電源の停電→電源オンからの起動でうまく立ち上がらないノードがある
    • ノードに格納されているデータも破壊されたり失ったりする
    • 停電以外は多分大丈夫なんだろう

HDFSの共有機能

  • HDFSの利用が増えるにつれて、ファイルシステム自体にもリソースの共有手段を導入する必要が出てきた
    • Unixのパーミッションを模倣(違いは、HDFSにおける通常のファイルには実行権限やスティッキービットが存在しないという点)
    • ユーザーの身元確認が弱点→クライアントアプリケーションは信頼できるソースから取得した認証情報を使う必要
    • データストレージとして使える容量の総計は、データノードの数と各ノードに用意されたストレージで決まる
    • HDFSのアーキテクチャは大半のアプリケーションが入力として大規模なデータセットを流し込むということを前提 vs MapReduceプログラミングフレームワークには小さめのファイルを大量に(Reduceタスクの数と同じだけ)生成するという傾向がある

スケーリングおよびHDFS Federation

  • NameNodeのスケーラビリティは、これまでずっと課題
    • NameNodeのヒープサイズのせいで扱えるファイル数に制約が出てしまい、アドレス可能なブロック数にも制約が出てしまう
    • 新機能として、複数の独立した名前空間(およびNameNodes)でクラスタ内の物理ストレージを共有できるようにした
      • 異なるアプリケーションで使う名前空間を分離
      • クラスタの全体的な可用性を向上

(ここでいう名前空間はディレクトリの意)

  • アプリケーションは、単一の名前空間を使いたがる
  • 一番シンプルな手法は、クラスタ全体で名前空間を共有することだ

教訓

  • このファイルシステムは非常に頑健で、NameNodeの障害はめったに発生しない
  • 実際、ダウンタイムの大半はソフトウェアのアップグレードによるものである
  • フェイルオーバーのソリューション(手動だけどね)が登場したのも、つい最近のことだった

メタデータサーバー(=ねーむのーど)

謝辞

(ry

15章 RiakとErlang/OTP

前置き

Riakはオープンソースのデータベースで、Erlangがサポートされている。

Riakは Amazon Dynamo の論文に基づいて実装されている NoSQL データベース。

http://ja.wikipedia.org/wiki/Riak

Riakは分散型の耐障害性を備えたオープンソースのデータベースで、Erlang/OTPを使って大規模システムを作るよい例にもなっている。大規模な分散システムをErlangがサポートしてくれているおかげで、Riakは高可用性や、容量とスループットの両面における線形スケーラビリティといった、データベースではあまり見られない機能を備えている

http://ja.wikipedia.org/wiki/Erlang

15.1 Erlangの簡単な紹介

どうでもいいErlang入門

とりあえず入れるだけならパッケージインストールが出来る

Fedora(20)でやる例

$ sudo yum install -y erlang

なお、Riakをyumで入れると自動でErlangも入る

$ sudo yum install -y riak

Hello, World

http://blog.overlasting.net/2007-05-09-1.html

なぞるととりあえず出来る

ハローワールド

-module(sample).
-export([hello_world/0]).

hello_world() -> io:fwrite("Hello, World!\n").

階乗

-module(factorial).
-export([fac/1]).
fac(0) -> 1;
fac(N) when N>0 ->
Prev = fac(N-1),
N*Prev.

fac() -> io:fwrite(fac(10)).
Eshell V5.10.4  (abort with ^G)
1> c(sample).
{ok,sample}
2> sample:hello_world().
Hello, World!
ok
------
2> c(factorial).
factorial.erl:8: Warning: function fac/0 is unused
{ok,factorial}
3> factorial:fac(3).
6

http://www.ibm.com/developerworks/jp/opensource/library/os-erlang1/

このあたりはHaskellなどと似ている

Erlangではパターン・マッチングを多用 Erlangはタプル(積型とも呼ばれる)とリストをサポート [X|Xs]という表記はヘッドがXでテールがXsの空でないリストにマッチする。小文字で始まる識別子はアトム(それ自身を表わすもの)を示す。たとえば、タプル{ok,37}okはアトム

プログラムの単位で、psコマンドなどで表示されるプロセスではない

Erlangシステムにおけるプロセスはそれぞれ独立したメモリ空間で並行に動作し、メッセージ・パッシングを使って通信する

JVMみたいな理解でよいでしょう

プロセスは仮想マシン(VM)内にしか存在しないため、1つのVMは数百万ものプロセスを同時に実行できる

で、数百万くらいのプロセスに対して相互に読み込みや書き込みや削除を行うことができる旨が書かれている

プロセス間通信

プロセスにメッセージを送るプログラム

http://erlangworld.web.fc2.com/concurrent_programming/message.html

プロセスを生成するにはspawn(Module, Function, Arguments)組み込み関数(BIF)を使う

-module(hello_server).
-export([loop/0]).

loop() ->
    receive
        {hello, X} ->
            io:format("hello:~p~n",[X]),
            loop();
            
        {erlang, X, Y} ->
            io:format("erlang:~p  :~p~n",[X,Y]),
            loop();
            
        Other ->
            io:format("not supported:~p~n",[Other]),
            loop()
    end.
1> Pid = spawn(hello_server, loop, []).
<0.34.0>

=ERROR REPORT==== 8-Nov-2014::14:11:35 ===
Error in process <0.34.0> with exit value: {undef,[{hello_server,loop,[],[]}]}

2> Pid ! {hello, yuichi}.
{hello,yuichi}
3> Pid ! {erlang, hoge, fuga}.
{erlang,hoge,fuga}
4> Pid ! {にっこにっこにー, hoge, fuga}.
* 1: illegal character
4> Pid ! {lovelive, niko, maki}.        
{lovelive,niko,maki}
5> Pid ! {niko, maki}.          
{niko,maki}
6> Pid ! {niko, maki, hoge, hoge}.
{niko,maki,hoge,hoge}
7> Pid ! hoge.                    
hoge
8> Pid ! [hogehoge].
[hogehoge]

15.2 プロセスの骨格

いろんな役割のプロセスを作ることが出来て、それはライブラリでごにょごにょ出来る。

プロセスは生成する必要があり、必要ならばその別名を登録しておくこともできる。新しく生成されたプロセスの最初の動作は、プロセスのループ・データの初期化だ。プロセスの初期化時にspawn組み込み関数に渡された引数の結果をループ・データとして使うことが多い

  • spawnBIFに渡された引数はプロセスごとに異なる
  • プロセスを別名で登録するかどうか、登録するならばどのような名前にするかを決める必要がある
  • プロセス状態を初期化する関数は、プロセスが実行する作業に応じて異なる動作を行なう
  • システムの状態はどの場合でもループ・データで表わされるが、ループ・データの内容はプロセスによって異なる
  • 受信/評価ループの本体部分では、プロセスは異なるメッセージを受信して異なる処理を行なう
  • 最後に、終了時の後始末はプロセスによって異なる

15.3 OTPビヘイビア

物理的にも開発者がバラバラに散らばっている状況ではテンプレートがないとオレオレ実装が蔓延して収拾がつかなくなる

システムの設計や構築で利用できる、標準化されたブロックの集合

  • Open Telecom Platform
    • OTPはErlangのライブラリ群といくつかの基本原理で構成
    • 頑健なシステムを開発するための既製ツールとして利用
    • OTPのパターンやライブラリの多くはbehaviorsとして用意

OTPビヘイビアは、Erlang/OTP配布物に含まれているstdlibアプリケーションのライブラリ・モジュールとして提供されている

OTPビヘイビアの構成

  • 実際の処理を行なうワーカ・プロセス
    • サーバ
    • イベント・ハンドラ
    • 有限状態マシンがある。
  • ワーカや他のスーパバイザを監視するためのスーパバイザ(監視ツリーを構成する子プロセス群(ワーカや他のスーパバイザ)を監視する)

監視ツリーはアプリケーションと呼ばれるビヘイビアにまとめられる

ビヘイビアのモジュールには、そのビヘイビア・タイプに必要な汎用コードがすべて入っている

ビヘイビアモジュールが用意する汎用の機能

  • プロセスの生成と、場合によっては登録
  • 同期または非同期の呼び出しによるクライアント・メッセージの送受信。内部メッセージ・プロトコルの定義も含む
  • ループ・データの格納とプロセス・ループの管理
  • プロセスの停止

汎用サーバ

クライアント/サーバ型のビヘイビアを実装する汎用サーバ

gen_serverビヘイビアを使った一例

-module(riak_core_node_watcher).
-behavior(gen_server).
%% API
-export([start_link/0,service_up/2,service_down/1,node_up/0,node_down/0,services/0,
services/1,nodes/1,avsn/0]).
%% gen_server callbacks
-export([init/1,handle_call/3,handle_cast/2,handle_info/2,terminate/2, code_change/3]).
-record(state, {status=up, services=[], peers=[], avsn=0, bcast_tref,
bcast_mod={gen_server, abcast}}).

サーバの起動

  • gen_serverビヘイビアでは、spawnBIFとspawn_linkBIFを使う代わりに、gen_server:start関数とgen_server:start_link関数を使う
  • spawnとstartの主な違いは、呼び出しが同期型かどうか
  • ワーカ・プロセスのPIDが初期化されるまで呼び出しが戻らなくなるため、ワーカ・プロセスの起動をより決定的にすることができ、予期しない競合条件による問題を予防

メッセージの受け渡し

  • 同期呼び出し gen_server:call/2関数
  • 非同期呼び出し gen_server:cast/2関数