Skip to content

Commit bb9f635

Browse files
committed
Remove deprecated API in KafkaTestUtils
1 parent bbc2475 commit bb9f635

1 file changed

Lines changed: 11 additions & 8 deletions

File tree

external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaTestUtils.scala

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -30,10 +30,10 @@ import scala.util.control.NonFatal
3030

3131
import kafka.admin.AdminUtils
3232
import kafka.api.Request
33-
import kafka.producer.{KeyedMessage, Producer, ProducerConfig}
34-
import kafka.serializer.StringEncoder
3533
import kafka.server.{KafkaConfig, KafkaServer}
3634
import kafka.utils.ZkUtils
35+
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
36+
import org.apache.kafka.common.serialization.StringSerializer
3737
import org.apache.zookeeper.server.{NIOServerCnxnFactory, ZooKeeperServer}
3838

3939
import org.apache.spark.SparkConf
@@ -68,7 +68,7 @@ private[kafka010] class KafkaTestUtils extends Logging {
6868
private var server: KafkaServer = _
6969

7070
// Kafka producer
71-
private var producer: Producer[String, String] = _
71+
private var producer: KafkaProducer[String, String] = _
7272

7373
// Flag to test whether the system is correctly started
7474
private var zkReady = false
@@ -178,8 +178,9 @@ private[kafka010] class KafkaTestUtils extends Logging {
178178

179179
/** Send the array of messages to the Kafka broker */
180180
def sendMessages(topic: String, messages: Array[String]): Unit = {
181-
producer = new Producer[String, String](new ProducerConfig(producerConfiguration))
182-
producer.send(messages.map { new KeyedMessage[String, String](topic, _ ) }: _*)
181+
producer = new KafkaProducer[String, String](producerConfiguration)
182+
val records = messages.map { new ProducerRecord[String, String](topic, _) }
183+
records.map(producer.send)
183184
producer.close()
184185
producer = null
185186
}
@@ -198,10 +199,12 @@ private[kafka010] class KafkaTestUtils extends Logging {
198199

199200
private def producerConfiguration: Properties = {
200201
val props = new Properties()
201-
props.put("metadata.broker.list", brokerAddress)
202-
props.put("serializer.class", classOf[StringEncoder].getName)
202+
props.put("bootstrap.servers", brokerAddress)
203+
props.put("value.serializer", classOf[StringSerializer].getName)
204+
// Key serializer is required.
205+
props.put("key.serializer", classOf[StringSerializer].getName)
203206
// wait for all in-sync replicas to ack sends
204-
props.put("request.required.acks", "-1")
207+
props.put("acks", "all")
205208
props
206209
}
207210

0 commit comments

Comments
 (0)