Notice
Recent Posts
Recent Comments
Link
| 일 | 월 | 화 | 수 | 목 | 금 | 토 |
|---|---|---|---|---|---|---|
| 1 | ||||||
| 2 | 3 | 4 | 5 | 6 | 7 | 8 |
| 9 | 10 | 11 | 12 | 13 | 14 | 15 |
| 16 | 17 | 18 | 19 | 20 | 21 | 22 |
| 23 | 24 | 25 | 26 | 27 | 28 | 29 |
| 30 |
Tags
- scala
- coursera
- Elk
- gradle
- enablekafkastreams
- schema registry
- Spring
- spring-cloud-stream
- spring-kafka
- springboot
- scala 2.10
- RabbitMQ
- 한빛미디어
- aws
- kafka streams
- Kafka
- play framework
- confluent
- spring-batch
- kafkastream
- reactive
- Logstash
- Elasticsearch
- kafka interactive query
- 플레이 프레임워크
- avo
- kafkastreams
- Slick
- statestore
- 카프카
Archives
- Today
- Total
b
JobLauncher, SimpleJobLauncher 본문
휴가중(진짜 심심해서 쓰는 ) 포스팅. 작성중
JobLauncher 의 유일한 구현체 SimpleJobLauncher이다.
1. 유일한 public method 'run'을 보면 Job , JobParameters 를 아귀먼트로 받는것을 확인할 수 이고, return 은 JobExecution이다.
2. JobExecution은 Entity를 상속받는 상태를 나타내는 클래스라 생각해도 된다.
/*
* Copyright 2006-2008 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.batch.core.launch.support;
/**
* Simple implementation of the {@link JobLauncher} interface. The Spring Core
* {@link TaskExecutor} interface is used to launch a {@link Job}. This means
* that the type of executor set is very important. If a
* {@link SyncTaskExecutor} is used, then the job will be processed
* within the same thread that called the launcher. Care should
* be taken to ensure any users of this class understand fully whether or not
* the implementation of TaskExecutor used will start tasks synchronously or
* asynchronously. The default setting uses a synchronous task executor.
*
* There is only one required dependency of this Launcher, a
* {@link JobRepository}. The JobRepository is used to obtain a valid
* JobExecution. The Repository must be used because the provided {@link Job}
* could be a restart of an existing {@link JobInstance}, and only the
* Repository can reliably recreate it.
*
* @author Lucas Ward
* @Author Dave Syer
*
* @since 1.0
*
* @see JobRepository
* @see TaskExecutor
*/
public class SimpleJobLauncher implements JobLauncher, InitializingBean {
protected static final Log logger = LogFactory.getLog(SimpleJobLauncher.class);
private JobRepository jobRepository;
private TaskExecutor taskExecutor;
/**
* Run the provided job with the given {@link JobParameters}. The
* {@link JobParameters} will be used to determine if this is an execution
* of an existing job instance, or if a new one should be created.
*
* @param job the job to be run.
* @param jobParameters the {@link JobParameters} for this particular
* execution.
* @return JobExecutionAlreadyRunningException if the JobInstance already
* exists and has an execution already running.
* @throws JobRestartException if the execution would be a re-start, but a
* re-start is either not allowed or not needed.
* @throws JobInstanceAlreadyCompleteException if this instance has already
* completed successfully
* @throws JobParametersInvalidException
*/
public JobExecution run(final Job job, final JobParameters jobParameters)
throws JobExecutionAlreadyRunningException, JobRestartException, JobInstanceAlreadyCompleteException,
JobParametersInvalidException {
Assert.notNull(job, "The Job must not be null.");
Assert.notNull(jobParameters, "The JobParameters must not be null.");
final JobExecution jobExecution;
JobExecution lastExecution = jobRepository.getLastJobExecution(job.getName(), jobParameters);
if (lastExecution != null) {
if (!job.isRestartable()) {
throw new JobRestartException("JobInstance already exists and is not restartable");
}
}
// Check the validity of the parameters before doing creating anything
// in the repository...
job.getJobParametersValidator().validate(jobParameters);
/*
* There is a very small probability that a non-restartable job can be
* restarted, but only if another process or thread manages to launch
* and fail a job execution for this instance between the last
* assertion and the next method returning successfully.
*/
jobExecution = jobRepository.createJobExecution(job.getName(), jobParameters);
try {
taskExecutor.execute(new Runnable() {
public void run() {
try {
logger.info("Job: [" + job + "] launched with the following parameters: [" + jobParameters
+ "]");
job.execute(jobExecution);
logger.info("Job: [" + job + "] completed with the following parameters: [" + jobParameters
+ "] and the following status: [" + jobExecution.getStatus() + "]");
}
catch (Throwable t) {
logger.info("Job: [" + job
+ "] failed unexpectedly and fatally with the following parameters: [" + jobParameters
+ "]", t);
rethrow(t);
}
}
private void rethrow(Throwable t) {
if (t instanceof RuntimeException) {
throw (RuntimeException) t;
}
else if (t instanceof Error) {
throw (Error) t;
}
throw new IllegalStateException(t);
}
});
}
catch (TaskRejectedException e) {
jobExecution.upgradeStatus(BatchStatus.FAILED);
if (jobExecution.getExitStatus().equals(ExitStatus.UNKNOWN)) {
jobExecution.setExitStatus(ExitStatus.FAILED.addExitDescription(e));
}
jobRepository.update(jobExecution);
}
return jobExecution;
}
public void setJobRepository(JobRepository jobRepository) {
this.jobRepository = jobRepository;
}
public void setTaskExecutor(TaskExecutor taskExecutor) {
this.taskExecutor = taskExecutor;
}
public void afterPropertiesSet() throws Exception {
Assert.state(jobRepository != null, "A JobRepository has not been set.");
if (taskExecutor == null) {
logger.info("No TaskExecutor has been set, defaulting to synchronous executor.");
taskExecutor = new SyncTaskExecutor();
}
}
}
---
public class JobExecution extends Entity {
private JobInstance jobInstance;
private volatile Collection stepExecutions = new CopyOnWriteArraySet();
private volatile BatchStatus status = BatchStatus.STARTING;
private volatile Date startTime = null;
private volatile Date createTime = new Date(System.currentTimeMillis());
private volatile Date endTime = null;
private volatile Date lastUpdated = null;
private volatile ExitStatus exitStatus = ExitStatus.UNKNOWN;
private volatile ExecutionContext executionContext = new ExecutionContext();
private transient volatile List failureExceptions = new CopyOnWriteArrayList();
하략 ... 대부분 생성자/Get/Set
참고#1: http://chanwook.tistory.com/797
요약 : 4년전 교육 받았을때도 느꼈지만... 박찬욱님 짱 :)
'spring framework > spring-batch' 카테고리의 다른 글
| JAXP DOM4J, JDOM2의 성능 비교 (0) | 2013.11.13 |
|---|---|
| Spring Batch Admin (0) | 2013.01.11 |
| JobParametersIncrementer의 사용. (0) | 2012.08.25 |
Comments