|
57 | 57 | import com.oceanbase.odc.metadb.task.JobEntity; |
58 | 58 | import com.oceanbase.odc.metadb.task.JobRepository; |
59 | 59 | import com.oceanbase.odc.service.task.config.TaskFrameworkProperties; |
| 60 | +import com.oceanbase.odc.service.task.constants.JobAttributeEntityColumn; |
60 | 61 | import com.oceanbase.odc.service.task.constants.JobEntityColumn; |
61 | 62 | import com.oceanbase.odc.service.task.enums.JobStatus; |
62 | 63 | import com.oceanbase.odc.service.task.enums.TaskRunMode; |
@@ -315,14 +316,34 @@ private void updateJobScheduleEntity(TaskResult taskResult) { |
315 | 316 | jobRepository.update(jse); |
316 | 317 |
|
317 | 318 | if (taskResult.getLogMetadata() != null && taskResult.getStatus().isTerminated()) { |
318 | | - taskResult.getLogMetadata().forEach((k, v) -> { |
| 319 | + saveOrUpdateLogMetadata(taskResult, jse); |
| 320 | + } |
| 321 | + } |
| 322 | + |
| 323 | + private void saveOrUpdateLogMetadata(TaskResult taskResult, JobEntity jse) { |
| 324 | + taskResult.getLogMetadata().forEach((k, v) -> { |
| 325 | + // log key may exist if job is retrying |
| 326 | + Optional<String> logValue = findByJobIdAndAttributeKey(jse.getId(), k); |
| 327 | + if (logValue.isPresent()) { |
| 328 | + updateJobAttributeValue(jse.getId(), k, v); |
| 329 | + } else { |
319 | 330 | JobAttributeEntity jobAttribute = new JobAttributeEntity(); |
320 | 331 | jobAttribute.setJobId(jse.getId()); |
321 | 332 | jobAttribute.setAttributeKey(k); |
322 | 333 | jobAttribute.setAttributeValue(v); |
323 | 334 | jobAttributeRepository.save(jobAttribute); |
324 | | - }); |
325 | | - } |
| 335 | + } |
| 336 | + }); |
| 337 | + } |
| 338 | + |
| 339 | + private void updateJobAttributeValue(Long id, String key, String value) { |
| 340 | + CriteriaBuilder cb = entityManager.getCriteriaBuilder(); |
| 341 | + CriteriaUpdate<JobAttributeEntity> update = cb.createCriteriaUpdate(JobAttributeEntity.class); |
| 342 | + Root<JobAttributeEntity> e = update.from(JobAttributeEntity.class); |
| 343 | + update.set(JobAttributeEntityColumn.ATTRIBUTE_VALUE, value); |
| 344 | + update.where(cb.equal(e.get(JobAttributeEntityColumn.ID), id), |
| 345 | + cb.equal(e.get(JobAttributeEntityColumn.ATTRIBUTE_KEY), key)); |
| 346 | + entityManager.createQuery(update).executeUpdate(); |
326 | 347 | } |
327 | 348 |
|
328 | 349 | @Transactional(rollbackFor = Exception.class) |
@@ -467,4 +488,5 @@ public Optional<String> findByJobIdAndAttributeKey(Long jobId, String attributeK |
467 | 488 | JobAttributeEntity attributeEntity = jobAttributeRepository.findByJobIdAndAttributeKey(jobId, attributeKey); |
468 | 489 | return Objects.isNull(attributeEntity) ? Optional.empty() : Optional.of(attributeEntity.getAttributeValue()); |
469 | 490 | } |
| 491 | + |
470 | 492 | } |
0 commit comments