WindGateユーザーガイド¶
この文書では、WindGateの利用方法について紹介します。
WindGateプロファイル¶
WindGateは「Hadoopクラスターとデータベース」または「Hadoopクラスターとローカルファイルシステム」などの二点間で様々なデータをやり取りするツールです。 主に、Asakusa Frameworkでバッチアプリケーションを作成する際に、データフローの入力データや出力データを取り扱うために利用します。
WindGateでは、「Hadoopクラスター」や「データベース」、「ローカルファイルシステム」などのそれぞれを「リソース」という考え方で同じように取り扱います。 WindGateの「プロファイル」にそれぞれのリソースとアクセス方法を記述することで、リソース間で自由にデータをやり取りできるようにします。
また、このプロファイルはWindGateに複数登録することもでき、複数のデータベースやローカルファイルシステム上のパス、またはアクセスするHadoopクラスターへの通信方法など、複数の組み合わせを個別に管理できます。
WindGateのプロファイルは、 $ASAKUSA_HOME/windgate/profile/<プロファイル名>.properties
(以降、構成ファイル)内に記述します。
<プロファイル名>
の部分には、特定のプロファイルごとに名前を付けてその名前を指定します。
それぞれの構成ファイルには、Javaの一般的なプロパティファイルの文法で、主に下記のセクションを記述します。
セクション名 | 内容 |
---|---|
core |
WindGate本体の設定 |
session |
セッションの設定 |
process.basic |
プロセスの設定 |
resource.hadoop |
Hadoopクラスターの設定 |
resource.local |
ローカルファイルシステムの設定 |
resource.jdbc |
データベースの設定 |
ここでの「セクション」とは、該当するセクション名またはそのサブセクションとなるプロパティの項目のことをいいます。
たとえば、構成ファイル内の core
, core.maxProcesses
などはいずれも core
セクション内の要素です。
環境変数の利用¶
WindGateプロファイルのいくつかの項目では、項目値の文字列に ${変数名}
という形式を指定することで、WindGateを起動した環境の環境変数を含められます。
また、 ${変数名-デフォルト値}
のように、 変数名の後に -
を指定して、その後にデフォルト値を記述することで、
WindGateを起動した環境の環境変数が存在しない場合に、ここで指定したデフォルト値を設定することができます。
WindGate本体の設定¶
WindGate本体の設定は、構成ファイル内の core
セクション内に記述します。
同時に実行するデータ転送の設定¶
WindGateは、一度の処理で複数のリソース間でのデータ転送を行います。 それぞれのデータ転送を同時に実行するには、次の設定を追加します。
名前 | 値 |
---|---|
core.maxProcesses |
同時に実行可能なデータ転送数の最大 |
Attention
この設定はプロファイルごとに個別のものです。 ここで同時実行数を1に設定しても、複数のプロファイルに対する処理を同時に行った場合、データ転送は二つ以上が同時に実行する場合があります。
Hint
この設定は、主にデータベースを利用するリソースなどでコネクション数に制限がある場合などを想定しています。 特に制限がない場合には、WindGateを実行するコンピューターのコア数などと同じ数値にするのが良いでしょう。
セッションの設定¶
WindGateと連携したバッチアプリケーションを作成する場合、通常はそれぞれのジョブフローの入力と出力時にそれぞれWindGateの処理が実行されます。 たとえば、ジョブフローの処理の開始時にWindGateを利用して、データベースの内容をHadoopクラスター上に展開し(インポート)、処理の終了時に計算結果をデータベースに書き戻します(エクスポート)。
それぞれのジョブフローは、一連のトランザクション処理とみなされているため、このWindGateを利用したインポートとエクスポートは一連の処理として関連付けられていなければなりません。 この、処理を関連付けるための仕組みを「セッション」と呼びます。
WindGateのセッションはジョブフローの先頭でインポートを行う際に作成され、エクスポートが完了した後に破棄されます。 セッションの作成から破棄までの間にジョブフローが失敗した場合、このセッションの情報を元に何らかの復旧作業を行います。
また、同じセッションが同時に複数作成されようとした場合、WindGateを間違えて多重に起動してしまっている可能性があります。 このような間違いを防ぐためにも、セッションは利用されます。
セッションの設定は、構成ファイル内の session
セクション内に記述します。
Note
現在のところ、WindGateには ローカルファイルシステムを利用したセッション のみが提供されています。
ローカルファイルシステムを利用したセッション¶
セッション情報をローカルファイルシステム上に保持するには、 session
セクションに下記の内容を指定します。
名前 | 値 |
---|---|
session |
com.asakusafw.windgate.file.session.FileSessionProvider |
session.directory |
セッション情報を保持させるディレクトリ |
ここで、 session.directory
で指定したディレクトリ以下にセッションの情報に関するファイルが作成されます。
このプロパティには、 ${環境変数名}
の形式で環境変数を利用できます。
特に、 ${WINDGATE_PROFILE}
という値に利用中のプロファイル名が格納されているので、その値を利用するとプロファイルごとにディレクトリを分けられて便利です。
Attention
複数のプロファイルを利用する場合、かならずそれぞれのプロファイルで利用するディレクトリを分けてください。 同じディレクトリにした場合、それぞれのプロファイルを参照するWindGateを同時に実行した際に、正しく動作しない可能性があります。
Note
この機能は、OSのファイルロックを利用して実現しています。 ファイルロックが正しく動作しないOSやファイルシステム上では予想外の動作をするかもしれません。
プロセスの設定¶
WindGateは二つのリソースの間でデータを転送するツールです。 この転送時に二つのリソースを仲立ちするのが「プロセス」で、入力元からデータを取り出して、出力先にそのデータを書き出す処理を行います。
また、WindGateは一度の処理内で、複数のリソース間のデータ転送を行います。 入力と出力の対になるリソース間ごとにプロセスが作成され、同時に実行するプロセスの個数は 同時に実行するデータ転送の設定 で指定できます。
プロセスの設定は、構成ファイル内の process.basic
セクション内に記述します。
Note
ここでのセクション名が process.basic
となっているのは、このプロセスが「通常の方法でデータ転送を行う」という役割を持っているためです。
将来、キャッシュの機能などがサポートされる際には、 process
セクションも増える予定です。
Note
ここでの「プロセス」はUNIXのプロセスとは別物です。 実際、WindGateのプロセスは、同一JavaVM上のそれぞれのスレッドで実行されます。
通常のデータ転送プロセス¶
標準的なデータ転送プロセスを利用するには、 process.basic
セクションに以下のように記述します。
名前 | 値 |
---|---|
process.basic |
com.asakusafw.windgate.core.process.BasicProcessProvider |
この項目には、特に追加の設定はありません。
再試行可能なデータ転送プロセス¶
再試行可能なデータ転送プロセスを利用するには、 process.basic
セクションに以下のように記述します。
名前 | 値 |
---|---|
process.basic |
com.asakusafw.windgate.retryable.RetryableProcessProvider |
process.basic.component |
com.asakusafw.windgate.core.process.BasicProcessProvider |
process.basic.retryCount |
リトライ回数 |
process.basic.retryInterval |
再試行までの待機時間 (秒) |
process.basic.component
は実際に利用するデータ転送プロセスを設定します。
現在利用可能なプロセスは 通常のデータ転送プロセス のみであるため、ここには com.asakusafw.windgate.core.process.BasicProcessProvider
を指定します。
再試行可能なデータ転送プロセスでは、 process.basic.component
に指定したデータ転送プロセスを利用し、通常の方法でデータ転送を行います。
データ転送に失敗した場合、 process.basic.retryCount
に設定された回数を上限として、成功するまで上記プロセスを再実行します。
また、 process.basic.retryInterval
が指定されている場合、その時間だけ待機後にプロセスが再実行されます。
process.basic.retryInterval
が指定されていない場合は即座に再実行します。
なお、このプロセスを利用するには、プラグインライブラリに asakusa-windgate-retryable
の追加が必要です。
詳しくは プラグインライブラリの管理 を参照してください。
Hadoopクラスターの設定¶
Asakusa Frameworkで作成したバッチからWindGateを利用する場合、リソースの片方にはHadoopクラスターを利用します。
Hadoopクラスターとの通信方法は、構成ファイル内の resource.hadoop
セクション内に記述します。
同一環境上のHadoopを利用する¶
WindGateを起動したコンピュータ上のHadoopを利用するには、 resource.hadoop
セクションに以下のように記述します。
名前 | 値 |
---|---|
resource.hadoop |
com.asakusafw.windgate.hadoopfs.HadoopFsProvider |
resource.hadoop.basePath |
転送先のベースパス (省略可) |
resource.hadoop.basePath
は転送先のベースパスで、省略時はHadoopのデフォルト設定を利用します。
URI形式で、 hdfs://<host>:8080/user/asakusa
等のHadoopファイルシステム上のパスを指定できます。
上記の設定のうち、先頭の resource.hadoop
を除くすべての項目の値の中に ${環境変数名}
という形式で環境変数を含められます。
なお、このリソースを利用するには、プラグインライブラリに asakusa-windgate-hadoopfs
の追加が必要です。
詳しくは プラグインライブラリの管理 を参照してください。
Attention
Asakusa Framework 0.7.0
より、設定 resource.hadoop.compression
は利用できなくなりました。
転送時の圧縮はフレームワークが規定する内部の形式を利用するようになります。
Hint
通常の利用方法では、 resource.hadoop.basePath
を設定する必要はありません。
既定値以外のファイルシステムを利用する場合などに利用することを想定しています。
Hadoopを利用する際の環境変数¶
Hadoopクラスターと通信するリソースを利用するには、WindGateの起動時にHadoopの設定がすべて利用可能である必要があります。 WindGate起動時のHadoopの設定と、バッチで利用するHadoopの設定が異なる場合、正しく動作しない可能性があります。
環境変数の設定方法は WindGateの環境変数設定 を参照してください。
SSH経由でリモートのHadoopを利用する¶
WindGateからリモートコンピュータにSSHで接続し、そこにインストールされたHadoopを利用するには、 resource.hadoop
セクションに以下のように記述します。
また、 Hadoopブリッジ をリモートコンピュータ上にインストールしておく必要があります。
名前 | 値 |
---|---|
resource.hadoop |
com.asakusafw.windgate.hadoopfs.jsch.JschHadoopFsProvider |
resource.hadoop.user |
ログイン先のユーザー名 |
resource.hadoop.host |
SSHのリモートホスト名 |
resource.hadoop.port |
SSHのリモートポート番号 |
resource.hadoop.privateKey |
ローカルの秘密鍵の位置 |
resource.hadoop.passPhrase |
秘密鍵のパスフレーズ |
resource.hadoop.env.ASAKUSA_HOME |
ログイン先の Asakusa Framework のインストール先 |
resource.hadoop.env.<name> |
ログイン先の環境変数 <name> の値 |
上記の設定のうち、先頭の resource.hadoop
を除くすべての項目の値の中に ${環境変数名}
という形式で環境変数を含められます。
なお、このリソースを利用するには、プラグインライブラリに asakusa-windgate-hadoopfs
、および $ASAKUSA_HOME/windgate/lib
ディレクトリに JSch [1] の追加が必要です。
詳しくは プラグインライブラリの管理 を参照してください。
リモートと通信する際に、SSHで接続する元でもHadoopの設定が必要です。 必要な環境変数については Hadoopを利用する際の環境変数 を参照してください。
Attention
Asakusa Framework 0.7.0
より、設定 resource.hadoop.compression
は利用できなくなりました。
転送時の圧縮はフレームワークが規定する内部の形式を利用するようになります。
[1] | http://www.jcraft.com/jsch/ |
Hadoopブリッジ¶
WindGateからSSHを経由してHadoopにアクセスする際に、HadoopブリッジとよぶAsakusa Frameworkのツールを経由します。
このツールは通常 $ASAKUSA_HOME/windgate-ssh
というディレクトリにインストールされていて、リモートコンピューターのAsakusa Frameworkにも同様のディレクトリが必要です。
また、プロファイルの resource.hadoop.env.ASAKUSA_HOME
には、リモートコンピューターのAsakusa Frameworkのインストール先をフルパスで指定してください。
このツールの内部では、以下の順序でリモートコンピューターの hadoop コマンドを検索し、そのコマンドでHadoopクラスターの操作を行います。
- リモートコンピューターのAsakusa Frameworkに組み込みのHadoop [2] がインストールされている場合、そのHadoop環境を利用します。
- 環境変数
HADOOP_CMD
が設定されている場合、$HADOOP_CMD
を hadoop コマンドとみなして利用します。 - 環境変数
HADOOP_HOME
が設定されている場合、 $HADOOP_HOME/bin/hadoop コマンドを利用します。 - hadoop コマンドのパスが通っている場合、それを利用します。
上記のうち、必要な環境変数をプロファイル内の resource.hadoop.env.<name>
や、リモート環境上の $ASAKUSA_HOME/windgate-ssh/conf/env.sh
ファイル内で設定してください。
結果としてコマンドが見つからなかった場合にはエラーになります。
また、ログの設定は $ASAKUSA_HOME/windgate-ssh/conf/logback.xml
で行えます。
WindGate本体と同様に、SLF4JとLogbackを利用しています [3] 。
Attention
HadoopブリッジはSSH経由で実行され、標準入出力を利用してWindGateとデータのやり取りを行います。 ブリッジのJavaプログラム内で標準出力を利用しようとした場合、標準エラー出力にリダイレクトされるようになっています。 そのため、ログの設定を行う際には、ログメッセージの出力先に注意してください。 通常はログ出力先に標準出力を設定しないようにしてください。
また、 $ASAKUSA_HOME/windgate-ssh/conf/env.sh
に指定した
HADOOP_USER_CLASSPATH_FIRST
の設定は、ログの設定を有効にするためにも必要です。
特別な理由でHadoopのクラスパスを優先したい時を除き、 HADOOP_USER_CLASSPATH_FIRST
の設定を変更しないようにしてください。
[2] | 組み込みのHadoopについては、 WindGateスタートガイド - Hadoopの設定 を参照してください。 |
[3] | WindGateのログ設定 を参照 |
ローカルファイルシステムの設定¶
WindGateのリソースとして、WindGateを起動したコンピュータのファイルシステムを指定できます [4] 。
構成ファイル内の resource.local
セクション内に以下の設定を記述します。
名前 | 値 |
---|---|
resource.local |
com.asakusafw.windgate.stream.file.FileResourceProvider |
resource.local.basePath |
ベースパス |
resource.local.basePath
は絶対パスで指定し、WindGateはそのパス以下のみを利用します。
また、 resource.local.basePath
には ${環境変数名}
の形式で環境変数を指定できます。
上記の設定のうち、先頭の resource.local
を除くすべての項目の値の中に ${環境変数名}
という形式で環境変数を含められます。
なお、このリソースを利用するには、プラグインライブラリに asakusa-windgate-stream
の追加が必要です。
詳しくは プラグインライブラリの管理 を参照してください。
Warning
開発環境では、ベースパスに壊れてもよいディレクトリを指定してください。 ここで指定したパスはテスト実行時などにテストドライバーが削除したり変更したりします。
[4] | WindGateを起動したコンピュータから、OSのファイルシステムを利用するというだけですので、ネットワークファイルシステム等でもファイルシステム上にマウントしてあれば利用可能です。 なお、「ローカル」と書いているのは、Hadoopのファイルシステムと区別するためです。 |
データベースの設定¶
WindGateのリソースとして、JDBCをサポートするデータベースを指定できます。
現在の構成では、WindGateから直接JDBCドライバーを利用して対象のデータベースにアクセスします。
また、データの取得にはテーブルを SELECT
文で取得し、データの書き戻しにはテーブルを TRUNCATE
した後にバッチモードで INSERT
文を発行します。
Warning
この構成では、データの書き出し前に対象のテーブルの内容を完全に削除します。 そのため、書き出し先のテーブルには通常利用するテーブルとは別のテーブルを指定し、WindGateの外側でマージ処理等を行ってください。
Attention
この構成では、データの取得時にアプリケーション側でのページネーション等を行いません。
そのため、MySQLなどのカーソル機能が十分でないデータベースでは、巨大なデータを取得する際に十分なパフォーマンスが得られません。
特に、MySQLの場合には設定に resource.jdbc.batchGetUnit=1000
, resource.jdbc.properties.useCursorFetch=true
等を指定し、カーソルを利用するようにしてください。
構成ファイル内の resource.jdbc
セクション内に以下の設定を記述します。
名前 | 値 |
---|---|
resource.jdbc |
com.asakusafw.windgate.jdbc.JdbcResourceProvider |
resource.jdbc.driver |
JDBCドライバーのクラス名 |
resource.jdbc.url |
接続先データベースのJDBC URL |
resource.jdbc.user |
データベースのユーザー名 |
resource.jdbc.password |
データベースのパスワード |
resource.jdbc.batchGetUnit |
一度に取得するデータの件数 (読み出し時) [5] |
resource.jdbc.batchPutUnit |
一度に挿入するデータの件数 (書き込み時) [6] |
resource.jdbc.connect.retryCount |
接続時のリトライ回数 (省略時にはリトライなし) |
resource.jdbc.connect.retryInterval |
接続リトライまでの間隔 (秒、省略時には10秒) |
resource.jdbc.statement.truncate |
テーブルの内容を削除する際の文形式 [7] (省略時には TRUNCATE 文) |
resource.jdbc.properties.<キー名> |
コネクションプロパティの値 |
resource.jdbc.optimizations |
データベースとの接続時に利用する最適化オプション |
上記の設定のうち、先頭の resource.jdbc
を除くすべての項目の値の中に ${環境変数名}
という形式で環境変数を含められます。
なお、このリソースを利用するには、プラグインライブラリに asakusa-windgate-jdbc
とJDBCドライバーライブラリの追加が必要です。
詳しくは プラグインライブラリの管理 を参照してください。
[5] | この値は Statement.setFetchSize() に設定します。
PostgreSQL等ではこの設定によってカーソルを利用するモードになります。
この値が未設定の場合や 0 を設定した場合、 Statement.getFetchSize() は既定値が利用されます。 |
[6] | 大きすぎる値を指定するとメモリ不足で正しく動作しません。 1000から10000程度での動作を確認しています。 |
[7] | この設定は java.text.MessageFormat の形式で指定し、削除対象のテーブル名は {0} で指定してください。
省略時には TRUNCATE TABLE {0} が利用され、代わりに DELETE FROM {0} などを指定できます。
なお、 MessageFormat ではシングルクウォート ( ' ) が特殊文字として取り扱われることに注意が必要です。 |
データベース接続時の最適化オプション¶
resource.jdbc.optimizations
にはWindGateで利用可能なデータベース固有の最適化オプションを指定します。
本バージョンでは、以下のオプションが利用可能です。
オプション | 内容 |
---|---|
ORACLE_DIRPATH |
Oracleのダイレクト・パス・インサートを有効にする。 実行するINSERT文に対して、APPEND_VALUESヒントを指定する。 WindGateの実行環境でダイレクト・パス・インサートを利用にするには、このオプション指定に加えて、 データベースを利用するエクスポーター記述 でオプションを指定する必要がある。 |
その他のWindGateの設定¶
WindGateプロファイルのほかに、WindGate全体の設定に関するものがいくつか用意されています。
WindGateの環境変数設定¶
WindGateの実行に特別な環境変数を利用する場合、 $ASAKUSA_HOME/windgate/conf/env.sh
内でエクスポートして定義できます。
WindGateをAsakusa Frameworkのバッチから利用する場合、以下の環境変数が必要です。
名前 | 備考 |
---|---|
ASAKUSA_HOME |
Asakusaのインストール先パス。 |
HADOOP_USER_CLASSPATH_FIRST |
WindGateのログ設定 時にHadoopのログ機構を利用しないための設定。 true を指定する。 |
特別な理由がない限り、 ASAKUSA_HOME
はWindGateを実行する前にあらかじめ定義しておいてください。
$ASAKUSA_HOME/windgate/conf/env.sh
では、その他必要な環境変数を定義するようにしてください。
Hint
YAESS を経由してWindGateを実行する場合、WindGateで利用する環境変数 ASAKUSA_HOME
はYAESS側の設定で行えます。
詳しくは YAESSユーザーガイド を参照してください。
その他、以下の環境変数を利用可能です。
名前 | 備考 |
---|---|
HADOOP_CMD |
利用する hadoop コマンドのパス。 |
HADOOP_HOME |
Hadoopのインストール先パス。 |
WINDGATE_OPTS |
WindGateを実行するJava VMの追加オプション。 |
なお、WindGateの本体は、以下の規約に従って起動します (上にあるものほど優先度が高いです)。
- Asakusa Frameworkに組み込みのHadoop [8] がインストールされている場合、そのHadoopライブラリーを利用します。
- 環境変数に
HADOOP_CMD
が設定されている場合、$HADOOP_CMD
コマンドを経由して起動します。 - 環境変数に
HADOOP_HOME
が設定されている場合、$HADOOP_HOME/bin/hadoop
コマンドを経由して起動します。 - hadoop コマンドのパスが通っている場合、 hadoop コマンドを経由して起動します。
このため、 HADOOP_CMD
と HADOOP_HOME
の両方を指定した場合、 HADOOP_CMD
の設定を優先します。
Attention
Asakusa Framework バージョン 0.9系以前では hadoop コマンドが見つからない場合、WindGateは代わりに java コマンドを利用してアプリケーションを起動する機能が含まれていましたが、バージョン 0.10 よりこの機能は廃止されました。
[8] | 組み込みのHadoopについては、 WindGateスタートガイド - Hadoopの設定 を参照してください。 |
WindGateのログ設定¶
WindGateは内部のログ表示に SLF4J
[9] 、およびバックエンドに Logback
[10] を利用しています。
ログの設定を変更するには、 $ASAKUSA_HOME/windgate/conf/logback.xml
を編集してください。
また、WindGateの実行時には以下の値がシステムプロパティとして設定されます。
名前 | 値 |
---|---|
com.asakusafw.windgate.log.batchId |
バッチID |
com.asakusafw.windgate.log.flowId |
フローID |
com.asakusafw.windgate.log.executionId |
実行ID |
Logback以外のログの仕組みを利用する場合、 $ASAKUSA_HOME/windgate/lib
にあるLogback関連のライブラリを置換した上で、設定ファイルを $ASAKUSA_HOME/windgate/conf
以下に配置します (ここは実行時にクラスパスとして設定されます)。
[9] | http://www.slf4j.org/ |
[10] | http://logback.qos.ch/ |
プラグインライブラリの管理¶
WindGateの様々な機能は、プラグイン機構を利用して実現しています。
それぞれのプラグイン、およびプラグインが利用する依存ライブラリは、 $ASAKUSA_HOME/windgate/plugin
ディレクトリ直下に配置してください。
たとえば、WindGateはHadoopクラスターにアクセスする際にもプラグインが必要です。 標準的なものはWindGate導入時に自動的にプラグインが追加されますが、その他のプラグインは拡張モジュールとして提供されるため、必要に応じて拡張モジュールを導入してください。
See also
拡張モジュールの一覧やその導入方法については、 Asakusa Gradle Plugin ユーザーガイド や Asakusa Framework デプロイメントガイド を参照してください。
ローカルファイルシステムの入出力¶
Asakusa FrameworkのバッチアプリケーションからWindGateを利用してローカルファイルシステムの入出力を行うには、対象のプロファイルに ローカルファイルシステムの設定 を追加します。
また、データモデルとバイトストリームをマッピングする DataModelStreamSupport
[11] の実装クラスを作成します。
この実装クラスは、DMDLコンパイラの拡張を利用して自動的に生成できます。
なお、以降の機能を利用するには次のライブラリやプラグインが必要です [12] 。
ライブラリ | 概要 |
---|---|
asakusa-windgate-vocabulary |
DSL用のクラス群 |
asakusa-windgate-plugin |
DSLコンパイラプラグイン |
asakusa-windgate-test-moderator |
テストドライバープラグイン |
asakusa-windgate-dmdl |
DMDLコンパイラプラグイン |
Hint
Asakusa Gradle Plugin ユーザーガイド の手順に従ってプロジェクトテンプレートから作成したプロジェクトは、これらのライブラリやプラグインがGradle Pluginによってデフォルトで利用可能になっています。
[11] | com.asakusafw.runtime.directio.DataFormat |
[12] | com.asakusafw.windgate.core.vocabulary.DataModelStreamSupport |
CSV形式のDataModelStreamSupportの作成¶
CSV形式 [13] に対応した DataModelStreamSupport
の実装クラスを自動的に生成するには、対象のデータモデルに @windgate.csv
を指定します。
@windgate.csv
document = {
"the name of this document"
name : TEXT;
"the content of this document"
content : TEXT;
};
上記のように記述してデータモデルクラスを生成すると、 <出力先パッケージ>.csv.<データモデル名>CsvSupport
というクラスが自動生成されます。
このクラスは DataModelStreamSupport
を実装し、データモデル内のプロパティが順番に並んでいるCSVを取り扱えます。
また、 単純な ローカルファイルシステムを利用するインポーター記述 と ローカルファイルシステムを利用するエクスポーター記述 の骨格も自動生成します。
前者は <出力先パッケージ>.csv.Abstract<データモデル名>CsvImporterDescription
、後者は <出力先パッケージ>.csv.Abstract<データモデル名>CsvExporterDescription
というクラス名で生成します。必要に応じて継承して利用してください。
[13] | ここでのCSV形式は、 RFC 4180 で提唱されている形式を拡張したものです。 文字セットをASCIIの範囲外にも拡張したり、CRLF以外にもCRのみやLFのみも改行と見なしたり、ダブルクウォート文字の取り扱いを緩くしたりなどの拡張を加えています。 CSV形式の注意点 も参照してください。 |
CSV形式の設定¶
@windgate.csv
属性には、次のような要素を指定できます。
要素 | 型 | 既定値 | 内容 |
---|---|---|---|
charset |
文字列 | "UTF-8" |
ファイルの文字エンコーディング |
has_header |
論理値 | FALSE |
TRUE でヘッダの利用を許可。 FALSE で不許可 |
force_header |
論理値 | FALSE |
TRUE でヘッダの利用を許可し、ヘッダの形式チェックを行わない。 FALSE で不許可 |
true |
文字列 | "true" |
BOOLEAN 型の TRUE 値の表現形式 |
false |
文字列 | "false" |
BOOLEAN 型の FALSE 値の表現形式 |
date |
文字列 | "yyyy-MM-dd" |
DATE 型の表現形式 |
datetime |
文字列 | "yyyy-MM-dd HH:mm:ss" |
DATETIME 型の表現形式 |
なお、 date
および datetime
には SimpleDateFormat
[14] の形式で日付や時刻を指定します。
以下は記述例です。
@windgate.csv(
charset = "ISO-2022-JP",
has_header = TRUE,
true = "1",
false = "0",
date = "yyyy/MM/dd",
datetime = "yyyy/MM/dd HH:mm:ss",
)
model = {
...
};
[14] | java.text.SimpleDateFormat |
CSVフィールドの設定¶
CSVのフィールドに関する設定は、DMDLスクリプトのデータモデルに含まれるそれぞれのプロパティに @windgate.csv.field
属性を指定します。
@windgate.csv.field
属性には、次のような要素を指定できます。
要素 | 型 | 既定値 | 内容 |
---|---|---|---|
name |
文字列 | プロパティ名 | CSV形式の設定 でヘッダを有効にしている場合に使用するCSVのヘッダのフィールド名 |
quote |
文字列 | default |
各フィールド値のクォートに関する動作の指定。default , needed , always のいずれかを指定 |
name
要素に指定するフィールド名は、入力データの読み込み時のヘッダの形式チェックやデータ出力時のヘッダ文字列として使用されます。
quote
要素に関する動作の指定は以下の通りです。
needed
: フィールド値にクォート処理が必要な値が含まれている場合にクォート処理を行うalways
: フィールド値の内容に関わらず、常にクォート処理を行うdefault
:needed
を指定した場合と同じ動作を行う
以下はCSVフィールドの設定を付加したDMDLスクリプトの記述例です。
@windgate.csv
document = {
"the name of this document"
@windgate.csv.field(name = "題名" , quote = "always")
name : TEXT;
"the content of this document"
@windgate.csv.field(name = "内容" , quote = "needed")
content : TEXT;
};
ファイル情報の取得¶
解析中のCSVファイルに関する属性を取得する場合、それぞれ以下の属性をプロパティに指定します。
属性 | 型 | 内容 |
---|---|---|
@windgate.csv.file_name |
TEXT |
ファイル名 |
@windgate.csv.line_number |
INT , LONG |
テキスト行番号 (1起算) |
@windgate.csv.record_number |
INT , LONG |
レコード番号 (1起算) |
上記の属性が指定されたプロパティは、CSVのフィールドから除外されます。
Attention
これらの属性はCSVの解析時のみ有効です。 CSVを書き出す際には無視されます。
CSVから除外するプロパティ¶
特定のプロパティをCSVのフィールドとして取り扱いたくない場合、プロパティに @windgate.csv.ignore
を指定します。
CSV形式の注意点¶
自動生成でサポートするCSV形式を利用するうえで、いくつかの注意点があります。
- CSVに空の文字列を書き出しても、読み出し時に
null
として取り扱われます - 論理値は復元時に、値が
true
で指定した文字列の場合にはtrue
, 空の場合にはnull
, それ以外の場合にはfalse
となります - ヘッダが一文字でも異なる場合、解析時にヘッダとして取り扱われません
- 1レコードが10MBを超える場合、正しく解析できません
ローカルファイルシステムを利用するインポーター記述¶
WindGateと連携してファイルからデータをインポートする場合、 FsImporterDescription
[15] クラスのサブクラスを作成して必要な情報を記述します。
このクラスでは、下記のメソッドをオーバーライドします。
String getProfileName()
- インポーターが使用するプロファイル名を戻り値に指定します。
String getPath()
インポート対象のファイルパスを
resource.local.basePath
からの相対パスで指定します。ここには
${変数名}
の形式で、バッチ起動時の引数やあらかじめ宣言された変数を利用できます。 利用可能な変数はコンテキストAPIで参照できるものと同様です。Class<?> getModelType()
インポーターが処理対象とするデータモデルオブジェクトの型を表すクラスを戻り値に指定します。
このメソッドは、自動生成される骨格ではすでに宣言されています。
Class<? extends DataModelStreamSupport<?>> getStreamSupport()
DataModelStreamSupport
の実装クラスを戻り値に指定します。このメソッドは、自動生成される骨格ではすでに宣言されています。
以下は実装例です。
public class DocumentFromFile extends FsImporterDescription {
@Override
public Class<?> getModelType() {
return Document.class;
}
@Override
public String getProfileName() {
return "example";
}
@Override
public String getPath() {
return "example/input.csv";
}
@Override
public Class<? extends DataModelStreamSupport<?>> getStreamSupport() {
return DocumentCsvSupport.class;
}
}
[15] | com.asakusafw.vocabulary.windgate.FsImporterDescription |
ローカルファイルシステムを利用するエクスポーター記述¶
WindGateと連携してジョブフローの処理結果をローカルのファイルに書き出すには、 FsExporterDescription
[16] クラスのサブクラスを作成して必要な情報を記述します。
このクラスでは、下記のメソッドをオーバーライドします。
String getProfileName()
- エクスポーターが使用するプロファイル名を戻り値に指定します。
String getPath()
エクスポート対象のファイルパスを
resource.local.basePath
からの相対パスで指定します。ここには
${変数名}
の形式で、バッチ起動時の引数やあらかじめ宣言された変数を利用できます。 利用可能な変数はコンテキストAPIで参照できるものと同様です。Class<?> getModelType()
エクスポーターが処理対象とするデータモデルオブジェクトの型を表すクラスを戻り値に指定します。
このメソッドは、自動生成される骨格ではすでに宣言されています。
Class<? extends DataModelStreamSupport<?>> getStreamSupport()
DataModelStreamSupport
の実装クラスを戻り値に指定します。このメソッドは、自動生成される骨格ではすでに宣言されています。
Attention
getPath()
で指定した出力先に既にファイルが存在する場合、エクスポート時に上書きされます。
以下は実装例です。
public class WordIntoFile extends FsExporterDescription {
@Override
public Class<?> getModelType() {
return Word.class;
}
@Override
public String getProfileName() {
return "example";
}
@Override
public String getPath() {
return "example/output.csv";
}
@Override
public Class<? extends DataModelStreamSupport<?>> getStreamSupport() {
return WordCsvSupport.class;
}
}
[16] | com.asakusafw.vocabulary.windgate.FsExporterDescription |
データベースの入出力¶
Asakusa FrameworkのバッチアプリケーションからWindGateを利用してデータベースの入出力を行うには、 対象のプロファイルに データベースの設定 を追加します。
また、データモデルと PreparedStatement
, ResultSet
をマッピングする DataModelJdbcSupport
[17] の実装クラスを作成します。
この実装クラスは、DMDLコンパイラの拡張を利用して自動的に生成できます。
なお、以降の機能を利用するには次のライブラリやプラグインが必要です。
ライブラリ | 概要 |
---|---|
asakusa-windgate-vocabulary |
DSL用のクラス群 |
asakusa-windgate-plugin |
DSLコンパイラプラグイン |
asakusa-windgate-test-moderator |
テストドライバープラグイン |
asakusa-windgate-dmdl |
DMDLコンパイラプラグイン |
Hint
Asakusa Gradle Plugin ユーザーガイド の手順に従ってプロジェクトテンプレートから作成したプロジェクトは、これらのライブラリやプラグインがGradle Pluginによってデフォルトで利用可能になっています。
[17] | com.asakusafw.windgate.core.vocabulary.DataModelJdbcSupport |
DataModelJdbcSupportの自動生成¶
データモデルから DataModelJdbcSupport
の実装クラスを自動的に生成するには、それぞれのプロパティに @windgate.jdbc.column
を指定してさらに name
要素で対応するカラム名を記述します。
また、テーブル名を指定するにはデータモデルに @windgate.jdbc.table
を指定して name
要素内に記述します [18] 。
@windgate.jdbc.table(name = "DOCUMENT")
document = {
"the name of this document"
@windgate.jdbc.column(name = "NAME")
name : TEXT;
"the content of this document"
@windgate.jdbc.column(name = "CONTENT")
content : TEXT;
};
上記のように記述してデータモデルクラスを生成すると、
<出力先パッケージ>.jdbc.<データモデル名>JdbcSupport
というクラスが自動生成されます。
このクラスは DataModelJdbcSupport
を実装し、 @windgate.jdbc.column
で指定したカラムが利用可能です。
また、 @windgate.jdbc.table
を指定した場合、単純な データベースを利用するインポーター記述 と データベースを利用するエクスポーター記述 の骨格も自動生成します。
前者は <出力先パッケージ>.jdbc.Abstract<データモデル名>JdbcImporterDescription
、後者は <出力先パッケージ>.jdbc.Abstract<データモデル名>JdbcExporterDescription
というクラス名で生成します。
この自動生成されたインポーター/エクスポーター記述の骨格は指定されたテーブルのすべてのカラムを利用します。 必要に応じて継承して利用してください。
[18] | @windgate.jdbc.table の指定は必須ではありません。 |
DMDLとJDBCの型の対応¶
DMDLとJDBCの型の対応は以下の通りです。
説明 | DMDL | Javaクラス | JDBC |
32bit符号付き整数 | INT | int (IntOption) | int |
64bit符号付き整数 | LONG | long (LongOption) | long |
単精度浮動小数点 | FLOAT | float (FloatOption) | float |
倍精度浮動小数点 | DOUBLE | double (DoubleOption) | double |
文字列 | TEXT | Text (StringOption) | String |
10進数 | DECIMAL | BigDecimal (DecimalOption) | BigDecimal |
日付 | DATE | Date (DateOption) | java.sql.Date |
日時 | DATETIME | DateTime (DateTime) | java.sql.Timestamp |
論理値 | BOOLEAN | boolean (BooleanOption) | boolean |
8bit符号付き整数 | BYTE | byte (ByteOption) | byte |
16bit符号付き整数 | SHORT | short (ShortOption) | short |
データベースを利用するインポーター記述¶
WindGateと連携してデータベースのテーブルからデータをインポートする場合、 JdbcImporterDescription
[19] クラスのサブクラスを作成して必要な情報を記述します。
このクラスでは、下記のメソッドをオーバーライドします。
String getProfileName()
- インポーターが使用するプロファイル名を戻り値に指定します。
Class<?> getModelType()
インポーターが処理対象とするデータモデルオブジェクトの型を表すクラスを戻り値に指定します。
このメソッドは、自動生成される骨格ではすでに宣言されています。
String getTableName()
インポート対象のテーブル名を戻り値に指定します。
このメソッドは、自動生成される骨格ではすでに宣言されています。
List<String> getColumnNames()
インポート対象のカラム名を戻り値に指定します。 ここで指定したカラム名のみインポートを行います。
このメソッドは、自動生成される骨格ではすでに宣言されています。
Class<? extends DataModelJdbcSupport<?>> getJdbcSupport()
DataModelJdbcSupport
の実装クラスを戻り値に指定します。このメソッドは、自動生成される骨格ではすでに宣言されています。
String getCondition()
インポーターが利用する抽出条件をSQLの条件式で指定します(省略可能)。
指定する文字列はSQL文の
WHERE
句以降の文字列(WHERE
の部分は不要)である必要があります。 省略時にはテーブル全体を入力の対象にとります。ここには
${変数名}
の形式で、バッチ起動時の引数やあらかじめ宣言された変数を利用できます。 利用可能な変数はコンテキストAPIで参照できるものと同様です。 変数がそのまま文字列として展開されるため、文字列リテラルを利用する場合などには注意が必要です。Collection<? extends JdbcAttribute> getOptions()
インポート処理時に指定するオプションを指定します。
指定可能なオプションは、列挙型
JdbcImporterDescription.Option
[20] に定義されています。 本バージョンでは、以下のオプションが利用可能です。¶ オプション 内容 ORACLE_PARTITION
Oracleのパーティションテーブルに対する並列読み込みを有効にする。
この機能を利用する場合、WindGateの実行環境は WindGate JDBC ダイレクト・モード を利用する必要がある。 また WindGate JDBC ダイレクト・モード の実行環境では、このオプション指定に加えて JDBCドライバーの配置 で
com.asakusafw.dag.jdbc.<profile-name>.optimizations
を設定する必要がある。
以下は実装例です。
public class DocumentFromDb extends JdbcImporterDescription {
@Override
public Class<?> getModelType() {
return Document.class;
}
@Override
public String getProfileName() {
return "example";
}
@Override
public String getTableName() {
return "DOCUMENT";
}
@Override
public List<String> getColumnNames() {
return Arrays.asList("NAME", "CONTENT");
}
@Override
public Class<? extends DataModelJdbcSupport<?>> getJdbcSupport() {
return DocumentJdbcSupport.class;
}
@Override
public Collection<JdbcAttribute> getOptions() {
return Arrays.asList(Option.ORACLE_PARTITION);
}
}
[19] | com.asakusafw.vocabulary.windgate.JdbcImporterDescription |
[20] | com.asakusafw.vocabulary.windgate.JdbcImporterDescription.Option |
データベースを利用するエクスポーター記述¶
WindGateと連携してジョブフローの処理結果をデータベースのテーブルに書き出すには、 JdbcExporterDescription
[21] クラスのサブクラスを作成して必要な情報を記述します。
このクラスでは、下記のメソッドをオーバーライドします。
String getProfileName()
- エクスポーターが使用するプロファイル名を戻り値に指定します。
Class<?> getModelType()
エクスポーターが処理対象とするデータモデルオブジェクトの型を表すクラスを戻り値に指定します。
このメソッドは、自動生成される骨格ではすでに宣言されています。
String getTableName()
エクスポート対象のテーブル名を戻り値に指定します。
このメソッドは、自動生成される骨格ではすでに宣言されています。
List<String> getColumnNames()
エクスポート対象のカラム名を戻り値に指定します。 ここで指定したカラム名のみエクスポートを行います。
このメソッドは、自動生成される骨格ではすでに宣言されています。
String getCustomTruncate()
テーブルの内容を削除する際のクエリー文を指定します。 省略時には データベースの設定 に従ったクエリーが実行されます。
ここには
${変数名}
の形式で、バッチ起動時の引数やあらかじめ宣言された変数を利用できます。 利用可能な変数はコンテキストAPIで参照できるものと同様です。 変数がそのまま文字列として展開されるため、文字列リテラルを利用する場合などには注意が必要です。Class<? extends DataModelJdbcSupport<?>> getJdbcSupport()
DataModelJdbcSupport
の実装クラスを戻り値に指定します。このメソッドは、自動生成される骨格ではすでに宣言されています。
Collection<? extends JdbcAttribute> getOptions()
エクスポート処理時に指定するオプションを指定します。
指定可能なオプションは、列挙型
JdbcExporterDescription.Option
[22] に定義されています。 本バージョンでは、以下のオプションが利用可能です。¶ オプション 内容 ORACLE_DIRPATH
Oracleのダイレクト・パス・インサートを有効にする。 実行するINSERT文に対して、APPEND_VALUESヒントを指定する。
WindGateの実行環境でダイレクト・パス・インサートを利用にするには、このオプション指定に加えて データベースの設定 で
resource.jdbc.optimizations
を設定する必要がある。
以下は実装例です。
public class WordIntoDb extends JdbcExporterDescription {
@Override
public Class<?> getModelType() {
return Word.class;
}
@Override
public String getProfileName() {
return "example";
}
@Override
public String getTableName() {
return "WORD";
}
@Override
public List<String> getColumnNames() {
return Arrays.asList("STRING", "FREQUENCY");
}
@Override
public Class<? extends DataModelJdbcSupport<?>> getJdbcSupport() {
return WordJdbcSupport.class;
}
@Override
public Collection<Option> getOptions() {
return Arrays.asList(Option.ORACLE_DIRPATH);
}
}
[21] | com.asakusafw.vocabulary.windgate.JdbcExporterDescription |
[22] | com.asakusafw.vocabulary.windgate.JdbcExporterDescription.Option |
WindGateと連携したテスト¶
WindGateを利用したジョブフローやバッチのテストは、Asakusa Frameworkの通常のテスト方法で行えます。 通常のテストについては アプリケーションのテスト - TestDriver を参照してください。
Attention
テストドライバーは、テストのたびにWindGateのプラグイン用のClassLoaderを作成し、プラグインライブラリをクラスパスに通します。
クラスロードに関する問題が発生した場合には、テストを実行する際のクラスパスにそれらのライブラリを含めてください。