Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
import java.util.Arrays;
import java.util.List;

import scala.collection.mutable.WrappedArray;
import scala.collection.mutable.Seq;
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

WrappedArray is gone in 2.13; this should be an equivalent superclass


import org.apache.spark.ml.feature.RegexTokenizer;
import org.apache.spark.ml.feature.Tokenizer;
Expand Down Expand Up @@ -69,7 +69,7 @@ public static void main(String[] args) {
.setPattern("\\W"); // alternatively .setPattern("\\w+").setGaps(false);

spark.udf().register(
"countTokens", (WrappedArray<?> words) -> words.size(), DataTypes.IntegerType);
"countTokens", (Seq<?> words) -> words.size(), DataTypes.IntegerType);

Dataset<Row> tokenized = tokenizer.transform(sentenceDataFrame);
tokenized.select("sentence", "words")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ object SparkKMeans {
while(tempDist > convergeDist) {
val closest = data.map (p => (closestPoint(p, kPoints), (p, 1)))

val pointStats = closest.reduceByKey{case ((p1, c1), (p2, c2)) => (p1 + p2, c1 + c2)}
val pointStats = closest.reduceByKey(mergeResults)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not quite sure why, but a few calls to reduceByKey didn't like the existing syntax in 2.13. I had to break out a typed method. missing parameter type for expanded function


val newPoints = pointStats.map {pair =>
(pair._1, pair._2._1 * (1.0 / pair._2._2))}.collectAsMap()
Expand All @@ -102,5 +102,10 @@ object SparkKMeans {
kPoints.foreach(println)
spark.stop()
}

private def mergeResults(a: (Vector[Double], Int),
b: (Vector[Double], Int)): (Vector[Double], Int) = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit. Indentation?

(a._1 + b._1, a._2 + b._2)
}
}
// scalastyle:on println
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ object SchemaConverters {
StructField(f.name, schemaType.dataType, schemaType.nullable)
}

SchemaType(StructType(fields), nullable = false)
SchemaType(StructType(fields.toSeq), nullable = false)

case ARRAY =>
val schemaType = toSqlTypeHelper(avroSchema.getElementType, existingRecordNames)
Expand Down Expand Up @@ -126,7 +126,7 @@ object SchemaConverters {
StructField(s"member$i", schemaType.dataType, nullable = true)
}

SchemaType(StructType(fields), nullable = false)
SchemaType(StructType(fields.toSeq), nullable = false)
}

case other => throw new IncompatibleSchemaException(s"Unsupported type $other")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -336,7 +336,7 @@ private[kafka010] class KafkaOffsetReader(
}
})
}
incorrectOffsets
incorrectOffsets.toSeq
}

// Retry to fetch latest offsets when detecting incorrect offsets. We don't use
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1540,8 +1540,8 @@ abstract class KafkaSourceSuiteBase extends KafkaSourceTest {
makeSureGetOffsetCalled,
Execute { q =>
// wait to reach the last offset in every partition
q.awaitOffset(
0, KafkaSourceOffset(partitionOffsets.mapValues(_ => 3L)), streamingTimeout.toMillis)
q.awaitOffset(0,
KafkaSourceOffset(partitionOffsets.mapValues(_ => 3L).toMap), streamingTimeout.toMillis)
},
CheckAnswer(-20, -21, -22, 0, 1, 2, 11, 12, 22),
StopStream,
Expand Down
3 changes: 2 additions & 1 deletion mllib/src/main/scala/org/apache/spark/ml/Estimator.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.spark.ml

import scala.annotation.varargs
import scala.reflect.ClassTag

import org.apache.spark.annotation.Since
import org.apache.spark.ml.param.{ParamMap, ParamPair}
Expand All @@ -26,7 +27,7 @@ import org.apache.spark.sql.Dataset
/**
* Abstract class for estimators that fit models to data.
*/
abstract class Estimator[M <: Model[M]] extends PipelineStage {
abstract class Estimator[M <: Model[M] : ClassTag] extends PipelineStage {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't quite get why 2.13 thinks this needs a ClassTag (and thus some subclasses), but I'm just going with it. Will see if MiMa is OK with it

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unfortunately, MiMi seems to complain on this and a few others like this.

[error]  * method this()Unit in class org.apache.spark.ml.Estimator does not have a correspondent in current version
4232
[error]    filter with: ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.ml.Estimator.this")
4233
[error]  * method this()Unit in class org.apache.spark.ml.Predictor does not have a correspondent in current version
4234
[error]    filter with: ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.ml.Predictor.this")
4235
[error]  * method this()Unit in class org.apache.spark.ml.classification.ProbabilisticClassifier does not have a correspondent in current version
4236
[error]    filter with: ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.ml.classification.ProbabilisticClassifier.this")
4237
[error]  * method this()Unit in class org.apache.spark.ml.classification.Classifier does not have a correspondent in current version
4238
[error]    filter with: ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.ml.classification.Classifier.this")


/**
* Fits a single model to the input data with optional parameters.
Expand Down
4 changes: 3 additions & 1 deletion mllib/src/main/scala/org/apache/spark/ml/Predictor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.spark.ml

import scala.reflect.ClassTag

import org.apache.spark.annotation.Since
import org.apache.spark.ml.feature.{Instance, LabeledPoint}
import org.apache.spark.ml.functions.checkNonNegativeWeight
Expand Down Expand Up @@ -115,7 +117,7 @@ private[ml] trait PredictorParams extends Params
abstract class Predictor[
FeaturesType,
Learner <: Predictor[FeaturesType, Learner, M],
M <: PredictionModel[FeaturesType, M]]
M <: PredictionModel[FeaturesType, M] : ClassTag]
extends Estimator[M] with PredictorParams {

/** @group setParam */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.spark.ml.classification

import scala.reflect.ClassTag

import org.apache.spark.SparkException
import org.apache.spark.annotation.Since
import org.apache.spark.ml.{PredictionModel, Predictor, PredictorParams}
Expand Down Expand Up @@ -73,7 +75,7 @@ private[spark] trait ClassifierParams
abstract class Classifier[
FeaturesType,
E <: Classifier[FeaturesType, E, M],
M <: ClassificationModel[FeaturesType, M]]
M <: ClassificationModel[FeaturesType, M] : ClassTag]
extends Predictor[FeaturesType, E, M] with ClassifierParams {

/** @group setParam */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.spark.ml.classification

import scala.reflect.ClassTag

import org.apache.spark.annotation.Since
import org.apache.spark.ml.linalg.{DenseVector, Vector, VectorUDT}
import org.apache.spark.ml.param.ParamMap
Expand Down Expand Up @@ -51,7 +53,7 @@ private[ml] trait ProbabilisticClassifierParams
abstract class ProbabilisticClassifier[
FeaturesType,
E <: ProbabilisticClassifier[FeaturesType, E, M],
M <: ProbabilisticClassificationModel[FeaturesType, M]]
M <: ProbabilisticClassificationModel[FeaturesType, M] : ClassTag]
extends Classifier[FeaturesType, E, M] with ProbabilisticClassifierParams {

/** @group setParam */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -492,12 +492,7 @@ class GaussianMixture @Since("2.0.0") (
(i, (agg.means(i), agg.covs(i), agg.weights(i), ws))
}
} else Iterator.empty
}.reduceByKey { case ((mean1, cov1, w1, ws1), (mean2, cov2, w2, ws2)) =>
// update the weights, means and covariances for i-th distributions
BLAS.axpy(1.0, mean2, mean1)
BLAS.axpy(1.0, cov2, cov1)
(mean1, cov1, w1 + w2, ws1 + ws2)
}.mapValues { case (mean, cov, w, ws) =>
}.reduceByKey(GaussianMixture.mergeWeightsMeans).mapValues { case (mean, cov, w, ws) =>
// Create new distributions based on the partial assignments
// (often referred to as the "M" step in literature)
GaussianMixture.updateWeightsAndGaussians(mean, cov, w, ws)
Expand Down Expand Up @@ -560,12 +555,7 @@ class GaussianMixture @Since("2.0.0") (
agg.meanIter.zip(agg.covIter).zipWithIndex
.map { case ((mean, cov), i) => (i, (mean, cov, agg.weights(i), ws)) }
} else Iterator.empty
}.reduceByKey { case ((mean1, cov1, w1, ws1), (mean2, cov2, w2, ws2)) =>
// update the weights, means and covariances for i-th distributions
BLAS.axpy(1.0, mean2, mean1)
BLAS.axpy(1.0, cov2, cov1)
(mean1, cov1, w1 + w2, ws1 + ws2)
}.mapValues { case (mean, cov, w, ws) =>
}.reduceByKey(GaussianMixture.mergeWeightsMeans).mapValues { case (mean, cov, w, ws) =>
// Create new distributions based on the partial assignments
// (often referred to as the "M" step in literature)
GaussianMixture.updateWeightsAndGaussians(mean, cov, w, ws)
Expand Down Expand Up @@ -624,8 +614,8 @@ class GaussianMixture @Since("2.0.0") (
val gaussians = Array.tabulate(numClusters) { i =>
val start = i * numSamples
val end = start + numSamples
val sampleSlice = samples.view(start, end)
val weightSlice = sampleWeights.view(start, end)
val sampleSlice = samples.view.slice(start, end)
val weightSlice = sampleWeights.view.slice(start, end)
val localWeightSum = weightSlice.sum
weights(i) = localWeightSum / weightSum

Expand Down Expand Up @@ -691,6 +681,16 @@ object GaussianMixture extends DefaultParamsReadable[GaussianMixture] {
new DenseMatrix(n, n, symmetricValues)
}

private def mergeWeightsMeans(
a: (DenseVector, DenseVector, Double, Double),
b: (DenseVector, DenseVector, Double, Double)): (DenseVector, DenseVector, Double, Double) =
{
// update the weights, means and covariances for i-th distributions
BLAS.axpy(1.0, b._1, a._1)
BLAS.axpy(1.0, b._2, a._2)
(a._1, a._2, a._3 + b._3, a._4 + b._4)
}

/**
* Update the weight, mean and covariance of gaussian distribution.
*
Expand Down
3 changes: 2 additions & 1 deletion mllib/src/main/scala/org/apache/spark/ml/feature/LSH.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.spark.ml.feature

import scala.reflect.ClassTag
import scala.util.Random

import org.apache.spark.ml.{Estimator, Model}
Expand Down Expand Up @@ -324,7 +325,7 @@ private[ml] abstract class LSHModel[T <: LSHModel[T]]
* (2) Wang, Jingdong et al. "Hashing for similarity search: A survey." arXiv preprint
* arXiv:1408.2927 (2014).
*/
private[ml] abstract class LSH[T <: LSHModel[T]]
private[ml] abstract class LSH[T <: LSHModel[T] : ClassTag]
extends Estimator[T] with LSHParams with DefaultParamsWritable {
self: Estimator[T] =>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ object RobustScaler extends DefaultParamsReadable[RobustScaler] {
}
Iterator.tabulate(numFeatures)(i => (i, summaries(i).compress))
} else Iterator.empty
}.reduceByKey { case (s1, s2) => s1.merge(s2) }
}.reduceByKey { (s1, s2) => s1.merge(s2) }
} else {
val scale = math.max(math.ceil(math.sqrt(vectors.getNumPartitions)).toInt, 2)
vectors.mapPartitionsWithIndex { case (pid, iter) =>
Expand All @@ -214,7 +214,7 @@ object RobustScaler extends DefaultParamsReadable[RobustScaler] {
seqOp = (s, v) => s.insert(v),
combOp = (s1, s2) => s1.compress.merge(s2.compress)
).map { case ((_, i), s) => (i, s)
}.reduceByKey { case (s1, s2) => s1.compress.merge(s2.compress) }
}.reduceByKey { (s1, s2) => s1.compress.merge(s2.compress) }
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.spark.ml.feature

import scala.collection.mutable.ArrayBuilder
import scala.reflect.ClassTag

import org.apache.spark.annotation.Since
import org.apache.spark.ml._
Expand Down Expand Up @@ -155,7 +156,7 @@ private[feature] trait SelectorParams extends Params
* By default, the selection method is `numTopFeatures`, with the default number of top features
* set to 50.
*/
private[ml] abstract class Selector[T <: SelectorModel[T]]
private[ml] abstract class Selector[T <: SelectorModel[T] : ClassTag]
extends Estimator[T] with SelectorParams with DefaultParamsWritable {

/** @group setParam */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -291,7 +291,7 @@ class Word2VecModel private[ml] (
val outputSchema = transformSchema(dataset.schema, logging = true)
val vectors = wordVectors.getVectors
.mapValues(vv => Vectors.dense(vv.map(_.toDouble)))
.map(identity) // mapValues doesn't return a serializable map (SI-7005)
.map(identity).toMap // mapValues doesn't return a serializable map (SI-7005)
val bVectors = dataset.sparkSession.sparkContext.broadcast(vectors)
val d = $(vectorSize)
val emptyVec = Vectors.sparse(d, Array.emptyIntArray, Array.emptyDoubleArray)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -937,7 +937,7 @@ final class ParamMap private[ml] (private val map: mutable.Map[Param[Any], Any])

/** Put param pairs with a `java.util.List` of values for Python. */
private[ml] def put(paramPairs: JList[ParamPair[_]]): this.type = {
put(paramPairs.asScala: _*)
put(paramPairs.asScala.toSeq: _*)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.spark.ml.regression

import scala.reflect.ClassTag

import org.apache.spark.ml.{PredictionModel, Predictor, PredictorParams}


Expand All @@ -30,7 +32,7 @@ import org.apache.spark.ml.{PredictionModel, Predictor, PredictorParams}
abstract class Regressor[
FeaturesType,
Learner <: Regressor[FeaturesType, Learner, M],
M <: RegressionModel[FeaturesType, M]]
M <: RegressionModel[FeaturesType, M] : ClassTag]
extends Predictor[FeaturesType, Learner, M] with PredictorParams {

// TODO: defaultEvaluator (follow-up PR)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1223,28 +1223,28 @@ private[python] class PythonMLLibAPI extends Serializable {
* Python-friendly version of [[MLUtils.convertVectorColumnsToML()]].
*/
def convertVectorColumnsToML(dataset: DataFrame, cols: JArrayList[String]): DataFrame = {
MLUtils.convertVectorColumnsToML(dataset, cols.asScala: _*)
MLUtils.convertVectorColumnsToML(dataset, cols.asScala.toSeq: _*)
}

/**
* Python-friendly version of [[MLUtils.convertVectorColumnsFromML()]]
*/
def convertVectorColumnsFromML(dataset: DataFrame, cols: JArrayList[String]): DataFrame = {
MLUtils.convertVectorColumnsFromML(dataset, cols.asScala: _*)
MLUtils.convertVectorColumnsFromML(dataset, cols.asScala.toSeq: _*)
}

/**
* Python-friendly version of [[MLUtils.convertMatrixColumnsToML()]].
*/
def convertMatrixColumnsToML(dataset: DataFrame, cols: JArrayList[String]): DataFrame = {
MLUtils.convertMatrixColumnsToML(dataset, cols.asScala: _*)
MLUtils.convertMatrixColumnsToML(dataset, cols.asScala.toSeq: _*)
}

/**
* Python-friendly version of [[MLUtils.convertMatrixColumnsFromML()]]
*/
def convertMatrixColumnsFromML(dataset: DataFrame, cols: JArrayList[String]): DataFrame = {
MLUtils.convertMatrixColumnsFromML(dataset, cols.asScala: _*)
MLUtils.convertMatrixColumnsFromML(dataset, cols.asScala.toSeq: _*)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,7 @@ class BisectingKMeans private (
divisibleIndices.contains(parentIndex(index))
}
newClusters = summarize(d, newAssignments, dMeasure)
newClusterCenters = newClusters.mapValues(_.center).map(identity)
newClusterCenters = newClusters.mapValues(_.center).map(identity).toMap
}
if (preIndices != null) {
preIndices.unpersist()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@

package org.apache.spark.mllib.clustering

import scala.collection.mutable.IndexedSeq

import breeze.linalg.{diag, DenseMatrix => BreezeMatrix, DenseVector => BDV, Vector => BV}

import org.apache.spark.annotation.Since
Expand Down Expand Up @@ -189,8 +187,8 @@ class GaussianMixture private (
case None =>
val samples = breezeData.takeSample(withReplacement = true, k * nSamples, seed)
(Array.fill(k)(1.0 / k), Array.tabulate(k) { i =>
val slice = samples.view(i * nSamples, (i + 1) * nSamples)
new MultivariateGaussian(vectorMean(slice), initCovariance(slice))
val slice = samples.view.slice(i * nSamples, (i + 1) * nSamples)
new MultivariateGaussian(vectorMean(slice.toSeq), initCovariance(slice.toSeq))
})
}

Expand Down Expand Up @@ -259,7 +257,7 @@ class GaussianMixture private (
}

/** Average of dense breeze vectors */
private def vectorMean(x: IndexedSeq[BV[Double]]): BDV[Double] = {
private def vectorMean(x: Seq[BV[Double]]): BDV[Double] = {
val v = BDV.zeros[Double](x(0).length)
x.foreach(xi => v += xi)
v / x.length.toDouble
Expand All @@ -269,7 +267,7 @@ class GaussianMixture private (
* Construct matrix where diagonal entries are element-wise
* variance of input vectors (computes biased variance)
*/
private def initCovariance(x: IndexedSeq[BV[Double]]): BreezeMatrix[Double] = {
private def initCovariance(x: Seq[BV[Double]]): BreezeMatrix[Double] = {
val mu = vectorMean(x)
val ss = BDV.zeros[Double](x(0).length)
x.foreach { xi =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -335,7 +335,7 @@ object PrefixSpan extends Logging {
largePrefixes = newLargePrefixes
}

var freqPatterns = sc.parallelize(localFreqPatterns, 1)
var freqPatterns = sc.parallelize(localFreqPatterns.toSeq, 1)

val numSmallPrefixes = smallPrefixes.size
logInfo(s"number of small prefixes for local processing: $numSmallPrefixes")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ class SlidingRDD[T: ClassTag](@transient val parent: RDD[T], val windowSize: Int
}
if (sizes(i) + tail.length >= offset + windowSize) {
partitions +=
new SlidingRDDPartition[T](partitionIndex, parentPartitions(i), tail, offset)
new SlidingRDDPartition[T](partitionIndex, parentPartitions(i), tail.toSeq, offset)
partitionIndex += 1
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ private[spark] class EntropyAggregator(numClasses: Int)
* @param offset Start index of stats for this (node, feature, bin).
*/
def getCalculator(allStats: Array[Double], offset: Int): EntropyCalculator = {
new EntropyCalculator(allStats.view(offset, offset + statsSize - 1).toArray,
new EntropyCalculator(allStats.view.slice(offset, offset + statsSize - 1).toArray,
allStats(offset + statsSize - 1).toLong)
}
}
Expand Down
Loading