Skip to content

Commit ccadd81

Browse files
committed
Add example about how to convert key and value to strings
1 parent 77208d1 commit ccadd81

File tree

2 files changed

+40
-20
lines changed

2 files changed

+40
-20
lines changed

docs/structured-streaming-kafka-integration.md

Lines changed: 21 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -22,82 +22,94 @@ application. See the [Deploying](#deploying) subsection below.
2222
<div data-lang="scala" markdown="1">
2323

2424
// Subscribe to 1 topic
25-
spark
25+
val ds1 = spark
2626
.readStream
2727
.format("kafka")
2828
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
2929
.option("subscribe", "topic1")
3030
.load()
31+
ds1.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
32+
.as[(String, String)]
3133

3234
// Subscribe to multiple topics
33-
spark
35+
val ds2 = spark
3436
.readStream
3537
.format("kafka")
3638
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
3739
.option("subscribe", "topic1,topic2")
3840
.load()
41+
ds2.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
42+
.as[(String, String)]
3943

4044
// Subscribe to a pattern
41-
spark
45+
val ds3 = spark
4246
.readStream
4347
.format("kafka")
4448
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
4549
.option("subscribePattern", "topic.*")
4650
.load()
51+
ds3.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
52+
.as[(String, String)]
4753

4854
</div>
4955
<div data-lang="java" markdown="1">
5056

5157
// Subscribe to 1 topic
52-
spark
58+
Dataset<Row> ds1 = spark
5359
.readStream()
5460
.format("kafka")
5561
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
5662
.option("subscribe", "topic1")
5763
.load()
64+
ds1.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
5865

5966
// Subscribe to multiple topics
60-
spark
67+
Dataset<Row> ds2 = spark
6168
.readStream()
6269
.format("kafka")
6370
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
6471
.option("subscribe", "topic1,topic2")
6572
.load()
73+
ds2.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
6674

6775
// Subscribe to a pattern
68-
spark
76+
Dataset<Row> ds3 = spark
6977
.readStream()
7078
.format("kafka")
7179
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
7280
.option("subscribePattern", "topic.*")
7381
.load()
82+
ds3.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
7483

7584
</div>
7685
<div data-lang="python" markdown="1">
7786

7887
# Subscribe to 1 topic
79-
spark
88+
ds1 = spark
8089
.readStream()
8190
.format("kafka")
8291
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
8392
.option("subscribe", "topic1")
8493
.load()
94+
ds1.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
8595

8696
# Subscribe to multiple topics
87-
spark
97+
ds2 = spark
8898
.readStream
8999
.format("kafka")
90100
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
91101
.option("subscribe", "topic1,topic2")
92102
.load()
103+
ds2.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
93104

94105
# Subscribe to a pattern
95-
spark
106+
ds3 = spark
96107
.readStream()
97108
.format("kafka")
98109
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
99110
.option("subscribePattern", "topic.*")
100111
.load()
112+
ds3.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
101113

102114
</div>
103115
</div>

external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala

Lines changed: 19 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -145,8 +145,10 @@ class KafkaSourceSuite extends KafkaSourceTest {
145145
.option("kafka.metadata.max.age.ms", "1")
146146
.option("subscribePattern", s"topic-.*")
147147

148-
val kafka = reader.load().select("key", "value").as[(Array[Byte], Array[Byte])]
149-
val mapped = kafka.map(kv => new String(kv._2).toInt + 1)
148+
val kafka = reader.load()
149+
.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
150+
.as[(String, String)]
151+
val mapped = kafka.map(kv => kv._2.toInt + 1)
150152

151153
testStream(mapped)(
152154
StopStream
@@ -190,8 +192,10 @@ class KafkaSourceSuite extends KafkaSourceTest {
190192
.option("kafka.metadata.max.age.ms", "1")
191193
.option("subscribePattern", s"$topicPrefix-.*")
192194

193-
val kafka = reader.load().select("key", "value").as[(Array[Byte], Array[Byte])]
194-
val mapped = kafka.map(kv => new String(kv._2).toInt + 1)
195+
val kafka = reader.load()
196+
.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
197+
.as[(String, String)]
198+
val mapped = kafka.map(kv => kv._2.toInt + 1)
195199

196200
testStream(mapped)(
197201
makeSureGetOffsetCalled,
@@ -272,8 +276,10 @@ class KafkaSourceSuite extends KafkaSourceTest {
272276
.option("kafka.bootstrap.servers", testUtils.brokerAddress)
273277
.option("kafka.metadata.max.age.ms", "1")
274278
options.foreach { case (k, v) => reader.option(k, v) }
275-
val kafka = reader.load().select("key", "value").as[(Array[Byte], Array[Byte])]
276-
val mapped = kafka.map(kv => new String(kv._2).toInt + 1)
279+
val kafka = reader.load()
280+
.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
281+
.as[(String, String)]
282+
val mapped = kafka.map(kv => kv._2.toInt + 1)
277283

278284
testStream(mapped)(
279285
makeSureGetOffsetCalled,
@@ -309,8 +315,10 @@ class KafkaSourceSuite extends KafkaSourceTest {
309315
.option("kafka.bootstrap.servers", testUtils.brokerAddress)
310316
.option("kafka.metadata.max.age.ms", "1")
311317
options.foreach { case (k, v) => reader.option(k, v) }
312-
val kafka = reader.load().select("key", "value").as[(Array[Byte], Array[Byte])]
313-
val mapped = kafka.map(kv => new String(kv._2).toInt + 1)
318+
val kafka = reader.load()
319+
.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
320+
.as[(String, String)]
321+
val mapped = kafka.map(kv => kv._2.toInt + 1)
314322

315323
testStream(mapped)(
316324
AddKafkaData(Set(topic), 4, 5, 6), // Add data when stream is stopped
@@ -368,10 +376,10 @@ class KafkaSourceStressSuite extends KafkaSourceTest with BeforeAndAfter {
368376
.option("subscribePattern", "stress.*")
369377
.option("failOnCorruptMetadata", "false")
370378
.load()
371-
.select("key", "value")
372-
.as[(Array[Byte], Array[Byte])]
379+
.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
380+
.as[(String, String)]
373381

374-
val mapped = kafka.map(kv => new String(kv._2).toInt + 1)
382+
val mapped = kafka.map(kv => kv._2.toInt + 1)
375383

376384
runStressTest(
377385
mapped,

0 commit comments

Comments
 (0)