Skip to content
Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
Show all changes
61 commits
Select commit Hold shift + click to select a range
fd93e59
add small text files input API
yinxusen Mar 18, 2014
9bf87d4
Merge branch 'master' into small-files-input
yinxusen Mar 18, 2014
e3681f2
Spark 1246 add min max to stat counter
dwmclary Mar 18, 2014
e7423d4
Revert "SPARK-1236 - Upgrade Jetty to 9.1.3.v20140225."
pwendell Mar 18, 2014
c27a7ab
fix errors and refine code
yinxusen Mar 18, 2014
2fa26ec
SPARK-1102: Create a saveAsNewAPIHadoopDataset method
CodingCat Mar 18, 2014
79e547f
Update copyright year in NOTICE to 2014
mateiz Mar 18, 2014
e108b9a
[SPARK-1260]: faster construction of features with intercept
mengxr Mar 18, 2014
f9d8a83
[SPARK-1266] persist factors in implicit ALS
mengxr Mar 19, 2014
cc2655a
Fix SPARK-1256: Master web UI and Worker web UI returns a 404 error
witgo Mar 19, 2014
a18ea00
Bundle tachyon: SPARK-1269
nicklan Mar 19, 2014
d55ec86
bugfix: Wrong "Duration" in "Active Stages" in stages page
BlackNiuza Mar 19, 2014
6112270
SPARK-1203 fix saving to hdfs from yarn
tgravescs Mar 19, 2014
ab747d3
Bugfixes/improvements to scheduler
mridulm Mar 19, 2014
79d07d6
[SPARK-1132] Persisting Web UI through refactoring the SparkListener …
andrewor14 Mar 19, 2014
67fa71c
Added doctest for map function in rdd.py
jyotiska Mar 19, 2014
1678931
SPARK-1099:Spark's local mode should probably respect spark.cores.max…
Mar 19, 2014
ffe272d
Revert "SPARK-1099:Spark's local mode should probably respect spark.c…
aarondav Mar 20, 2014
7d22941
remove merge process from smallTextFiles interface
yinxusen Mar 20, 2014
66a03e5
Principal Component Analysis
rezazadeh Mar 20, 2014
ca76423
[Hot Fix #42] Do not stop SparkUI if bind() is not called
andrewor14 Mar 20, 2014
9aadcff
SPARK-1251 Support for optimizing and executing structured queries
marmbrus Mar 21, 2014
e09139d
Fix maven jenkins: Add explicit init for required tables in SQLQueryS…
marmbrus Mar 21, 2014
78c0f25
remove useless code and consideration, neaten the code style
yinxusen Mar 21, 2014
7e17fe6
Add hive test files to repository. Remove download script.
marmbrus Mar 21, 2014
2c0aa22
SPARK-1279: Fix improper use of SimpleDateFormat
zsxwing Mar 21, 2014
dab5439
Make SQL keywords case-insensitive
mateiz Mar 21, 2014
d780983
Add asCode function for dumping raw tree representations.
marmbrus Mar 21, 2014
646e554
Fix to Stage UI to display numbers on progress bar
emtiazahmed Mar 22, 2014
d348362
fix code style problem and rewrite the testsuite for simplicity.
yinxusen Mar 22, 2014
abf6714
SPARK-1254. Supplemental fix for HTTPS on Maven Central
srowen Mar 23, 2014
57a4379
[SPARK-1292] In-memory columnar representation for Spark SQL
liancheng Mar 23, 2014
8265dc7
Fixed coding style issues in Spark SQL
liancheng Mar 23, 2014
80c2968
[SPARK-1212] Adding sparse data support and update KMeans
mengxr Mar 24, 2014
eae90e4
refine code documents
yinxusen Mar 24, 2014
839bd3f
remove the use of commons-io
yinxusen Mar 24, 2014
21109fb
SPARK-1144 Added license and RAT to check licenses.
ScrapCodes Mar 24, 2014
56db8a2
HOT FIX: Exclude test files from RAT
pwendell Mar 24, 2014
8043b7b
SPARK-1294 Fix resolution of uppercase field names using a HiveContext.
marmbrus Mar 25, 2014
dc126f2
SPARK-1094 Support MiMa for reporting binary compatibility accross ve…
pwendell Mar 25, 2014
5140598
SPARK-1128: set hadoop task properties when constructing HadoopRDD
CodingCat Mar 25, 2014
b637f2d
Unify the logic for column pruning, projection, and filtering of tabl…
marmbrus Mar 25, 2014
007a733
SPARK-1286: Make usage of spark-env.sh idempotent
aarondav Mar 25, 2014
05ed628
move wholefile interface from MLUtils to MLContext
yinxusen Mar 25, 2014
134ace7
Add more hive compatability tests to whitelist
marmbrus Mar 25, 2014
71d4ed2
SPARK-1316. Remove use of Commons IO
srowen Mar 25, 2014
f8111ea
SPARK-1319: Fix scheduler to account for tasks using > 1 CPUs.
shivaram Mar 25, 2014
8237df8
Avoid Option while generating call site
witgo Mar 25, 2014
f87dab8
fix logic error
yinxusen Mar 26, 2014
4f7d547
Initial experimentation with Travis CI configuration
marmbrus Mar 26, 2014
b859853
SPARK-1321 Use Guava's top k implementation rather than our BoundedPr…
rxin Mar 26, 2014
3b69987
change from Java code to Scala
yinxusen Mar 26, 2014
a0853a3
SPARK-1322, top in pyspark should sort result in descending order.
ScrapCodes Mar 26, 2014
345825d
Unified package definition format in Spark SQL
liancheng Mar 26, 2014
b0ea02a
modify scala doc, and add space after 'if'
yinxusen Mar 26, 2014
32cbdfd
[SQL] Un-ignore a test that is now passing.
marmbrus Mar 27, 2014
e15e574
[SQL] Add a custom serializer for maps since they do not have a no-ar…
marmbrus Mar 27, 2014
be6d96c
SPARK-1324: SparkUI Should Not Bind to SPARK_PUBLIC_DNS
pwendell Mar 27, 2014
3e63d98
Spark 1095 : Adding explicit return types to all public methods
NirmalReddy Mar 27, 2014
1fa48d9
SPARK-1325. The maven build error for Spark Tools
srowen Mar 27, 2014
4ed60d1
rebase to the latest trunk to merge
yinxusen Mar 27, 2014
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -45,3 +45,4 @@ dist/
spark-*-bin.tar.gz
unit-tests.log
/lib/
mllib/build/
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.mllib.util;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of util, how about io or input?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think input is better, because input of machine learning algorithm is usually datasets, batch/streaming files, in different format. Meantime, the output is usually models or prediction results, which has a different semantic meaning.


import java.io.IOException;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.CombineFileRecordReader;
import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;

/**
* The specific InputFormat reads files in HDFS or local disk. It will be called by
* HadoopRDD to generate new BatchFileRecordReader.
*/
public class BatchFileInputFormat
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is basically the WholeFileInputFormat for Text:

https://github.com/tomwhite/hadoop-book/blob/master/ch07/src/main/java/WholeFileInputFormat.java

Shall we call it WholeTextFileInputFormat?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think not. Here is a different meaning. WholeTextFileInputFormat, which extends FileInputFormat, is used to read an entire huge file from HDFS or disks, but here I called it BatchFileInputFormat because it will read a bunch of files, not just a single file, and it extends CombineFileInputFormat. I think I should rename it BatchFilesInputFormat.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the value here? Is it a line from a text file or the whole file? If the value is an arbitrary Text converted from bytes, how do you split a file?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It depends. If the file length little than the MAX_BYTES_ALLOCATION, it will read the whole file out one-time. Otherwise, it will read several splits of a file out. Fragments of a file will be merged together in smallTextFiles() interface.

extends CombineFileInputFormat<String, Text> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The indentation you used is not consistent with Spark code style. This line may fit the line above.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry for my carelessness. I'll fix it.


@Override
protected boolean isSplitable(JobContext context, Path file) {
return false;
}
@Override
public RecordReader<String, Text> createRecordReader(
InputSplit split,
TaskAttemptContext context) throws IOException {
return new CombineFileRecordReader<String, Text>(
(CombineFileSplit)split,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

put a space after )

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, I will add it.

context,
(Class)BatchFileRecordReader.class);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.mllib.util;

import java.io.IOException;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;

/**
* Reads an entire file out in bytes format in <filename, content> format.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are you reading the entire file in bytes?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the life cycle of an instance of this class, the answer is yes. You know, the split(index) here indicates a single file. Because I set isSplitable() in BatchFileInputFormat to false.

The nextKeyValue() could be called several times, until not more bytes left in the split(index), by nextKeyValue() function in CombineFileInputFormat, and eventually the compute() in HadoopRDD. It will read an entire file in bytes, and I encapsulate these bytes into Text.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

CombineFileInputFormat is to combine splits but not records. If the file is long and you cut it at MAX_BYTES_ALLOCATION, how do you guarantee this would be a valid cut for UTF-8 text?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here is really a mistake that I ignore.

I use MAX_BYTES_ALLOCATION here to avoid the scenario that a file length is bigger than what an Int.maxValue can represent. Mahout implementation just ignore the case, they convert long directly into int. Though the semantic of the interface is "small file", but we should not limit the input file length for end-user.

I just treat it as bytes array which encapsulated in a Text, I will join the slices of a single file together in smallTextFiles() interface. Due to the locality of split, I can merge them together without shuffle. But I forget it in this version, I'll fix it now.

*/

public class BatchFileRecordReader extends RecordReader<String, Text> {
private long startOffset;
private long end;
private long pos;
private Path path;

private static final int MAX_BYTES_ALLOCATION = 64 * 1024 * 1024;

private String key = null;
private Text value = null;

private FSDataInputStream fileIn;

public BatchFileRecordReader(
CombineFileSplit split,
TaskAttemptContext context,
Integer index)
throws IOException {
path = split.getPath(index);
startOffset = split.getOffset(index);
pos = startOffset;
end = startOffset + split.getLength(index);

FileSystem fs = path.getFileSystem(context.getConfiguration());
fileIn = fs.open(path);
fileIn.seek(startOffset);
}

@Override
public void initialize(InputSplit arg0, TaskAttemptContext arg1)
throws IOException, InterruptedException {}

@Override
public void close() throws IOException {
if (fileIn != null) {
fileIn.close();
}
}

@Override
public float getProgress() throws IOException {
if (startOffset == end) return 0;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should return 1.0f?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, but not startOffset == end, should be pos == end. I make a mistake here.

if (pos == end) return 1.0f; is OK.

return Math.min(1.0f, (pos - startOffset) / (float) (end - startOffset));
}

@Override
public String getCurrentKey() throws IOException, InterruptedException {
return key;
}

@Override
public Text getCurrentValue() throws IOException, InterruptedException{
return value;
}

@Override
public boolean nextKeyValue() throws IOException {
if (key == null) {
key = path.getName();
}
if (value == null) {
value = new Text();
}

if (pos >= end) {
return false;
}

int maxBufferLength = end - pos < Integer.MAX_VALUE ? (int) (end - pos) : Integer.MAX_VALUE;
if (maxBufferLength > MAX_BYTES_ALLOCATION) {
maxBufferLength = MAX_BYTES_ALLOCATION;
}

byte[] innerBuffer = new byte[maxBufferLength];

int len = fileIn.read(pos, innerBuffer, 0, maxBufferLength);
pos += len;

value.set(innerBuffer, 0, len);

return true;
}
}
23 changes: 21 additions & 2 deletions mllib/src/main/scala/org/apache/spark/mllib/util/MLUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,12 @@

package org.apache.spark.mllib.util

import org.apache.hadoop.io.Text
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

organize imports

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think here org.apache.hadoop.io.Text should be the first one, because it belongs to the third-party library. The following one org.jblas.DoubleMatrix is also the third-party class, then I think it is better in a lexicographical order.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

okay

import org.jblas.DoubleMatrix

import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.SparkContext._

import org.jblas.DoubleMatrix
import org.apache.spark.mllib.regression.LabeledPoint

/**
Expand Down Expand Up @@ -120,4 +121,22 @@ object MLUtils {
}
sum
}

/**
* Reads a bunch of small files from HDFS, or a local file system (available on all nodes), or any
* Hadoop-supported file system URI, and return an RDD[(String, String)].
*
* @param path The directory you should specified, such as
* hdfs://[address]:[port]/[dir]
*
* @return RDD[(fileName: String, content: String)]
* i.e. the first is the file name of a file, the second one is its content.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do not need to repeat RDD[(fileName: String, content: String)]

*/
def smallTextFiles(sc: SparkContext, path: String): RDD[(String, String)] = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The current API uses textFile to load text files. I suggest using wholeTextFile to load the entire content of each file. small is not a good name here because it is not defined.

sc.newAPIHadoopFile(
path,
classOf[BatchFileInputFormat],
classOf[String],
classOf[Text]).mapValues(_.toString)
}
}
Loading