Skip to content

Commit f1db7f1

Browse files
committed
Merge pull request #15 from harishreedharan/hdfsrdd
Initial implementation of HDFSBackedBlockRDD.
2 parents 6fe8af0 + 1fe3567 commit f1db7f1

4 files changed

Lines changed: 263 additions & 0 deletions

File tree

core/src/main/scala/org/apache/spark/rdd/BlockRDD.scala

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -84,5 +84,9 @@ class BlockRDD[T: ClassTag](@transient sc: SparkContext, @transient val blockIds
8484
"Attempted to use %s after its blocks have been removed!".format(toString))
8585
}
8686
}
87+
88+
protected def getBlockIdLocations(): Map[BlockId, Seq[String]] = {
89+
locations_
90+
}
8791
}
8892

streaming/src/main/scala/org/apache/spark/streaming/storage/HdfsUtils.scala

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,4 +59,14 @@ private[streaming] object HdfsUtils {
5959
}
6060
}
6161

62+
def getBlockLocations(path: String, conf: Configuration): Option[Array[String]] = {
63+
val dfsPath = new Path(path)
64+
val dfs =
65+
this.synchronized {
66+
dfsPath.getFileSystem(conf)
67+
}
68+
val fileStatus = dfs.getFileStatus(dfsPath)
69+
val blockLocs = Option(dfs.getFileBlockLocations(fileStatus, 0, fileStatus.getLen))
70+
blockLocs.map(_.flatMap(_.getHosts))
71+
}
6272
}
Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,91 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.spark.streaming.storage.rdd
18+
19+
import scala.reflect.ClassTag
20+
21+
import org.apache.hadoop.conf.Configuration
22+
23+
import org.apache.spark.broadcast.Broadcast
24+
import org.apache.spark.rdd.BlockRDD
25+
import org.apache.spark.storage.{BlockId, StorageLevel}
26+
import org.apache.spark.streaming.storage.{FileSegment, HdfsUtils, WriteAheadLogRandomReader}
27+
import org.apache.spark._
28+
29+
private[streaming]
30+
class HDFSBackedBlockRDDPartition(val blockId: BlockId, idx: Int, val segment: FileSegment)
31+
extends Partition {
32+
val index = idx
33+
}
34+
35+
private[streaming]
36+
class HDFSBackedBlockRDD[T: ClassTag](
37+
@transient sc: SparkContext,
38+
@transient hadoopConfiguration: Configuration,
39+
@transient override val blockIds: Array[BlockId],
40+
@transient val segments: Array[FileSegment],
41+
val storeInBlockManager: Boolean,
42+
val storageLevel: StorageLevel
43+
) extends BlockRDD[T](sc, blockIds) {
44+
45+
if (blockIds.length != segments.length) {
46+
throw new IllegalStateException("Number of block ids must be the same as number of segments!")
47+
}
48+
49+
// Hadoop Configuration is not serializable, so broadcast it as a serializable.
50+
val broadcastedHadoopConf = sc.broadcast(new SerializableWritable(hadoopConfiguration))
51+
.asInstanceOf[Broadcast[SerializableWritable[Configuration]]]
52+
override def getPartitions: Array[Partition] = {
53+
assertValid()
54+
(0 until blockIds.size).map { i =>
55+
new HDFSBackedBlockRDDPartition(blockIds(i), i, segments(i))
56+
}.toArray
57+
}
58+
59+
override def compute(split: Partition, context: TaskContext): Iterator[T] = {
60+
assertValid()
61+
val hadoopConf = broadcastedHadoopConf.value.value
62+
val blockManager = SparkEnv.get.blockManager
63+
val partition = split.asInstanceOf[HDFSBackedBlockRDDPartition]
64+
val blockId = partition.blockId
65+
blockManager.get(blockId) match {
66+
// Data is in Block Manager, grab it from there.
67+
case Some(block) =>
68+
block.data.asInstanceOf[Iterator[T]]
69+
// Data not found in Block Manager, grab it from HDFS
70+
case None =>
71+
val reader = new WriteAheadLogRandomReader(partition.segment.path, hadoopConf)
72+
val dataRead = reader.read(partition.segment)
73+
reader.close()
74+
// Currently, we support storing the data to BM only in serialized form and not in
75+
// deserialized form
76+
if (storeInBlockManager) {
77+
blockManager.putBytes(blockId, dataRead, storageLevel)
78+
}
79+
dataRead.rewind()
80+
blockManager.dataDeserialize(blockId, dataRead).asInstanceOf[Iterator[T]]
81+
}
82+
}
83+
84+
override def getPreferredLocations(split: Partition): Seq[String] = {
85+
val partition = split.asInstanceOf[HDFSBackedBlockRDDPartition]
86+
val locations = getBlockIdLocations()
87+
locations.getOrElse(partition.blockId,
88+
HdfsUtils.getBlockLocations(partition.segment.path, hadoopConfiguration)
89+
.getOrElse(new Array[String](0)).toSeq)
90+
}
91+
}
Lines changed: 158 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,158 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.spark.streaming.storage.rdd
18+
19+
import java.io.File
20+
import java.util.concurrent.atomic.AtomicInteger
21+
22+
import scala.collection.mutable.ArrayBuffer
23+
24+
import com.google.common.io.Files
25+
import org.apache.hadoop.conf.Configuration
26+
import org.scalatest.{BeforeAndAfter, FunSuite}
27+
28+
import org.apache.spark.storage.{BlockId, StorageLevel, StreamBlockId}
29+
import org.apache.spark.streaming.storage.{FileSegment, WriteAheadLogWriter}
30+
import org.apache.spark.{SparkConf, SparkContext}
31+
32+
class HDFSBackedBlockRDDSuite extends FunSuite with BeforeAndAfter {
33+
// Name of the framework for Spark context
34+
def framework = this.getClass.getSimpleName
35+
36+
// Master for Spark context
37+
def master = "local[2]"
38+
39+
val conf = new SparkConf()
40+
.setMaster(master)
41+
.setAppName(framework)
42+
val sparkContext = new SparkContext(conf)
43+
val hadoopConf = new Configuration()
44+
val blockManager = sparkContext.env.blockManager
45+
// Since the same BM is reused in all tests, use an atomic int to generate ids
46+
val idGenerator = new AtomicInteger(0)
47+
var file: File = null
48+
var dir: File = null
49+
50+
before {
51+
dir = Files.createTempDir()
52+
file = new File(dir, "BlockManagerWrite")
53+
}
54+
55+
after {
56+
file.delete()
57+
dir.delete()
58+
}
59+
60+
test("Verify all data is available when all data is in BM and HDFS") {
61+
doTestHDFSBackedRDD(5, 5, 20, 5)
62+
}
63+
64+
test("Verify all data is available when all data is in BM but not in HDFS") {
65+
doTestHDFSBackedRDD(5, 0, 20, 5)
66+
}
67+
68+
test("Verify all data is available when all data is in HDFS and no data is in BM") {
69+
doTestHDFSBackedRDD(0, 5, 20, 5)
70+
}
71+
72+
test("Verify part of the data is in BM, and the remaining in HDFS") {
73+
doTestHDFSBackedRDD(3, 2, 20, 5)
74+
}
75+
76+
/**
77+
* Write a bunch of events into the HDFS Block RDD. Put a part of all of them to the
78+
* BlockManager, so all reads need not happen from HDFS.
79+
* @param total - Total number of Strings to write
80+
* @param blockCount - Number of blocks to write (therefore, total # of events per block =
81+
* total/blockCount
82+
*/
83+
private def doTestHDFSBackedRDD(
84+
writeToBMCount: Int,
85+
writeToHDFSCount: Int,
86+
total: Int,
87+
blockCount: Int
88+
) {
89+
val countPerBlock = total / blockCount
90+
val blockIds = (0 until blockCount).map {
91+
i =>
92+
StreamBlockId(idGenerator.incrementAndGet(), idGenerator.incrementAndGet())
93+
}
94+
95+
val writtenStrings = generateData(total, countPerBlock)
96+
97+
if (writeToBMCount != 0) {
98+
(0 until writeToBMCount).foreach { i =>
99+
blockManager
100+
.putIterator(blockIds(i), writtenStrings(i).iterator, StorageLevel.MEMORY_ONLY_SER)
101+
}
102+
}
103+
104+
val segments = new ArrayBuffer[FileSegment]
105+
if (writeToHDFSCount != 0) {
106+
// Generate some fake segments for the blocks in BM so the RDD does not complain
107+
segments ++= generateFakeSegments(writeToBMCount)
108+
segments ++= writeDataToHDFS(writtenStrings.slice(writeToBMCount, blockCount),
109+
blockIds.slice(writeToBMCount, blockCount))
110+
111+
} else {
112+
segments ++= generateFakeSegments(blockCount)
113+
}
114+
val rdd = new HDFSBackedBlockRDD[String](sparkContext, hadoopConf, blockIds.toArray,
115+
segments.toArray, false, StorageLevel.MEMORY_ONLY)
116+
117+
val dataFromRDD = rdd.collect()
118+
// verify each partition is equal to the data pulled out
119+
assert(writtenStrings.flatten === dataFromRDD)
120+
}
121+
122+
/**
123+
* Write data to HDFS and get a list of Seq of Seqs in which each Seq represents the data that
124+
* went into one block.
125+
* @param count - Number of Strings to write
126+
* @param countPerBlock - Number of Strings per block
127+
* @return - Tuple of (Seq of Seqs, each of these Seqs is one block, Seq of FileSegments,
128+
* each representing the block being written to HDFS.
129+
*/
130+
private def generateData(
131+
count: Int,
132+
countPerBlock: Int
133+
): Seq[Seq[String]] = {
134+
val strings = (0 until count).map { _ => scala.util.Random.nextString(50)}
135+
strings.grouped(countPerBlock).toSeq
136+
}
137+
138+
private def writeDataToHDFS(
139+
blockData: Seq[Seq[String]],
140+
blockIds: Seq[BlockId]
141+
): Seq[FileSegment] = {
142+
assert(blockData.size === blockIds.size)
143+
val segments = new ArrayBuffer[FileSegment]()
144+
val writer = new WriteAheadLogWriter(file.toString, hadoopConf)
145+
blockData.zip(blockIds).foreach {
146+
case (data, id) =>
147+
segments += writer.write(blockManager.dataSerialize(id, data.iterator))
148+
}
149+
writer.close()
150+
segments
151+
}
152+
153+
private def generateFakeSegments(count: Int): Seq[FileSegment] = {
154+
(0 until count).map {
155+
_ => new FileSegment("random", 0l, 0)
156+
}
157+
}
158+
}

0 commit comments

Comments
 (0)