|
16 | 16 | */ |
17 | 17 | package com.ctrip.framework.apollo.biz.service; |
18 | 18 |
|
| 19 | +import static com.ctrip.framework.apollo.biz.config.BizConfig.DEFAULT_RELEASE_HISTORY_RETENTION_SIZE; |
| 20 | + |
| 21 | +import com.ctrip.framework.apollo.biz.config.BizConfig; |
19 | 22 | import com.ctrip.framework.apollo.biz.entity.Audit; |
20 | 23 | import com.ctrip.framework.apollo.biz.entity.ReleaseHistory; |
21 | 24 | import com.ctrip.framework.apollo.biz.repository.ReleaseHistoryRepository; |
| 25 | +import com.ctrip.framework.apollo.biz.repository.ReleaseRepository; |
| 26 | +import com.ctrip.framework.apollo.core.utils.ApolloThreadFactory; |
| 27 | +import com.ctrip.framework.apollo.tracer.Tracer; |
| 28 | +import com.google.common.collect.Queues; |
22 | 29 | import com.google.gson.Gson; |
| 30 | +import java.util.List; |
| 31 | +import java.util.Optional; |
| 32 | +import java.util.concurrent.BlockingQueue; |
| 33 | +import java.util.concurrent.ExecutorService; |
| 34 | +import java.util.concurrent.Executors; |
| 35 | +import java.util.concurrent.TimeUnit; |
| 36 | +import java.util.concurrent.atomic.AtomicBoolean; |
| 37 | +import java.util.stream.Collectors; |
| 38 | +import javax.annotation.PostConstruct; |
| 39 | +import javax.annotation.PreDestroy; |
| 40 | +import org.slf4j.Logger; |
| 41 | +import org.slf4j.LoggerFactory; |
23 | 42 | import org.springframework.data.domain.Page; |
| 43 | +import org.springframework.data.domain.PageRequest; |
24 | 44 | import org.springframework.data.domain.Pageable; |
25 | 45 | import org.springframework.stereotype.Service; |
| 46 | +import org.springframework.transaction.TransactionStatus; |
26 | 47 | import org.springframework.transaction.annotation.Transactional; |
27 | 48 |
|
28 | 49 | import java.util.Date; |
29 | 50 | import java.util.Map; |
30 | 51 | import java.util.Set; |
| 52 | +import org.springframework.transaction.support.TransactionCallbackWithoutResult; |
| 53 | +import org.springframework.transaction.support.TransactionTemplate; |
31 | 54 |
|
32 | 55 | /** |
33 | 56 | * @author Jason Song(song_s@ctrip.com) |
34 | 57 | */ |
35 | 58 | @Service |
36 | 59 | public class ReleaseHistoryService { |
| 60 | + private static final Logger logger = LoggerFactory.getLogger(ReleaseHistoryService.class); |
37 | 61 | private static final Gson GSON = new Gson(); |
| 62 | + private static final int CLEAN_QUEUE_MAX_SIZE = 100; |
| 63 | + private final BlockingQueue<ReleaseHistory> releaseClearQueue = Queues.newLinkedBlockingQueue(CLEAN_QUEUE_MAX_SIZE); |
| 64 | + private final ExecutorService cleanExecutorService = Executors.newSingleThreadExecutor( |
| 65 | + ApolloThreadFactory.create("ReleaseHistoryService", true)); |
| 66 | + private final AtomicBoolean cleanStopped = new AtomicBoolean(false); |
38 | 67 |
|
39 | 68 | private final ReleaseHistoryRepository releaseHistoryRepository; |
| 69 | + private final ReleaseRepository releaseRepository; |
40 | 70 | private final AuditService auditService; |
| 71 | + private final BizConfig bizConfig; |
| 72 | + private final TransactionTemplate transactionManager; |
41 | 73 |
|
42 | 74 | public ReleaseHistoryService( |
43 | 75 | final ReleaseHistoryRepository releaseHistoryRepository, |
44 | | - final AuditService auditService) { |
| 76 | + final ReleaseRepository releaseRepository, |
| 77 | + final AuditService auditService, |
| 78 | + final BizConfig bizConfig, |
| 79 | + final TransactionTemplate transactionManager) { |
45 | 80 | this.releaseHistoryRepository = releaseHistoryRepository; |
| 81 | + this.releaseRepository = releaseRepository; |
46 | 82 | this.auditService = auditService; |
| 83 | + this.bizConfig = bizConfig; |
| 84 | + this.transactionManager = transactionManager; |
47 | 85 | } |
48 | 86 |
|
| 87 | + @PostConstruct |
| 88 | + private void initialize() { |
| 89 | + cleanExecutorService.submit(() -> { |
| 90 | + while (!cleanStopped.get() && !Thread.currentThread().isInterrupted()) { |
| 91 | + try { |
| 92 | + ReleaseHistory releaseHistory = releaseClearQueue.poll(1, TimeUnit.SECONDS); |
| 93 | + if (releaseHistory != null) { |
| 94 | + this.cleanReleaseHistory(releaseHistory); |
| 95 | + } else { |
| 96 | + TimeUnit.MINUTES.sleep(1); |
| 97 | + } |
| 98 | + } catch (Throwable ex) { |
| 99 | + logger.error("Clean releaseHistory failed", ex); |
| 100 | + Tracer.logError(ex); |
| 101 | + } |
| 102 | + } |
| 103 | + }); |
| 104 | + } |
49 | 105 |
|
50 | 106 | public Page<ReleaseHistory> findReleaseHistoriesByNamespace(String appId, String clusterName, |
51 | 107 | String namespaceName, Pageable |
@@ -92,11 +148,86 @@ public ReleaseHistory createReleaseHistory(String appId, String clusterName, Str |
92 | 148 | auditService.audit(ReleaseHistory.class.getSimpleName(), releaseHistory.getId(), |
93 | 149 | Audit.OP.INSERT, releaseHistory.getDataChangeCreatedBy()); |
94 | 150 |
|
| 151 | + int releaseHistoryRetentionLimit = this.getReleaseHistoryRetentionLimit(releaseHistory); |
| 152 | + if (releaseHistoryRetentionLimit != DEFAULT_RELEASE_HISTORY_RETENTION_SIZE) { |
| 153 | + if (!releaseClearQueue.offer(releaseHistory)) { |
| 154 | + logger.warn("releaseClearQueue is full, failed to add task to clean queue, " + |
| 155 | + "clean queue max size:{}", CLEAN_QUEUE_MAX_SIZE); |
| 156 | + } |
| 157 | + } |
95 | 158 | return releaseHistory; |
96 | 159 | } |
97 | 160 |
|
98 | 161 | @Transactional |
99 | 162 | public int batchDelete(String appId, String clusterName, String namespaceName, String operator) { |
100 | 163 | return releaseHistoryRepository.batchDelete(appId, clusterName, namespaceName, operator); |
101 | 164 | } |
| 165 | + |
| 166 | + private Optional<Long> releaseHistoryRetentionMaxId(ReleaseHistory releaseHistory, int releaseHistoryRetentionSize) { |
| 167 | + Page<ReleaseHistory> releaseHistoryPage = releaseHistoryRepository.findByAppIdAndClusterNameAndNamespaceNameAndBranchNameOrderByIdDesc( |
| 168 | + releaseHistory.getAppId(), |
| 169 | + releaseHistory.getClusterName(), |
| 170 | + releaseHistory.getNamespaceName(), |
| 171 | + releaseHistory.getBranchName(), |
| 172 | + PageRequest.of(releaseHistoryRetentionSize, 1) |
| 173 | + ); |
| 174 | + if (releaseHistoryPage.isEmpty()) { |
| 175 | + return Optional.empty(); |
| 176 | + } |
| 177 | + return Optional.of( |
| 178 | + releaseHistoryPage |
| 179 | + .getContent() |
| 180 | + .get(0) |
| 181 | + .getId() |
| 182 | + ); |
| 183 | + } |
| 184 | + |
| 185 | + private void cleanReleaseHistory(ReleaseHistory cleanRelease) { |
| 186 | + String appId = cleanRelease.getAppId(); |
| 187 | + String clusterName = cleanRelease.getClusterName(); |
| 188 | + String namespaceName = cleanRelease.getNamespaceName(); |
| 189 | + String branchName = cleanRelease.getBranchName(); |
| 190 | + |
| 191 | + int retentionLimit = this.getReleaseHistoryRetentionLimit(cleanRelease); |
| 192 | + //Second check, if retentionLimit is default value, do not clean |
| 193 | + if (retentionLimit == DEFAULT_RELEASE_HISTORY_RETENTION_SIZE) { |
| 194 | + return; |
| 195 | + } |
| 196 | + |
| 197 | + Optional<Long> maxId = this.releaseHistoryRetentionMaxId(cleanRelease, retentionLimit); |
| 198 | + if (!maxId.isPresent()) { |
| 199 | + return; |
| 200 | + } |
| 201 | + |
| 202 | + boolean hasMore = true; |
| 203 | + while (hasMore && !Thread.currentThread().isInterrupted()) { |
| 204 | + List<ReleaseHistory> cleanReleaseHistoryList = releaseHistoryRepository.findFirst100ByAppIdAndClusterNameAndNamespaceNameAndBranchNameAndIdLessThanEqualOrderByIdAsc( |
| 205 | + appId, clusterName, namespaceName, branchName, maxId.get()); |
| 206 | + Set<Long> releaseIds = cleanReleaseHistoryList.stream() |
| 207 | + .map(ReleaseHistory::getReleaseId) |
| 208 | + .collect(Collectors.toSet()); |
| 209 | + |
| 210 | + transactionManager.execute(new TransactionCallbackWithoutResult() { |
| 211 | + @Override |
| 212 | + protected void doInTransactionWithoutResult(TransactionStatus status) { |
| 213 | + releaseHistoryRepository.deleteAll(cleanReleaseHistoryList); |
| 214 | + releaseRepository.deleteAllById(releaseIds); |
| 215 | + } |
| 216 | + }); |
| 217 | + hasMore = cleanReleaseHistoryList.size() == 100; |
| 218 | + } |
| 219 | + } |
| 220 | + |
| 221 | + private int getReleaseHistoryRetentionLimit(ReleaseHistory releaseHistory) { |
| 222 | + String overrideKey = String.format("%s+%s+%s+%s", releaseHistory.getAppId(), |
| 223 | + releaseHistory.getClusterName(), releaseHistory.getNamespaceName(), releaseHistory.getBranchName()); |
| 224 | + |
| 225 | + Map<String, Integer> overrideMap = bizConfig.releaseHistoryRetentionSizeOverride(); |
| 226 | + return overrideMap.getOrDefault(overrideKey, bizConfig.releaseHistoryRetentionSize()); |
| 227 | + } |
| 228 | + |
| 229 | + @PreDestroy |
| 230 | + void stopClean() { |
| 231 | + cleanStopped.set(true); |
| 232 | + } |
102 | 233 | } |
0 commit comments