diff --git a/apollo-biz/src/main/java/com/ctrip/framework/apollo/biz/message/DatabaseMessageSender.java b/apollo-biz/src/main/java/com/ctrip/framework/apollo/biz/message/DatabaseMessageSender.java index 48a2d340757..8fce13a0c56 100644 --- a/apollo-biz/src/main/java/com/ctrip/framework/apollo/biz/message/DatabaseMessageSender.java +++ b/apollo-biz/src/main/java/com/ctrip/framework/apollo/biz/message/DatabaseMessageSender.java @@ -22,6 +22,7 @@ import com.ctrip.framework.apollo.tracer.Tracer; import com.ctrip.framework.apollo.tracer.spi.Transaction; import com.google.common.collect.Queues; +import javax.annotation.PreDestroy; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.stereotype.Component; @@ -43,7 +44,7 @@ public class DatabaseMessageSender implements MessageSender { private static final Logger logger = LoggerFactory.getLogger(DatabaseMessageSender.class); private static final int CLEAN_QUEUE_MAX_SIZE = 100; - private BlockingQueue toClean = Queues.newLinkedBlockingQueue(CLEAN_QUEUE_MAX_SIZE); + private final BlockingQueue toClean = Queues.newLinkedBlockingQueue(CLEAN_QUEUE_MAX_SIZE); private final ExecutorService cleanExecutorService; private final AtomicBoolean cleanStopped; @@ -68,7 +69,9 @@ public void sendMessage(String message, String channel) { Transaction transaction = Tracer.newTransaction("Apollo.AdminService", "sendMessage"); try { ReleaseMessage newMessage = releaseMessageRepository.save(new ReleaseMessage(message)); - toClean.offer(newMessage.getId()); + if(!toClean.offer(newMessage.getId())){ + logger.warn("Queue is full, Failed to add message {} to clean queue", newMessage.getId()); + } transaction.setStatus(Transaction.SUCCESS); } catch (Throwable ex) { logger.error("Sending message to database failed", ex); @@ -116,6 +119,7 @@ private void cleanMessage(Long id) { } } + @PreDestroy void stopClean() { cleanStopped.set(true); }