Asakusa バッチ実行環境の構成 (前編)

この記事は Asakusa Framework Advent Calendar 2013 の 13 日目の記事として書いています.

今回の記事は以下のページを参考にしています. 用語などもここでの定義を使用しています. http://docs.asakusafw.com/0.5.2/release/ja/html/administration/deployment-with-windgate.html#id1

記事が長いので前編, 後編に分けています.

後編はこちら → Asakusa バッチ実行環境の構成 (後編)

Asakusa バッチを実行するためのコンポーネント

Asakusa Framework を使ったバッチを実行するとき, 様々なコンポーネントが関連し合って一連の処理を動かしています. Asakusa バッチの本体は Hadoop ジョブの集まりですが, それだけではバッチとしては機能不足です. 実際にバッチとして動かすためには, ジョブ間の依存関係の管理や, ジョブ間で受け渡すデータの管理, 外部システムとの入出力を行う機能が必要になります. さらに, それらが分散配置されたときに全体を統括する機能も必要です.

Asakusa にはそれぞれの機能を担うコンポーネントが用意されており, それらを適宜組み合わせ構成します. それぞれのコンポーネントと組み合わせの特徴を掴み, 目的に合わせて構成するための指針を書いていきます. (あくまで自分の経験に基くもので, 所属する組織の公式見解ではありません. 念の為)

まずは, Asakusa バッチに登場するコンポーネントを分類しつつ並べていきます.

Asakusa Framework コア

Asakusa Framework のランタイムモジュールです. ${ASAKUSA_HOME}/core 以下に配置されています. これについては特に説明は不要でしょう.

Asakusa バッチアプリケーション

mvn package すると出来上がる「バッチ名-batchapps-バージョン.jar」という名前の .jar ファイルのことです. 「バッチ名」と「バージョン」の部分はプロジェクトの設定によって変わります. 以後, Asakusa バッチアプリケーションの実体としての .jar ファイルのことを, “batchapps.jar” と書きます.

なお, プロジェクトの開発管理には Maven を使用している前提で書きます. Gradle を使った管理方法については別の記事で取り上げる予定です.

この .jar ファイルには, 以下のものが格納されています.

  1. Asakusa DSL で記述されたプログラムから変換 (Asakusa では「コンパイル」と呼んでいます) された Hadoop ジョブ
    • → これは .jar ファイルの形で batchapps.jar の中に格納されています.
  2. Hadoop ジョブの依存関係を記録したファイル
    • → これはバッチを実行する際に使われます
  3. コンパイル前の演算子やフローの依存関係グラフの .dot ファイル
  4. コンパイル後の Hadoop ジョブ (Asakusa では「ステージ」と呼んでいます) 群の依存関係グラフの .dot ファイル
  5. Assakusa によるコンパイルやビルドのログ

.dot ファイルは後で graphviz で処理すると画像に変換できます.

この .jar ファイルは展開され ${ASAKUSA_HOME}/batchapps 以下に配置されます.

ステージの依存関係情報はこの batchapps.jar の中にしか無いため, 分散配置する必要がある場合は全ての箇所で同一の batchapps.jar を使用しなければなりません. これについては「デプロイ」を扱う記事で書く予定です.

【追記 12/17】デプロイについて書きました. → リリースとデプロイ

外部連携系コンポーネント

バッチ処理では, あるところに貯めてあるデータを一括で取得し, 処理を行った結果を一括で戻します. この入出力処理を行うコンポーネントには, 大きく分けて以下の 3 つがあります.

  1. ThunderGate

    最古の入出力コンポーネント. 対象となる RDBMS が MySQL 限定だったり, テーブルのスキーマに制限があったりします.

    RDBMS の選定, ThunderGate 用テーブルスキーマの適用ができるプロジェクト以外では, まず使わないと思います.

    ref. スキーマの制限 http://docs.asakusafw.com/0.5.2/release/ja/html/thundergate/user-guide.html#id56

    ${ASAKUSA_HOME}/bulkloader 以下に配置されています.

  2. WindGate

    ローカルファイルシステムや JDBC 接続を通して RDBMS と入出力を行うためのコンポーネント.

    RDBMS と HDFS を直接接続するときはこれを使います. JDBC 接続さえできれば良いので, RDBMS のあるサーバ以外からもアクセスでき, 既存のシステムとの連携がし易いです.

    ローカルファイルシステムとやり取りする場合は, いくつかのファイル形式が選択でき, 外部連携をやり易くしています. ファイル形式には CSV, TSV があります. (TSV は正式にサポートしているわけではないそうです) ローカルファイルシステム上にファイルを作るため, ディスクの空き容量には注意が必要です.

    自前で Importer/Exporter クラスを実装することで, 任意のファイル形式に対する入出力も行えます. (通常は CSV, TSV だけで十分でしょう)

    ${ASAKUSA_HOME}/windgate 以下に配置されています.

  3. Direct I/O

    Asakusa バッチが HDFS と入出力を行うためのコンポーネント. (正確には,「Hadoop クラスタから直接参照できるデータとの入出力」を行うためのコンポーネント. HDFS 以外には S3 があります) 実体としては Asakusa Framework コアに含まれているが, ここではコンポーネントとして分けて提示しておきます.

    ref. S3 を使用する場合の設定 http://docs.asakusafw.com/0.5.2/release/ja/html/directio/user-guide.html#id40

    WindGate とは違って HDFS 上のファイルを読んだり, そこへ出力したりします. そのためローカルファイルシステムとやり取りする場合は, WindGate のときよりもう一手間必要となります.

    複数に分割されたファイルの入出力が行えるため, 外部連携の場面での工夫の余地が残ります. 例えば, 連番を振ったファイルを出力しておいて, それらを MySQL に付随する mysqlimport コマンドで取り込むことで, 複数ファイルを並列してデータベースに取り込ませることができます.

    Direct I/O も, いくつかのファイル形式が選択でき, 外部連携をやり易くしています. ファイル形式は CSV, SequenceFile, TSV がある. (これも TSV は正式にサポートしているわけではないそうです)

    ${ASAKUSA_HOME}/directio 以下に配置されています.

バッチ処理の特性上, どうしても一気に大量のデータを読んだり, 一気に大量のデータを書き出したりする必要があります. 何から読んだり, 何に対して書き出したりするのかを考慮しないと, データの入出力に時間が掛かってしまいます. さらに, 書き出した後のファイルを外部の連携先へ入力する箇所でも, データの読み込み方法を設計しておかないと速度面でつまづきます.

せっかく Hadoop で分散処理して計算処理が速くなっても, 入出力で足を引っ張られては意味がありません. できるだけ将来を想定し, 万が一の回避策を用意した上で入出力方法を決定しましょう. そうしないとデータサイズが1桁上がった (10倍になった) ときに, 計算処理と入出力処理に掛かる時間がほぼ同じとか, 恐しい実行時間になります. 恐しい実行時間になります. ローカルファイルシステム (xfs, ext4 など), RDBMS, HDFS それぞれの特性や, 大量データ読み書きの手法について勉強する必要があります.

ジョブ統括系コンポーネント

入出力を含め, Asakusa バッチアプリケーションが行う全ての処理の依存関係解決と, それらの実行を行うのが YAESS です. 今のところ選択肢は YAESS のみです. ${ASAKUSA_HOME}/yaess 以下に配置されています.

Hadoop 実行系コンポーネント

外部連携系コンポーネントやジョブ統括系コンポーネントが Hadoop (HDFS や MapReduce) にアクセスするときに使用します. Hadoop クライアントと同じマシンに配置されます. ThunderGate が使用する「ThunderGate Hadoop ブリッジ」, WindGate が使用する「WindGate Hadoop ブリッジ」, YAESS が使用する「YAESS Hadoop ブリッジ」があります.

ThunderGate Hadoop ブリッジは実体は ThunderGate と同じものです.

WindGate Hadoop ブリッジは ${ASAKUSA_HOME}/windgate-ssh 以下に配置されています.

YAESS Hadoop ブリッジは ${ASAKUSA_HOME}/yaess-hadoop 以下に配置されています.

WindGate のプロセスが Hadoop クライアントと同じマシンに配置されている場合は, WindGate Hadoop ブリッジは不要です.

外部連携の経路

外部連携系コンポーネントの種類がいくつかあるので, それぞれのデータが移動する経路をまとめておきます. ThunderGate はそもそも構成が限定されているので, ここでは説明しません. 先日書いた通り Hadoop クラスタの種類として, 社内環境, EC2, EMR の 3 つがあります. それぞれに特有な話がある場合は, 途中でコメントを入れていきます.

Hadoop クラスタにデータが届くまで

データと Hadoop クラスタが同一マシンにいる場合は, 以下の経路が考えられます.

ローカルファイルシステムから HDFS へ移動する場面では, hadoop コマンドやその他 HDFS アクセスを行えるツールを使用します. ここは Asakusa とは無関係な部分なので, 使い慣れたものを自由に使えます. また RDBMS 上のデータでも, いったんローカルファイルにダンプして WindGate や Direct I/O を使う方法もあります.

ローカル Asakusa クラスタへのインポート

図1. ローカル Asakusa クラスタへのインポート

データと Hadoop クラスタが別マシンにいる場合は, 以下の経路が考えられます.

図1 と比較して, WindGate Hadoop ブリッジを通る (RDBMS からと Local FS からの) 経路が 2 つ, S3 を経由 (して, WindGate もしくは Direct I/O を使用) する経路が 2 つ増えました.

S3 へのアップロードとダウンロードは好きなツールが使えます. 私は s3cmd をよく使ってましたが, 最近は awscli を使っています.

リモート Asakusa クラスタへのインポート

図2. リモート Asakusa クラスタへのインポート

これは EC2 クラスタや EMR 特有の話ですが, バッチ処理ごとにクラスタを立ち上げる運用になっている場合, Hadoop マスターの IP アドレスが一定しません. EC2 クラスタでは全てのノードの IP アドレスは決まっているのですが, 前回マスターとして起動したインスタンスが今回は起動せず仕方無く別のインスタンスをマスターとして動かす, という場面があります. EMR には停止状態 (stopped) が無いので, クラスタを止めたい場合は破棄 (terminate) するしかありません. なので, 起動するたびにインスタンスを作成するので, 当然プライベート IP アドレスは予測不能です.

これに対処するには, Hadoop マスターとなるインスタンスは常に立ち上げておくか, どの IP アドレスで起動してもいいように構成を考えます. 後者の構成では, どの場所からも同じように扱える場所が必要になります. AWS では S3, DynamoDB, RDS などの永続化のためのサービスを使うことになるでしょう. RDS を用意する間でも無い場合, 料金を節約したい場合や SLA が気になる場合は, 全てのデータのやり取りを S3 を介して行うのが良いです. 特に EMR に用意されている hadoop コマンドには, その EMR を立ち上げたアカウントの情報が設定されており, hadoop fs -get s3://...hadoop fs -put s3://... とするだけで S3 へのアクセスができます.

外部システムにデータが届くまで

Asakusa バッチアプリケーションの処理結果を外部システムに渡す経路は, ちょうど図1と図2の矢印を逆向きにしたものになります.

入力データのあったマシンとは別のところに処理結果を出力することも可能です. 詳しくは後の小節で取り上げます.

後編に続きます.