Skip to content

Commit 334816a

Browse files
eason-yuchen-liuHeartSaVioR
authored andcommitted
[SPARK-48411][SS][PYTHON] Add E2E test for DropDuplicateWithinWatermark
### What changes were proposed in this pull request? This PR adds a test for API DropDuplicateWithinWatermark in Python, which was previously missing. ### Why are the changes needed? Check the correctness of API DropDuplicateWithinWatermark. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Passed: ``` python/run-tests --testnames pyspark.sql.tests.streaming.test_streaming python/run-tests --testnames pyspark.sql.tests.connect.streaming.test_parity_streaming ``` ### Was this patch authored or co-authored using generative AI tooling? No. Closes #46740 from eason-yuchen-liu/DropDuplicateWithinWatermark_test. Authored-by: Yuchen Liu <yuchen.liu@databricks.com> Signed-off-by: Jungtaek Lim <kabhwan.opensource@gmail.com>
1 parent 72df3cb commit 334816a

2 files changed

Lines changed: 35 additions & 1 deletion

File tree

python/pyspark/sql/tests/streaming/test_streaming.py

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@
2222

2323
from pyspark.sql import Row
2424
from pyspark.sql.functions import lit
25-
from pyspark.sql.types import StructType, StructField, IntegerType, StringType
25+
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, TimestampType
2626
from pyspark.testing.sqlutils import ReusedSQLTestCase
2727
from pyspark.errors import PySparkValueError
2828

@@ -413,6 +413,30 @@ def test_streaming_with_temporary_view(self):
413413
set([Row(value="view_a"), Row(value="view_b"), Row(value="view_c")]), set(result)
414414
)
415415

416+
def test_streaming_drop_duplicate_within_watermark(self):
417+
"""
418+
This verifies dropDuplicatesWithinWatermark works with a streaming dataframe.
419+
"""
420+
user_schema = StructType().add("time", TimestampType()).add("id", "integer")
421+
df = (
422+
self.spark.readStream.option("sep", ";")
423+
.schema(user_schema)
424+
.csv("python/test_support/sql/streaming/time")
425+
)
426+
q1 = (
427+
df.withWatermark("time", "2 seconds")
428+
.dropDuplicatesWithinWatermark(["id"])
429+
.writeStream.outputMode("update")
430+
.format("memory")
431+
.queryName("test_streaming_drop_duplicates_within_wm")
432+
.start()
433+
)
434+
self.assertTrue(q1.isActive)
435+
q1.processAllAvailable()
436+
q1.stop()
437+
result = self.spark.sql("SELECT * FROM test_streaming_drop_duplicates_within_wm").collect()
438+
self.assertTrue(len(result) >= 6 and len(result) <= 9)
439+
416440

417441
class StreamingTests(StreamingTestsMixin, ReusedSQLTestCase):
418442
def _assert_exception_tree_contains_msg(self, exception, msg):
Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
2024-05-24 15:03:20;1
2+
2024-05-24 15:03:21;2
3+
2024-05-24 15:03:24;3
4+
2024-05-24 15:03:25;3
5+
2024-05-24 15:03:31;4
6+
2024-05-24 15:03:31;1
7+
2024-05-24 15:03:32;3
8+
2024-05-24 15:03:45;2
9+
2024-05-24 15:03:46;5
10+
2024-05-24 15:03:50;6

0 commit comments

Comments
 (0)