18. ジョブフローの作成¶
このチュートリアルでは、バッチアプリケーションのデータフローを定義する ジョブフロー を作成する方法を説明していきます。
18.1. ジョブフロー¶
ジョブフローは外部システムからデータを読み出し、データを加工して、外部システムにデータを書き戻す、という一連のデータフローを記述します。
フロー部品の作成 の繰り返しになりますが、ジョブフローでは以下の3つの定義を行います。
- 外部入力の定義
- バッチアプリケーションの入力データをどのようにして外部システムから取り込むかを定義します。 データソースの種類(ファイルやデータベース)、データモデルの種類や想定されるデータサイズ、データの配置場所やフィルタルールなどの指定を行います。
- 外部出力の定義
- バッチアプリケーションの出力データをどのようにして外部システムに書き戻すかを定義します。 データソースの種類(ファイルやデータベース)、データモデルの種類、データの配置場所や配置形式(ファイル名やファイル分割のルール)などの指定を行います。
- データフローの定義
- 演算子を組み合わせて一連のデータ処理の流れを定義します。 ジョブフローではこれに加えて、外部入力と演算子の流れ、および演算子と外部出力の流れも記述します。
これらの実装のために、ジョブフローでは以下のコンポーネントを作成します。
- 外部入力の定義 - インポータ記述を作成する
- 外部出力の定義 - エクスポータ記述を作成する
- データフローの定義 - ジョブフロークラスを作成する
なお、外部入出力については サンプルアプリケーションの説明 - ファイル配置と編成 の仕様に従って定義していきます。
18.2. インポータ記述を作成する¶
インポータ記述 は外部システムから入力データをどのように取り込むかをデータモデルごとに指定します。
インポータ記述はJavaのインターフェース ImporterDescription [1] を実装したクラスとして作成します。
インポータ記述は DMDLのコンパイル で生成した ジョブフロー入出力基底クラス に含まれる、インポータ記述の基底クラスを継承して作成すると便利です。
このクラスは ImporterDescription を実装した抽象クラスで、各データモデルを扱うための標準的な実装を提供します。
ここではジョブフローの入力となる売上明細、店舗マスタ、商品マスタに対してそれぞれインポータ記述を定義します。
| [1] | com.asakusafw.vocabulary.external.ImporterDescription |
18.2.1. 売上明細のインポータ記述¶
ではまず、売上明細のインポータ記述を作成します。
インポータ記述はプロジェクトのソースフォールダ src/main/java 配下に任意のJavaパッケージ名とクラス名を持つクラスとして作成できます。
ここでは、以下のようにインポータ記述を作成します。
| パッケージ名 | com.example.jobflow |
| クラス名 | SalesDetailFromCsv |
| 基底クラス | com.example.modelgen.dmdl.csv.AbstractSalesDetailCsvInputDescription |
基底クラス AbstractSalesDetailCsvInputDescription は DMDLのコンパイル によって生成されたインポータ記述用の基底クラスです。
この基底クラスにはいくつかの抽象メソッドが含まれるので、このクラスで実装していきます。
抽象メソッドを仮にメソッド定義のみを行った状態は、以下のようになります。
package com.example.jobflow;
import com.example.modelgen.dmdl.csv.AbstractSalesDetailCsvInputDescription;
public class SalesDetailFromCsv extends AbstractSalesDetailCsvInputDescription {
@Override
public String getBasePath() {
}
@Override
public String getResourcePattern() {
}
}
18.2.1.1. 入力ファイルのベースパス¶
インポータ記述のメソッド getBasePath には、このインポータ記述が取り扱う入力ファイルの基底となる論理的なパスを戻り値として指定します。
@Override
public String getBasePath() {
return "sales";
}
このパスはあくまで論理的なもので、バッチアプリケーション実行時にファイルを読み込む際には、 Direct I/Oの設定に基づいてベースパスからファイルシステム上のパスに解決されます。
たとえば、 ベースパス sales はバッチアプリケーションの実行時にHadoopファイルシステム上のパス /user/asakusa/target/testing/directio/sales にマッピングされ、
このディレクトリ配下のファイルを読み込む、といったように動作します。
18.2.1.2. 入力ファイル名のパターン¶
インポータ記述のメソッド getResourcePattern には、このインポータ記述が取り扱う入力ファイル名のパターンを戻り値として指定します。
@Override
public String getResourcePattern() {
return "**/${date}.csv";
}
ここにはファイル名を表す文字列のほか、バッチ実行時の引数 ${arg} やワイルドカード * などのパターン用の文字列も利用できます。
ここでの設定 **/{date}.csv は、ベースパスに対してすべてのサブディレクトリ配下の、バッチ引数 ${date} で指定された日付文字列を含む、拡張子 .csv のファイルにマッチします。
例えば バッチ引数に date=2011-04-01 と指定した場合、 2011-04-01.csv , 2011/2011-04-01.csv などのファイルにマッチします。
18.2.1.3. 入力ファイルの推定データサイズ¶
オプションの設定項目として、インポータ記述が扱うデータに対する推定データサイズを指定することで、データサイズに応じた最適化が行われます。
インポータ記述のメソッド getDataSize には、入力データの推定データサイズを列挙型 DataSize [2] を使って指定します。
@Override
public DataSize getDataSize() {
return DataSize.LARGE;
}
ここでは売上明細を大きめな入力データと推定するため DataSize.LARGE を指定します。
適切なデータサイズを指定することで、特に結合処理におけるバッチアプリケーションのパフォーマンスが向上する可能性があるため、 この項目はなるべく設定するとよいでしょう。
| [2] | com.asakusafw.vocabulary.external.ImporterDescription.DataSize |
18.2.2. 店舗マスタのインポータ記述¶
売上明細のインポータ記述 と同様の手順で、店舗マスタのインポータ記述を作成しましょう。
ここでは、以下の値を参考に、店舗マスタのインポータ記述を作成してみてください。
| クラス名 | StoreInfoFromCsv |
| 基底クラス | com.example.modelgen.dmdl.csv.AbstractStoreInfoCsvInputDescription |
| ベースパス | master |
| リソースパターン | store_info.csv |
| データサイズ | DataSize.TINY (とても小さい) |
18.2.3. 商品マスタのインポータ記述¶
同様にして、以下の値を参考に商品マスタのインポータ記述を作成してみてください。
| クラス名 | ItemInfoFromCsv |
| 基底クラス | com.example.modelgen.dmdl.csv.AbstractItemInfoCsvInputDescription |
| ベースパス | master |
| リソースパターン | store_info.csv |
| データサイズ | DataSize.LARGE (大きい) |
18.3. エクスポータ記述を作成する¶
エクスポータ記述 は外部システムに対して出力データをどのように書き戻すかをデータモデルごとに指定します。
エクスポータ記述はJavaのインターフェース ExporterDescription [3] を実装したクラスとして作成します。
エクスポータ記述は DMDLのコンパイル で生成した ジョブフロー入出力基底クラス に含まれる、エクスポータ記述の基底クラスを継承して作成すると便利です。
このクラスは ExporterDescription を実装した抽象クラスで、各データモデルを扱うための標準的な実装を提供します。
ここではジョブフローの出力となるカテゴリ別売上集計、エラー情報に対してそれぞれエクスポータ記述を定義します。
| [3] | com.asakusafw.vocabulary.external.ExporterDescription |
18.3.1. カテゴリ別売上集計のエクスポータ記述¶
カテゴリ別売上集計のエクスポータ記述を作成します。
エクスポータ記述はプロジェクトのソースフォルダ src/main/java 配下に任意のJavaパッケージ名とクラス名を持つクラスとして作成できます。
ここでは、以下のようにエクスポータ記述を作成します。
| パッケージ名 | com.example.jobflow |
| クラス名 | CategorySummaryToCsv |
| 基底クラス | com.example.modelgen.dmdl.csv.AbstractCategorySummaryCsvOutputDescription |
基底クラス AbstractCategorySummaryCsvOutputDescription は DMDLのコンパイル によって生成されたエクスポータ記述用の基底クラスです。
この基底クラスにはいくつかの抽象メソッドが含まれるので、このクラスで実装していきます。
抽象メソッドを仮にメソッド定義のみを行った状態は、以下のようになります。
package com.example.jobflow;
import com.example.modelgen.dmdl.csv.AbstractCategorySummaryCsvOutputDescription;
public class CategorySummaryToCsv extends AbstractCategorySummaryCsvOutputDescription {
@Override
public String getBasePath() {
}
@Override
public String getResourcePattern() {
}
}
18.3.1.1. 出力ファイルのベースパス¶
エクスポータ記述のメソッド getBasePath には、このエクスポータ記述が取り扱う出力ファイルの基底となる論理的なパスを戻り値として指定します。
ベースパスの仕組みは 入力ファイルのベースパス と同様です。
@Override
public String getBasePath() {
return "result/category";
}
18.3.1.2. 出力ファイル名のパターン¶
エクスポータ記述のメソッド getResourcePattern には、このエクスポータ記述が取り扱う出力ファイル名のパターンを戻り値として指定します。
@Override
public String getResourcePattern() {
return "result.csv";
}
ここにはファイル名を表す文字列のほか、バッチ実行時の引数 ${arg} やワイルドカード * などのパターン用の文字列も利用できます。
Hint
出力ファイル名にパターンを利用することは、パフォーマンスの観点でも重要になることがあります。 例えば大容量のファイルを出力する際には、1つのファイルとして出力するよりもワイルドカードやその他のパターンを利用して適切にファイルを分割することで、処理時間が短縮できることが多いでしょう。
18.3.1.3. 出力データのソート¶
オプションの設定項目として、出力データのソートに関する指定を行うことができます。
エクスポータ記述のメソッド getOrder では、出力ファイルをソートするためのキーとなるプロパティ名を戻り値として指定します。
@Override
public List<String> getOrder() {
return Arrays.asList("-selling_price_total");
}
それぞれのプロパティには接頭辞 + ( +property_name ) を付与することで昇順、接頭辞 - ( -property_name ) を指定することで降順を表します。
戻り値はリスト形式で、複数のソートキーを指定することもできます。
ここでは、売上合計プロパティ selling_price_total の降順でソートしています。
18.3.1.4. 出力前の削除ファイルのパターン¶
エクスポータ記述のメソッド getDeletePatterns では、出力を行う前に削除するファイル名パターンの一覧を戻り値に指定します。
出力ファイルのベースパス で指定したパスを起点に、これらのパターンが表すパスに含まれるファイルを消去した後に、実行結果の出力を行います。
パターンには * (ワイルドカード) なども利用することができます。
@Override
public List<String> getDeletePatterns() {
return Arrays.asList("*");
}
この設定はバッチを複数回実行する際に、前回の出力結果が混ざらないようにするために利用することを意図しています。 特に 出力ファイル名のパターン を利用する場合、定義の内容によってはファイル名が出力の都度変わるため、 前回実行したバッチの出力をクリーニングしないと意図せずに前回実行したバッチの出力と結果が混ざってしまう場合があります。
この項目はオプションの設定項目ですが、特別な理由がない限りは設定しておくことを推奨します。
18.3.2. エラー情報のエクスポータ記述¶
カテゴリ別売上集計のエクスポータ記述 と同様の手順で、エラー情報のエクスポータ記述を作成しましょう。
ここでは、以下の値を参考に、エラー情報のエクスポータ記述を作成してみてください。
| クラス名 | ErrorRecordToCsv |
| 基底クラス | com.example.modelgen.dmdl.csv.AbstractErrorRecordCsvOutputDescription |
| ベースパス | result/error |
| リソースパターン | ${date}.csv |
| ソート順 | ファイル名の昇順 |
| 削除パターン | ベースパス配下のすべてのファイル |
18.4. ジョブフロークラスを作成する¶
ジョブフローを構築するためのクラスを ジョブフロークラス と呼びます。
ジョブフロークラスはプロジェクトのソースフォルダ src/main/java 配下に任意のJavaパッケージ名とクラス名を持つクラスとして作成できます。
ここでは、以下のようにジョブフロークラスを作成します。
| パッケージ名 | com.example.jobflow |
| クラス名 | CategorySummaryJob |
ジョブフロークラスは、以下のように宣言します。
publicスコープを指定したクラスとして作成する- Flow DSL用の親クラス
FlowDescription[4] を継承する - ジョブフロークラスであることを示す注釈
JobFlow[5] を指定し、要素nameにこのジョブフローの名前を指定する
ジョブフロークラスはフロー部品クラスと似たような構成ですが、
ジョブフロークラスであることを示す注釈 JobFlow を指定し、要素 name にこのジョブフローの名前を指定する点が異なります。
要素 name は任意の名前を使用できますが、バッチアプリケーションごとにユニークな値を指定する必要があります。
この値は、テストドライバーによるジョブフローのテスト時に利用するほか、
バッチアプリケーション実行時に フローID ( flow_id )という名前でログなどに出力されます。
FlowDescription を継承したクラスは抽象メソッド describe を実装する必要があります。
describe メソッドの実装については後述の フロー記述メソッドを作成する で説明しますが、ここではとりあえず空のメソッドを定義しておきます。
作成したジョブフロークラスは、以下のようになります。
package com.example.jobflow;
import com.asakusafw.vocabulary.flow.FlowDescription;
import com.asakusafw.vocabulary.flow.JobFlow;
@JobFlow(name = "byCategory")
public class CategorySummaryJob extends FlowDescription {
@Override
protected void describe() {
}
}
| [4] | com.asakusafw.vocabulary.flow.FlowDescription |
| [5] | com.asakusafw.vocabulary.flow.JobFlow |
18.5. ジョブフローコンスタクタを作成する¶
ジョブフロークラスのコンストラクタには、このジョブフローの入出力を表すインポータ記述とエクスポータ記述の指定を行います。
このジョブフローでは、 このチュートリアルで作成した以下のインポータ記述とエクスポータ記述を指定します。
- インポータ記述
- エクスポータ記述
これに基づいて作成したジョブフローコンストラクタは、以下のようになります。
...
import com.asakusafw.vocabulary.flow.Export;
import com.asakusafw.vocabulary.flow.Import;
import com.asakusafw.vocabulary.flow.In;
import com.asakusafw.vocabulary.flow.Out;
import com.example.modelgen.dmdl.model.CategorySummary;
import com.example.modelgen.dmdl.model.ErrorRecord;
import com.example.modelgen.dmdl.model.ItemInfo;
import com.example.modelgen.dmdl.model.SalesDetail;
import com.example.modelgen.dmdl.model.StoreInfo;
@JobFlow(name = "byCategory")
public class CategorySummaryJob extends FlowDescription {
final In<SalesDetail> salesDetail;
final In<StoreInfo> storeInfo;
final In<ItemInfo> itemInfo;
final Out<CategorySummary> categorySummary;
final Out<ErrorRecord> errorRecord;
public CategorySummaryJob(
@Import(name = "salesDetail", description = SalesDetailFromCsv.class)
In<SalesDetail> salesDetail,
@Import(name = "storeInfo", description = StoreInfoFromCsv.class)
In<StoreInfo> storeInfo,
@Import(name = "itemInfo", description = ItemInfoFromCsv.class)
In<ItemInfo> itemInfo,
@Export(name = "categorySummary", description = CategorySummaryToCsv.class)
Out<CategorySummary> categorySummary,
@Export(name = "errorRecord", description = ErrorRecordToCsv.class)
Out<ErrorRecord> errorRecord) {
this.salesDetail = salesDetail;
this.storeInfo = storeInfo;
this.itemInfo = itemInfo;
this.categorySummary = categorySummary;
this.errorRecord = errorRecord;
}
まず、フロー部品と同様にジョブフロークラスにデータフローの入出力を保持するインスタンスフィールドを作成します。
final In<SalesDetail> salesDetail;
final In<StoreInfo> storeInfo;
final In<ItemInfo> itemInfo;
final Out<CategorySummary> categorySummary;
final Out<ErrorRecord> errorRecord;
コンストラクタでは、仮引数にこのフロー部品が受け取る入力と出力を宣言します。 ここまではフロー部品と同様です。
public CategorySummaryJob(
In<SalesDetail> salesDetail,
In<StoreInfo> storeInfo,
In<ItemInfo> itemInfo,
Out<CategorySummary> categorySummary,
Out<ErrorRecord> errorRecord) {
}
そして、引数の入力に対して注釈 Import [6] を付与し、要素 name に入力に対する任意の名前を、要素 description に インポータ記述のクラスリテラルを指定します。
ここで指定したインポート処理の結果が、この入力を通して利用できます。
public CategorySummaryJob(
@Import(name = "salesDetail", description = SalesDetailFromCsv.class)
In<SalesDetail> salesDetail,
@Import(name = "storeInfo", description = StoreInfoFromCsv.class)
In<StoreInfo> storeInfo,
@Import(name = "itemInfo", description = ItemInfoFromCsv.class)
In<ItemInfo> itemInfo,
Out<CategorySummary> categorySummary,
Out<ErrorRecord> errorRecord) {
}
同様に、引数の出力に対して注釈 Export [7] を付与し、要素 name に出力に対する任意の名前を、要素 description に エクスポータ記述のクラスリテラルを指定します。
この出力に対するジョブフローの実行結果が、エクスポート処理によって外部システムに書き出されるようになります。
public CategorySummaryJob(
@Import(name = "salesDetail", description = SalesDetailFromCsv.class)
In<SalesDetail> salesDetail,
@Import(name = "storeInfo", description = StoreInfoFromCsv.class)
In<StoreInfo> storeInfo,
@Import(name = "itemInfo", description = ItemInfoFromCsv.class)
In<ItemInfo> itemInfo,
@Export(name = "categorySummary", description = CategorySummaryToCsv.class)
Out<CategorySummary> categorySummary,
@Export(name = "errorRecord", description = ErrorRecordToCsv.class)
Out<ErrorRecord> errorRecord) {
}
コンストラクタの本体では、引数で受け取った入力と出力をインスタンスフィールドに代入します。
public CategorySummaryJob(
@Import(name = "salesDetail", description = SalesDetailFromCsv.class)
In<SalesDetail> salesDetail,
@Import(name = "storeInfo", description = StoreInfoFromCsv.class)
In<StoreInfo> storeInfo,
@Import(name = "itemInfo", description = ItemInfoFromCsv.class)
In<ItemInfo> itemInfo,
@Export(name = "categorySummary", description = CategorySummaryToCsv.class)
Out<CategorySummary> categorySummary,
@Export(name = "errorRecord", description = ErrorRecordToCsv.class)
Out<ErrorRecord> errorRecord) {
this.salesDetail = salesDetail;
this.storeInfo = storeInfo;
this.itemInfo = itemInfo;
this.categorySummary = categorySummary;
this.errorRecord = errorRecord;
}
| [6] | com.asakusafw.vocabulary.flow.Import |
| [7] | com.asakusafw.vocabulary.flow.Export |
18.6. フロー記述メソッドを作成する¶
ジョブフローで扱うデータフローの処理内容は、フロー部品と同様にフロー記述メソッド describe に記述します。
@Override
protected void describe() {
}
このフロー記述メソッド describe はフロー部品と記述方法は全く同じですが、
ジョブフローのフロー記述メソッドで扱う入力はインポータ記述の定義内容に従って外部入力となること、
同様に出力はエクスポータ記述の定義内容に従った外部出力になる点がフロー部品と異なります。
このチュートリアルではデータフローの処理は フロー部品の作成 でフロー部品として作成してあるため、 ここでのフロー記述メソッドはフロー部品から生成されたフロー演算子を呼び出す処理のみを記述します。
18.6.1. 演算子ファクトリを用意する¶
ここでは フロー部品のコンパイル で作成したフロー部品用の演算子ファクトリクラス CategorySummaryFlowPartFactory を生成します。
...
import com.example.flowpart.CategorySummaryFlowPartFactory;
...
@Override
protected void describe() {
CategorySummaryFlowPartFactory flowPartFactory = new CategorySummaryFlowPartFactory();
}
18.6.2. 入力と演算子を接続する¶
このデータフローの入力である売上明細、店舗マスタ、商品マスタを指定してフロー部品を実行します。
フロー部品の演算子ファクトリクラスに対しては create メソッドを呼び出すことで該当のフロー部品を実行するデータフローを記述することができます。
...
import com.example.flowpart.CategorySummaryFlowPartFactory.CategorySummaryFlowPart;
...
@Override
protected void describe() {
CategorySummaryFlowPartFactory flowPartFactory = new CategorySummaryFlowPartFactory();
CategorySummaryFlowPart flowPart = flowPartFactory.create(salesDetail, storeInfo, itemInfo);
}
18.6.3. 演算子と出力を接続する¶
このデータフローの出力であるカテゴリ別売上明細とエラー情報に対して、それぞれフロー部品の出力ポートを指定してデータフローを構築します。
@Override
protected void describe() {
CategorySummaryFlowPartFactory flowPartFactory = new CategorySummaryFlowPartFactory();
CategorySummaryFlowPart flowPart = flowPartFactory.create(salesDetail, storeInfo, itemInfo);
categorySummary.add(flowPart.categorySummary);
errorRecord.add(flowPart.errorRecord);
}
これでジョブフローからフロー部品を呼び出すデータフローが完成しました。
18.7. 終わりに¶
このチュートリアル終了時点のジョブフロー関連クラスは、次のようになります。
18.7.1. インポータ記述¶
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 | package com.example.jobflow;
import com.example.modelgen.dmdl.csv.AbstractSalesDetailCsvInputDescription;
public class SalesDetailFromCsv extends AbstractSalesDetailCsvInputDescription {
@Override
public String getBasePath() {
return "sales";
}
@Override
public String getResourcePattern() {
return "**/${date}.csv";
}
@Override
public DataSize getDataSize() {
return DataSize.LARGE;
}
}
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 | package com.example.jobflow;
import com.example.modelgen.dmdl.csv.AbstractStoreInfoCsvInputDescription;
public class StoreInfoFromCsv extends AbstractStoreInfoCsvInputDescription {
@Override
public String getBasePath() {
return "master";
}
@Override
public String getResourcePattern() {
return "store_info.csv";
}
@Override
public DataSize getDataSize() {
return DataSize.TINY;
}
}
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 | package com.example.jobflow;
import com.example.modelgen.dmdl.csv.AbstractItemInfoCsvInputDescription;
public class ItemInfoFromCsv extends AbstractItemInfoCsvInputDescription {
@Override
public String getBasePath() {
return "master";
}
@Override
public String getResourcePattern() {
return "item_info.csv";
}
@Override
public DataSize getDataSize() {
return DataSize.LARGE;
}
}
|
18.7.2. エクスポータ記述¶
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 | package com.example.jobflow;
import java.util.Arrays;
import java.util.List;
import com.example.modelgen.dmdl.csv.AbstractCategorySummaryCsvOutputDescription;
public class CategorySummaryToCsv extends AbstractCategorySummaryCsvOutputDescription {
@Override
public String getBasePath() {
return "result/category";
}
@Override
public String getResourcePattern() {
return "result.csv";
}
@Override
public List<String> getOrder() {
return Arrays.asList("-selling_price_total");
}
@Override
public List<String> getDeletePatterns() {
return Arrays.asList("*");
}
}
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 | package com.example.jobflow;
import java.util.Arrays;
import java.util.List;
import com.example.modelgen.dmdl.csv.AbstractErrorRecordCsvOutputDescription;
public class ErrorRecordToCsv extends AbstractErrorRecordCsvOutputDescription {
@Override
public String getBasePath() {
return "result/error";
}
@Override
public String getResourcePattern() {
return "${date}.csv";
}
@Override
public List<String> getOrder() {
return Arrays.asList("+file_name");
}
@Override
public List<String> getDeletePatterns() {
return Arrays.asList("*");
}
}
|
18.7.3. ジョブフロークラス¶
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 | package com.example.jobflow;
import com.asakusafw.vocabulary.flow.Export;
import com.asakusafw.vocabulary.flow.FlowDescription;
import com.asakusafw.vocabulary.flow.Import;
import com.asakusafw.vocabulary.flow.In;
import com.asakusafw.vocabulary.flow.JobFlow;
import com.asakusafw.vocabulary.flow.Out;
import com.example.flowpart.CategorySummaryFlowPartFactory;
import com.example.flowpart.CategorySummaryFlowPartFactory.CategorySummaryFlowPart;
import com.example.modelgen.dmdl.model.CategorySummary;
import com.example.modelgen.dmdl.model.ErrorRecord;
import com.example.modelgen.dmdl.model.ItemInfo;
import com.example.modelgen.dmdl.model.SalesDetail;
import com.example.modelgen.dmdl.model.StoreInfo;
@JobFlow(name = "byCategory")
public class CategorySummaryJob extends FlowDescription {
final In<SalesDetail> salesDetail;
final In<StoreInfo> storeInfo;
final In<ItemInfo> itemInfo;
final Out<CategorySummary> categorySummary;
final Out<ErrorRecord> errorRecord;
public CategorySummaryJob(
@Import(name = "salesDetail", description = SalesDetailFromCsv.class)
In<SalesDetail> salesDetail,
@Import(name = "storeInfo", description = StoreInfoFromCsv.class)
In<StoreInfo> storeInfo,
@Import(name = "itemInfo", description = ItemInfoFromCsv.class)
In<ItemInfo> itemInfo,
@Export(name = "categorySummary", description = CategorySummaryToCsv.class)
Out<CategorySummary> categorySummary,
@Export(name = "errorRecord", description = ErrorRecordToCsv.class)
Out<ErrorRecord> errorRecord) {
this.salesDetail = salesDetail;
this.storeInfo = storeInfo;
this.itemInfo = itemInfo;
this.categorySummary = categorySummary;
this.errorRecord = errorRecord;
}
@Override
protected void describe() {
CategorySummaryFlowPartFactory flowPartFactory = new CategorySummaryFlowPartFactory();
CategorySummaryFlowPart flowPart = flowPartFactory.create(salesDetail, storeInfo, itemInfo);
categorySummary.add(flowPart.categorySummary);
errorRecord.add(flowPart.errorRecord);
}
}
|