Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion core/src/main/scala/org/elasticmq/MessageData.scala
Original file line number Diff line number Diff line change
Expand Up @@ -11,5 +11,6 @@ case class MessageData(
created: DateTime,
statistics: MessageStatistics,
messageGroupId: Option[String],
messageDeduplicationId: Option[String]
messageDeduplicationId: Option[String],
tracingId: Option[TracingId]
)
14 changes: 14 additions & 0 deletions core/src/main/scala/org/elasticmq/MessageSystemAttribute.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package org.elasticmq

import java.util.Base64

sealed trait MessageSystemAttribute
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this can be defined in the rest module, as that's the only place where it's used?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Indeed, moved to rest module


case class StringMessageSystemAttribute(value: String) extends MessageSystemAttribute
case class NumberMessageSystemAttribute(value: String) extends MessageSystemAttribute
case class BinaryMessageSystemAttribute(value: Array[Byte]) extends MessageSystemAttribute

object BinaryMessageSystemAttribute {
def fromString(value: String): BinaryMessageSystemAttribute =
BinaryMessageSystemAttribute(Base64.getDecoder.decode(value))
}
3 changes: 2 additions & 1 deletion core/src/main/scala/org/elasticmq/NewMessageData.scala
Original file line number Diff line number Diff line change
Expand Up @@ -7,5 +7,6 @@ case class NewMessageData(
nextDelivery: NextDelivery,
messageGroupId: Option[String],
messageDeduplicationId: Option[String],
orderIndex: Int
orderIndex: Int,
tracingId: Option[TracingId]
)
3 changes: 3 additions & 0 deletions core/src/main/scala/org/elasticmq/TracingId.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
package org.elasticmq

case class TracingId(id: String) extends AnyVal
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ case class InternalMessage(
var receiveCount: Int,
isFifo: Boolean,
messageGroupId: Option[String],
messageDeduplicationId: Option[String]
messageDeduplicationId: Option[String],
tracingId: Option[TracingId]
) extends Comparable[InternalMessage] {

// Priority queues have biggest elements first
Expand Down Expand Up @@ -63,7 +64,8 @@ case class InternalMessage(
created,
MessageStatistics(firstReceive, receiveCount),
messageGroupId,
messageDeduplicationId
messageDeduplicationId,
tracingId
)

def toNewMessageData =
Expand All @@ -74,7 +76,8 @@ case class InternalMessage(
MillisNextDelivery(nextDelivery),
messageGroupId,
messageDeduplicationId,
orderIndex
orderIndex,
tracingId
)

def deliverable(deliveryTime: Long): Boolean = nextDelivery <= deliveryTime
Expand All @@ -96,7 +99,8 @@ object InternalMessage {
0,
queueData.isFifo,
newMessageData.messageGroupId,
newMessageData.messageDeduplicationId
newMessageData.messageDeduplicationId,
newMessageData.tracingId
)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@ class InternalMessageSpec extends AnyFunSuite with Matchers {
receiveCount = 0,
isFifo = true,
messageGroupId = None,
messageDeduplicationId = None
messageDeduplicationId = None,
tracingId = None
)

val second = first.copy(
Expand All @@ -48,7 +49,8 @@ class InternalMessageSpec extends AnyFunSuite with Matchers {
receiveCount = 0,
isFifo = true,
messageGroupId = None,
messageDeduplicationId = None
messageDeduplicationId = None,
tracingId = None
)

val second = first.copy(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@ class ReceiveRequestAttemptCacheTest extends AnyFunSuite with Matchers {
receiveCount = 0,
isFifo = false,
messageGroupId = None,
messageDeduplicationId = None
messageDeduplicationId = None,
tracingId = None
)
val msg2 = msg1.copy(id = "id-2")
val msg3 = msg1.copy(id = "id-3")
Expand Down Expand Up @@ -77,7 +78,8 @@ class ReceiveRequestAttemptCacheTest extends AnyFunSuite with Matchers {
receiveCount = 0,
isFifo = false,
messageGroupId = None,
messageDeduplicationId = None
messageDeduplicationId = None,
tracingId = None
)
val msg2 = msg1.copy(id = "id-2")
val messageQueue = MessageQueue(isFifo = false)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@ trait DataCreationHelpers {
nextDelivery: MillisNextDelivery,
deliveryReceipt: Option[DeliveryReceipt] = None,
messageGroupId: Option[String] = None,
messageDeduplicationId: Option[String] = None
messageDeduplicationId: Option[String] = None,
tracingId: Option[TracingId] = None
) =
MessageData(
MessageId(id),
Expand All @@ -45,7 +46,8 @@ trait DataCreationHelpers {
new DateTime(0),
MessageStatistics(NeverReceived, 0),
messageGroupId,
messageDeduplicationId
messageDeduplicationId,
tracingId
)

def createNewMessageData(
Expand All @@ -54,7 +56,8 @@ trait DataCreationHelpers {
messageAttributes: Map[String, MessageAttribute],
nextDelivery: MillisNextDelivery,
messageGroupId: Option[String] = None,
messageDeduplicationId: Option[String] = None
messageDeduplicationId: Option[String] = None,
tracingId: Option[TracingId] = None
) =
NewMessageData(
Some(MessageId(id)),
Expand All @@ -63,7 +66,8 @@ trait DataCreationHelpers {
nextDelivery,
messageGroupId,
messageDeduplicationId,
orderIndex = 0
orderIndex = 0,
tracingId
)

def createNewMessageData(messageData: MessageData) =
Expand All @@ -74,6 +78,7 @@ trait DataCreationHelpers {
messageData.nextDelivery,
messageData.messageGroupId,
messageData.messageDeduplicationId,
orderIndex = 0
orderIndex = 0,
messageData.tracingId
)
}
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ object LocalPerformanceTest extends App {
}

def sendMessage(m: String): Unit = {
Await.result(currentQueue ? SendMessage(NewMessageData(None, m, Map.empty, ImmediateNextDelivery, None, None, 0)),
Await.result(currentQueue ? SendMessage(NewMessageData(None, m, Map.empty, ImmediateNextDelivery, None, None, 0, None)),
10.seconds)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,90 +6,27 @@ import java.util.UUID

import akka.http.scaladsl.model.StatusCodes
import com.amazonaws.AmazonServiceException
import com.amazonaws.auth.{AWSStaticCredentialsProvider, BasicAWSCredentials}
import com.amazonaws.client.builder.AwsClientBuilder.EndpointConfiguration
import com.amazonaws.services.sqs.AmazonSQS
import com.amazonaws.services.sqs.model._
import com.amazonaws.services.sqs.{AmazonSQS, AmazonSQSClientBuilder}
import org.apache.http.HttpHost
import org.apache.http.client.entity.UrlEncodedFormEntity
import org.apache.http.client.methods.HttpPost
import org.apache.http.impl.client.{CloseableHttpClient, HttpClients}
import org.apache.http.message.BasicNameValuePair
import org.elasticmq._
import org.elasticmq.rest.sqs.model.RedrivePolicy
import org.elasticmq.util.Logging
import org.scalatest._
import org.scalatest.matchers.should.Matchers

import scala.collection.JavaConverters._
import scala.io.Source
import scala.util.Try
import scala.util.control.Exception._
import org.scalatest.funsuite.AnyFunSuite
import org.scalatest.matchers.should.Matchers

class AmazonJavaSdkTestSuite extends AnyFunSuite with Matchers with BeforeAndAfter with Logging {
class AmazonJavaSdkTestSuite extends SqsClientServerCommunication with Matchers {
val visibilityTimeoutAttribute = "VisibilityTimeout"
val defaultVisibilityTimeoutAttribute = "VisibilityTimeout"
val redrivePolicyAttribute = "RedrivePolicy"
val delaySecondsAttribute = "DelaySeconds"
val receiveMessageWaitTimeSecondsAttribute = "ReceiveMessageWaitTimeSeconds"
val awsAccountId = "123456789"
val awsRegion = "elasticmq"

var client: AmazonSQS = _ // strict server
var relaxedClient: AmazonSQS = _
var httpClient: CloseableHttpClient = _

var currentTestName: String = _

var strictServer: SQSRestServer = _
var relaxedServer: SQSRestServer = _

before {
logger.info(s"\n---\nRunning test: $currentTestName\n---\n")

strictServer = SQSRestServerBuilder
.withPort(9321)
.withServerAddress(NodeAddress(port = 9321))
.withAWSAccountId(awsAccountId)
.withAWSRegion(awsRegion)
.start()

relaxedServer = SQSRestServerBuilder
.withPort(9322)
.withServerAddress(NodeAddress(port = 9322))
.withSQSLimits(RelaxedSQSLimits)
.start()

strictServer.waitUntilStarted()
relaxedServer.waitUntilStarted()

client = AmazonSQSClientBuilder
.standard()
.withCredentials(new AWSStaticCredentialsProvider(new BasicAWSCredentials("x", "x")))
.withEndpointConfiguration(new EndpointConfiguration("http://localhost:9321", "us-east-1"))
.build()

relaxedClient = AmazonSQSClientBuilder
.standard()
.withCredentials(new AWSStaticCredentialsProvider(new BasicAWSCredentials("x", "x")))
.withEndpointConfiguration(new EndpointConfiguration("http://localhost:9322", "us-east-1"))
.build()

httpClient = HttpClients.createDefault()
}

after {
client.shutdown()
relaxedClient.shutdown()
httpClient.close()

// TODO: Figure out why this intermittently isn't able to unbind cleanly
Try(strictServer.stopAndWait())
Try(relaxedServer.stopAndWait())

logger.info(s"\n---\nTest done: $currentTestName\n---\n")
}

test("should create a queue") {
client.createQueue(new CreateQueueRequest("testQueue1"))
Expand Down Expand Up @@ -2075,13 +2012,6 @@ class AmazonJavaSdkTestSuite extends AnyFunSuite with Matchers with BeforeAndAft
}
}

override protected def runTest(testName: String, args: Args): Status = {
currentTestName = testName
val result = super.runTest(testName, args)
currentTestName = null
result
}

private def createFifoQueue(suffix: Int = 1, attributes: Map[String, String] = Map.empty): String = {
val createRequest1 = new CreateQueueRequest(s"testFifoQueue$suffix.fifo")
.addAttributesEntry("FifoQueue", "true")
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
package org.elasticmq.rest.sqs

import com.amazonaws.auth.{AWSStaticCredentialsProvider, BasicAWSCredentials}
import com.amazonaws.client.builder.AwsClientBuilder.EndpointConfiguration
import com.amazonaws.services.sqs.{AmazonSQS, AmazonSQSClientBuilder}
import org.apache.http.impl.client.{CloseableHttpClient, HttpClients}
import org.elasticmq.util.Logging
import org.elasticmq.{NodeAddress, RelaxedSQSLimits}
import org.scalatest.{Args, BeforeAndAfter, Status}
import org.scalatest.funsuite.AnyFunSuite

import scala.util.Try

trait SqsClientServerCommunication extends AnyFunSuite with BeforeAndAfter with Logging {

var client: AmazonSQS = _ // strict server
var relaxedClient: AmazonSQS = _
var httpClient: CloseableHttpClient = _

var currentTestName: String = _

var strictServer: SQSRestServer = _
var relaxedServer: SQSRestServer = _
val awsAccountId = "123456789"
val awsRegion = "elasticmq"

before {
logger.info(s"\n---\nRunning test: $currentTestName\n---\n")

strictServer = SQSRestServerBuilder
.withPort(9321)
.withServerAddress(NodeAddress(port = 9321))
.withAWSAccountId(awsAccountId)
.withAWSRegion(awsRegion)
.start()

relaxedServer = SQSRestServerBuilder
.withPort(9322)
.withServerAddress(NodeAddress(port = 9322))
.withSQSLimits(RelaxedSQSLimits)
.start()

strictServer.waitUntilStarted()
relaxedServer.waitUntilStarted()

client = AmazonSQSClientBuilder
.standard()
.withCredentials(new AWSStaticCredentialsProvider(new BasicAWSCredentials("x", "x")))
.withEndpointConfiguration(new EndpointConfiguration("http://localhost:9321", "us-east-1"))
.build()

relaxedClient = AmazonSQSClientBuilder
.standard()
.withCredentials(new AWSStaticCredentialsProvider(new BasicAWSCredentials("x", "x")))
.withEndpointConfiguration(new EndpointConfiguration("http://localhost:9322", "us-east-1"))
.build()

httpClient = HttpClients.createDefault()
}

after {
client.shutdown()
relaxedClient.shutdown()
httpClient.close()

// TODO: Figure out why this intermittently isn't able to unbind cleanly
Try(strictServer.stopAndWait())
Try(relaxedServer.stopAndWait())

logger.info(s"\n---\nTest done: $currentTestName\n---\n")
}

override protected def runTest(testName: String, args: Args): Status = {
currentTestName = testName
val result = super.runTest(testName, args)
currentTestName = null
result
}

}
Loading