Java EasyBatch 実装サンプルを作成してみました。


j-easy/easy-batch: The simple, stupid batch framework for Java の example を作成してみました。
作った example について説明を記載します。


注意点

EasyBatch の version について

作成時期が 2016年1月になります。当時の EasyBatch の version は 4.0.0 で、現在 2017年10月時点での最新 version は 5.1.0 になります。
5.0.0 で パッケージ構成等に変更がありますので、5.0.0 以上を使用される方 は 参考情報レベルでご確認ください。

Echonest について

以下のようなメールを頂きました。
2016/05/29 日に Echonest API が使用できなくなりましたので、この Example も動作しないかと思います。

Greetings again from The Echo Nest Team at Spotify. If you are receiving this email we have noticed you are still making active API calls to The Echo Nest API.
As a follow-up to the news on March 29th, this is a friendly reminder that on Tuesday, May 31st, The Echo Nest platform will no longer serve requests and you will need to move over to the Spotify API. If you currently have an app that uses The Echo Nest API, check out the Spotify Web API. Thanks for helping us build the future of music together! The Echo Nest

Spotify Echo Nest API - Spotify Developer に Spotify API と統合される旨が記載されています。 試していないので推測ですが、API URL を見る限り、一部 API は生きているのかもしれません。
ただ、Example 内で使用している Artist API についてはエラーが返ってきたので、はおそらく動作しないかと思います。


Github repository

以下になります。
kemsakurai/easybatch-framework-example


実装した内容

本家と全く同じような処理を作成しても意味がないので、以下のような実装にしてみました。

  1. 本家では Twitter API と連携する Sample があるので、The Echo Nest の API と連携する。

  2. RecordMapper で 1. で取得した Artist 情報を別のオブジェクトに変換する。

  3. 2. で変換、作成したオブジェクトを csv に出力する。

  4. Job の処理結果を HTML レポートとして出力する。1


QuickStart

Echonest の account の取得は現在できません。
QuickStart


easybatch-framework-example の 説明

1. Main.java の説明

以下、Main クラスの記述抜粋です。

        // Execute jobs
        JobReport jobReport = JobBuilder.aNewJob()
                .reader(new EchonestReader())
                .mapper(new ArtistCsvRowMapper())
                .marshaller(new DelimitedRecordMarshaller(ArtistCsvRowMapper.Result.class, new String[]{"name", "familiarity", "hotttnesss"}))
                .writer(new FileRecordWriter("./outputs/echonest_result.csv"))
                .call();

        // Output Reports
        HtmlJobReportFormatter htmlJobReportFormatter = new HtmlJobReportFormatter();
        String html = htmlJobReportFormatter.formatReport(jobReport);
        try (FileWriter fw = new FileWriter("./outputs/job_report.html")) {
            fw.write(html);
        }

補足説明

  1. JobBuilder.aNewJob() で JobBuilder の生成を行います。new JobBuilder() でも動作に変わりはありません。

  2. JobBuilder#reader() で RecordReader を設定します。RecordReader で複数件データを読み込んでも、後続処理では、1件ずつ処理が行われます。2

  3. JobBuilder#mapper() で RecordMapper を設定します。
    RecordReader で読み込んだ Artist を、別オブジェクトに変換します。

  4. JobBuilder#marshaller() で、DelimitedRecordMarshaller を設定。
    オブジェクトを CSV 形式に変換します。使用可能な Marshaller は marshallers · j-easy/easy-batch Wiki に記載されています。
    DelimitedRecordMarshaller の変換元 Java クラスは、フィールド の getter , setter を実装する必要があり、未実装だと4.0.0ではエラーになります。

  5. JobBuilder#writer() で RecordWriter を設定します。CSV に書き込みたいので、FileRecordWriter を使用します。

  6. HtmlJobReportFormatter で、JobBuilder#call() の戻りの JobReport を HTML に変換して、File に書き出します。
    HTML レポートとして以下のようなファイルが出力されます。
    HTML Report


2. EchonestReader.java

以下、RecordReader クラス の記述の抜粋です。

/**
 * EchonestReader
 *
 * @author Kem
 */
public class EchonestReader implements RecordReader {

    private final List<Artist> artists = new ArrayList<>();
    private int index = 0;
    private EchoNestAPI en;
    private final String key = "XXXXXXXXXXXXXXXXX";

    public EchonestReader() {
        en = new EchoNestAPI(key);
    }

    public boolean hasNextRecord() {
        return index < artists.size();
    }

    public Long getTotalRecords() {
        return (long) artists.size();
    }

    public void open() throws RecordReaderOpeningException {
        int start = 0;
        while (start % 100 == 0) {
            try {
                Params param = new Params();
                param.add("genre", "hip house");
                param.add("start", start);
                param.add("results", 100);
                param.add("bucket", "familiarity");
                param.add("bucket", "hotttnesss");
                List<Artist> results = en.searchArtists(param);
                start += results.size();
                artists.addAll(results);
                try {
                    Thread.sleep(4000);
                } catch (InterruptedException ex) {
                    Logger.getLogger(EchonestReader.class.getName()).log(Level.SEVERE, null, ex);
                }
            } catch (EchoNestException ex) {
                Logger.getLogger(EchonestReader.class.getName()).log(Level.SEVERE, null, ex);
            }
        }
    }

    public Record readNextRecord() throws RecordReadingException {
        Artist artist = artists.get(index);
        Header header = new Header((long) index, artist.toString(), new Date());
        index++;
        return new GenericRecord<>(header, artist);
    }

    public String getDataSourceName() {
        return "Echonest result";
    }

    public void close() throws RecordReaderClosingException {
        en = null;
    }
}

補足説明

  1. RecordReader#open()
    データ取得を行います。EchonestReader は Echonest に対してジャンル hip house に属する artist を問い合わせします。
    問い合わせた後、4秒 sleep して、リクエスト制限を超えないようにしています。
    デフォルトの取得属性に加えて、familiarity (どれくらい一般的に知られているかを示す指標)、hotttnesss (どれくらいアツい artist かを示す指標) を取得します。

  2. RecordReader#hasNextRecord()
    次に処理すべきレコードがあるのかを boolean 値で返します。

  3. RecordReader#getTotalRecords()
    open() で読み込んだレコード総数を返します。

  4. RecordReader#readNextRecord()
    open() メソッドで読み込んだデータを1件ずつ取得します。
    取得して、次の1件へのロジックを記載する必要があります。

  5. RecordReader#getDataSourceName()
    データソース名を返します。ログに出力したり、Header名称として使用したりします。

  6. RecordReader#close()
    open() したものに対しての close 処理を記述します。
    jdbc リソースであれば、Connection#close()、ResultSet#close() を行う必要があります。


3. ArtistCsvRowMapper.java

/**
 * ArtistCsvRowMapper
 *
 * @author Kem
 */
public class ArtistCsvRowMapper implements RecordMapper<Record<Artist>, Record<ArtistCsvRowMapper.Result>> {

    @Override
    public GenericRecord<Result> processRecord(Record<Artist> record) throws RecordMappingException {
        Result result = null;
        try {
            Artist artist = record.getPayload();
            result = new Result();
            result.name = artist.getName();
            result.familiarity = String.valueOf(artist.getFamiliarity());
            result.hotttnesss = String.valueOf(artist.getHotttnesss());
        } catch (EchoNestException ex) {
            Logger.getLogger(ArtistCsvRowMapper.class.getName()).log(Level.SEVERE, null, ex);
        }
        Header header = record.getHeader();
        return new GenericRecord<>(header, result);
    }

    public static class Result {

        private String name;
        private String familiarity;
        private String hotttnesss;

        public String getName() {
            return name;
        }

        public void setName(String name) {
            this.name = name;
        }

        public String getFamiliarity() {
            return familiarity;
        }

        public void setFamiliarity(String familiarity) {
            this.familiarity = familiarity;
        }

        public String getHotttnesss() {
            return hotttnesss;
        }

        public void setHotttnesss(String hotttnesss) {
            this.hotttnesss = hotttnesss;
        }
    }
}

補足説明

  1. ArtistCsvRowMapper#processRecord()
    Record から Record へ変換するメソッドです。
    入力 Record の Record#getPayload() でオブジェクト実体を取得して、出力 Record に変換します。

  2. Result.java
    戻り値の出力 Record に設定するデータクラスです。


その他補足 と 感想

  • HtmlJobReportFormatter、FileRecordWriterを使用するには、以下のライブラリを追加する必要があります。以下、build.gradleの抜粋です。

        compile 'org.easybatch:easybatch-tools:4.0.0'
        compile 'org.easybatch:easybatch-flatfile:4.0.0'
    

  • Writer や Marshaller はライブラリで提供されているものを使いましたが、提供される実装は他にもあります。
    Home · j-easy/easy-batch WikiComponent reference のページに詳しく記載されています。

  • 拡張ライブラリもありますが、MongoDB の依存関係で pom 地獄に陥りました。 途中導入はたいへんかもしれません。

  • 個人的には設定ファイルレスは非常によいかと思います。書きやすいです。

  • reader()wirter() メソッドに 対象クラスを インスタンス化 して設定して、JOB定義を作る方式なので、guice 経由でインスタンス化すれば、DI も可能です。3


EasyBatch の Example を作成しました。
説明は以上です。


  1. HTML レポート、CSV は outputs ディレクトリ配下に出力されます。 

  2. 5.1.0 だと、batchSize() で処理件数を設定できるようです。Home · j-easy/easy-batch Wiki 

  3. DI を実行する JobBuilder を作ってもよいかもしれません。 

コメント