Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 23 additions & 14 deletions docs/structured-streaming-kafka-integration.md
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
</div>
</div>

### Creating a Kafka Source for Batch Queries
### Creating a Kafka Source for Batch Queries
If you have a use case that is better suited to batch processing,
you can create a Dataset/DataFrame for a defined range of offsets.

Expand Down Expand Up @@ -374,17 +374,24 @@ The following configurations are optional:
<td>streaming and batch</td>
<td>Rate limit on maximum number of offsets processed per trigger interval. The specified total number of offsets will be proportionally split across topicPartitions of different volume.</td>
</tr>
<tr>
<td>groupIdPrefix</td>
<td>string</td>
<td>spark-kafka-source</td>
<td>streaming and batch</td>
<td>Prefix of consumer group identifiers (`group.id`) that are generated by structured streaming queries</td>
</tr>
</table>

## Writing Data to Kafka

Here, we describe the support for writing Streaming Queries and Batch Queries to Apache Kafka. Take note that
Here, we describe the support for writing Streaming Queries and Batch Queries to Apache Kafka. Take note that
Apache Kafka only supports at least once write semantics. Consequently, when writing---either Streaming Queries
or Batch Queries---to Kafka, some records may be duplicated; this can happen, for example, if Kafka needs
to retry a message that was not acknowledged by a Broker, even though that Broker received and wrote the message record.
Structured Streaming cannot prevent such duplicates from occurring due to these Kafka write semantics. However,
Structured Streaming cannot prevent such duplicates from occurring due to these Kafka write semantics. However,
if writing the query is successful, then you can assume that the query output was written at least once. A possible
solution to remove duplicates when reading the written data could be to introduce a primary (unique) key
solution to remove duplicates when reading the written data could be to introduce a primary (unique) key
that can be used to perform de-duplication when reading.

The Dataframe being written to Kafka should have the following columns in schema:
Expand All @@ -405,8 +412,8 @@ The Dataframe being written to Kafka should have the following columns in schema
</table>
\* The topic column is required if the "topic" configuration option is not specified.<br>

The value column is the only required option. If a key column is not specified then
a ```null``` valued key column will be automatically added (see Kafka semantics on
The value column is the only required option. If a key column is not specified then
a ```null``` valued key column will be automatically added (see Kafka semantics on
how ```null``` valued key values are handled). If a topic column exists then its value
is used as the topic when writing the given row to Kafka, unless the "topic" configuration
option is set i.e., the "topic" configuration option overrides the topic column.
Expand Down Expand Up @@ -568,31 +575,33 @@ df.selectExpr("topic", "CAST(key AS STRING)", "CAST(value AS STRING)") \
.format("kafka") \
.option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
.save()

{% endhighlight %}
</div>
</div>


## Kafka Specific Configurations

Kafka's own configurations can be set via `DataStreamReader.option` with `kafka.` prefix, e.g,
`stream.option("kafka.bootstrap.servers", "host:port")`. For possible kafka parameters, see
Kafka's own configurations can be set via `DataStreamReader.option` with `kafka.` prefix, e.g,
`stream.option("kafka.bootstrap.servers", "host:port")`. For possible kafka parameters, see
[Kafka consumer config docs](http://kafka.apache.org/documentation.html#newconsumerconfigs) for
parameters related to reading data, and [Kafka producer config docs](http://kafka.apache.org/documentation/#producerconfigs)
for parameters related to writing data.

Note that the following Kafka params cannot be set and the Kafka source or sink will throw an exception:

- **group.id**: Kafka source will create a unique group id for each query automatically.
- **group.id**: Kafka source will create a unique group id for each query automatically. The user can
set the prefix of the automatically generated group.id's via the optional source option `groupIdPrefix`, default value
is "spark-kafka-source".
- **auto.offset.reset**: Set the source option `startingOffsets` to specify
where to start instead. Structured Streaming manages which offsets are consumed internally, rather
than rely on the kafka Consumer to do it. This will ensure that no data is missed when new
where to start instead. Structured Streaming manages which offsets are consumed internally, rather
than rely on the kafka Consumer to do it. This will ensure that no data is missed when new
topics/partitions are dynamically subscribed. Note that `startingOffsets` only applies when a new
streaming query is started, and that resuming will always pick up from where the query left off.
- **key.deserializer**: Keys are always deserialized as byte arrays with ByteArrayDeserializer. Use
- **key.deserializer**: Keys are always deserialized as byte arrays with ByteArrayDeserializer. Use
DataFrame operations to explicitly deserialize the keys.
- **value.deserializer**: Values are always deserialized as byte arrays with ByteArrayDeserializer.
- **value.deserializer**: Values are always deserialized as byte arrays with ByteArrayDeserializer.
Use DataFrame operations to explicitly deserialize the values.
- **key.serializer**: Keys are always serialized with ByteArraySerializer or StringSerializer. Use
DataFrame operations to explicitly serialize the keys into either strings or byte arrays.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister
// Each running query should use its own group id. Otherwise, the query may be only assigned
// partial data since Kafka will assign partitions to multiple consumers having the same group
// id. Hence, we should generate a unique id for each query.
val uniqueGroupId = s"spark-kafka-source-${UUID.randomUUID}-${metadataPath.hashCode}"
val uniqueGroupId = streamingUniqueGroupId(parameters, metadataPath)

val caseInsensitiveParams = parameters.map { case (k, v) => (k.toLowerCase(Locale.ROOT), v) }
val specifiedKafkaParams =
Expand Down Expand Up @@ -119,7 +119,7 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister
// Each running query should use its own group id. Otherwise, the query may be only assigned
// partial data since Kafka will assign partitions to multiple consumers having the same group
// id. Hence, we should generate a unique id for each query.
val uniqueGroupId = s"spark-kafka-source-${UUID.randomUUID}-${metadataPath.hashCode}"
val uniqueGroupId = streamingUniqueGroupId(parameters, metadataPath)

val caseInsensitiveParams = parameters.map { case (k, v) => (k.toLowerCase(Locale.ROOT), v) }
val specifiedKafkaParams =
Expand Down Expand Up @@ -159,7 +159,7 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister
// Each running query should use its own group id. Otherwise, the query may be only assigned
// partial data since Kafka will assign partitions to multiple consumers having the same group
// id. Hence, we should generate a unique id for each query.
val uniqueGroupId = s"spark-kafka-source-${UUID.randomUUID}-${metadataPath.hashCode}"
val uniqueGroupId = streamingUniqueGroupId(parameters, metadataPath)

val caseInsensitiveParams = parameters.map { case (k, v) => (k.toLowerCase(Locale.ROOT), v) }
val specifiedKafkaParams =
Expand Down Expand Up @@ -538,6 +538,18 @@ private[kafka010] object KafkaSourceProvider extends Logging {
.setIfUnset(ConsumerConfig.RECEIVE_BUFFER_CONFIG, 65536: java.lang.Integer)
.build()

/**
* Returns a unique consumer group (group.id), allowing the user to set the prefix of
* the consumer group
*/
private def streamingUniqueGroupId(
parameters: Map[String, String],
metadataPath: String): String = {
val groupIdPrefix = parameters
.getOrElse("groupIdPrefix", "spark-kafka-source")
s"${groupIdPrefix}-${UUID.randomUUID}-${metadataPath.hashCode}"
}

/** Class to conveniently update Kafka config params, while logging the changes */
private case class ConfigUpdater(module: String, kafkaParams: Map[String, String]) {
private val map = new ju.HashMap[String, Object](kafkaParams.asJava)
Expand Down