Skip to content

Commit 7826c07

Browse files
committed
Set job status to failed when exception is thrown in listener.
Fixes #2216.
1 parent a1a130a commit 7826c07

1 file changed

Lines changed: 108 additions & 89 deletions

File tree

src/main/java/org/ohdsi/webapi/cohortdefinition/GenerationJobExecutionListener.java

Lines changed: 108 additions & 89 deletions
Original file line numberDiff line numberDiff line change
@@ -40,98 +40,117 @@
4040
import java.util.Objects;
4141

4242
import static org.ohdsi.webapi.Constants.Params.TARGET_DATABASE_SCHEMA;
43+
import org.springframework.batch.core.ExitStatus;
4344

4445
/**
4546
* @author cknoll1
4647
*/
4748
public class GenerationJobExecutionListener implements JobExecutionListener {
48-
private static final Logger log = LoggerFactory.getLogger(GenerationJobExecutionListener.class);
49-
private final SourceService sourceService;
50-
private final CohortDefinitionRepository cohortDefinitionRepository;
51-
private final TransactionTemplate transactionTemplate;
52-
private final JdbcTemplate sourceTemplate;
53-
54-
public GenerationJobExecutionListener(SourceService sourceService,
55-
CohortDefinitionRepository cohortDefinitionRepository,
56-
TransactionTemplate transactionTemplate,
57-
JdbcTemplate sourceTemplate) {
58-
this.sourceService = sourceService;
59-
this.cohortDefinitionRepository = cohortDefinitionRepository;
60-
this.transactionTemplate = transactionTemplate;
61-
this.sourceTemplate = sourceTemplate;
62-
}
63-
64-
private CohortGenerationInfo findBySourceId(CohortDefinition df, Integer sourceId) {
65-
return df.getGenerationInfoList().stream()
66-
.filter(info -> info.getId().getSourceId().equals(sourceId))
67-
.findFirst()
68-
.orElseThrow(() -> new IllegalStateException(String.format("Cannot find cohortGenerationInfo for cohortDefinition[{}] sourceId[%s]", df.getId(), sourceId)));
69-
}
70-
71-
@Override
72-
public void afterJob(JobExecution je) {
73-
74-
JobParameters jobParams = je.getJobParameters();
75-
Integer defId = Integer.valueOf(jobParams.getString("cohort_definition_id"));
76-
Integer sourceId = Integer.valueOf(jobParams.getString("source_id"));
77-
Source source = sourceService.findBySourceId(sourceId);
78-
String cohortTable = jobParams.getString(TARGET_DATABASE_SCHEMA) + ".cohort";
79-
80-
DefaultTransactionDefinition completeTx = new DefaultTransactionDefinition();
81-
completeTx.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRES_NEW);
82-
TransactionStatus completeStatus = this.transactionTemplate.getTransactionManager().getTransaction(completeTx);
83-
CohortDefinition df = this.cohortDefinitionRepository.findOne(defId);
84-
CohortGenerationInfo info = findBySourceId(df, sourceId);
85-
setExecutionDurationIfPossible(je, info);
86-
info.setStatus(GenerationStatus.COMPLETE);
87-
88-
if (je.getStatus() == BatchStatus.FAILED || je.getStatus() == BatchStatus.STOPPED) {
89-
info.setIsValid(false);
90-
info.setRecordCount(null);
91-
info.setPersonCount(null);
92-
info.setCanceled(je.getStepExecutions().stream().anyMatch(se -> Objects.equals(Constants.CANCELED, se.getExitStatus().getExitCode())));
93-
info.setFailMessage(StringUtils.abbreviateMiddle(je.getAllFailureExceptions().get(0).getMessage(), "... [truncated] ...", 2000));
94-
} else {
95-
info.setIsValid(true);
96-
info.setFailMessage(null);
97-
98-
// query summary results from source
99-
String statsQuery = "SELECT count(distinct subject_id) as person_count, count(*) as record_count from @cohort_table where cohort_definition_id = @cohort_definition_id";
100-
String statsSql = SqlTranslate.translateSql(statsQuery, source.getSourceDialect(), null, null);
101-
String renderedSql = SqlRender.renderSql(statsSql, new String[]{"cohort_table", "cohort_definition_id"}, new String[]{cohortTable, defId.toString()});
102-
Map<String, Object> stats = this.sourceTemplate.queryForMap(renderedSql);
103-
info.setPersonCount(Long.parseLong(stats.get("person_count").toString()));
104-
info.setRecordCount(Long.parseLong(stats.get("record_count").toString()));
105-
}
106-
107-
this.cohortDefinitionRepository.save(df);
108-
this.transactionTemplate.getTransactionManager().commit(completeStatus);
109-
}
110-
111-
private void setExecutionDurationIfPossible(JobExecution je, CohortGenerationInfo info) {
112-
if (Objects.isNull(je.getEndTime()) || Objects.isNull(je.getStartTime())) {
113-
log.error("Cannot set duration time for cohortGenerationInfo[{}]. startData[{}] and endData[{}] cannot be empty.", info.getId(), je.getStartTime(), je.getEndTime());
114-
return;
115-
}
116-
info.setExecutionDuration((int) (je.getEndTime().getTime() - je.getStartTime().getTime()));
117-
}
118-
119-
@Override
120-
public void beforeJob(JobExecution je) {
121-
Date startTime = Calendar.getInstance().getTime();
122-
JobParameters jobParams = je.getJobParameters();
123-
Integer defId = Integer.valueOf(jobParams.getString("cohort_definition_id"));
124-
Integer sourceId = Integer.valueOf(jobParams.getString("source_id"));
125-
126-
DefaultTransactionDefinition initTx = new DefaultTransactionDefinition();
127-
initTx.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRES_NEW);
128-
TransactionStatus initStatus = this.transactionTemplate.getTransactionManager().getTransaction(initTx);
129-
CohortDefinition df = this.cohortDefinitionRepository.findOne(defId);
130-
CohortGenerationInfo info = findBySourceId(df, sourceId);
131-
info.setIsValid(false);
132-
info.setStartTime(startTime);
133-
info.setStatus(GenerationStatus.RUNNING);
134-
this.cohortDefinitionRepository.save(df);
135-
this.transactionTemplate.getTransactionManager().commit(initStatus);
136-
}
49+
50+
private static final Logger log = LoggerFactory.getLogger(GenerationJobExecutionListener.class);
51+
private final SourceService sourceService;
52+
private final CohortDefinitionRepository cohortDefinitionRepository;
53+
private final TransactionTemplate transactionTemplate;
54+
private final JdbcTemplate sourceTemplate;
55+
56+
public GenerationJobExecutionListener(SourceService sourceService,
57+
CohortDefinitionRepository cohortDefinitionRepository,
58+
TransactionTemplate transactionTemplate,
59+
JdbcTemplate sourceTemplate) {
60+
this.sourceService = sourceService;
61+
this.cohortDefinitionRepository = cohortDefinitionRepository;
62+
this.transactionTemplate = transactionTemplate;
63+
this.sourceTemplate = sourceTemplate;
64+
}
65+
66+
private CohortGenerationInfo findBySourceId(CohortDefinition df, Integer sourceId) {
67+
return df.getGenerationInfoList().stream()
68+
.filter(info -> info.getId().getSourceId().equals(sourceId))
69+
.findFirst()
70+
.orElseThrow(() -> new IllegalStateException(String.format("Cannot find cohortGenerationInfo for cohortDefinition[{}] sourceId[%s]", df.getId(), sourceId)));
71+
}
72+
73+
@Override
74+
public void afterJob(JobExecution je) {
75+
76+
JobParameters jobParams = je.getJobParameters();
77+
Integer defId = Integer.valueOf(jobParams.getString("cohort_definition_id"));
78+
Integer sourceId = Integer.valueOf(jobParams.getString("source_id"));
79+
String cohortTable = jobParams.getString(TARGET_DATABASE_SCHEMA) + ".cohort";
80+
81+
DefaultTransactionDefinition completeTx = new DefaultTransactionDefinition();
82+
completeTx.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRES_NEW);
83+
TransactionStatus completeStatus = this.transactionTemplate.getTransactionManager().getTransaction(completeTx);
84+
85+
try {
86+
Source source = sourceService.findBySourceId(sourceId);
87+
CohortDefinition df = this.cohortDefinitionRepository.findOne(defId);
88+
CohortGenerationInfo info = findBySourceId(df, sourceId);
89+
setExecutionDurationIfPossible(je, info);
90+
info.setStatus(GenerationStatus.COMPLETE);
91+
92+
if (je.getStatus() == BatchStatus.FAILED || je.getStatus() == BatchStatus.STOPPED) {
93+
info.setIsValid(false);
94+
info.setRecordCount(null);
95+
info.setPersonCount(null);
96+
info.setCanceled(je.getStepExecutions().stream().anyMatch(se -> Objects.equals(Constants.CANCELED, se.getExitStatus().getExitCode())));
97+
info.setFailMessage(StringUtils.abbreviateMiddle(je.getAllFailureExceptions().get(0).getMessage(), "... [truncated] ...", 2000));
98+
} else {
99+
info.setIsValid(true);
100+
info.setFailMessage(null);
101+
102+
// query summary results from source
103+
String statsQuery = "SELECT count(distinct subject_id) as person_count, count(*) as record_count from @cohort_table where cohort_definition_id = @cohort_definition_id";
104+
String statsSql = SqlTranslate.translateSql(statsQuery, source.getSourceDialect(), null, null);
105+
String renderedSql = SqlRender.renderSql(statsSql, new String[]{"cohort_table", "cohort_definition_id"}, new String[]{cohortTable, defId.toString()});
106+
Map<String, Object> stats = this.sourceTemplate.queryForMap(renderedSql);
107+
info.setPersonCount(Long.parseLong(stats.get("person_count").toString()));
108+
info.setRecordCount(Long.parseLong(stats.get("record_count").toString()));
109+
}
110+
111+
this.cohortDefinitionRepository.save(df);
112+
this.transactionTemplate.getTransactionManager().commit(completeStatus);
113+
} catch (Exception e) {
114+
this.transactionTemplate.getTransactionManager().rollback(completeStatus);
115+
log.error(e.getMessage());
116+
je.addFailureException(e);
117+
je.setExitStatus(new ExitStatus("FAILED", "Could not complete cohort generation job"));
118+
je.setStatus(BatchStatus.FAILED);
119+
}
120+
}
121+
122+
private void setExecutionDurationIfPossible(JobExecution je, CohortGenerationInfo info) {
123+
if (Objects.isNull(je.getEndTime()) || Objects.isNull(je.getStartTime())) {
124+
log.error("Cannot set duration time for cohortGenerationInfo[{}]. startData[{}] and endData[{}] cannot be empty.", info.getId(), je.getStartTime(), je.getEndTime());
125+
return;
126+
}
127+
info.setExecutionDuration((int) (je.getEndTime().getTime() - je.getStartTime().getTime()));
128+
}
129+
130+
@Override
131+
public void beforeJob(JobExecution je) {
132+
Date startTime = Calendar.getInstance().getTime();
133+
JobParameters jobParams = je.getJobParameters();
134+
Integer defId = Integer.valueOf(jobParams.getString("cohort_definition_id"));
135+
Integer sourceId = Integer.valueOf(jobParams.getString("source_id"));
136+
137+
DefaultTransactionDefinition initTx = new DefaultTransactionDefinition();
138+
initTx.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRES_NEW);
139+
TransactionStatus initStatus = this.transactionTemplate.getTransactionManager().getTransaction(initTx);
140+
try {
141+
CohortDefinition df = this.cohortDefinitionRepository.findOne(defId);
142+
CohortGenerationInfo info = findBySourceId(df, sourceId);
143+
info.setIsValid(false);
144+
info.setStartTime(startTime);
145+
info.setStatus(GenerationStatus.RUNNING);
146+
this.cohortDefinitionRepository.save(df);
147+
this.transactionTemplate.getTransactionManager().commit(initStatus);
148+
} catch (Exception e) {
149+
this.transactionTemplate.getTransactionManager().rollback(initStatus);
150+
log.error(e.getMessage());
151+
je.addFailureException(e);
152+
je.setExitStatus(new ExitStatus("FAILED", "Could not start cohort generation job"));
153+
je.setStatus(BatchStatus.FAILED);
154+
}
155+
}
137156
}

0 commit comments

Comments
 (0)