Asakusa DSLとThunderGateの連携¶
この文書では、ThunderGateと連携してデータベースを操作するバッチアプリケーションを Asakusa DSLで記述する方法について紹介します。
ThunderGateについての情報は、 ThunderGateユーザーガイド を参照して下さい。
基本的な連携の方法¶
Asakusa DSLで定義するジョブフローは、 ThunderGateと連携することで入出力にデータベースのテーブルを利用できます。
Note
単一のジョブフローの中で、インポートとエクスポートの対象にできるデータベースは1つまでです。 ただし、 補助インポータ の機能を利用すると、制限つきで複数のデータベースからインポートできます。
データベースのテーブルからインポートする¶
ThunderGateと連携してデータベースのテーブルからデータをインポートする場合、
DbImporterDescription
[1] クラスのサブクラスを作成して必要な情報を記述します。
このクラスでは、下記のメソッドをオーバーライドします。
String getTargetName()
- インポータが使用するターゲット名(データソースを表す識別子)を戻り値に指定します。 インポータは実行時に $ASAKUSA_HOME/bulkloader/conf 配下に配置した[ターゲット名]-jdbc.properties に記述されたデータベース接続情報定義ファイルを使用してデータベースに対するアクセスを行います。 このファイルの内容については ThunderGateユーザーガイド を参照して下さい。
Class<?> getModelType()
- インポータが処理対象とするデータモデルオブジェクトの型を表すクラスを戻り値に指定します。 インポータは実行時にデータモデルクラスを作成する元となったテーブル名に対してインポート処理を行います [2] 。
LockType getLockType()
- インポータの処理時に行われるロックの種類を戻り値に指定します。
指定可能なロック種別については
BulkLoadImporterDescription.LockType
のAPIリファレンスを参照して下さい [3] 。 String getWhere()
インポータが利用する抽出条件をSQLの条件式で指定します。 指定する文字列はMySQL形式の
WHERE
以降の文字列である必要があります。ここには
${変数名}
の形式で、バッチ起動時の引数やあらかじめ宣言された変数を利用できます。 利用可能な変数はコンテキストAPIで参照できるものと同様です。 詳しくは Asakusa DSLユーザーガイド を参照してください。DataSize getDataSize()
- インポートするデータのおおよそのサイズを指定します。 コンパイラはこのデータサイズをヒントに実行計画を作成します。 省略された場合にはデータサイズが不明であるという前提で実行計画を作成します。
以下の例では、テーブルから生成したデータモデルクラス Hoge
に対応するテーブルの全データをロードして、その際にテーブルロックを取得します。
public class HogeFromDb extends DbImporterDescription {
public String getTargetName() {
return "asakusa";
}
public Class<?> getModelType() {
return Hoge.class;
}
public LockType getLockType() {
return LockType.TABLE;
}
}
以下の例では、バッチ引数で指定した parameter
変数を利用して条件式を指定しています。
public class HogeFromDb extends DbImporterDescription {
public String getTargetName() {
return "asakusa";
}
public Class<?> getModelType() {
return Hoge.class;
}
public String getWhere() {
return "NAME = '${parameter}'";
}
public LockType getLockType() {
return LockType.ROW;
}
}
[1] | com.asakusafw.vocabulary.bulkloader.DbImporterDescription |
[2] | DMDLを直接記述してデータモデルクラスを作成している場合、 DbImporterDescription の代わりに BulkLoadImporterDescription [4] を利用して下さい |
[3] | com.asakusafw.vocabulary.bulkloader.BulkLoadImporterDescription.LockType |
[4] | com.asakusafw.vocabulary.bulkloader.BulkLoadImporterDescription |
データベースのテーブルにエクスポートする¶
ThunderGateと連携してジョブフローの処理結果をデータベースのテーブルに書き出すには、
DbExporterDescription
[5] クラスのサブクラスを作成して必要な情報を記述します。
このクラスでは、下記のメソッドをオーバーライドします。
String getTargetName()
- エクスポータが使用するターゲット名(データソースを表す識別子)を戻り値に指定します。
利用方法はインポータの
getTargetName()
と同様です。 Class<?> getModelType()
- エクスポータが処理対象とするデータモデルオブジェクトの型を表すクラスを戻り値に指定します。 エクスポータは実行時にデータモデルクラスを作成する元となったテーブル名に対してエクスポート処理を行います [6] 。
以下の例では、テーブルから生成したデータモデルクラス Hoge
に対応するテーブルに対して、ジョブフローの処理結果を書き戻します。
public class HogeIntoDb extends DbExporterDescription {
public Class<?> getModelType() {
return Hoge.class;
}
}
[5] | com.asakusafw.vocabulary.bulkloader.DbExporterDescription |
[6] | DMDLを直接記述してデータモデルクラスを作成している場合、 DbExporterDescription の代わりに BulkLoadExporterDescription [7] を利用して下さい |
[7] | com.asakusafw.vocabulary.bulkloader.BulkLoadExporterDescription |
補助インポータ¶
補助インポータは、1つのジョブフロー中に通常のインポートやエクスポート処理を行うデータベースとは別の、 データベースからデータをインポートする際に使用するインポータです。
通常のインポータはデータの更新を前提としてロック取得 (排他制御) の指定を行いますが、 補助インポータは指定したテーブルに対してデータを参照のみを行います。 つまり、補助インポータを利用すると、「他のデータベースからマスタデータなどの参照データを読み出せる」ということになります [8] 。
補助インポータを使用してインポート処理を行うには、
SecondaryImporterDescription
[9] を継承したクラス(インポート処理記述クラス)を作成し、必要なメソッドをオーバーライドします。
同クラスに指定するメソッドを以下に示します。
String getTargetName()
- 補助インポータが使用するターゲット名(データソースを表す識別子)を戻り値に指定します。 通常のインポータとは異なるターゲット名を指定します。補助インポータ実行時にはターゲット名に対応するデータベース接続情報定義ファイルを配置しておく必要があります。 データベース接続情報定義ファイルの定義方法は通常のインポータと同様です。
Class<?> getModelType()
- 補助インポータが処理対象とするデータモデルオブジェクトの型を表すクラスを戻り値に指定します。 利用方法は通常のインポータと同様です。
String getWhere()
- 補助インポータが利用する抽出条件をSQLの条件式で指定します。 利用方法は通常のインポータと同様です。
DataSize getDataSize()
- このインポータが取り込むデータサイズの分類を指定します。 利用方法は通常のインポータと同様です。
以下の例では、テーブルから生成したデータモデルクラス Foo
に対応するテーブルの全データをロードします。
また、その時に利用するデータベースは other
というターゲット名で指定しています。
以下の例では、テーブルから生成したデータモデルクラス Hoge
に対応するテーブルに対して、ジョブフローの処理結果を書き戻します。
/**
* 補助インポータの動作を定義する。
*/
public class SecondaryImporterExample extends SecondaryImporterDescription {
@Override
public String getTargetName() {
return "other";
}
@Override
public Class<?> getModelType() {
return Foo.class;
}
// 補助インポータはgetLockType()をオーバーライドできない。
}
以下は補助インポータを利用する場合の注意点です。
- 補助インポータでないインポータのターゲットは、ジョブフロー中で1種類までです
DbImporterDescription
を使う場合、getTargetName()
はジョブフロー中で全て同じものにしてくださいSecondaryImporterDescription
が、DbImporterDescription
と同じターゲット名を指定することは可能です
- エクスポータのターゲットは、通常のインポータと同じターゲットにしてください
- 通常のインポータでターゲットAを指定し、エクスポータと補助インポータにターゲットBを指定、のようなことはできません
- 通常のインポータを一つも利用しない場合、エクスポータのターゲット名は何を指定してもかまいません
[8] | これとは逆の「補助エクスポータ」のような仕組みは現在提供していません |
[9] | com.asakusafw.vocabulary.bulkloader.SecondaryImporterDescription |
重複チェック機能¶
エクスポータの拡張機能で、新しいレコードをテーブルに追加する際に、特定のカラムが同じデータが既にデータベース上にあるかどうかをチェックできます。 重複データがデータベース上に既に存在する場合には、そのデータを通常のテーブルには追加せずに、かわりにエラー情報のテーブルに追加します。 この機能は既存のレコードに対しては利用できず、 新しいレコードを追加する際にだけ利用できます 。
重複チェックを行う場合、まずは次のようなテーブルが必要です。
- 正常レコードを登録するテーブル
- 重複チェック用のカラムがテーブルに存在すること
- 重複したレコードを登録するエラーテーブル
- エラーコードを格納するカラム(CHAR/VARCHAR型)がテーブルに存在すること
この機能の利用方法を、2つのケースに分けて説明します。
正常テーブルよりもエラーテーブルの情報が少ない場合¶
正常テーブルよりもエラーテーブルの情報が少ない場合に、重複チェックを行う方法を紹介します。 このとき、正常テーブルとエラーテーブルは次のような関係であるとします。
- 正常テーブル
- 必要な業務情報やシステムカラムを含んでいる
- 重複チェック用のカラムを含んでいる
- エラーテーブル
- 正常テーブルの一部または全部のカラムが、同じ名前で存在する
- さらに、エラーコードを格納するカラムが存在する (正常テーブルに含まれていなくてよい)
つまり、正常テーブルにない情報をエラーテーブルに設定したい場合 [10] には、この方法は利用できません。 この場合には 正常テーブルとエラーテーブルの構造が大きく異なる場合 を参照して下さい。
重複チェックを行うには DbExporterDescription
の代わりに DupCheckDbExporterDescription
[11] を継承したエクスポータ記述を作成します。
/**
* 重複チェックつきエクスポータの動作を定義する (正常テーブル中心)。
*/
public class DupCheckExporterExample1 extends DupCheckDbExporterDescription {
@Override
public String getTargetName() {
return "asakusa";
}
@Override
public Class<?> getModelType() {
return Hoge.class;
}
@Override
protected Class<?> getNormalModelType() {
return Hoge.class;
}
@Override
protected Class<?> getErrorModelType() {
return HogeError.class;
}
@Override
protected List<String> getCheckColumnNames() {
return Arrays.asList("VALUE");
}
@Override
protected String getErrorCodeColumnName() {
return "ERR_CODE";
}
@Override
protected String getErrorCodeValue() {
return "999";
}
}
それぞれのオーバーライドしたメソッドでは、以下のように設定します。
getTargetName()
- エクスポータが使用するターゲット名(データソースを表す識別子)を戻り値に指定します。 利用方法は通常のエクスポータやインポータと同様です。
getModelType()
- 正常テーブルのテーブルモデルクラスを返します。
getNormalModelType()
- 正常テーブルのテーブルモデルクラスを返します。
getErrorModelType()
- エラーテーブルのテーブルモデルクラスを返します。
getCheckColumnNames()
- 重複チェックを行うカラム名の一覧を返します。 この値は、正常テーブルのテーブルモデルに存在するカラムを指定する必要があります。
getErrorCodeColumnName()
- エラーコードを格納するカラム名を返します。 この値は、エラーテーブルに実際に存在するカラム名である必要があります。
getErrorCodeValue()
- 重複チェックに失敗した場合に設定されるエラーコードです。 この値は重複チェックに失敗したレコードがエラーテーブルに格納される際に、上記「エラーコードを格納するカラム」に自動的に設定されます。
[10] | エラーコードを格納するカラムだけは、正常テーブルになくても大丈夫です |
[11] | com.asakusafw.vocabulary.bulkloader.DupCheckDbExporterDescription |
正常テーブルとエラーテーブルの構造が大きく異なる場合¶
正常テーブルよりもエラーテーブルの情報が多い場合や、正常テーブルとエラーテーブルの構造が大きく異なる場合には、 それらの両方のプロパティを持つデータモデルを予め作成する必要があります。 ここでは、そのようなデータモデルを「ユニオンモデル」と仮に呼ぶことにします。
なお、ここで想定する正常テーブルとエラーテーブルは次のような制約があるものとします。
- 正常テーブル
- 重複チェック用のカラムを含んでいる
- エラーテーブル
- エラーコードを格納するカラムが存在する
ユニオンモデルは、上記の2つのテーブルの全てのカラムを持つようなデータ構造である必要があります [12] 。
正常テーブルの名前が NORMAL_TABLE
, エラーテーブルの名前が ERROR_TABLE
である場合、
ユニオンモデルはDMDLで次のように記述できます [13] 。
union_model = normal_table + error_table;
上記の記述によって、 UnionModel
という名前のユニオンモデルを作成できます。
また、ジョブフローやフロー部品では、ユニオンテーブルのテーブルモデルを使って処理を行います。
ユニオンテーブルのテーブルモデルをエクスポートする際に、先ほどと同様に DupCheckDbExporterDescription
を指定して、次のように書きます。
/**
* 重複チェックつきエクスポータの動作を定義する (ユニオンモデル)。
*/
public class DupCheckExporterExample2 extends DupCheckDbExporterDescription {
@Override
public String getTargetName() {
return "asakusa";
}
@Override
public Class<?> getModelType() {
return UnionModel.class;
}
@Override
protected Class<?> getNormalModelType() {
return NormalTable.class;
}
@Override
protected Class<?> getErrorModelType() {
return ErrorTable.class;
}
@Override
protected List<String> getCheckColumnNames() {
return Arrays.asList("VALUE");
}
@Override
protected String getErrorCodeColumnName() {
return "ERR_CODE";
}
@Override
protected String getErrorCodeValue() {
return "999";
}
}
この構造は 正常テーブルよりもエラーテーブルの情報が少ない場合 とほとんど同じですが、メソッド getModelType()
の戻り値が異なっています。
ジョブフローの出力もここに指定する型(ユニオンモデル)でなくてはならないことに注意して下さい。
全体としては以下のように設定します。
getTargetName()
- エクスポータが使用するターゲット名(データソースを表す識別子)を戻り値に指定します。 利用方法は通常のエクスポータやインポータと同様です。
getModelType()
- ユニオンモデルクラスを返します。
getNormalModelType()
- 正常テーブルのテーブルモデルクラスを返します。
getErrorModelType()
- エラーテーブルのテーブルモデルクラスを返します。
getCheckColumnNames()
- 重複チェックを行うカラム名の一覧を返します。 この値は、正常テーブルのテーブルモデルに存在するカラムを指定する必要があります。
getErrorCodeColumnName()
- エラーコードを格納するカラム名を返します。 この値は、エラーテーブルに実際に存在するカラム名である必要があります。
getErrorCodeValue()
- 重複チェックに失敗した場合に設定されるエラーコードです。 この値は重複チェックに失敗したレコードがエラーテーブルに格納される際に、上記「エラーコードを格納するカラム」に自動的に設定されます。
この機能で想定するユースケースは、「別システムからの取り込みとクレンジング処理のバッチ」です。
- 取込みデータの形式をユニオンモデルで表す
- 正常テーブルは、業務に必要なカラムだけを含める
- エラーテーブルは、エラートラッキングに必要なカラムだけを含める
- 取込みデータをクレンジングして、エラーがあればエラーカラムに情報をセットして、エラーテーブルに情報を書き出す
- クレンジングしたデータは、重複チェック機能を使って正常テーブルに情報を書き出す
- 重複チェックに成功した場合には、必要なカラムだけを正常テーブルに書き出す
- 重複チェックに失敗した場合には、エラーカラムに「重複エラー」の情報を設定して、エラーテーブルに情報を書き出す
[12] | より厳密には、「エラーコードカラム」に対応するプロパティはユニオンモデルに不要です |
[13] | DMDLの利用方法は、 DMDLユーザーガイド を参照して下さい |
キャッシュ機能¶
ThunderGateでは、インポート時に前回インポートからの差分のみを転送する「キャッシュ機能」が提供されています。 ただし、キャッシュを利用するには主に以下のような制限があります。
- 条件式 (
getWhere()
)を利用できない - ロック (
getLockType()
)時に行単位のロックを指定できない - 同一のキャッシュを複数のジョブフローで同時に利用できない
詳しい利用方法や、利用時の注意などは Cache for ThunderGate を参照してください。