Direct I/O ユーザーガイド

この文書では、Direct I/Oの利用方法について紹介します。

データソース

Direct I/Oはファイルシステム上に配置したジョブフローの入出力データを、実行エンジンから直接読み書きするための機構です。ここでのファイルシステムにはHadoop File System APIを経由しますので、ローカルファイルシステムだけでなくHDFSなどのHadoopから参照可能なファイルを利用できます。

Direct I/Oでは、読み書きする対象のことを「データソース」として抽象化しています。 それぞれのデータソースはいずれかの「論理パス」上に配置します。 このパスは唯一のルートを持つツリー状の構造で、ルートを含む任意のノードに対してデータソースを配置できます。 これらの配置設定はAsakusa Framework側の設定ファイル上で行います。

DSLではデータソースそのものを指定せずに、入出力を行う先の論理パスのみを指定します。 ジョブフローの実行時には論理パスからデータソースを引き当て、そのデータソースに対する入出力が行われます。

入力のアーキテクチャ

Direct I/Oの入力には次のような特徴があります。

  • 大きなデータを分割して読みだす

以降では上記の特徴に関するアーキテクチャを紹介します。

入力データの分割

Direct I/Oでデータを読み出す際、以下の条件をすべて満たした場合にデータを小さな断片に分割し、並列に読み出すことが可能です。

  • データソースが分割をサポートしている
  • データの形式が分割をサポートしている
  • データが十分に大きい

データを分割できない場合にも、ファイルごとに並列に読み出すことは可能です。

出力のアーキテクチャ

Direct I/Oの出力には次のような特徴があります。

  • 出力ファイルを内容によって分割したりソートしたりできる
  • Direct I/O全体の出力を簡易的なトランザクションで行う

以降では上記の特徴に関するアーキテクチャを紹介します。

出力ファイルの分割と内容のソート

Direct I/Oでファイルを出力する際、出力内容を元にファイルの分割と内容のソートを行うことができます。

Direct I/Oの出力では、出力先のファイルごとに並列にファイルを作成します。 このとき、単一の出力先に対して並列に書き出すことはできないため、出力ファイルが1つである場合や、ファイルの大きさに極端な偏りが見られる場合には全体の性能が低下します。

Direct I/Oでは出力ファイル名の生成パターンを指定して出力ファイルを分割する、もしくは出力ファイル名の生成パターンを細かく指定せず、実行エンジン側で一意な名前を決めるワイルドカードを使用することで高速化が望めます。

Hint

ワイルドカードを指定した場合はソートが使えなくなるなど、出力ファイルの指定にはいくつかのトレードオフが存在します。 詳細は後述の 出力ファイル名のパターン を参照してください。

簡易的な出力のトランザクション

Direct I/Oで複数のデータソースに対して出力を行う際、内部では簡易的なトランザクション処理を行っています。 これにより、いずれかのデータソースに出力する際にエラーとなってしまった場合でも、最終的な出力先のデータを全く破壊しないか、または整合性のとれた出力結果を得られるようになっています。

Direct I/Oのトランザクション処理は主に以下の流れで行います。

  1. システムディレクトリ [1] に「トランザクション情報ファイル」を作成する

  2. 出力ファイルごとに「試行領域 (attempt area)」にファイルを作成する

  3. すべてのファイルを試行領域へ作成したら、それぞれのデータをデータソースの「ステージング領域 (staging area)」に移動する

    • 失敗した場合は試行領域のデータをクリアする
  4. すべての出力についてステージング領域へのファイル移動が成功した場合、システムディレクトリに「コミットマークファイル」をアトミックに作成する

    • いずれかが失敗した場合はコミットマークファイルを作成しない
    1. で作成したコミットマークファイルが存在する場合、それぞれのステージング領域の内容を最終的な出力先に移動し、コミットマークファイルを削除する
    • コミットマークファイルが存在しない場合は、ステージング領域の内容をクリアする
  5. トランザクション情報ファイルを削除する

Attention

データを最終的な出力先に移動する際に、標準的なデータソースではすでに移動先にデータが存在する場合に上書きします。

なお、試行領域はタスク試行ごとに、ステージング領域はデータソースごとにそれぞれ作成されます。

上記の仕組み上、Direct I/Oによる出力には次のような制約があります。

  • 試行領域 > ステージング領域 > 最終的な出力先 とデータを移動させるため、データの移動に時間がかかるデータソースでは速度が出ない [2]

  • コミットマークファイル作成から削除までの間、データソースは一時的に整合性が失われる

  • コミットマークファイル作成から削除までの間に処理が失敗した場合、トランザクションの修復が行われるまで整合性が失われる

    Warning

    バージョン 0.10.4 ではトランザクションの修復を自動的には行いません。 トランザクションのメンテナンス を参考に、手動で修復を行ってください。

  • コミットマークファイル自体が障害によって失われた場合、データソースの整合性が失われる

  • 同一の出力先に対して複数のジョブフローから出力を行った場合、結果が不安定になる (競合に対するロック等は行わない)

[1]詳しくは システムディレクトリの設定 を参照してください。
[2]例えばHadoopファイルシステムを経由して Amazon Simple Storage Service ( Amazon S3 )を利用する場合、データの移動に時間がかかるようです。後述の Amazon S3での設定例 も参考にしてください。

データソースの設定

Direct I/Oの機構を利用するには、入出力の仲介を行う「データソース」の設定が必要です。 主に以下のような設定を行います。

  • データソースの実装
  • データソースを配置する論理パス
  • データソースが実際に利用するファイルシステム上のパス

これらの設定は、 $ASAKUSA_HOME で指定したディレクトリ以下の core/conf/asakusa-resources.xml (以下「設定ファイル」)内に、以下の形式でそれぞれ記述していきます。

asakusa-resources.xml
<property>
    <name>プロパティ名</name>
    <value></value>
</property>

Attention

このファイルはAsakusa FrameworkがHadoopのジョブを実行する際に利用する共通の設定ファイル [3] です。 Hadoop本体の core-site.xml 等と同様の形式 [4] ですが、 ${...} 形式でのシステムプロパティの展開をサポートしていません。

[3]実行時プラグイン の設定にも利用しています。
[4]https://hadoop.apache.org/docs/r1.2.1/api/org/apache/hadoop/conf/Configuration.html

データソースの追加

データソースを追加するには設定ファイルに次の項目を追加します。

データソースを追加する際の設定
名前
com.asakusafw.directio.<DSID> データソースの実装クラス名
com.asakusafw.directio.<DSID>.path データソースを配置する論理パス

設定の名前に含まれる <DSID> はそれぞれのデータソースを表す識別子です。 <DSID> には半角アルファベットの大文字小文字、半角数字、半角アンダースコア ( _ ) の組み合わせを指定できます。 複数のデータソースを利用する場合にはデータソースごとに識別子を変えて指定してください。

データソースの実装は、現在のところ Hadoopのファイルシステムを利用したデータソース のみを提供しています。 詳しくは対象の項を参照してください。

論理パスとはDirect I/Oのそれぞれのデータソースを配置する仮想的なパスで、DSLからこのパスを指定してデータソースを利用します。 このパスは alpha/beta/gamma のように名前をスラッシュ ( / ) で区切って書きます。

特別なパスとして、ルートパスは / 一文字で指定します。

論理パスの解決

DSLで指定した論理パスから実行時にデータソースを引き当てる際、次のような方法でデータソースの検索が行われます。

  1. 論理パスに対してデータソースが配置されている場合、そのデータソースを利用する
  2. 論理パスに対してデータソースが配置されていない場合、現在の論理パスの親パスに対して再帰的にデータソースの検索を行う
  3. ただし、現在の論理パスがルートである場合、データソースの検索は失敗する

つまり、DSLで指定した論理パスに対して、親方向に最も近いデータソースを検索して利用しています。

また、データソースを配置した論理パスよりもDSLで指定した論理パスの方が長い (つまり、サブパスが指定された) 場合、データソースを配置した論理パスからの相対パスをファイルパスの先頭に利用します。

たとえば、データソースを a/b に配置し、DSLでは論理パスに a/b/c/d と指定した場合、データソースからの相対パスは c/d となります。 さらにDSLでファイルパスに e/f と指定すると、結果のファイルパスは c/d/e/f となります。

Note

この論理パスの機構は、Unixのファイルシステムのマウントを参考に設計しています。

Hadoopのファイルシステムを利用したデータソース

データソースの実装として、HadoopのファイルシステムAPI ( FileSystem [5] ) を利用したものを提供しています。

本データソースを利用する場合、実装クラス名 ( com.asakusafw.directio.<DSID> ) には com.asakusafw.runtime.directio.hadoop.HadoopDataSource を指定します。 また、利用するファイルシステムについては、Hadoopの本体側であらかじめ設定を行っておく必要があります。

Direct I/Oの設定ファイルには、対象のデータソースに対してさらに論理パスに対するファイルシステム上のパスを表す「ファイルシステムパス」の設定が必要です。

Hadoopのファイルシステムを利用したデータソース
名前 形式
com.asakusafw.directio.<DSID>.fs.path URI ファイルシステム上のパス
[5]org.apache.hadoop.fs.FileSystem

ファイルシステムパスの形式

ファイルシステムパスには次の3種類の形式を指定できます。

相対パス

Hadoopのデフォルトファイルシステム [6] のワーキングディレクトリ [7] からの相対パスを利用します。

なお、デフォルトファイルシステムにローカルファイルシステムを指定している場合、 ワーキングディレクトリは必ずユーザーのホームディレクトリになります。

絶対パス

Hadoopのデフォルトファイルシステム上の絶対パスを利用します。

たとえば /var/log/tmp/directio などです。

完全URI

URIに対応するファイルシステム、ホスト、パスを利用します。

たとえば file:///home/asakusahdfs://localhost:8020/user/asakusa などです。

[6]Hadoopの設定ファイル core-site.xml 内の fs.defaultFS (fs.default.name) に指定したファイルシステムです。
[7]多くのHadoopディストリビューションでは、デフォルトのワーキングディレクトリはアプリケーション実行ユーザーのホームディレクトリです。

Warning

ファイルシステムパス以下はテストドライバー実行時に全て削除されます。 特にスタンドアロンモードのHadoopを利用時に相対パスを指定した場合、ホームディレクトリを起点としたパスと解釈されるため注意が必要です。

Hint

ファイルシステムパスの形式は環境や構成に応じて使い分けるべきです。

例えば開発環境ではOSやHadoopの設定に依存しない相対パスの設定が便利でしょう。 運用環境ではワーキングディレクトリに依存しない絶対パスの設定が安定するかもしれません。

また、複数種類のデータソースを使用し、Hadoopのデフォルトファイルシステム以外のファイルシステムを利用する場合は完全URIを使用する必要がありますが、この場合すべてのファイルシステムパスを完全URIで記述したほうが可読性が向上するかもしれません。

絶対パスや完全URIはHadoop側の設定を変更した場合に、その設定に追従する必要があるかもしれないので注意が必要です。

論理パスとファイルシステムパスの対応付け

Hadoopのファイルシステムを利用したデータソースでは、指定したファイルシステム上のパス ( com.asakusafw.directio.<DSID>.fs.path ) を起点に論理パスとファイルを対応付けます。 具体的には、次のような手順で対応付けます。

  1. DSLで指定した論理パスとファイル名から、 論理パスの解決 にある方法で実際のファイルパスを計算する
  2. 計算したファイルパスを、指定したファイルシステム上のパスからの相対パスとみなす

たとえば、データソースを hadoop に配置し、DSLでは論理パスに hadoop/asakusa , ファイル名に data.csv と指定した場合、実際に利用するファイルパスは asakusa/data.csv となります。 さらに起点となるファイルシステム上のパスが hdfs://localhost/user であった場合、対応付けられる最終的なファイルシステム上のパスは hdfs://localhost/user/asakusa/data.csv となります。

ファイルの分割読み出しの設定

Hadoopのファイルシステムを利用したデータソース において、 入力データの分割 は次のように設定します。 いずれのプロパティも必須ではありません。

ファイルの分割読み出しに関する設定
名前 形式
com.asakusafw.directio.<DSID>.fragment.min long 断片の最小バイト数
com.asakusafw.directio.<DSID>.fragment.pref long 断片の推奨バイト数

...fragment.min に0未満の値を指定した場合、入力データの分割は行われません。 未指定の場合は 16MB 程度に設定されます。

...fragment.pref が未指定の場合、 64MB程度に設定されます。 また、 ...fragment.min 未満の値は設定できません。

分割の最小バイト数や推奨バイト数は、次のようにデータの形式で上書きされることがあります。

  • データの形式が入力データの分割を許可しない場合、ファイルは分割されない
  • データの形式で指定した最小バイト数がデータソースで指定したものより大きな場合、データの形式で指定したものを優先する
  • データの形式で推奨バイト数が指定されている場合、データの形式で指定したものを優先する
  • 推奨バイト数が最小バイト数未満になる場合、推奨バイト数は最小バイト数の値を利用する

入力データの分割を許可している場合、このデータソースにおいてそれぞれの断片は次の制約をすべて満たします。

  • それぞれの断片は最小バイト数未満にならない
  • それぞれの断片は推奨バイト数の2倍以上にならない

Hint

Hadoop本体に指定したスプリットの設定はここでは使用しません。 通常の場合は既定の設定値で問題なく動作するはずですが、ファイルの途中からデータを読み出すような操作に多大なコストがかかるようなファイルシステムにおいては、ファイルの分割を行わないなどの設定が必要になります。

トランザクションの設定

Hadoopのファイルシステムを利用したデータソース において、 簡易的な出力のトランザクション は次のように設定します。 いずれのプロパティも必須ではありません。

トランザクションに関する設定
名前 形式
com.asakusafw.directio.<DSID>.fs.tempdir URI トランザクション用に利用するファイルシステム上のパス
com.asakusafw.directio.<DSID>.output.staging boolean ステージング領域を利用するかどうか
com.asakusafw.directio.<DSID>.output.streaming boolean 試行領域に直接出力するかどうか
com.asakusafw.directio.<DSID>.threads.commit int ステージング領域から最終的な出力先にデータを移動させる処理に使用するスレッド数

...fs.tempdir を省略した場合、このパスは com.asakusafw.directio.<DSID>.fs.path 下の _directio_temp というディレクトリになります。 明示的に設定を行う場合、この値は ...fs.path と同一のファイルシステムでなければなりません [8]

トランザクションが修復不可能な状態になった場合や、処理中に実行エンジンが異常終了した場合、 ...fs.tempdir 以下に処理の途中結果が残されている場合があります。

...output.staging を省略した場合、この値は true (ステージング領域を利用する) となります。 ステージング領域を利用しない場合、試行領域から最終的な出力先へ結果のデータを直接移動します。

...output.streaming を省略した場合、この値は true (試行領域に直接出力する) となります。 試行領域に直接出力しない場合、ローカルテンポラリ領域にファイルを作成したのち、ファイルの作成が成功した際にステージング領域にファイルを移動します。 この時利用するローカルテンポラリ領域は ローカルテンポラリ領域の設定 があらかじめ必要です [9]

...threads.commit を省略した場合、この値は 1 (シングルスレッドで処理する)となります。 出力ファイル数が多い場合や、ファイルの移動に時間がかかるデータソースの実装を利用する場合、この値を増やすことでトランザクション処理にかかる時間を短縮できる可能性があります。 なお ...output.stagingfalse の場合、この設定は無効です。

[8]具体的には、 ...fs.tempdir 以下のファイルを ...fs.path 以下のディレクトリに FileSystem.rename() で移動できる必要があります。
[9]試行領域に直接出力をしない場合にローカルテンポラリ領域が設定されていないと実行時にエラーとなります。

Keep Aliveの設定

Hadoopの一部のファイルシステム実装では、データを大きなブロックで転送するような実装になっています。 大きなブロックを転送する際にアプリケーションの応答がなくなり、実行エンジンによってはタイムアウトとなって処理が強制終了されてしまいます。

Hadoopのファイルシステムを利用したデータソース において、Keep Aliveの設定を行うことで上記の問題を回避できます。

Keep Aliveの設定
名前 形式
com.asakusafw.directio.<DSID>.keepalive.interval long ハートビート信号を送る間隔 (ミリ秒)

...keepalive.interval を省略した場合、Direct I/OでのKeep Aliveの設定は無効になります。

Attention

Keep Aliveの設定は注意深く行ってください。 アプリケーションがフリーズしてしまうなど、ファイルの転送とは別の原因で応答がなくなっても、本機能のせいで正常にタイムアウト処理が行われなくなる可能性があります。

データソースの設定例

ここではいくつかのデータソースの設定例を示します。

HDFSでの設定例

以下はHDFSの入出力を行う場合の設定例です。

asakusa-resources.xml
<property>
    <name>com.asakusafw.directio.hdfs</name>
    <value>com.asakusafw.runtime.directio.hadoop.HadoopDataSource</value>
</property>
<property>
    <name>com.asakusafw.directio.hdfs.path</name>
    <value>hdfs/var</value>
</property>
<property>
    <name>com.asakusafw.directio.hdfs.fs.path</name>
    <value>hdfs://localhost:8020/var/asakusa</value>
</property>

HDFSは直接の出力やファイルの移動を低コストで行えるようになっています。 そのため、特別な設定を行わなくてもそれなりに動作します。

Amazon S3での設定例

Amazon Simple Storage Service ( Amazon S3 )の入出力を行う場合の設定例です。

asakusa-resources.xml
<property>
    <name>com.asakusafw.directio.s3</name>
    <value>com.asakusafw.runtime.directio.hadoop.HadoopDataSource</value>
</property>
<property>
    <name>com.asakusafw.directio.s3.path</name>
    <value>s3/spool</value>
</property>
<property>
    <name>com.asakusafw.directio.s3.fs.path</name>
    <value>s3://example/var/spool</value>
</property>
<property>
    <name>com.asakusafw.directio.s3.output.staging</name>
    <value>false</value>
</property>
<property>
    <name>com.asakusafw.output.system.dir</name>
    <value>s3://example/var/system</value>
</property>

本ドキュメントの作成時点では、Hadoopのファイルシステムを経由してS3を利用する場合、出力ファイルの移動にコストがかかるようです。 このため、上記の設定では次のようなことを行っています。

  • ステージ領域をスキップする ( ...output.staging = false )
    • ステージ領域を利用する場合、タスクが全て成功した後にファイルの名前変更を行います。S3上でのファイル名変更はHDFS上のそれより時間がかかります。

Attention

上記の例はステージ領域をスキップするよう設定していますが、この設定によりトランザクション処理が行えなくなる点に注意してください。

複数のデータソースを利用する設定例

複数のデータソースを組み合わせて利用する場合、設定ファイルのデータソース( com.asakusafw.directio.<DSID> ) のうち、 <DSID> の部分を別々のものに設定します。

asakusa-resources.xml
<property>
    <name>com.asakusafw.directio.data</name>
    <value>com.asakusafw.runtime.directio.hadoop.HadoopDataSource</value>
</property>
<property>
    <name>com.asakusafw.directio.data.path</name>
    <value>data</value>
</property>
<property>
    <name>com.asakusafw.directio.data.fs.path</name>
    <value>hdfs://localhost:8020/user/directio/var</value>
</property>
<property>
    <name>com.asakusafw.directio.master</name>
    <value>com.asakusafw.runtime.directio.hadoop.HadoopDataSource</value>
</property>
<property>
    <name>com.asakusafw.directio.master.path</name>
    <value>data/master</value>
</property>
<property>
    <name>com.asakusafw.directio.master.fs.path</name>
    <value>hdfs://localhost:8020/user/directio/master</value>
</property>

上記の例は論理パス datadata/master に対してそれぞれ data , master というDSIDのデータソースを指定する例です。 論理パスとファイルシステムパスをそれぞれ次のように対応づけています。

論理パスとファイルシステムパスの対応付け
ID 論理パス ファイルシステムパス
data data hdfs://localhost:8020/user/directio/var
master data/master hdfs://localhost:8020/user/directio/master

上記の設定では、DSLから data というパスが指定された場合に data というデータソースを利用し、 data/master というパスが指定された場合に master というデータソースを利用します。

それ以外に、 data/transactiondata/2012 など、 data 以下でなおかつ data/master と無関係なパスが指定された場合にも data というデータソースを利用します。 master というデータソースも同様に、 data/master/item など、 data/master のサブパスを指定した場合にも利用されます。

DSLで論理パスより長いパスを指定した場合、論理パスにマッチした残りの部分はそのままファイルシステム上のパスに利用します。 上記の設定でDSLから data/2012/01 と指定した場合、実行時には hdfs://localhost:8020/user/directio/var/2012/01 というパスとして処理が行われます。

なお、 data とは関係ないパス(たとえば var/log など)が指定された場合には、対応するデータソースが見つからないためエラーとなります。 これを避けるにはデフォルト設定のように、ルートパス ( / ) に対してデータソースを配置します。

Hint

データソースの識別子(<DSID>)は実行時のログメッセージにも利用されるため、わかりやすいものにしてください。

その他の設定

データソースの設定以外に、Direct I/Oの全体を通した設定を行えます。

システムディレクトリの設定

システムディレクトリはDirect I/Oの管理情報を保持するためのディレクトリで、以下の形式で設定します。 この内容はHadoop本体の設定ファイルに書いても、Direct I/Oの設定ファイルに書いてもどちらでも有効です [10]

システムディレクトリの設定
名前 形式
com.asakusafw.output.system.dir URI Hadoopファイルシステム上のシステムディレクトリ

システムディレクトリの設定が省略された場合、Hadoopが利用するデフォルトファイルシステム上の、 <ワーキングディレクトリ>/_directio を利用します。 またプロパティの値の中に、Javaのシステムプロパティを ${システムプロパティ名} という形式で利用できます。

Hint

システムディレクトリはトランザクションの管理情報など、Direct I/Oを利用するうえで重要な情報が記録されます。 そのため、信頼性の高いデータストア上か、Direct I/Oを利用するうえで重要性の高いデータストアと同じ領域内に配置することを推奨します。

[10]正確に言えば、データソースの設定もHadoop本体の設定ファイル内に記載できます。 ただし、データソースの設定はDirect I/O独自の設定ファイルに記載することを推奨します。

ローカルテンポラリ領域の設定

ローカルテンポラリ領域は、Direct I/Oの処理を実際に行っているコンピューターのローカルファイルシステム上のディレクトリです。 トランザクションの設定 において、「ステージング領域を利用しない」という設定を行った際に、出力ファイルを一時的に作成します。

この内容は以下の形式で設定します。 なお、Hadoop本体の設定ファイルに書いても、Direct I/Oの設定ファイルに書いてもどちらでも有効です。

ローカルテンポラリ領域の設定
名前 形式
com.asakusafw.output.local.tempdir ファイルパス ローカルファイルシステム上のテンポラリディレクトリ

ローカルテンポラリ領域はローカルファイルシステム上の絶対パスを指定します。 この設定が省略された場合、ローカルテンポラリ領域は利用できなくなります。

設定に対するディレクトリが存在しない場合、ローカルテンポラリ領域の利用時に自動的にディレクトリを作成します。

ログの設定

Direct I/Oに関するログは実行エンジンのログの設定を利用して行います。 それぞれの実行エンジンに関連するドキュメントを参照してください。

ファイルの入出力

Direct I/Oを利用してファイルを入出力するには、 Hadoopのファイルシステムを利用したデータソース などの設定をしておきます。

また、データモデルと対象のファイル形式をマッピングする DataFormat [11] の作成が必要です。 DataFormat のサブタイプとして、任意のストリームを取り扱う BinaryStreamFormat [12] や、Hadoopのファイルを取り扱う HadoopFileFormat [13] を現在利用できます( DataFormat は直接実装できません ) 。

なお、以降の機能を利用するには次のライブラリやプラグインが必要です。

Direct I/Oで利用するライブラリ等
ライブラリ 概要
asakusa-directio-vocabulary DSL用のクラス群
asakusa-directio-plugin DSLコンパイラプラグイン
asakusa-directio-test-moderator テストドライバープラグイン
asakusa-directio-dmdl DMDLコンパイラプラグイン

Hint

Asakusa Gradle Plugin ユーザーガイド の手順に従ってプロジェクトテンプレートから作成したプロジェクトは、これらのライブラリやプラグインがGradle Pluginによってデフォルトで利用可能になっています。

[11]com.asakusafw.runtime.directio.DataFormat
[12]com.asakusafw.runtime.directio.BinaryStreamFormat
[13]com.asakusafw.runtime.directio.hadoop.HadoopFileFormat

データフォーマットの作成

Direct I/Oはいくつかのファイルフォーマットにおいて、 DataFormat の実装クラスをDMDLコンパイラの拡張を利用して自動的に生成する機能を提供したり、実装用の基底クラスを提供しています。

Direct I/Oが提供する各ファイルフォーマットの利用方法については、以下のドキュメントを参照してください。

また、各ファイルフォーマットの定義から DataFormat の実装クラスを生成する方法については、 Direct I/O スタートガイド - データモデルクラスの生成 などを参照してください。

ファイルを入力に利用するDSL

Direct I/Oを利用してファイルからデータを読み出す場合、 DirectFileInputDescription [14] クラスのサブクラスを作成して必要な情報を記述します。

このクラスでは、下記のメソッドをオーバーライドします。

String getBasePath()

入力に利用する論理パスを戻り値に指定します。

ここには ${変数名} の形式で、バッチ起動時の引数やあらかじめ宣言された変数を利用できます。 利用可能な変数はコンテキストAPIで参照できるものと同様です。

String getResourcePattern()

入力に利用するファイル名のパターンを戻り値に指定します。 getBasePath() で指定したパスを起点に、このパターンの名前を持つファイルを検索します。

形式については 入力ファイル名のパターン を参照してください。

Class<?> getModelType()

処理対象とするデータモデルオブジェクトの型を表すクラスを戻り値に指定します。

このメソッドは、自動生成される骨格ではすでに宣言されています。

Class<? extends DataFormat<?>> getFormat()

DataFormat の実装クラスを戻り値に指定します。

このメソッドは、自動生成される骨格ではすでに宣言されています。

Class<? extends DataFilter<?>> getFilter()

DataFilter [15] のサブクラスを戻り値に指定します。

詳細については 入力データのフィルター を参照してください。

boolean isOptional()

入力にとるファイルが存在しない場合に、バッチ全体を異常終了させるには false を、空の入力として処理を続行する場合には true を、それぞれ戻り値に指定します。

省略した場合、ファイルが存在しない場合にバッチ全体を異常終了させます ( false )。

DataSize getDataSize()

入力の推定データサイズを返します。

省略した場合、データサイズは不明 ( DataSize.UNKNOWN ) となります。

以下は実装例です。

public class DocumentFromFile extends DirectFileInputDescription {

    @Override
    public String getBasePath() {
        return "example";
    }

    @Override
    public String getResourcePattern() {
        return "**/data-*.csv";
    }

    @Override
    public Class<?> getModelType() {
        return Document.class;
    }

    @Override
    public Class<? extends DataFormat<?>> getFormat() {
        return DocumentCsvFormat.class;
    }

    @Override
    public Class<? extends DataFilter<?>> getFilter() {
        return DocumentFilter.class;
    }

    @Override
    public boolean isOptional() {
        return true;
    }

    @Override
    public DataSize getDataSize() {
        return DataSize.LARGE;
    }
}
[14]com.asakusafw.vocabulary.directio.DirectFileInputDescription
[15]com.asakusafw.runtime.directio.DataFilter

入力ファイルのベースパス

getBasePath() に指定した論理パスは「ベースパス」と呼ばれます。

実行時にはこのベースパスのみを利用して入力元のデータソースを探します。 そのため、以下の2つでは異なる結果になる場合があります。

  • basePath = "data/asakusa" , resourcePattern = "file.csv"
  • basePath = "data" , resourcePattern = "asakusa/file.csv"

上記の場合、 data/asakusa という論理パスにデータソースが配置されている場合、それぞれが参照するデータソースは異なるものになります。 この規則について詳しくは、 論理パスの解決 を参照してください。

また、ベースパスには ${変数名} の形式でバッチ引数を利用できます。

ベースパスには * を使ったワイルドカード指定はできません。 また、 |, [, ], {, } といった文字は利用できません。

入力ファイル名のパターン

getResourcePattern() にはファイル名だけでなくワイルドカードなどのパターン用の文字列も利用できます。

ここに利用できるパターンは以下の通りです。

利用できるパターン
文字列 名前 概要
名前文字 リテラル そのままファイル名として利用します。 対象のデータソースが利用できるファイル名のうち、 / , \ , $ , * , ? , # , | , { , } , [ , ] 以外の文字を利用できます。
/ 名前区切り パスに含まれる名前の区切り文字です。
${バッチ引数名} 変数 実行時にバッチ引数と置き換えます。 対象のバッチ引数は、変数を含まない任意のパターンの組み合わせである必要があります。
* ワイルドカード 0個以上の任意の名前文字とマッチします。
{..|..|..} 選択 | で区切られたいずれかの名前にマッチします。 .. の部分には名前文字と名前区切りの組み合わせのみを指定できます。

上記のほかに、特別なディレクトリやファイル名として ** を利用できます。 これは、検索対象以下のすべてのサブディレクトリ(自身のディレクトリも含む)とそれに含まれるファイルにマッチします。

ただし、 ** はディレクトリやファイル名の一部としては利用できません。 たとえば、 **.csv というパターンは利用できず、代わりに **/*.csv と書きます。

Hint

「変数」に関する挙動は、パターンの解釈の前に一度変数をすべて展開し、展開後の文字列をパターンとして解釈して利用しています。

入力データのフィルター

getFilter() で入力フィルタークラスを指定すると、Direct I/O を利用してファイルからデータを読み出す際に、ファイル単位やレコード単位で読み出すデータを制限できます。

Experimental

Asakusa Framework バージョン 0.10.4 では、Direct I/O の入力フィルターは試験的機能として提供しています。

この入力フィルタークラスは、 DataFilter [16] を継承したクラスを、アプリケーションの入力データに合わせて個別に実装する必要があります。 DataFilter を継承し、下記のメソッドを必要に応じてオーバーライドしてください。

void initialize(Context context)

入力フィルターを初期化する際にフレームワークから呼び出されます。

context からバッチ引数を取得できます。 入力フィルターの中では Framework API を利用できませんので、バッチ引数が必要な場合はここで取得する必要があります。

boolean acceptsPath(String path)

入力データのうち、ファイルを丸ごとフィルターする際に利用します。 このメソッドが true を返す場合には対象のファイルを入力として利用し、 false を返す場合には対象のファイルを入力から除外します。

path には、入力データのフルパスが格納されています。 ベースパスやファイル名のパターンとは異なる可能性がある点に注意が必要です。

boolean acceptsData(T data)
入力データのうち、個々のレコードをフィルターする際に利用します。 このメソッドが true を返す場合には対象のレコードを入力として利用し、 false を返す場合には対象のレコードを入力から除外します。

入力フィルターを利用する際には、以下の点に注意してください。

入力フィルター内でフレームワークAPIを利用できない
入力フィルタークラスの各メソッド内では、フレームワークAPIを利用できません とくにバッチ引数などを利用する際には、 initialize() メソッドの Context オブジェクトから取得してください
テストドライバーからの実行で入力フィルターを利用できない
テストドライバーから入力フィルターを含むジョブフローやバッチをテストする場合、入力フィルターは自動的に無効化されます テストデータを用意する際には「フィルター適用後」のデータを指定し、入力フィルターのテストは個別に行ってください

以下は入力フィルタークラスの実装例です。

public class RegexInputFilter extends DataFilter<Object> {

    private Pattern accept;

    @Override
    public void initialize(Context context) {
        // バッチ引数から正規表現パターンを取り出す
        String s = context.getBatchArguments().get("pattern");
        accept = Pattern.compile(s);
    }

    @Override
    public boolean acceptsPath(String path) {
        // 正規表現パターンにマッチするファイルのみを入力に利用
        return accept.matcher(path).matches();
    }
}
[16]com.asakusafw.runtime.directio.DataFilter

ファイルを出力に利用するDSL

Direct I/Oを利用してデータをファイルに書き出す場合、 DirectFileOutputDescription [17] クラスのサブクラスを作成して必要な情報を記述します。

このクラスでは、下記のメソッドをオーバーライドします。

String getBasePath()

出力に利用する論理パスを戻り値に指定します。

ここには ${変数名} の形式で、バッチ起動時の引数やあらかじめ宣言された変数を利用できます。 利用可能な変数はコンテキストAPIで参照できるものと同様です。

String getResourcePattern()

出力に利用するファイル名のパターンを戻り値に指定します。 getBasePath() で指定したパスを起点に、このパターンが表すパスにそれぞれのファイルを出力します。

パターンには {property_name:format} (プレースホルダ) などを利用できます。 これは指定したプロパティの内容を、指定のフォーマットでファイル名に埋め込みます。

詳しくは 出力ファイル名のパターン を参照してください。

List<String> getOrder()

それぞれの出力ファイルの内容をソートするプロパティを指定します。

それぞれのプロパティは +property_name で昇順、 -property_name で降順を表します。 プロパティ名はDMDLのプロパティ名と同様、すべて小文字で単語をアンダースコア ( _ ) で区切ってください。

省略した場合、出力ファイルのソートを行いません。

List<String> getDeletePatterns()

出力を行う前に削除するファイル名パターンの一覧を戻り値に指定します。 getBasePath() で指定したパスを起点に、これらのパターンが表すパスを消去した後に、ファイルの出力を行います。

パターンには * (ワイルドカード) など、 入力ファイル名のパターン と同様のものを利用できます。

省略した場合、ファイルの削除を行いません。

Class<?> getModelType()

処理対象とするデータモデルオブジェクトの型を表すクラスを戻り値に指定します。

このメソッドは、自動生成される骨格ではすでに宣言されています。

Class<? extends DataFormat<?>> getFormat()

DataFormat の実装クラスを戻り値に指定します。

このメソッドは、自動生成される骨格ではすでに宣言されています。

以下は実装例です。

public class DocumentToFile extends DirectFileOutputDescription {

    @Override
    public String getBasePath() {
        return "example";
    }

    @Override
    public String getResourcePattern() {
        return "{date:yyyy/MM}/data.csv";
    }

    @Override
    public List<String> getOrder() {
        return Arrays.asList("+id");
    }

    @Override
    public List<String> getDeletePatterns() {
        return Arrays.asList("${oldyear}/*/data.csv");
    }

    @Override
    public Class<?> getModelType() {
        return Document.class;
    }

    @Override
    public Class<? extends DataFormat<?>> getFormat() {
        return DocumentCsvFormat.class;
    }
}

Hint

出力先のファイルがすでに存在する場合、古いファイルを削除してからこの出力で上書きします。 ただし、ファイルに対するレコードがひとつも存在しない場合にはファイル自体が作成されず、古いファイルが残ってしまう場合があります。

このため、出力先にワイルドカードやランダムな値を利用する場合には、 getDeletePatterns() を利用してファイルを削除しておいたほうが良い場合があります。

[17]com.asakusafw.vocabulary.directio.DirectFileOutputDescription

出力ファイルのベースパス

getBasePath() に指定した論理パスは「ベースパス」と呼ばれます。

実行時にはこのベースパスのみを利用して出力先のデータソースを探します。 そのため、以下の2つでは異なる結果になる場合があります。

  • basePath = "data/asakusa" , resourcePattern = "file.csv"
  • basePath = "data" , resourcePattern = "asakusa/file.csv"

上記の場合、 data/asakusa という論理パスにデータソースが配置されている場合、それぞれが参照するデータソースは異なるものになります。 この規則について詳しくは、 論理パスの解決 を参照してください。

また、ベースパスには ${変数名} の形式でバッチ引数を利用できます。

ベースパスには * を使ったワイルドカード指定はできません。 また、 |, [, ], {, } といった文字は利用できません。

出力ファイルのベースパスは、次のような制約があります。

  • 同一ジョブフローの入力が、ある出力のベースパスと同じまたはそのサブパスであってはならない
  • 同一ジョブフローの出力が、ある出力のベースパスと同じまたはそのサブパスであってはならない

Note

上記の制約はトランザクションの制御やテストのために導入した制約です。 出力に対してはこのような制約がありますが、2つの入力が同じベースパスを利用することは可能です。

出力ファイル名のパターン

getResourcePattern() にはファイル名だけでなくプロパティの内容からファイル名を計算するための、プレースホルダも利用できます。

ここに利用できるパターンは以下の通りです。

出力ファイル名に利用できるパターン
文字列 名前 概要
名前文字 リテラル そのままファイル名として利用します。 対象のデータソースが利用できるファイル名のうち、 / , \ , $ , * , ? , # , | , { , } , [ , ] 以外の文字を利用できます。
/ 名前区切り パスに含まれる名前の区切り文字です。
${バッチ引数名} 変数 実行時にバッチ引数と置き換えます。 対象のバッチ引数は、名前文字または名前区切りの組み合わせである必要があります。
{property:format} プレースホルダ プロパティの内容を指定のフォーマットで文字列化して利用します。 プロパティはDMDLと同様に snake_case の形式でプロパティ名を指定します。
[開始番号..終了番号] ランダムな値 開始番号以上、終了番号以下のランダムな数値に置き換えます。 それぞれの番号は0以上かつ2の31乗未満で、開始番号より終了番号のほうが大きな数値である必要があります。
* ワイルドカード 分散環境上での出力に都合のよい任意の文字列を利用します。 ただし、この出力にはプレースホルダとランダムな値、およびファイル内のソート機能 [18] を利用できなくなります。

Attention

出力するレコード数が「ランダムな値」の範囲よりも十分に大きくない場合、ランダムな値のすべての範囲に対するファイルが生成されない場合があります。

Attention

出力ファイル名のパターンでは、変数の展開後の文字列にプレースホルダ、ランダムな値、ワイルドカードを表す文字列を含められません。 この制約は将来緩和されるかもしれません。

Hint

出力ファイルが1つになってしまう場合や、出力ファイルのサイズに大きな偏りができてしまう場合、「ランダムな値」を利用することでパフォーマンスを向上させられる場合があります。

Hint

「ランダムな値」をゼロ埋めしたい場合、 [0..9][0..9] のように書けます。

Hint

「ワイルドカード」は制約が多い代わりに高速に動作する可能性があります。 ワイルドカード文字を置換する際に、実行エンジンごとに異なる形式の文字列を利用します。

プレースホルダ ( {property:format} ) には次のようなフォーマットを利用できます。

プレースホルダに使用できるフォーマット
形式 データ型 概要
: とそれ以降を省略 すべて toString() によって文字列化
:<日付> DATE : 以降を SimpleDateFormat.format() によって文字列化
:<日時> DATETIME : 以降を SimpleDateFormat.format() によって文字列化
:<数値書式> BYTE, SHORT, INT, LONG, FLOAT, DOUBLE, DECIMAL : 以降を DecimalFormat.format() によって文字列化

<日付><日時> には SimpleDateFormat [19] 形式のパターンを指定します。 たとえば、パターンに data/{date:yyyy/MM}.csv と指定すると、プロパティ date の内容を元に data/<年>/<月>.csv のようなファイルを年と月の情報からそれぞれ作成します。さらに内容をソートするプロパティにも date を指定すると、ファイルを年と月で分割した後に日にちでソートして出力できます。

<数値書式> には DecimalFormat [20] 形式のパターンを指定します。

出力ファイル名については 出力ファイルの分割と内容のソート も参照してください。

Attention

出力するデータが存在しない場合、ファイルは一つも作成されません。 これは、ファイル名にプレースホルダを指定していない場合でも同様です。

[18]DirectFileOutputDescription.getOrder() ( ファイルを出力に利用するDSL を参照 )
[19]java.text.SimpleDateFormat
[20]java.text.DecimalFormat

アプリケーションのテスト

Direct I/Oを利用したジョブフローやバッチのテストは、Asakusa Frameworkの通常のテスト方法で行えます。 テスト方法については アプリケーションのテスト - TestDriver を参照してください。

ここでは、Direct I/Oを使ってテストドライバーを実行する際の動作や注意点を説明します。

テスト実行時の動作

以下はテスト実行時のテストドライバーの挙動です。

テスト実行時の設定

テストドライバーを使ったテスト実行時には、Direct I/Oの データソースの設定 は通常の実行環境と同様に、 開発環境にインストールしたAsakusa Frameworkの設定ファイル $ASAKUSA_HOME/core/conf/asakusa-resources.xml が使用されます。 必要に応じてこのファイルを編集し、適切な設定を行ってください。

入出力のクリア

テストドライバーの入出力が指定された場合、テストの実施前に入出力の対象がすべて削除されます。 このとき、DSLの getBasePath() で指定した論理パス以下のすべての内容を削除します。

Warning

上記のような挙動のため、データソースの入出力対象はできるだけ制限するようにしてください。

入力データの作成

入力データの作成時、指定された入力ファイルのパターンに対して一つだけファイルを作成します。 この時、下記のルールをもとに作成するファイルパスを計算します。

テスト時の入力ファイル名の変換ルール
文字列 名前 変換後
名前文字 リテラル そのまま利用します
/ 名前区切り そのまま利用します
${バッチ引数名} 変数 テストに指定したバッチ引数で置き換えます
* ワイルドカード __testing__ という文字列に置き換えます
{..|..|..} 選択 最左の文字列をそのまま利用します

Note

この規則は暫定的なもので、将来変更されるかもしれません。

出力データの取得

出力された結果データの取得時、テストドライバーはDSLの getBasePath() で指定した論理パス以下のすべての内容を取得します。 このため、バッチのテストで複数のジョブフローが同一のベースパスに出力を行う場合、正しく動作しません。

Note

この規則は暫定的なもので、将来変更されるかもしれません。

Attention

現在、ジョブフローの出力に対する初期データの作成 ( .prepare() ) はサポートしていません。

データフォーマットに対応したファイルをテストデータに指定

Direct I/Oを利用したアプリケーションのテストでは、Direct I/Oのデータフォーマットに対応するファイルをテストデータとして指定することも可能です。

例えば Direct I/O CSV を利用するアプリケーションでは、このフォーマット定義に対応するCSVファイルをテストの入力データや期待データとして指定することができます。

演算子のテスト

演算子のテストでは、DataLoader を利用して演算子の入力となるデータモデルオブジェクトをDirect I/Oのデータフォーマットに対応するファイルから生成することができます。

詳しくは、テストドライバーユーザーガイド - 演算子のテスト - DataLoader を参照してください。

データフローのテスト

データフローのテストでは、入力データと期待データをDirect I/Oのデータフォーマットに対応するファイルから生成することができます。

詳しくは、テストドライバーユーザーガイド - データフローのテストデータ作成 - Direct I/Oファイル形式 を参照してください。

入出力データのカウント情報

Direct I/Oはバッチアプリケーションの実行終了時に、入出力データのレコード件数やバイト数などの情報を集計してログに出力します。

以下、入出力データのカウント情報の出力例です。

...
Direct I/O file input: 3 entries
  itemInfo:
    number of input records: 9
    input file size in bytes: 1,017
  salesDetail:
    number of input records: 34
    input file size in bytes: 739
  storeInfo:
    number of input records: 5
    input file size in bytes: 275
  (TOTAL):
    number of input records: 48
    input file size in bytes: 2,031
Direct I/O file output: 2 entries
  categorySummary:
    number of output records: 3
    output file size in bytes: 1,326
  errorRecord:
    number of output records: 3
    output file size in bytes: 1,617
  (TOTAL):
    number of output records: 6
    output file size in bytes: 2,943

カウント情報に表示される項目は、アプリケーションで利用する実行エンジン毎に異なります。

Attention

Direct I/O Hive を利用した入出力データのカウント情報では、入力データファイルのバイト数 ( input file size in bytes ) は実際のファイルサイズとは多少異なる値が表示されます。

トランザクションのメンテナンス

Direct I/Oのファイル出力時には、 簡易的な出力のトランザクション を行っています。 出力を開始する前にシステムディレクトリに対してトランザクションの情報を作成し、トランザクション処理の完了後にこれらの情報をクリアしています。

以降では、トランザクションが中断された際にこれらを手動で修復する方法について紹介します。 なお、いずれのメンテナンス用コマンドについても、コマンドを起動した環境のHadoopのログ設定 [21] を利用してログを出力します。

[21]詳しくは ログの設定 を参照してください。

トランザクション情報の一覧を表示

残っているトランザクション情報の一覧を表示するには、 $ASAKUSA_HOME/directio/bin/list-transaction.sh コマンドを引数なしで実行します。 このコマンドを実行すると、以下の情報を表示します。

表示されるトランザクションの情報
セクション 内容
Date トランザクションを開始した日時
Execution ID 対象のジョブフローの実行ID
Status トランザクションの状態
Comments 補助的な情報

上記のうち、 Status を調べることで対象のトランザクションの状態が分かります。 特に重要な状態は Committed (コミット済み) で、この場合には最終的な出力先が不整合な状態になっている場合があります。

また、以降のコマンドでは Execution ID (実行ID) の情報を元にトランザクションの修復操作を行います。

コミットの適用

コミット済みのトランザクションを最終的な出力先に反映させるには、 $ASAKUSA_HOME/directio/bin/apply-transaction.sh コマンドを実行します。 コマンドの引数にはトランザクションに対応する実行IDを指定してしてください。

このコマンドが対象とするトランザクション処理は、 Committed (コミット済み) でなければなりません。 それ以外のトランザクション処理に対してこのコマンドを実行しても何も行いません。

このコマンドの実行が成功した場合、トランザクション情報の一覧にコマンドの対象が出現しなくなります。

このコマンドの実行に失敗した場合、出力先のデータソースに何らかの異常が発生している可能性があります。 データソースを正常な状態に戻した後に再度コミットを適用するか、または トランザクションの破棄 を実行して出力に不整合があるままトランザクションを破棄できます。

Warning

コミットを適用する順序には注意が必要です。 先に適用した出力は、後に適用した出力で上書きされてしまいます。

Note

このコマンドでは、ベストエフォートでのコミットの適用を行っています。 複数のデータソースが存在し、そのうち一つが常にコミットの適用に失敗してしまう場合、即座に適用処理を停止せずにほかのデータソースに対してコミットを適用したのち、エラーとしています。

トランザクションの破棄

任意のトランザクション処理を破棄するには、 $ASAKUSA_HOME/directio/bin/abort-transaction.sh コマンドを実行します。 コマンドの引数にはトランザクションに対応する実行IDを指定してしてください。

Warning

このコマンドはトランザクションのロールバックを行う のではなく 、単にトランザクションを破棄します。 Committed (コミット済み) のトランザクションに対してこの処理を実行すると、最終的な出力は不整合な状態になる場合があります。

このコマンドの実行が成功した場合、トランザクション情報の一覧にコマンドの対象が出現しなくなります。 また、それぞれのデータソース上でステージング領域や試行領域の中間データを削除します。

Attention

ただし、ローカルテンポラリ領域 [22] 内の試行領域については削除されません。 これらは別の手段で削除する必要があります。

このコマンドの実行に失敗した場合、出力先のデータソースに何らかの異常が発生している可能性があります。 データソースを正常な状態に戻した後に再度実行するか、またはコマンド実行時のログを参考に、トランザクション情報自体を削除してください。

[22]ローカルテンポラリ領域の設定 を参照してください。