Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ trait QueueActorMessageOps extends Logging {
receiveMessages(visibilityTimeout, count, receiveRequestAttemptId)
case DeleteMessage(deliveryReceipt) => deleteMessage(deliveryReceipt)
case LookupMessage(messageId) => messageQueue.byId.get(messageId.id).map(_.toMessageData)
case MoveMessage(message) => moveMessage(message)
}

private def handleOrRedirectMessage(message: NewMessageData): ReplyAction[MessageData] = {
Expand Down Expand Up @@ -123,7 +124,7 @@ trait QueueActorMessageOps extends Logging {
case default => Some(default)
}
})
.getOrElse(getMessagesFromQueue(visibilityTimeout, count))
.getOrElse(getMessagesFromQueue(count))
.map { internalMessage =>
// Putting the msg again into the queue, with a new next delivery
val newNextDelivery = computeNextDelivery(visibilityTimeout)
Expand All @@ -150,12 +151,12 @@ trait QueueActorMessageOps extends Logging {
}
}

private def getMessagesFromQueue(visibilityTimeout: VisibilityTimeout, count: Int) = {
private def getMessagesFromQueue(count: Int) = {
val deliveryTime = nowProvider.nowMillis
messageQueue.dequeue(count, deliveryTime).flatMap { internalMessage =>
if (queueData.deadLettersQueue.map(_.maxReceiveCount).exists(_ <= internalMessage.receiveCount)) {
logger.debug(s"${queueData.name}: send message $internalMessage to dead letters actor $deadLettersActorRef")
deadLettersActorRef.foreach(_ ! SendMessage(internalMessage.toNewMessageData))
deadLettersActorRef.foreach(_ ! MoveMessage(internalMessage))
internalMessage.deliveryReceipts.foreach(dr => deleteMessage(DeliveryReceipt(dr)))
None
} else {
Expand Down Expand Up @@ -185,4 +186,27 @@ trait QueueActorMessageOps extends Logging {
}
}
}

private def moveMessage(message: InternalMessage): Unit = {
def moveMessageToQueue(internalMessage: InternalMessage): Unit = {
messageQueue += internalMessage
logger.debug(s"Moved message with id ${internalMessage.id} to DLQ ${queueData.name}")
}

copyMessagesToActorRef.foreach { _ ! SendMessage(message.toNewMessageData) }

if (queueData.isFifo) {
// Ensure a message with the same deduplication id is not on the queue already. If the message is already on the
// queue do nothing.
// TODO: A message dedup id should be checked up to 5 mins after it has been received. If it has been deleted
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so we'd have to maintain a cache of already received message ids (for the last 5 mins)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems so. This comment was already there (in similar function). I have copied it to not forgot about this place once someone will do this TODO.
I have not extracted those functions to one common as I feel that behaviour is quite similar but the differences between input/output parameters are too big.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok :) We'll get back to these 5 minutes in #354 ;)

// during that period, it should _still_ be used when deduplicating new messages. If there's a match with a
// deleted message (that was sent less than 5 minutes ago, the new message should not be added).
messageQueue.byId.values.find(isDuplicate(message.toNewMessageData, _)) match {
case Some(_) => ()
case None => moveMessageToQueue(message)
}
} else {
moveMessageToQueue(message)
}
}
}
2 changes: 2 additions & 0 deletions core/src/main/scala/org/elasticmq/msg/QueueMsg.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package org.elasticmq.msg

import akka.actor.ActorRef
import org.elasticmq._
import org.elasticmq.actor.queue.InternalMessage
import org.elasticmq.actor.reply.Replyable
import org.joda.time.Duration

Expand All @@ -25,6 +26,7 @@ case class GetQueueStatistics(deliveryTime: Long) extends QueueQueueMsg[QueueSta
case class ClearQueue() extends QueueQueueMsg[Unit]

case class SendMessage(message: NewMessageData) extends QueueMessageMsg[MessageData]
case class MoveMessage(message: InternalMessage) extends QueueMessageMsg[Unit]
case class UpdateVisibilityTimeout(messageId: MessageId, visibilityTimeout: VisibilityTimeout)
extends QueueMessageMsg[Either[MessageDoesNotExist, Unit]]
case class ReceiveMessages(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1741,6 +1741,66 @@ class AmazonJavaSdkTestSuite extends AnyFunSuite with Matchers with BeforeAndAft
newQueueAttributes(redrivePolicyAttribute) should be(redrivePolicy)
}

test(
"When message is moved to dead letter queue, its SentTimestamp and ApproximateFirstReceiveTimestamp attributes should not be changed"
) {
import org.elasticmq.rest.sqs.model.RedrivePolicyJson._
import spray.json._

def getOneMessage(queueUrl: String) = {
client
.receiveMessage(new ReceiveMessageRequest(queueUrl).withWaitTimeSeconds(2).withAttributeNames("All"))
.getMessages
.asScala
.headOption
.orNull
}

// Given
val deadLetterQueueUrl = client.createQueue(new CreateQueueRequest("dlq1")).getQueueUrl
val redrivePolicy = RedrivePolicy("dlq1", awsRegion, awsAccountId, 2).toJson.toString()
val createQueueResult = client.createQueue(
new CreateQueueRequest("q1")
.withAttributes(Map(defaultVisibilityTimeoutAttribute -> "1", redrivePolicyAttribute -> redrivePolicy).asJava)
)

// When

client.sendMessage(new SendMessageRequest(createQueueResult.getQueueUrl, "test message"))

val msg1 = getOneMessage(createQueueResult.getQueueUrl)
val msg2 = getOneMessage(createQueueResult.getQueueUrl)
val emptyMsg = getOneMessage(createQueueResult.getQueueUrl)

val msgFromDeadLetterQueue = getOneMessage(deadLetterQueueUrl)

// Then
val msg1SentTimestamp = msg1.getAttributes.get("SentTimestamp")
val msg1ApproximateReceiveCount = msg1.getAttributes.get("ApproximateReceiveCount")
val msg1ApproximateFirstReceiveTimestamp = msg1.getAttributes.get("SentTimestamp")

val msg2SentTimestamp = msg2.getAttributes.get("SentTimestamp")
val msg2ApproximateReceiveCount = msg2.getAttributes.get("ApproximateReceiveCount")
val msg2ApproximateFirstReceiveTimestamp = msg2.getAttributes.get("SentTimestamp")

val msgFromDeadLetterQueueSentTimestamp = msgFromDeadLetterQueue.getAttributes.get("SentTimestamp")
val msgFromDeadLetterQueueApproximateReceiveCount =
msgFromDeadLetterQueue.getAttributes.get("ApproximateReceiveCount")
val msgFromDeadLetterQueueApproximateFirstReceiveTimestamp =
msgFromDeadLetterQueue.getAttributes.get("SentTimestamp")

msg1SentTimestamp shouldBe msg2SentTimestamp
msg2SentTimestamp shouldBe msgFromDeadLetterQueueSentTimestamp

msg2ApproximateReceiveCount.toInt shouldBe (msg1ApproximateReceiveCount.toInt + 1)
msgFromDeadLetterQueueApproximateReceiveCount.toInt shouldBe (msg2ApproximateReceiveCount.toInt + 1)

msg1ApproximateFirstReceiveTimestamp shouldBe msg2ApproximateFirstReceiveTimestamp
msg2ApproximateFirstReceiveTimestamp shouldBe msgFromDeadLetterQueueApproximateFirstReceiveTimestamp

emptyMsg shouldBe null
}

test("should return an error when the deadletter queue does not exist") {
import org.elasticmq.rest.sqs.model.RedrivePolicyJson._
import spray.json._
Expand Down