Skip to content
This repository was archived by the owner on Mar 11, 2024. It is now read-only.

Commit cc4b0a5

Browse files
committed
working poc
1 parent 4c18cb7 commit cc4b0a5

File tree

11 files changed

+86
-56
lines changed

11 files changed

+86
-56
lines changed

flinkspector-core/src/test/scala/org/flinkspector/core/quantify/MatchTuplesSpec.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,9 +23,9 @@ import org.hamcrest.core.Is
2323
import scala.collection.JavaConversions._
2424

2525
class MatchTuplesSpec extends CoreSpec {
26-
"The MatchFields" should "store a list of [[KeyMatcherPair]]s" in {
26+
"The MatchTuples" should "store a list of [[KeyMatcherPair]]s" in {
2727
val matcher = Is.is(1)
28-
val block = new MatchFields[Fluple3[Int, Int, Int]]("1", "2", "3")
28+
val block = new MatchTuples[Fluple3[Int, Int, Int]]("1", "2", "3")
2929
block.assertThat("1", matcher)
3030
block.assertThat("2", matcher)
3131
block.assertThat("2", matcher)
@@ -93,7 +93,7 @@ class MatchTuplesSpec extends CoreSpec {
9393
trait AssertBlockCase {
9494
val matcher = Is.is(1)
9595
val block =
96-
new MatchFields[Fluple4[Int, Int, Int, Int]]("1", "2", "3", "4")
96+
new MatchTuples[Fluple4[Int, Int, Int, Int]]("1", "2", "3", "4")
9797
block.assertThat("1", matcher)
9898
block.assertThat("2", matcher)
9999
block.assertThat("3", matcher)

flinkspector-dataset/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@
3838
</dependency>
3939
<dependency>
4040
<groupId>org.apache.flink</groupId>
41-
<artifactId>flink-java_${scala.binary.version}</artifactId>
41+
<artifactId>flink-java</artifactId>
4242
<version>${flink.version}</version>
4343
</dependency>
4444
</dependencies>

flinkspector-dataset/src/test/java/org/flinkspector/dataset/examples/BatchTest.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020
import org.apache.flink.api.java.DataSet;
2121
import org.apache.flink.api.java.tuple.Tuple2;
2222
import org.flinkspector.core.collection.ExpectedRecords;
23-
import org.flinkspector.core.quantify.MatchFields;
2423
import org.flinkspector.core.quantify.MatchTuples;
2524
import org.flinkspector.core.quantify.OutputMatcher;
2625
import org.flinkspector.core.trigger.FinishAtCount;

flinkspector-datastream-scala_2.11/pom.xml renamed to flinkspector-datastream-scala/pom.xml

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -35,23 +35,24 @@
3535
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
3636
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
3737
<parent>
38-
<artifactId>flinkspector-parent</artifactId>
3938
<groupId>org.flinkspector</groupId>
40-
<version>0.1-SNAPSHOT</version>
39+
<artifactId>flinkspector-parent_2.11</artifactId>
40+
<version>0.8-SNAPSHOT</version>
4141
</parent>
4242
<modelVersion>4.0.0</modelVersion>
4343

44-
<artifactId>flinkspector-datastream-scala_${scala.binary.version}</artifactId>
44+
<artifactId>flinkspector-datastream-scala_2.11</artifactId>
45+
<name>flinkspector-datastream-scala</name>
4546

4647
<dependencies>
4748
<dependency>
4849
<groupId>org.flinkspector</groupId>
49-
<artifactId>flinkspector-core</artifactId>
50+
<artifactId>flinkspector-core_${scala.binary.version}</artifactId>
5051
<version>${project.version}</version>
5152
</dependency>
5253
<dependency>
5354
<groupId>org.flinkspector</groupId>
54-
<artifactId>flinkspector-datastream</artifactId>
55+
<artifactId>flinkspector-datastream_${scala.binary.version}</artifactId>
5556
<version>${project.version}</version>
5657
</dependency>
5758
<dependency>
@@ -100,7 +101,7 @@
100101
<plugin>
101102
<groupId>net.alchim31.maven</groupId>
102103
<artifactId>scala-maven-plugin</artifactId>
103-
<version>3.1.4</version>
104+
<version>3.2.2</version>
104105
<executions>
105106
<!-- Run scala compiler in the process-resources phase, so that dependencies on
106107
scala classes can be resolved later in the (Java) compile phase -->
Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,11 @@
1717
package org.flinkspector.scala.datastream
1818

1919
import org.apache.flink.api.common.typeinfo.TypeInformation
20-
import org.apache.flink.runtime.StreamingMode
20+
import org.apache.flink.streaming.api.datastream.DataStreamSource
2121
import org.apache.flink.streaming.api.environment.{StreamExecutionEnvironment => JavaEnv}
2222
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
2323
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord
24-
import org.apache.flink.test.util.{ForkableFlinkMiniCluster, TestBaseUtils}
24+
import org.apache.flink.test.util.TestBaseUtils
2525
import org.flinkspector.core.input.Input
2626
import org.flinkspector.core.runtime.OutputVerifier
2727
import org.flinkspector.core.trigger.VerifyFinishedTrigger
@@ -78,9 +78,9 @@ class DataStreamTestEnvironment(testEnv: org.flinkspector.datastream.DataStreamT
7878
* @param data The array of elements to startWith the data stream from.
7979
* @return The data stream representing the given array of elements
8080
*/
81-
@SafeVarargs def fromElementsWithTimeStamp[OUT: ClassTag: TypeInformation](data: StreamRecord[OUT]*): DataStream[OUT] = {
81+
@SafeVarargs def fromElementsWithTimeStamp[OUT: ClassTag: TypeInformation](data: StreamRecord[OUT]*): DataStreamSource[OUT] = {
8282
val typeInfo = implicitly[TypeInformation[OUT]]
83-
testEnv.fromCollectionWithTimestamp(data.asJava,typeInfo)
83+
testEnv.fromCollectionWithTimestamp(data.asJava,typeInfo, false)
8484
}
8585

8686
/**
@@ -89,9 +89,9 @@ class DataStreamTestEnvironment(testEnv: org.flinkspector.datastream.DataStreamT
8989
* @param input The { @link EventTimeInput} to startWith the data stream from.
9090
* @return The data stream representing the given input.
9191
*/
92-
def fromInput[OUT: ClassTag: TypeInformation](input: EventTimeInput[OUT]): DataStream[OUT] = {
92+
def fromInput[OUT: ClassTag: TypeInformation](input: EventTimeInput[OUT]): DataStreamSource[OUT] = {
9393
val typeInfo = implicitly[TypeInformation[OUT]]
94-
testEnv.fromInput(input,typeInfo)
94+
testEnv.fromInput(input)
9595
}
9696

9797
/**
@@ -100,9 +100,9 @@ class DataStreamTestEnvironment(testEnv: org.flinkspector.datastream.DataStreamT
100100
* @param input The { @link Input} to startWith the data stream from.
101101
* @return The data stream representing the given input.
102102
*/
103-
def fromInput[OUT: ClassTag: TypeInformation](input: Input[OUT]): DataStream[OUT] = {
103+
def fromInput[OUT: ClassTag: TypeInformation](input: Input[OUT]): DataStreamSource[OUT] = {
104104
val typeInfo = implicitly[TypeInformation[OUT]]
105-
testEnv.fromInput(input,typeInfo)
105+
testEnv.fromInput(input)
106106
}
107107

108108
/**
@@ -119,9 +119,9 @@ class DataStreamTestEnvironment(testEnv: org.flinkspector.datastream.DataStreamT
119119
* @param data The collection of elements to startWith the data stream from.
120120
* @return The data stream representing the given collection
121121
*/
122-
def fromCollectionWithTimestamp[OUT: ClassTag: TypeInformation](data: Seq[StreamRecord[OUT]]): DataStream[OUT] = {
122+
def fromCollectionWithTimestamp[OUT: ClassTag: TypeInformation](data: Seq[StreamRecord[OUT]]): DataStreamSource[OUT] = {
123123
val typeInfo = implicitly[TypeInformation[OUT]]
124-
testEnv.fromCollectionWithTimestamp(data.asJava,typeInfo)
124+
testEnv.fromCollectionWithTimestamp(data.asJava, false)
125125
}
126126

127127
/**
@@ -155,7 +155,7 @@ class DataStreamTestEnvironment(testEnv: org.flinkspector.datastream.DataStreamT
155155
}
156156

157157
def close(): Unit = {
158-
testEnv.terminate()
158+
//TODO: figure this out testEnv
159159
}
160160
}
161161

@@ -171,7 +171,7 @@ object DataStreamTestEnvironment {
171171
@throws(classOf[Exception])
172172
def createTestEnvironment(parallelism: Int): DataStreamTestEnvironment = {
173173
val tasksSlots: Int = Runtime.getRuntime.availableProcessors
174-
val cluster: ForkableFlinkMiniCluster = TestBaseUtils.startCluster(1, tasksSlots, StreamingMode.STREAMING, false, false, true)
174+
val cluster = TestBaseUtils.startCluster(1, tasksSlots, false, false, true)
175175
val env = new org.flinkspector.datastream.DataStreamTestEnvironment(cluster, parallelism)
176176
new DataStreamTestEnvironment(env)
177177
}
Lines changed: 35 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -21,17 +21,20 @@ import java.util.concurrent.TimeUnit
2121

2222
import org.apache.flink.api.common.typeinfo.TypeInformation
2323
import org.apache.flink.streaming.api.TimeCharacteristic
24+
import org.apache.flink.streaming.api.datastream.DataStreamSource
2425
import org.apache.flink.streaming.api.scala.DataStream
25-
import org.flinkspector.core.collection.{MatcherBuilder, ExpectedRecords}
26-
import org.flinkspector.core.input.{InputBuilder, Input}
26+
import org.flinkspector.core.Order
27+
import org.flinkspector.core.collection.{ExpectedRecords, MatcherBuilder}
28+
import org.flinkspector.core.input.{Input, InputBuilder}
2729
import org.flinkspector.core.quantify.HamcrestVerifier
2830
import org.flinkspector.core.runtime.OutputVerifier
2931
import org.flinkspector.core.trigger.VerifyFinishedTrigger
3032
import org.flinkspector.datastream.functions.TestSink
31-
import org.flinkspector.datastream.input.{EventTimeInputBuilder, EventTimeInput}
32-
import org.flinkspector.datastream.input.time.{Before, After}
33+
import org.flinkspector.datastream.input.{EventTimeInput, EventTimeInputBuilder}
34+
import org.flinkspector.datastream.input.time.{After, Before}
3335
import org.hamcrest.Matcher
3436
import org.scalatest.{BeforeAndAfterEach, Suite}
37+
import org.apache.flink.streaming.api.scala._
3538

3639
import scala.reflect.ClassTag
3740

@@ -68,7 +71,7 @@ trait FlinkDataStream extends BeforeAndAfterEach { this: Suite =>
6871
* @param input to emit.
6972
* @return a DataStreamSource generating the input.
7073
*/
71-
def createTestStream[OUT: ClassTag: TypeInformation](input: EventTimeInput[OUT]): DataStream[OUT] = {
74+
def createTestStream[OUT: ClassTag: TypeInformation](input: EventTimeInput[OUT]): DataStreamSource[OUT] = {
7275
val typeInfo = implicitly[TypeInformation[OUT]]
7376
testEnv.fromInput(input)
7477
}
@@ -92,7 +95,7 @@ trait FlinkDataStream extends BeforeAndAfterEach { this: Suite =>
9295
* @return a DataStream generating the input.
9396
*/
9497
def createTestStream[OUT: ClassTag: TypeInformation](input: Input[OUT]): DataStream[OUT] = {
95-
testEnv.fromInput(input)
98+
new DataStream(testEnv.fromInput(input))
9699
}
97100

98101
/**
@@ -116,6 +119,26 @@ trait FlinkDataStream extends BeforeAndAfterEach { this: Suite =>
116119
testEnv.createTestSink(verifier, trigger)
117120
}
118121

122+
/**
123+
* Creates a TestSink using {@link org.hamcrest.Matcher} to verify the output.
124+
*
125+
* @param function of generic type IN
126+
* @return the created sink.
127+
*/
128+
def createTestSink[IN](function: Iterable[IN] => Any, trigger: VerifyFinishedTrigger[_]): TestSink[IN] = {
129+
testEnv.createTestSink(new FunctionVerifier[IN](function), trigger)
130+
}
131+
132+
/**
133+
* Creates a TestSink using {@link org.hamcrest.Matcher} to verify the output.
134+
*
135+
* @param function of generic type IN
136+
* @return the created sink.
137+
*/
138+
def createTestSink[IN](function: Iterable[IN] => Any): TestSink[IN] = {
139+
testEnv.createTestSink(new FunctionVerifier[IN](function))
140+
}
141+
119142
/**
120143
* Creates a TestSink using {@link org.hamcrest.Matcher} to verify the output.
121144
*
@@ -153,8 +176,8 @@ trait FlinkDataStream extends BeforeAndAfterEach { this: Suite =>
153176
testEnv.setParallelism(parallelism)
154177
}
155178

156-
def fulfill[T](matcher: Matcher[JIterable[T]]) : FulfillWord[T] = {
157-
new FulfillWord[T](matcher)
179+
def fulfill[T](function: Iterable[T] => Any) : FulfillWord[T] = {
180+
new FulfillWord[T](function)
158181
}
159182

160183
class StreamShouldWrapper[T](val stream: DataStream[T]){
@@ -195,7 +218,7 @@ trait FlinkDataStream extends BeforeAndAfterEach { this: Suite =>
195218
}
196219

197220
def startWith[T](record: T): EventTimeInputBuilder[T] = {
198-
return EventTimeInputBuilder.create(record)
221+
return EventTimeInputBuilder.startWith(record)
199222
}
200223

201224
def emit[T](elem: T): InputBuilder[T] = {
@@ -210,8 +233,8 @@ trait FlinkDataStream extends BeforeAndAfterEach { this: Suite =>
210233
return n
211234
}
212235

213-
val strict: MatcherBuilder.Order = MatcherBuilder.Order.STRICT
214-
val notStrict: MatcherBuilder.Order = MatcherBuilder.Order.NONSTRICT
236+
val strict: Order = Order.STRICT
237+
val notStrict: Order = Order.NONSTRICT
215238
val seconds: TimeUnit = TimeUnit.SECONDS
216239
val minutes: TimeUnit = TimeUnit.MINUTES
217240
val hours: TimeUnit = TimeUnit.HOURS
@@ -220,7 +243,7 @@ trait FlinkDataStream extends BeforeAndAfterEach { this: Suite =>
220243

221244
}
222245

223-
final class FulfillWord[T](val matcher: Matcher[JIterable[T]]) {
246+
final class FulfillWord[T](val matcher: Iterable[T] => Any) {
224247

225248
var trigger: Option[VerifyFinishedTrigger[_]] = None
226249

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
package org.flinkspector.scala.datastream
2+
3+
import java.util
4+
import scala.collection.JavaConverters._
5+
6+
import org.flinkspector.core.runtime.{FlinkTestFailedException, SimpleOutputVerifier}
7+
8+
/**
9+
* Created by willi on 17.07.17.
10+
*/
11+
class FunctionVerifier[T](val function: Iterable[T] => Any) extends SimpleOutputVerifier[T] {
12+
13+
@throws[FlinkTestFailedException]
14+
override def verify(output: util.List[T]): Unit = {
15+
function(output.asScala)
16+
}
17+
}
18+
Lines changed: 9 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -16,29 +16,25 @@ package org.flinkspector.scala.datastream
1616
* limitations under the License.
1717
*/
1818

19-
import org.apache.flink.api.scala._
19+
import org.apache.flink.streaming.api.scala._
2020
import org.flinkspector.core.collection.ExpectedRecords
2121
import org.flinkspector.core.input.InputBuilder
2222

2323
//needs to be defined top level
2424
case class Output(key: String, value: Int)
2525

26-
class DataStreamSpec extends CoreSpec with FlinkDataStream{
26+
class DataStreamSpec extends CoreSpec with FlinkDataStream {
2727

2828
"basic test" should "work" in {
2929

3030
//create a test stream
31-
val stream = createTestStream(List(1, 2, 3, 4)).map(_ + 1)
31+
val stream = createTestStream[Int](List(1, 2, 3, 4))
32+
.map(_ + 1)
3233

33-
//build a matcher
34-
val expected = ExpectedRecords
35-
.create(2)
36-
.expect(3)
37-
.expect(4)
38-
.expect(5)
39-
40-
//use the matcher on the datastream
41-
stream should fulfill(expected)
34+
//test the output
35+
stream should fulfill {
36+
_ should contain allOf(2, 3, 4, 5)
37+
}
4238
executeTest()
4339

4440
}
@@ -54,13 +50,7 @@ class DataStreamSpec extends CoreSpec with FlinkDataStream{
5450

5551
val stream = createTestStream(input)
5652

57-
stream should fulfill {
58-
//use a case class to map the tuple
59-
new AssertBlock[(String,Int),Output] {
60-
field(v.key shouldBe a[String])
61-
field(v.value should be > 4)
62-
}
63-
}
53+
stream should fulfill(_ should contain(("check", 5)))
6454
executeTest()
6555
}
6656
}

0 commit comments

Comments
 (0)