Skip to content

Commit ec2b0b6

Browse files
authored
Merge pull request #39 from jpzk/release/2.1
Release/2.1
2 parents fb313bf + a372233 commit ec2b0b6

File tree

6 files changed

+86
-22
lines changed

6 files changed

+86
-22
lines changed

CHANGELOG.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,11 @@
11
# Changelog
22

3+
## Mocked Streams 2.1
4+
5+
* Added .inputWithTime to allow custom timestamps
6+
* Thanks to Dan Hamilton for .inputWithTime implementation
7+
* Added Dan Hamilton to CONTRIBUTORS.md
8+
39
## Mocked Streams 2.0
410

511
* Build against Apache Kafka 2.0

CONTRIBUTORS.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,3 +5,4 @@
55
* Svend Vanderveken
66
* Daniel Wojda
77
* Michal Dziemianko
8+
* Dan Hamilton

README.md

Lines changed: 20 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,17 +4,18 @@
44
[![codecov](https://codecov.io/gh/jpzk/mockedstreams/branch/master/graph/badge.svg)](https://codecov.io/gh/jpzk/mockedstreams) [![License](http://img.shields.io/:license-Apache%202-grey.svg)](http://www.apache.org/licenses/LICENSE-2.0.txt) [![GitHub stars](https://img.shields.io/github/stars/jpzk/mockedstreams.svg?style=flat)](https://github.com/jpzk/mockedstreams/stargazers)
55

66

7-
Mocked Streams 2.0.0 [(git)](https://github.com/jpzk/mockedstreams) is a library for Scala 2.11 and 2.12 which allows you to **unit-test processing topologies** of [Kafka Streams](https://kafka.apache.org/documentation#streams) applications (since Apache Kafka >=0.10.1) **without Zookeeper and Kafka Brokers**. Further, you can use your favourite Scala testing framework e.g. [ScalaTest](http://www.scalatest.org/) and [Specs2](https://etorreborre.github.io/specs2/). Mocked Streams is located at the Maven Central Repository, therefore you just have to add the following to your [SBT dependencies](http://www.scala-sbt.org/0.13/docs/Library-Dependencies.html):
7+
Mocked Streams 2.1.0 [(git)](https://github.com/jpzk/mockedstreams) is a library for Scala 2.11 and 2.12 which allows you to **unit-test processing topologies** of [Kafka Streams](https://kafka.apache.org/documentation#streams) applications (since Apache Kafka >=0.10.1) **without Zookeeper and Kafka Brokers**. Further, you can use your favourite Scala testing framework e.g. [ScalaTest](http://www.scalatest.org/) and [Specs2](https://etorreborre.github.io/specs2/). Mocked Streams is located at the Maven Central Repository, therefore you just have to add the following to your [SBT dependencies](http://www.scala-sbt.org/0.13/docs/Library-Dependencies.html):
88

9-
libraryDependencies += "com.madewithtea" %% "mockedstreams" % "2.0.0" % "test"
9+
libraryDependencies += "com.madewithtea" %% "mockedstreams" % "2.1.0" % "test"
1010

1111
Java 8 port of Mocked Streams is [Mockafka](https://github.com/carlosmenezes/mockafka)
1212

1313
## Apache Kafka Compatibility
1414

1515
| Mocked Streams Version | Apache Kafka Version |
1616
|------------- |-------------|
17-
| 2.0.0 | 2.0.0.0 |
17+
| 2.1.0 | 2.0.0.0 |
18+
2.0.0 | 2.0.0.0 |
1819
| 1.8.0 | 1.1.1.0 |
1920
| 1.7.0 | 1.1.0.0 |
2021
| 1.6.0 | 1.0.1.0 |
@@ -110,6 +111,22 @@ When you define your state stores via .stores(stores: Seq[String]) since 1.2 and
110111
mstreams.windowStateTable("store-name", "x") shouldEqual someMapX
111112
mstreams.windowStateTable("store-name", "y") shouldEqual someMapY
112113

114+
## Adding Timestamps
115+
116+
With .input the input records timestamps are set to 0 default timestamp of 0. This e.g. prevents testing Join windows of Kafka streams as it cannot produce records with different timestamps. However, using .inputWithTime allows adding timestamps like in the following example:
117+
118+
val inputA = Seq(
119+
("x", int(1), 1000L),
120+
("x", int(1), 1001L),
121+
("x", int(1), 1002L)
122+
)
123+
124+
val builder = MockedStreams()
125+
.topology(topology1WindowOutput)
126+
.inputWithTime(InputCTopic, strings, ints, inputA)
127+
.stores(Seq(StoreName))
128+
129+
113130
## Custom Streams Configuration
114131

115132
Sometimes you need to pass a custom configuration to Kafka Streams:

build.sbt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11

22
lazy val commonSettings = Seq(
33
organization := "com.madewithtea",
4-
version := "2.0.0",
4+
version := "2.1.0",
55
scalaVersion := "2.12.6",
66
crossScalaVersions := Seq("2.12.6","2.11.12"),
77
description := "Topology Unit-Testing Library for Apache Kafka / Kafka Streams",

src/main/scala/MockedStreams.scala

Lines changed: 28 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -51,20 +51,11 @@ object MockedStreams {
5151

5252
def stores(stores: Seq[String]) = this.copy(stateStores = stores)
5353

54-
def input[K, V](topic: String, key: Serde[K], value: Serde[V], newRecords: Seq[(K, V)]) = {
55-
val keySer = key.serializer
56-
val valSer = value.serializer
57-
58-
val factory = new ConsumerRecordFactory[K, V](keySer, valSer)
59-
60-
val updatedRecords = newRecords.foldLeft(inputs) {
61-
case (events, (k, v)) =>
62-
val newRecord = factory.create(topic, k, v)
63-
events :+ newRecord
64-
}
54+
def input[K, V](topic: String, key: Serde[K], value: Serde[V], records: Seq[(K, V)]) =
55+
_input(topic, key, value, Left(records))
6556

66-
this.copy(inputs = updatedRecords)
67-
}
57+
def inputWithTime[K, V](topic: String, key: Serde[K], value: Serde[V], records: Seq[(K, V, Long)]) =
58+
_input(topic, key, value, Right(records))
6859

6960
def output[K, V](topic: String, key: Serde[K], value: Serde[V], size: Int) = {
7061
if (size <= 0) throw new ExpectedOutputIsEmpty
@@ -78,17 +69,19 @@ object MockedStreams {
7869
}
7970
}
8071

81-
def outputTable[K, V](topic: String, key: Serde[K], value: Serde[V], size: Int): Map[K, V] =
72+
def outputTable[K, V](topic: String, key: Serde[K], value: Serde[V], size: Int) =
8273
output[K, V](topic, key, value, size).toMap
8374

84-
def stateTable(name: String): Map[Nothing, Nothing] = withProcessedDriver { driver =>
75+
def stateTable(name: String) = withProcessedDriver { driver =>
8576
val records = driver.getKeyValueStore(name).all()
8677
val list = records.asScala.toList.map { record => (record.key, record.value) }
8778
records.close()
8879
list.toMap
8980
}
9081

91-
def windowStateTable[K, V](name: String, key: K, timeFrom: Long = 0,
82+
def windowStateTable[K, V](name: String,
83+
key: K,
84+
timeFrom: Long = 0,
9285
timeTo: Long = Long.MaxValue) = withProcessedDriver { driver =>
9386
val store = driver.getStateStore(name).asInstanceOf[ReadOnlyWindowStore[K, V]]
9487
val records = store.fetch(key, timeFrom, timeTo)
@@ -97,6 +90,23 @@ object MockedStreams {
9790
list.toMap
9891
}
9992

93+
private def _input[K, V](topic: String, key: Serde[K], value: Serde[V],
94+
records: Either[Seq[(K, V)], Seq[(K, V, Long)]]) = {
95+
val keySer = key.serializer
96+
val valSer = value.serializer
97+
val factory = new ConsumerRecordFactory[K, V](keySer, valSer)
98+
99+
val updatedRecords = records match {
100+
case Left(withoutTime) => withoutTime.foldLeft(inputs) {
101+
case (events, (k, v)) => events :+ factory.create(topic, k, v)
102+
}
103+
case Right(withTime) => withTime.foldLeft(inputs) {
104+
case (events, (k, v, timestamp)) => events :+ factory.create(topic, k, v, timestamp)
105+
}
106+
}
107+
this.copy(inputs = updatedRecords)
108+
}
109+
100110
// state store is temporarily created in ProcessorTopologyTestDriver
101111
private def stream = {
102112
val props = new Properties
@@ -110,8 +120,7 @@ object MockedStreams {
110120
inputs.foreach(driver.pipeInput)
111121

112122
private def withProcessedDriver[T](f: Driver => T): T = {
113-
if(inputs.isEmpty)
114-
throw new NoInputSpecified
123+
if (inputs.isEmpty) throw new NoInputSpecified
115124

116125
val driver = stream
117126
produce(driver)
@@ -126,4 +135,5 @@ object MockedStreams {
126135
class NoInputSpecified extends Exception("No input fixtures specified. Call input() method on builder.")
127136

128137
class ExpectedOutputIsEmpty extends Exception("Output size needs to be greater than 0.")
138+
129139
}

src/test/scala/MockedStreamsSpec.scala

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -174,6 +174,20 @@ class MockedStreamsSpec extends FlatSpec with Matchers {
174174
output shouldEqual expected
175175
}
176176

177+
178+
it should "accept consumer records with custom timestamps" in {
179+
180+
import Fixtures.Multi._
181+
182+
val builder = MockedStreams()
183+
.topology(topology1WindowOutput)
184+
.inputWithTime(InputCTopic, strings, ints, inputCWithTimeStamps)
185+
.stores(Seq(StoreName))
186+
187+
builder.windowStateTable(StoreName, "x")
188+
.shouldEqual(expectedCWithTimeStamps.toMap)
189+
}
190+
177191
class LastInitializer extends Initializer[Integer] {
178192
override def apply() = 0
179193
}
@@ -216,11 +230,27 @@ class MockedStreamsSpec extends FlatSpec with Matchers {
216230
val inputA = Seq(("x", int(1)), ("y", int(2)))
217231
val inputB = Seq(("x", int(4)), ("y", int(3)))
218232
val inputC = Seq(("x", int(1)), ("x", int(1)), ("x", int(2)), ("y", int(1)))
233+
234+
val inputCWithTimeStamps = Seq(
235+
("x", int(1), 1000L),
236+
("x", int(1), 1000L),
237+
("x", int(1), 1001L),
238+
("x", int(1), 1001L),
239+
("x", int(1), 1002L)
240+
)
241+
219242
val expectedA = Seq(("x", int(5)), ("y", int(5)))
220243
val expectedB = Seq(("x", int(3)), ("y", int(1)))
244+
221245
val expectedCx = Seq((1, 2), (2, 1))
222246
val expectedCy = Seq((1, 1))
223247

248+
val expectedCWithTimeStamps = Seq(
249+
1000 -> 2,
250+
1001 -> 2,
251+
1002 -> 1
252+
)
253+
224254
val strings = Serdes.String()
225255
val ints = Serdes.Integer()
226256
val serdes = Consumed.`with`(strings, ints)

0 commit comments

Comments
 (0)