Skip to content

Commit 714f4d7

Browse files
tejasapatilsrowen
authored andcommitted
[SPARK-15601][CORE] CircularBuffer's toString() to print only the contents written if buffer isn't full
1. The class allocated 4x space than needed as it was using `Int` to store the `Byte` values 2. If CircularBuffer isn't full, currently toString() will print some garbage chars along with the content written as is tries to print the entire array allocated for the buffer. The fix is to keep track of buffer getting full and don't print the tail of the buffer if it isn't full (suggestion by sameeragarwal over #12194 (comment)) 3. Simplified `toString()` Added new test case Author: Tejas Patil <[email protected]> Closes #13351 from tejasapatil/circular_buffer. (cherry picked from commit ac38bdc) Signed-off-by: Sean Owen <[email protected]>
1 parent ea84b33 commit 714f4d7

File tree

2 files changed

+44
-25
lines changed

2 files changed

+44
-25
lines changed

core/src/main/scala/org/apache/spark/util/Utils.scala

Lines changed: 14 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import java.lang.management.ManagementFactory
2222
import java.net._
2323
import java.nio.ByteBuffer
2424
import java.nio.channels.Channels
25+
import java.nio.charset.StandardCharsets
2526
import java.util.concurrent._
2627
import java.util.{Locale, Properties, Random, UUID}
2728
import javax.net.ssl.HttpsURLConnection
@@ -2308,29 +2309,24 @@ private[spark] class RedirectThread(
23082309
* the toString method.
23092310
*/
23102311
private[spark] class CircularBuffer(sizeInBytes: Int = 10240) extends java.io.OutputStream {
2311-
var pos: Int = 0
2312-
var buffer = new Array[Int](sizeInBytes)
2312+
private var pos: Int = 0
2313+
private var isBufferFull = false
2314+
private val buffer = new Array[Byte](sizeInBytes)
23132315

2314-
def write(i: Int): Unit = {
2315-
buffer(pos) = i
2316+
def write(input: Int): Unit = {
2317+
buffer(pos) = input.toByte
23162318
pos = (pos + 1) % buffer.length
2319+
isBufferFull = isBufferFull || (pos == 0)
23172320
}
23182321

23192322
override def toString: String = {
2320-
val (end, start) = buffer.splitAt(pos)
2321-
val input = new java.io.InputStream {
2322-
val iterator = (start ++ end).iterator
2323-
2324-
def read(): Int = if (iterator.hasNext) iterator.next() else -1
2325-
}
2326-
val reader = new BufferedReader(new InputStreamReader(input))
2327-
val stringBuilder = new StringBuilder
2328-
var line = reader.readLine()
2329-
while (line != null) {
2330-
stringBuilder.append(line)
2331-
stringBuilder.append("\n")
2332-
line = reader.readLine()
2323+
if (!isBufferFull) {
2324+
return new String(buffer, 0, pos, StandardCharsets.UTF_8)
23332325
}
2334-
stringBuilder.toString()
2326+
2327+
val nonCircularBuffer = new Array[Byte](sizeInBytes)
2328+
System.arraycopy(buffer, pos, nonCircularBuffer, 0, buffer.length - pos)
2329+
System.arraycopy(buffer, 0, nonCircularBuffer, buffer.length - pos, pos)
2330+
new String(nonCircularBuffer, StandardCharsets.UTF_8)
23352331
}
23362332
}

core/src/test/scala/org/apache/spark/util/UtilsSuite.scala

Lines changed: 30 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717

1818
package org.apache.spark.util
1919

20-
import java.io.{ByteArrayInputStream, ByteArrayOutputStream, File, FileOutputStream}
20+
import java.io.{ByteArrayInputStream, ByteArrayOutputStream, File, FileOutputStream, PrintStream}
2121
import java.lang.{Double => JDouble, Float => JFloat}
2222
import java.net.{BindException, ServerSocket, URI}
2323
import java.nio.{ByteBuffer, ByteOrder}
@@ -679,14 +679,37 @@ class UtilsSuite extends SparkFunSuite with ResetSystemProperties with Logging {
679679
assert(!Utils.isInDirectory(nullFile, childFile3))
680680
}
681681

682-
test("circular buffer") {
682+
test("circular buffer: if nothing was written to the buffer, display nothing") {
683+
val buffer = new CircularBuffer(4)
684+
assert(buffer.toString === "")
685+
}
686+
687+
test("circular buffer: if the buffer isn't full, print only the contents written") {
688+
val buffer = new CircularBuffer(10)
689+
val stream = new PrintStream(buffer, true, "UTF-8")
690+
stream.print("test")
691+
assert(buffer.toString === "test")
692+
}
693+
694+
test("circular buffer: data written == size of the buffer") {
695+
val buffer = new CircularBuffer(4)
696+
val stream = new PrintStream(buffer, true, "UTF-8")
697+
698+
// fill the buffer to its exact size so that it just hits overflow
699+
stream.print("test")
700+
assert(buffer.toString === "test")
701+
702+
// add more data to the buffer
703+
stream.print("12")
704+
assert(buffer.toString === "st12")
705+
}
706+
707+
test("circular buffer: multiple overflow") {
683708
val buffer = new CircularBuffer(25)
684-
val stream = new java.io.PrintStream(buffer, true, "UTF-8")
709+
val stream = new PrintStream(buffer, true, "UTF-8")
685710

686-
// scalastyle:off println
687-
stream.println("test circular test circular test circular test circular test circular")
688-
// scalastyle:on println
689-
assert(buffer.toString === "t circular test circular\n")
711+
stream.print("test circular test circular test circular test circular test circular")
712+
assert(buffer.toString === "st circular test circular")
690713
}
691714

692715
test("nanSafeCompareDoubles") {

0 commit comments

Comments
 (0)