こちらのページで環境構築した Spring Boot でバッチ処理アプリケーションを作成します。内部的に Spring Batch を利用します。CSV ファイルを読み込んで、文字列加工して、MySQL DB に出力するバッチ処理です。
公式ドキュメント
.
|-- build.gradle
|-- gradle
| `-- wrapper
| |-- gradle-wrapper.jar
| `-- gradle-wrapper.properties
|-- gradlew
|-- gradlew.bat
`-- src
`-- main
|-- java
| `-- hello
| |-- Application.java
| |-- BatchConfiguration.java
| |-- JobCompletionNotificationListener.java
| |-- Person.java
| `-- PersonItemProcessor.java
`-- resources
|-- application.properties
|-- sample-data.csv
`-- schema-all.sql
設定内容についてはこちらをご参照ください。今回は Spring Batch を直接利用する場合は想定しておらず、あくまでも、Spring Batch を内部的に利用した Spring Boot アプリケーションです。
buildscript {
ext {
springBootVersion = '1.5.3.RELEASE'
}
repositories {
mavenCentral()
}
dependencies {
classpath("org.springframework.boot:spring-boot-gradle-plugin:${springBootVersion}")
}
}
apply plugin: 'java'
apply plugin: 'eclipse'
apply plugin: 'idea'
apply plugin: 'org.springframework.boot'
jar {
baseName = 'gs-batch-processing'
version = '0.1.0'
}
repositories {
mavenCentral()
}
sourceCompatibility = 1.8
targetCompatibility = 1.8
dependencies {
// Spring Batch を内部的に利用します。
// 今回は Web アプリケーションではないため `spring-boot-starter-web` は不要です。
compile('org.springframework.boot:spring-boot-starter-batch')
// http://search.maven.org/#artifactdetails|mysql|mysql-connector-java|6.0.6|jar
compile('mysql:mysql-connector-java:6.0.6')
}
今回のサンプル入力となる CSV ファイルです。
Jill,Doe
Joe,Doe
Justin,Doe
Jane,Doe
John,Doe
出力先 MySQL DB への接続情報です。Spring Boot では、こちらのページに記載の優先順位でプロパティを読み込みます。今回は特に、「15. Application properties packaged inside your jar (JAR 内のアプリケーション設定ファイル)」を利用していることになります。ちなみに、「14. Application properties outside of your packaged jar (JAR 外のアプリケーション設定ファイル)」の方が優先順位が高いため、実行時に設定を上書きできます。設定項目の一覧はこちらです。
spring.datasource.url=jdbc:mysql://localhost:3306/mydb
spring.datasource.username=myuser
spring.datasource.password=myuser
spring.datasource.driver-class-name=com.mysql.jdbc.Driver
application.yml を利用することもできます。
spring:
datasource:
url: jdbc:mysql://localhost:3306/mydb
username: myuser
password: myuser
driver-class-name: com.mysql.jdbc.Driver
出力先 MySQL テーブルの DDL です。こちらのページに記載されているとおり、schema-all.sql
というファイル名の SQL がアプリケーション起動時に実行されます。より柔軟かつ高度な初期化処理が必要な場合は Flyway と連携するように設定します。
DROP TABLE IF EXISTS people;
CREATE TABLE people (
person_id BIGINT NOT NULL AUTO_INCREMENT,
first_name VARCHAR(20),
last_name VARCHAR(20),
PRIMARY KEY (person_id)
);
先程記載した優先順位に関するページで紹介されている @Value
アノテーションを利用すると application.properties
などで設定したプロパティをメンバ変数に設定して利用できます。Lombok の @Value と区別して利用します。
package hello;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.web.bind.annotation.RequestMapping;
@RestController
public class HelloController {
@Value("${spring.datasource.url}")
private String datasourceUrl;
@RequestMapping("/")
public String index() {
System.out.println(datasourceUrl);
return "Greetings from Spring Boot! " + datasourceUrl;
}
}
mydb.people テーブル内の 1 レコードに対応するクラスです。
package hello;
public class Person {
private String firstName;
private String lastName;
public Person() {
}
public Person(String firstName, String lastName) {
this.firstName = firstName;
this.lastName = lastName;
}
public void setFirstName(String firstName) {
this.firstName = firstName;
}
public void setLastName(String lastName) {
this.lastName = lastName;
}
public String getFirstName() {
return firstName;
}
public String getLastName() {
return lastName;
}
@Override
public String toString() {
return "firstName: " + firstName + ", lastName: " + lastName;
}
}
一連バッチ処理「Read → Process → Write」における Process 処理のためのクラスです。Spring Batch が提供する ItemProcessor
インターフェースを実装します。
package hello;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.item.ItemProcessor;
public class PersonItemProcessor implements ItemProcessor<Person, Person> {
private static final Logger log = LoggerFactory.getLogger(PersonItemProcessor.class);
@Override
public Person process(final Person person) throws Exception {
final String firstName = person.getFirstName().toUpperCase();
final String lastName = person.getLastName().toUpperCase();
final Person transformedPerson = new Person(firstName, lastName);
log.info("Converting (" + person + ") into (" + transformedPerson + ")");
return transformedPerson;
}
}
Spring Boot のエントリーポイントとなるクラスです。アノテーションの意味については、こちらをご参照ください。
package hello;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class Application {
public static void main(String[] args) throws Exception {
SpringApplication.run(Application.class, args);
}
}
必須ではありませんが、以下のように記述することで、バッチ処理が完了した後に実行する処理を設定できます。spring-jdbc を利用して DB からデータを SELECT しています。
package hello;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.core.BatchStatus;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.listener.JobExecutionListenerSupport;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.jdbc.core.RowMapper;
import org.springframework.stereotype.Component;
@Component
public class JobCompletionNotificationListener extends JobExecutionListenerSupport {
private static final Logger log = LoggerFactory.getLogger(JobCompletionNotificationListener.class);
private final JdbcTemplate jdbcTemplate;
@Autowired
public JobCompletionNotificationListener(JdbcTemplate jdbcTemplate) {
this.jdbcTemplate = jdbcTemplate;
}
@Override
public void afterJob(JobExecution jobExecution) {
if(jobExecution.getStatus() == BatchStatus.COMPLETED) {
log.info("!!! JOB FINISHED! Time to verify the results");
List<Person> results = jdbcTemplate.query("SELECT first_name, last_name FROM people", new RowMapper<Person>() {
@Override
public Person mapRow(ResultSet rs, int row) throws SQLException {
return new Person(rs.getString(1), rs.getString(2));
}
});
for (Person person : results) {
log.info("Found <" + person + "> in the database.");
}
}
}
}
バッチ処理を設定するクラスです。
package hello;
import javax.sql.DataSource;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobExecutionListener;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.batch.item.database.BeanPropertyItemSqlParameterSourceProvider;
import org.springframework.batch.item.database.JdbcBatchItemWriter;
import org.springframework.batch.item.file.FlatFileItemReader;
import org.springframework.batch.item.file.mapping.BeanWrapperFieldSetMapper;
import org.springframework.batch.item.file.mapping.DefaultLineMapper;
import org.springframework.batch.item.file.transform.DelimitedLineTokenizer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.ClassPathResource;
import org.springframework.jdbc.core.JdbcTemplate;
@Configuration
@EnableBatchProcessing
public class BatchConfiguration {
@Autowired
public JobBuilderFactory jobBuilderFactory;
@Autowired
public StepBuilderFactory stepBuilderFactory;
@Autowired
public DataSource dataSource;
// reader/processor/writer
@Bean
public FlatFileItemReader<Person> reader() {
FlatFileItemReader<Person> reader = new FlatFileItemReader<Person>();
reader.setResource(new ClassPathResource("sample-data.csv"));
reader.setLineMapper(new DefaultLineMapper<Person>() {{
setLineTokenizer(new DelimitedLineTokenizer() {{
setNames(new String[] { "firstName", "lastName" });
}});
setFieldSetMapper(new BeanWrapperFieldSetMapper<Person>() {{
setTargetType(Person.class);
}});
}});
return reader;
}
@Bean
public PersonItemProcessor processor() {
return new PersonItemProcessor();
}
@Bean
public JdbcBatchItemWriter<Person> writer() {
JdbcBatchItemWriter<Person> writer = new JdbcBatchItemWriter<Person>();
writer.setItemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<Person>());
writer.setSql("INSERT INTO people (first_name, last_name) VALUES (:firstName, :lastName)");
writer.setDataSource(dataSource);
return writer;
}
// job/step
@Bean
public Job importUserJob(JobCompletionNotificationListener listener) {
return jobBuilderFactory.get("importUserJob")
.incrementer(new RunIdIncrementer())
.listener(listener)
.flow(step1())
.end()
.build();
}
@Bean
public Step step1() {
return stepBuilderFactory.get("step1")
.<Person, Person> chunk(10)
.reader(reader())
.processor(processor())
.writer(writer())
.build();
}
}
項目 | 概要 |
---|---|
@Configuration |
@Bean 定義を行うクラスに設定します。 |
@EnableBatchProcessing |
Spring Batch のバッチ処理を定義するクラスに設定します。 |
@Autowired |
他のクラスの Bean 定義で生成されたシングルトンを関連付けて利用できるようにします。 |
@Bean |
@Configuration が設定されたクラスのメソッドに対して設定できます。@Bean 設定されたメソッドはインスタンスを返します。これは設定値をもつシングルトンとして、アプリケーション全体で利用できます。 |
./gradlew build && java -jar build/libs/gs-batch-processing-0.1.0.jar
(or ./gradlew bootRun)
実行結果は以下のとおりです。すべて大文字化されて、レコードとして格納されました。
mysql> show tables from mydb;
+------------------------------+
| Tables_in_mydb |
+------------------------------+
| BATCH_JOB_EXECUTION |
| BATCH_JOB_EXECUTION_CONTEXT |
| BATCH_JOB_EXECUTION_PARAMS |
| BATCH_JOB_EXECUTION_SEQ |
| BATCH_JOB_INSTANCE |
| BATCH_JOB_SEQ |
| BATCH_STEP_EXECUTION |
| BATCH_STEP_EXECUTION_CONTEXT |
| BATCH_STEP_EXECUTION_SEQ |
| people | ←結果
+------------------------------+
10 rows in set (0.00 sec)
mysql> select * from mydb.people;
+-----------+------------+-----------+
| person_id | first_name | last_name |
+-----------+------------+-----------+
| 1 | JILL | DOE |
| 2 | JOE | DOE |
| 3 | JUSTIN | DOE |
| 4 | JANE | DOE |
| 5 | JOHN | DOE |
+-----------+------------+-----------+
5 rows in set (0.00 sec)