Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 11 additions & 6 deletions core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import java.io.IOException
import java.security.PrivilegedExceptionAction
import java.text.DateFormat
import java.util.{Arrays, Comparator, Date, Locale}
import java.util.concurrent.ConcurrentHashMap
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: unneeded import.


import scala.collection.JavaConverters._
import scala.util.control.NonFatal
Expand Down Expand Up @@ -143,14 +144,18 @@ class SparkHadoopUtil extends Logging {
* Returns a function that can be called to find Hadoop FileSystem bytes read. If
* getFSBytesReadOnThreadCallback is called from thread r at time t, the returned callback will
* return the bytes read on r since t.
*
* @return None if the required method can't be found.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why removing this line instead of the doc?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this doesn't return a None, but the doc is still corrected about the behavior.

*/
private[spark] def getFSBytesReadOnThreadCallback(): () => Long = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's update the document to say that, the returned function may be called in multiple threads.

val threadStats = FileSystem.getAllStatistics.asScala.map(_.getThreadStatistics)
val f = () => threadStats.map(_.getBytesRead).sum
val baselineBytesRead = f()
() => f() - baselineBytesRead
val f = () => FileSystem.getAllStatistics.asScala.map(_.getThreadStatistics.getBytesRead).sum
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are you changing this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For the previous code, threadStats and f function can be executed in two threads, so the metrics we got can be wrong.

val baseline = (Thread.currentThread().getId, f())
val bytesReadMap = new ConcurrentHashMap[Long, Long]()

() => {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's better to create an anonymous Function0 instance and treat bytesReadMap as a member variable and document the multi-thread semantic for the apply method.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a good idea, let me change the code.

bytesReadMap.put(Thread.currentThread().getId, f())
bytesReadMap.asScala.map { case (k, v) =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is not atomic, shall we synchronize on bytesReadMap when calculating the sum?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. Let me fix it.

v - (if (k == baseline._1) baseline._2 else 0)
}.sum
}
}

/**
Expand Down
8 changes: 7 additions & 1 deletion core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,13 @@ class HadoopRDD[K, V](
null
}
// Register an on-task-completion callback to close the input stream.
context.addTaskCompletionListener{ context => closeIfNeeded() }
context.addTaskCompletionListener { context =>
// Update the bytes read before closing is to make sure lingering bytesRead statistics in
// this thread get correctly added.
updateBytesRead()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will this duplicate with what we do in close()?

Copy link
Contributor Author

@jerryshao jerryshao May 27, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

close can be called in another thread as I remembered, so I added here to avoid lingering bytesRead in task running thread (Some bytes can be read when creating InputFormat), also it is no harm to call this updateBytesRead again.

closeIfNeeded()
}

private val key: K = if (reader == null) null.asInstanceOf[K] else reader.createKey()
private val value: V = if (reader == null) null.asInstanceOf[V] else reader.createValue()

Expand Down
8 changes: 7 additions & 1 deletion core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,13 @@ class NewHadoopRDD[K, V](
}

// Register an on-task-completion callback to close the input stream.
context.addTaskCompletionListener(context => close())
context.addTaskCompletionListener { context =>
// Update the bytesRead before closing is to make sure lingering bytesRead statistics in
// this thread get correctly added.
updateBytesRead()
close()
}

private var havePair = false
private var recordsSinceMetricsUpdate = 0

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -319,6 +319,45 @@ class InputOutputMetricsSuite extends SparkFunSuite with SharedSparkContext
}
assert(bytesRead >= tmpFile.length())
}

test("input metrics with old Hadoop API in different thread") {
val bytesRead = runAndReturnBytesRead {
sc.textFile(tmpFilePath, 4).mapPartitions { iter =>
val buf = new ArrayBuffer[String]()
val thread = new Thread() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: We could use ThreadUtils.runInNewThread() to make this shorter, like:

        ThreadUtils.runInNewThread("TestThread") {
          iter.flatMap(_.split(" ")).foreach(buf.append(_))
        }

override def run(): Unit = {
iter.flatMap(_.split(" ")).foreach(buf.append(_))
}
}
thread.start()
thread.join()

buf.iterator
}.count()
}
assert(bytesRead != 0)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this assert is unnecessary.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

assert(bytesRead >= tmpFile.length())
}

test("input metrics with new Hadoop API in different thread") {
val bytesRead = runAndReturnBytesRead {
sc.newAPIHadoopFile(tmpFilePath, classOf[NewTextInputFormat], classOf[LongWritable],
classOf[Text]).mapPartitions { iter =>
val buf = new ArrayBuffer[String]()
val thread = new Thread() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Same as above, we could rewrite to:

        ThreadUtils.runInNewThread("TestThread") {
          iter.map(_._2.toString).flatMap(_.split(" ")).foreach(buf.append(_))
        }

override def run(): Unit = {
iter.map(_._2.toString).flatMap(_.split(" ")).foreach(buf.append(_))
}
}
thread.start()
thread.join()

buf.iterator
}.count()
}
assert(bytesRead != 0)
assert(bytesRead >= tmpFile.length())
}
}

/**
Expand Down