diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/HashingTF.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/HashingTF.scala index 27b8bdcad2c17..0e6c43aec7a7a 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/feature/HashingTF.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/feature/HashingTF.scala @@ -26,11 +26,12 @@ import org.apache.spark.ml.linalg.Vectors import org.apache.spark.ml.param._ import org.apache.spark.ml.param.shared.{HasInputCol, HasOutputCol} import org.apache.spark.ml.util._ -import org.apache.spark.mllib.feature.HashingTF.murmur3Hash +import org.apache.spark.mllib.feature.{HashingTF => OldHashingTF} import org.apache.spark.sql.{DataFrame, Dataset} import org.apache.spark.sql.functions.{col, udf} import org.apache.spark.sql.types.{ArrayType, StructType} import org.apache.spark.util.Utils +import org.apache.spark.util.VersionUtils.majorMinorVersion /** * Maps a sequence of terms to their term frequencies using the hashing trick. @@ -44,7 +45,7 @@ import org.apache.spark.util.Utils class HashingTF @Since("1.4.0") (@Since("1.4.0") override val uid: String) extends Transformer with HasInputCol with HasOutputCol with DefaultParamsWritable { - private[this] val hashFunc: Any => Int = murmur3Hash + private var hashFunc: Any => Int = FeatureHasher.murmur3Hash @Since("1.2.0") def this() = this(Identifiable.randomUID("hashingTF")) @@ -142,6 +143,29 @@ class HashingTF @Since("1.4.0") (@Since("1.4.0") override val uid: String) @Since("1.6.0") object HashingTF extends DefaultParamsReadable[HashingTF] { + private class HashingTFReader extends MLReader[HashingTF] { + + private val className = classOf[HashingTF].getName + + override def load(path: String): HashingTF = { + val metadata = DefaultParamsReader.loadMetadata(path, sc, className) + val hashingTF = new HashingTF(metadata.uid) + metadata.getAndSetParams(hashingTF) + + // We support loading old `HashingTF` saved by previous Spark versions. + // Previous `HashingTF` uses `mllib.feature.HashingTF.murmur3Hash`, but new `HashingTF` uses + // `ml.Feature.FeatureHasher.murmur3Hash`. + val (majorVersion, _) = majorMinorVersion(metadata.sparkVersion) + if (majorVersion < 3) { + hashingTF.hashFunc = OldHashingTF.murmur3Hash + } + hashingTF + } + } + + @Since("3.0.0") + override def read: MLReader[HashingTF] = new HashingTFReader + @Since("1.6.0") override def load(path: String): HashingTF = super.load(path) } diff --git a/mllib/src/test/resources/test-data/hashingTF-pre3.0/metadata/.part-00000.crc b/mllib/src/test/resources/test-data/hashingTF-pre3.0/metadata/.part-00000.crc new file mode 100644 index 0000000000000..1ac377ac0dac4 Binary files /dev/null and b/mllib/src/test/resources/test-data/hashingTF-pre3.0/metadata/.part-00000.crc differ diff --git a/mllib/src/test/resources/test-data/hashingTF-pre3.0/metadata/_SUCCESS b/mllib/src/test/resources/test-data/hashingTF-pre3.0/metadata/_SUCCESS new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/mllib/src/test/resources/test-data/hashingTF-pre3.0/metadata/part-00000 b/mllib/src/test/resources/test-data/hashingTF-pre3.0/metadata/part-00000 new file mode 100644 index 0000000000000..492a07ae06d5a --- /dev/null +++ b/mllib/src/test/resources/test-data/hashingTF-pre3.0/metadata/part-00000 @@ -0,0 +1 @@ +{"class":"org.apache.spark.ml.feature.HashingTF","timestamp":1564446310495,"sparkVersion":"2.3.0-SNAPSHOT","uid":"hashingTF_8ced2ab477c1","paramMap":{"binary":true,"numFeatures":100,"outputCol":"features","inputCol":"words"}} diff --git a/mllib/src/test/scala/org/apache/spark/ml/feature/HashingTFSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/feature/HashingTFSuite.scala index d768a4078325e..d65646e236f77 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/feature/HashingTFSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/feature/HashingTFSuite.scala @@ -83,11 +83,24 @@ class HashingTFSuite extends MLTest with DefaultReadWriteTest { .setInputCol("words") .setOutputCol("features") .setNumFeatures(n) - val mLlibHashingTF = new MLlibHashingTF(n) - assert(hashingTF.indexOf("a") === mLlibHashingTF.indexOf("a")) - assert(hashingTF.indexOf("b") === mLlibHashingTF.indexOf("b")) - assert(hashingTF.indexOf("c") === mLlibHashingTF.indexOf("c")) - assert(hashingTF.indexOf("d") === mLlibHashingTF.indexOf("d")) + assert(hashingTF.indexOf("a") === 67) + assert(hashingTF.indexOf("b") === 65) + assert(hashingTF.indexOf("c") === 68) + assert(hashingTF.indexOf("d") === 90) + } + + test("SPARK-23469: Load HashingTF prior to Spark 3.0") { + val hashingTFPath = testFile("test-data/hashingTF-pre3.0") + val loadedHashingTF = HashingTF.load(hashingTFPath) + val mLlibHashingTF = new MLlibHashingTF(100) + assert(loadedHashingTF.indexOf("a") === mLlibHashingTF.indexOf("a")) + assert(loadedHashingTF.indexOf("b") === mLlibHashingTF.indexOf("b")) + assert(loadedHashingTF.indexOf("c") === mLlibHashingTF.indexOf("c")) + assert(loadedHashingTF.indexOf("d") === mLlibHashingTF.indexOf("d")) + + val metadata = spark.read.json(s"$hashingTFPath/metadata") + val sparkVersionStr = metadata.select("sparkVersion").first().getString(0) + assert(sparkVersionStr == "2.3.0-SNAPSHOT") } test("read/write") { @@ -103,7 +116,7 @@ class HashingTFSuite extends MLTest with DefaultReadWriteTest { object HashingTFSuite { private[feature] def murmur3FeatureIdx(numFeatures: Int)(term: Any): Int = { - Utils.nonNegativeMod(MLlibHashingTF.murmur3Hash(term), numFeatures) + Utils.nonNegativeMod(FeatureHasher.murmur3Hash(term), numFeatures) } } diff --git a/python/pyspark/ml/feature.py b/python/pyspark/ml/feature.py index 1196da53cfff5..fe8ac6239a60b 100755 --- a/python/pyspark/ml/feature.py +++ b/python/pyspark/ml/feature.py @@ -902,19 +902,19 @@ class HashingTF(JavaTransformer, HasInputCol, HasOutputCol, HasNumFeatures, Java >>> df = spark.createDataFrame([(["a", "b", "c"],)], ["words"]) >>> hashingTF = HashingTF(numFeatures=10, inputCol="words", outputCol="features") >>> hashingTF.transform(df).head().features - SparseVector(10, {0: 1.0, 1: 1.0, 2: 1.0}) + SparseVector(10, {5: 1.0, 7: 1.0, 8: 1.0}) >>> hashingTF.setParams(outputCol="freqs").transform(df).head().freqs - SparseVector(10, {0: 1.0, 1: 1.0, 2: 1.0}) + SparseVector(10, {5: 1.0, 7: 1.0, 8: 1.0}) >>> params = {hashingTF.numFeatures: 5, hashingTF.outputCol: "vector"} >>> hashingTF.transform(df, params).head().vector - SparseVector(5, {0: 1.0, 1: 1.0, 2: 1.0}) + SparseVector(5, {0: 1.0, 2: 1.0, 3: 1.0}) >>> hashingTFPath = temp_path + "/hashing-tf" >>> hashingTF.save(hashingTFPath) >>> loadedHashingTF = HashingTF.load(hashingTFPath) >>> loadedHashingTF.getNumFeatures() == hashingTF.getNumFeatures() True >>> hashingTF.indexOf("b") - 1 + 5 .. versionadded:: 1.3.0 """ diff --git a/python/pyspark/ml/tests/test_feature.py b/python/pyspark/ml/tests/test_feature.py index e2fc4e5111e31..6b0d1dc9d0624 100644 --- a/python/pyspark/ml/tests/test_feature.py +++ b/python/pyspark/ml/tests/test_feature.py @@ -296,7 +296,7 @@ def test_apply_binary_term_freqs(self): hashingTF.setInputCol("words").setOutputCol("features").setNumFeatures(n).setBinary(True) output = hashingTF.transform(df) features = output.select("features").first().features.toArray() - expected = Vectors.dense([1.0, 1.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0]).toArray() + expected = Vectors.dense([0.0, 0.0, 0.0, 0.0, 0.0, 1.0, 0.0, 1.0, 1.0, 0.0]).toArray() for i in range(0, n): self.assertAlmostEqual(features[i], expected[i], 14, "Error at " + str(i) + ": expected " + str(expected[i]) + ", got " + str(features[i]))