Skip to content
This repository was archived by the owner on Mar 24, 2025. It is now read-only.
30 changes: 3 additions & 27 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,37 +5,13 @@ cache:
- $HOME/.ivy2
matrix:
include:
# Spark 1.3.0
- jdk: openjdk6
scala: 2.10.5
env: TEST_HADOOP_VERSION="1.2.1" TEST_SPARK_VERSION="1.3.0"
- jdk: openjdk6
scala: 2.11.7
env: TEST_HADOOP_VERSION="1.0.4" TEST_SPARK_VERSION="1.3.0"
# Spark 1.4.1
# We only test Spark 1.4.1 with Hadooop 2.2.0 because
# https://github.com/apache/spark/pull/6599 is not present in 1.4.1,
# so the published Spark Maven artifacts will not work with Hadoop 1.x.
- jdk: openjdk6
scala: 2.10.5
env: TEST_HADOOP_VERSION="2.2.0" TEST_SPARK_VERSION="1.4.1"
- jdk: openjdk7
scala: 2.11.7
env: TEST_HADOOP_VERSION="2.2.0" TEST_SPARK_VERSION="1.4.1"
# Spark 1.5.0
- jdk: openjdk7
scala: 2.10.5
env: TEST_HADOOP_VERSION="1.0.4" TEST_SPARK_VERSION="1.5.0"
- jdk: openjdk7
scala: 2.11.7
env: TEST_HADOOP_VERSION="2.2.0" TEST_SPARK_VERSION="1.5.0"
# Spark 1.6.0
# Spark 2.0.0
- jdk: openjdk7
scala: 2.10.5
env: TEST_HADOOP_VERSION="2.2.0" TEST_SPARK_VERSION="1.6.0"
env: TEST_HADOOP_VERSION="2.2.0" TEST_SPARK_VERSION="2.0.0"
- jdk: openjdk7
scala: 2.11.7
env: TEST_HADOOP_VERSION="1.2.1" TEST_SPARK_VERSION="1.6.0"
env: TEST_HADOOP_VERSION="2.7.0" TEST_SPARK_VERSION="2.0.0"
script:
- sbt -Dhadoop.testVersion=$TEST_HADOOP_VERSION -Dspark.testVersion=$TEST_SPARK_VERSION ++$TRAVIS_SCALA_VERSION coverage test
- sbt ++$TRAVIS_SCALA_VERSION assembly
Expand Down
134 changes: 5 additions & 129 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,23 +10,25 @@ The structure and test tools are mostly copied from [CSV Data Source for Spark](

## Requirements

This library requires Spark 1.3+
This library requires Spark 2.0+ for 0.4.x. For Spark 1.3.+, 0.3.x version works with it.
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will create a branch for 0.3.x soon and correct this documentation.



## Linking
You can link against this library in your program at the following coordinates:

### Scala 2.10

```
groupId: com.databricks
artifactId: spark-xml_2.10
version: 0.3.3
version: 0.4.0-SNAPSHOT
```
### Scala 2.11

```
groupId: com.databricks
artifactId: spark-xml_2.11
version: 0.3.3
version: 0.4.0-SNAPSHOT
```

## Using with Spark shell
Expand Down Expand Up @@ -169,7 +171,6 @@ OPTIONS (path "books.xml", rowTag "book")
```

### Scala API
__Spark 1.4+:__

```scala
import org.apache.spark.sql.SQLContext
Expand Down Expand Up @@ -218,50 +219,7 @@ selectedData.write
.save("newbooks.xml")
```

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In README.md, I just removed the old examples.

__Spark 1.3:__

```scala
import org.apache.spark.sql.SQLContext

val sqlContext = new SQLContext(sc)
val df = sqlContext.load(
"com.databricks.spark.xml",
Map("path" -> "books.xml", "rowTag" -> "book"))

val selectedData = df.select("author", "@id")
selectedData.save("com.databricks.spark.xml",
SaveMode.ErrorIfExists,
Map("path" -> "newbooks.xml", "rootTag" -> "books", "rowTag" -> "book"))
```

You can manually specify the schema when reading data:
```scala
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.types.{StructType, StructField, StringType, IntegerType};

val sqlContext = new SQLContext(sc)
val customSchema = StructType(Array(
StructField("@id", StringType, nullable = true),
StructField("author", StringType, nullable = true),
StructField("description", StringType, nullable = true),
StructField("genre", StringType ,nullable = true),
StructField("price", DoubleType, nullable = true),
StructField("publish_date", StringType, nullable = true),
StructField("title", StringType, nullable = true)))

val df = sqlContext.load(
"com.databricks.spark.xml",
schema = customSchema,
Map("path" -> "books.xml", "rowTag" -> "book"))

val selectedData = df.select("author", "@id")
selectedData.save("com.databricks.spark.xml",
SaveMode.ErrorIfExists,
Map("path" -> "newbooks.xml", "rootTag" -> "books", "rowTag" -> "book"))
```

### Java API
__Spark 1.4+:__

```java
import org.apache.spark.sql.SQLContext
Expand Down Expand Up @@ -309,58 +267,8 @@ df.select("author", "@id").write()
```



__Spark 1.3:__

```java
import org.apache.spark.sql.SQLContext

SQLContext sqlContext = new SQLContext(sc);

HashMap<String, String> options = new HashMap<String, String>();
options.put("rowTag", "book");
options.put("path", "books.xml");
DataFrame df = sqlContext.load("com.databricks.spark.xml", options);

HashMap<String, String> options = new HashMap<String, String>();
options.put("rowTag", "book");
options.put("rootTag", "books");
options.put("path", "newbooks.xml");
df.select("author", "@id").save("com.databricks.spark.xml", SaveMode.ErrorIfExists, options)
```

You can manually specify schema:
```java
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.types.*;

SQLContext sqlContext = new SQLContext(sc);
StructType customSchema = new StructType(new StructField[] {
new StructField("@id", DataTypes.StringType, true, Metadata.empty()),
new StructField("author", DataTypes.StringType, true, Metadata.empty()),
new StructField("description", DataTypes.StringType, true, Metadata.empty()),
new StructField("genre", DataTypes.StringType, true, Metadata.empty()),
new StructField("price", DataTypes.DoubleType, true, Metadata.empty()),
new StructField("publish_date", DataTypes.StringType, true, Metadata.empty()),
new StructField("title", DataTypes.StringType, true, Metadata.empty())
});

HashMap<String, String> options = new HashMap<String, String>();
options.put("rowTag", "book");
options.put("path", "books.xml");
DataFrame df = sqlContext.load("com.databricks.spark.xml", customSchema, options);

HashMap<String, String> options = new HashMap<String, String>();
options.put("rowTag", "book");
options.put("rootTag", "books");
options.put("path", "newbooks.xml");
df.select("author", "@id").save("com.databricks.spark.xml", SaveMode.ErrorIfExists, options)
```

### Python API

__Spark 1.4+:__

```python
from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)
Expand Down Expand Up @@ -399,38 +307,7 @@ df.select("author", "@id").write \
```


__Spark 1.3:__

```python
from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)

df = sqlContext.load(source="com.databricks.spark.xml", rowTag = 'book', path = 'books.xml')
df.select("author", "@id").save('newbooks.xml', rootTag = 'books', rowTag = 'book', path = 'newbooks.xml')
```

You can manually specify schema:
```python
from pyspark.sql import SQLContext
from pyspark.sql.types import *

sqlContext = SQLContext(sc)
customSchema = StructType([ \
StructField("@id", StringType(), True), \
StructField("author", StringType(), True), \
StructField("description", StringType(), True), \
StructField("genre", StringType(), True), \
StructField("price", DoubleType(), True), \
StructField("publish_date", StringType(), True), \
StructField("title", StringType(), True)])

df = sqlContext.load(source="com.databricks.spark.xml", rowTag = 'book', schema = customSchema, path = 'books.xml')
df.select("author", "@id").save('newbooks.xml', rootTag = 'books', rowTag = 'book', path = 'newbooks.xml')
```


### R API
__Spark 1.4+:__

Automatically infer schema (data types)
```R
Expand Down Expand Up @@ -492,4 +369,3 @@ This library is built with [SBT](http://www.scala-sbt.org/0.13/docs/Command-Line
## Acknowledgements

This project was initially created by [HyukjinKwon](https://github.com/HyukjinKwon) and donated to [Databricks](https://databricks.com).

4 changes: 2 additions & 2 deletions build.sbt
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
name := "spark-xml"

version := "0.3.3"
version := "0.4.0-SNAPSHOT"

organization := "com.databricks"

Expand All @@ -10,7 +10,7 @@ spName := "databricks/spark-xml"

crossScalaVersions := Seq("2.10.5", "2.11.7")

sparkVersion := "1.6.0"
sparkVersion := "2.0.0"

val testSparkVersion = settingKey[String]("The version of Spark to test against.")

Expand Down
4 changes: 1 addition & 3 deletions src/main/scala/com/databricks/spark/xml/DefaultSource.scala
Original file line number Diff line number Diff line change
Expand Up @@ -90,9 +90,7 @@ class DefaultSource
}
if (doSave) {
// Only save data when the save mode is not ignore.
val codecClass =
CompressionCodecs.getCodecClass(XmlOptions(parameters).codec)
data.saveAsXmlFile(filesystemPath.toString, parameters, codecClass)
XmlFile.saveAsXmlFile(data, filesystemPath.toString, parameters)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here, I moved the codec handling inside to saveAsXmlFile so that codec option in parameters can be also concerned.

}
createRelation(sqlContext, parameters, data.schema)
}
Expand Down
39 changes: 13 additions & 26 deletions src/main/scala/com/databricks/spark/xml/XmlInputFormat.scala
Original file line number Diff line number Diff line change
Expand Up @@ -70,12 +70,7 @@ private[xml] class XmlRecordReader extends RecordReader[LongWritable, Text] {

override def initialize(split: InputSplit, context: TaskAttemptContext): Unit = {
val fileSplit: FileSplit = split.asInstanceOf[FileSplit]
val conf: Configuration = {
// Use reflection to get the Configuration. This is necessary because TaskAttemptContext is
// a class in Hadoop 1.x and an interface in Hadoop 2.x.
val method = context.getClass.getMethod("getConfiguration")
method.invoke(context).asInstanceOf[Configuration]
}
val conf: Configuration = context.getConfiguration
val charset =
Charset.forName(conf.get(XmlInputFormat.ENCODING_KEY, XmlOptions.DEFAULT_CHARSET))
startTag = conf.get(XmlInputFormat.START_TAG_KEY).getBytes(charset)
Expand All @@ -97,25 +92,18 @@ private[xml] class XmlRecordReader extends RecordReader[LongWritable, Text] {
val codec = new CompressionCodecFactory(conf).getCodec(path)
if (null != codec) {
decompressor = CodecPool.getDecompressor(codec)
// Use reflection to get the splittable compression codec and stream. This is necessary
// because SplittableCompressionCodec does not exist in Hadoop 1.0.x.
def isSplitCompressionCodec(obj: Any) = {
val splittableClassName = "org.apache.hadoop.io.compress.SplittableCompressionCodec"
obj.getClass.getInterfaces.map(_.getName).contains(splittableClassName)
}
// Here I made separate variables to avoid to try to find SplitCompressionInputStream at
// runtime.
val (inputStream, seekable) = codec match {
case c: CompressionCodec if isSplitCompressionCodec(c) =>
// At Hadoop 1.0.x, this case would not be executed.
val cIn = {
val sc = c.asInstanceOf[SplittableCompressionCodec]
sc.createInputStream(fsin, decompressor, start,
end, SplittableCompressionCodec.READ_MODE.BYBLOCK)
}
codec match {
case sc: SplittableCompressionCodec =>
val cIn = sc.createInputStream(
fsin,
decompressor,
start,
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not a big change. I just removed the reflection part that was needed for Hadoop 1.x.

end,
SplittableCompressionCodec.READ_MODE.BYBLOCK)
start = cIn.getAdjustedStart
end = cIn.getAdjustedEnd
(cIn, cIn)
in = cIn
filePosition = cIn
case c: CompressionCodec =>
if (start != 0) {
// So we have a split that is only part of a file stored using
Expand All @@ -124,10 +112,9 @@ private[xml] class XmlRecordReader extends RecordReader[LongWritable, Text] {
codec.getClass.getSimpleName + " compressed stream")
}
val cIn = c.createInputStream(fsin, decompressor)
(cIn, fsin)
in = cIn
filePosition = fsin
}
in = inputStream
filePosition = seekable
} else {
in = fsin
filePosition = fsin
Expand Down
5 changes: 2 additions & 3 deletions src/main/scala/com/databricks/spark/xml/XmlRelation.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import org.apache.spark.rdd.RDD
import org.apache.spark.sql._
import org.apache.spark.sql.sources.{PrunedScan, InsertableRelation, BaseRelation, TableScan}
import org.apache.spark.sql.types._
import com.databricks.spark.xml.util.{CompressionCodecs, InferSchema}
import com.databricks.spark.xml.util.{InferSchema, XmlFile}
import com.databricks.spark.xml.parsers.StaxXmlParser

case class XmlRelation protected[spark] (
Expand Down Expand Up @@ -106,8 +106,7 @@ case class XmlRelation protected[spark] (
+ s" to INSERT OVERWRITE a XML table:\n${e.toString}")
}
// Write the data. We assume that schema isn't changed, and we won't update it.
val codecClass = CompressionCodecs.getCodecClass(options.codec)
data.saveAsXmlFile(filesystemPath.toString, parameters, codecClass)
XmlFile.saveAsXmlFile(data, filesystemPath.toString, parameters)
} else {
sys.error("XML tables only support INSERT OVERWRITE for now.")
}
Expand Down
Loading