diff --git a/README.md b/README.md index 3cfaae7..7939491 100644 --- a/README.md +++ b/README.md @@ -174,7 +174,8 @@ someRuleSet.addMinMaxRules("Retail_Price_Validation", col("retail_price"), Bound ### Categorical Rules There are two types of categorical rules which are used to validate against a pre-defined list of valid values. As of 0.2 accepted categorical types are String, Double, Int, Long but any types outside of this can -be input as an array() column of any type so long as it can be evaulated against the intput column +be input as an array() column of any type so long as it can be evaluated against the input column. + ```scala val catNumerics = Array( Rule("Valid_Stores", col("store_id"), Lookups.validStoreIDs), @@ -187,6 +188,18 @@ Rule("Valid_Regions", col("region"), Lookups.validRegions) ) ``` +An optional `ignoreCase` parameter can be specified when evaluating against a list of String values to ignore or apply +case-sensitivity. By default, input columns will be evaluated against a list of Strings with case-sensitivity applied. +```scala +Rule("Valid_Regions", col("region"), Lookups.validRegions, ignoreCase=true) +``` + +Furthermore, the evaluation of categorical rules can be inverted by specifying `invertMatch=true` as a parameter. +This can be handy when defining a Rule that an input column cannot match list of invalid values. For example: +```scala +Rule("Invalid_Skus", col("sku"), Lookups.invalidSkus, invertMatch=true) +``` + ### Validation Now that you have some rules built up... it's time to build the ruleset and validate it. As mentioned above, the dataframe can be a simple df or a grouped df by passing column[s] to perform validation at the diff --git a/demo/Example.scala b/demo/Example.scala index ec278ed..1f52a12 100644 --- a/demo/Example.scala +++ b/demo/Example.scala @@ -50,11 +50,12 @@ object Example extends App with SparkSessionWrapper { val catNumerics = Array( Rule("Valid_Stores", col("store_id"), Lookups.validStoreIDs), - Rule("Valid_Skus", col("sku"), Lookups.validSkus) + Rule("Valid_Skus", col("sku"), Lookups.validSkus), + Rule("Invalid_Skus", col("sku"), Lookups.invalidSkus, invertMatch=true) ) val catStrings = Array( - Rule("Valid_Regions", col("region"), Lookups.validRegions) + Rule("Valid_Regions", col("region"), Lookups.validRegions, ignoreCase=true) ) //TODO - validate datetime @@ -76,18 +77,18 @@ object Example extends App with SparkSessionWrapper { .withColumn("create_dt", 'create_ts.cast("date")) // Doing the validation - // The validate method will return the rules report dataframe which breaks down which rules passed and which - // rules failed and how/why. The second return value returns a boolean to determine whether or not all tests passed -// val (rulesReport, passed) = RuleSet(df, Array("store_id")) - val (rulesReport, passed) = RuleSet(df) + // The validate method will return two reports - a complete report and a summary report. + // The complete report is verbose and will add all rule validations to the right side of the original + // df passed into RuleSet, while the summary report will contain all of the rows that failed one or more + // Rule evaluations. + val validationResults = RuleSet(df) .add(specializedRules) .add(minMaxPriceRules) .add(catNumerics) .add(catStrings) - .validate(2) + .validate() - rulesReport.show(200, false) -// rulesReport.printSchema() + validationResults.completeReport.show(200, false) } diff --git a/demo/Rules_Engine_Examples.dbc b/demo/Rules_Engine_Examples.dbc index f4770a7..3d21bf7 100644 Binary files a/demo/Rules_Engine_Examples.dbc and b/demo/Rules_Engine_Examples.dbc differ diff --git a/demo/Rules_Engine_Examples.html b/demo/Rules_Engine_Examples.html index 15b5d5a..b85b426 100644 --- a/demo/Rules_Engine_Examples.html +++ b/demo/Rules_Engine_Examples.html @@ -10,33 +10,34 @@ - - - - - - - + + + + + + diff --git a/src/main/scala/com/databricks/labs/validation/Rule.scala b/src/main/scala/com/databricks/labs/validation/Rule.scala index 6e4b869..3f4006e 100644 --- a/src/main/scala/com/databricks/labs/validation/Rule.scala +++ b/src/main/scala/com/databricks/labs/validation/Rule.scala @@ -1,11 +1,8 @@ package com.databricks.labs.validation -import com.databricks.labs.validation.utils.Structures.{Bounds, ValidationException} +import com.databricks.labs.validation.utils.Structures.Bounds import org.apache.spark.sql.Column import org.apache.spark.sql.functions.{array, lit} -import org.apache.spark.sql.types.BooleanType - -import java.util.UUID /** * Definition of a rule @@ -21,6 +18,8 @@ class Rule( private var _validNumerics: Column = array(lit(null).cast("double")) private var _validStrings: Column = array(lit(null).cast("string")) private var _implicitBoolean: Boolean = false + private var _ignoreCase: Boolean = false + private var _invertMatch: Boolean = false val inputColumnName: String = inputColumn.expr.toString().replace("'", "") override def toString: String = { @@ -47,8 +46,8 @@ class Rule( this } - private def setValidStrings(value: Array[String]): this.type = { - _validStrings = lit(value) + private def setValidStrings(value: Array[String], ignoreCase: Boolean): this.type = { + _validStrings = if(ignoreCase) lit(value.map(_.toLowerCase)) else lit(value) inputColumn.expr.children.map(_.prettyName) this } @@ -63,6 +62,16 @@ class Rule( this } + private def setIgnoreCase(value: Boolean): this.type = { + _ignoreCase = value + this + } + + private def setInvertMatch(value: Boolean): this.type = { + _invertMatch = value + this + } + def boundaries: Bounds = _boundaries def validNumerics: Column = _validNumerics @@ -73,6 +82,10 @@ class Rule( def isImplicitBool: Boolean = _implicitBoolean + def ignoreCase: Boolean = _ignoreCase + + def invertMatch: Boolean = _invertMatch + def isAgg: Boolean = { inputColumn.expr.prettyName == "aggregateexpression" || inputColumn.expr.children.map(_.prettyName).contains("aggregateexpression") @@ -114,6 +127,18 @@ object Rule { .setValidExpr(validExpr) } + def apply( + ruleName: String, + column: Column, + validNumerics: Array[Double], + invertMatch: Boolean + ): Rule = { + + new Rule(ruleName, column, RuleType.ValidateNumerics) + .setValidNumerics(validNumerics) + .setInvertMatch(invertMatch) + } + def apply( ruleName: String, column: Column, @@ -122,6 +147,19 @@ object Rule { new Rule(ruleName, column, RuleType.ValidateNumerics) .setValidNumerics(validNumerics) + .setInvertMatch(false) + } + + def apply( + ruleName: String, + column: Column, + validNumerics: Array[Long], + invertMatch: Boolean + ): Rule = { + + new Rule(ruleName, column, RuleType.ValidateNumerics) + .setValidNumerics(validNumerics.map(_.toString.toDouble)) + .setInvertMatch(invertMatch) } def apply( @@ -132,6 +170,19 @@ object Rule { new Rule(ruleName, column, RuleType.ValidateNumerics) .setValidNumerics(validNumerics.map(_.toString.toDouble)) + .setInvertMatch(false) + } + + def apply( + ruleName: String, + column: Column, + validNumerics: Array[Int], + invertMatch: Boolean + ): Rule = { + + new Rule(ruleName, column, RuleType.ValidateNumerics) + .setValidNumerics(validNumerics.map(_.toString.toDouble)) + .setInvertMatch(invertMatch) } def apply( @@ -142,16 +193,21 @@ object Rule { new Rule(ruleName, column, RuleType.ValidateNumerics) .setValidNumerics(validNumerics.map(_.toString.toDouble)) + .setInvertMatch(false) } def apply( ruleName: String, column: Column, - validStrings: Array[String] + validStrings: Array[String], + ignoreCase: Boolean = false, + invertMatch: Boolean = false ): Rule = { new Rule(ruleName, column, RuleType.ValidateStrings) - .setValidStrings(validStrings) + .setValidStrings(validStrings, ignoreCase) + .setIgnoreCase(ignoreCase) + .setInvertMatch(invertMatch) } } diff --git a/src/main/scala/com/databricks/labs/validation/RuleSet.scala b/src/main/scala/com/databricks/labs/validation/RuleSet.scala index 54f092a..9b07ba7 100644 --- a/src/main/scala/com/databricks/labs/validation/RuleSet.scala +++ b/src/main/scala/com/databricks/labs/validation/RuleSet.scala @@ -35,7 +35,7 @@ class RuleSet extends SparkSessionWrapper { private def setGroupByCols(value: Seq[String]): this.type = { _groupBys = value - _isGrouped = true + _isGrouped = value.nonEmpty this } @@ -110,15 +110,16 @@ class RuleSet extends SparkSessionWrapper { } /** - * Merge two rule sets by adding one rule set to another - * - * @param ruleSet RuleSet to be added - * @return RuleSet - */ + * Merge two rule sets by adding one rule set to another + * + * @param ruleSet RuleSet to be added + * @return RuleSet + */ def add(ruleSet: RuleSet): RuleSet = { - new RuleSet().setDF(ruleSet.getDf) - .setIsGrouped(ruleSet.isGrouped) - .add(ruleSet.getRules) + val addtnlGroupBys = ruleSet.getGroupBys diff this.getGroupBys + val mergedGroupBys = this.getGroupBys ++ addtnlGroupBys + this.add(ruleSet.getRules) + .setGroupByCols(mergedGroupBys) } /** diff --git a/src/main/scala/com/databricks/labs/validation/Validator.scala b/src/main/scala/com/databricks/labs/validation/Validator.scala index afd782b..d005311 100644 --- a/src/main/scala/com/databricks/labs/validation/Validator.scala +++ b/src/main/scala/com/databricks/labs/validation/Validator.scala @@ -29,16 +29,19 @@ class Validator(ruleSet: RuleSet, detailLvl: Int) extends SparkSessionWrapper { rule.inputColumn.cast("string").alias("actual") ).alias(rule.ruleName) case RuleType.ValidateNumerics => + val ruleExpr = if(rule.invertMatch) not(array_contains(rule.validNumerics, rule.inputColumn)) else array_contains(rule.validNumerics, rule.inputColumn) struct( lit(rule.ruleName).alias("ruleName"), - array_contains(rule.validNumerics, rule.inputColumn).alias("passed"), + ruleExpr.alias("passed"), rule.validNumerics.cast("string").alias("permitted"), rule.inputColumn.cast("string").alias("actual") ).alias(rule.ruleName) case RuleType.ValidateStrings => + val ruleValue = if(rule.ignoreCase) lower(rule.inputColumn) else rule.inputColumn + val ruleExpr = if(rule.invertMatch) not(array_contains(rule.validStrings, ruleValue)) else array_contains(rule.validStrings, ruleValue) struct( lit(rule.ruleName).alias("ruleName"), - array_contains(rule.validStrings, rule.inputColumn).alias("passed"), + ruleExpr.alias("passed"), rule.validStrings.cast("string").alias("permitted"), rule.inputColumn.cast("string").alias("actual") ).alias(rule.ruleName) diff --git a/src/main/scala/com/databricks/labs/validation/utils/Structures.scala b/src/main/scala/com/databricks/labs/validation/utils/Structures.scala index eb7bda0..b5aec4a 100644 --- a/src/main/scala/com/databricks/labs/validation/utils/Structures.scala +++ b/src/main/scala/com/databricks/labs/validation/utils/Structures.scala @@ -13,7 +13,9 @@ object Lookups { final val validRegions = Array("Northeast", "Southeast", "Midwest", "Northwest", "Southcentral", "Southwest") - final val validSkus = Array(123456, 122987,123256, 173544, 163212, 365423, 168212) + final val validSkus = Array(123456, 122987, 123256, 173544, 163212, 365423, 168212) + + final val invalidSkus = Array(9123456, 9122987, 9123256, 9173544, 9163212, 9365423, 9168212) } diff --git a/src/test/scala/com/databricks/labs/validation/RuleSetTestSuite.scala b/src/test/scala/com/databricks/labs/validation/RuleSetTestSuite.scala new file mode 100644 index 0000000..2cdad2e --- /dev/null +++ b/src/test/scala/com/databricks/labs/validation/RuleSetTestSuite.scala @@ -0,0 +1,169 @@ +package com.databricks.labs.validation + +import com.databricks.labs.validation.utils.Structures.Bounds +import org.apache.spark.sql.functions._ +import org.scalatest.funsuite.AnyFunSuite + + +class RuleSetTestSuite extends AnyFunSuite with SparkSessionFixture { + + import spark.implicits._ + + spark.sparkContext.setLogLevel("ERROR") + + test("A rule set should be created from a DataFrame.") { + val testDF = Seq( + (1, 2, 3), + (4, 5, 6), + (7, 8, 9) + ).toDF("retail_price", "scan_price", "cost") + val testRuleSet = RuleSet(testDF) + + // Ensure that the RuleSet DataFrame is set properly + assert(testRuleSet.getDf.exceptAll(testDF).count() == 0, "RuleSet DataFrame is not equal to the input DataFrame.") + + // Ensure that the RuleSet properties are set properly + assert(!testRuleSet.isGrouped) + assert(testRuleSet.getGroupBys.isEmpty) + assert(testRuleSet.getRules.isEmpty) + + } + + test("A rule set should be created from a DataFrame grouped by multiple columns.") { + val testDF = Seq( + ("food_a", 2.51, 3, 111111111111111L), + ("food_b", 5.11, 6, 211111111111111L), + ("food_b", 5.32, 7, 311111111111111L), + ("food_d", 8.22, 99, 411111111111111L) + ).toDF("product_name", "scan_price", "cost", "id") + val testRuleSet = RuleSet(testDF, Array("product_name", "id")) + + // Ensure that the RuleSet DataFrame is set properly + assert(testRuleSet.getDf.exceptAll(testDF).count() == 0, "RuleSet DataFrame is not equal to the input DataFrame.") + + // Ensure that the group-by columns are set properly + assert(testRuleSet.isGrouped) + assert(testRuleSet.getGroupBys.length == 2) + assert(testRuleSet.getGroupBys.contains("product_name")) + assert(testRuleSet.getGroupBys.contains("id")) + + // Ensure that the RuleSet properties are set properly + assert(testRuleSet.getRules.isEmpty) + + } + + test("A rule set should be created from a DataFrame grouped by a single column.") { + val testDF = Seq( + ("food_a", 2.51, 3, 111111111111111L), + ("food_b", 5.11, 6, 211111111111111L), + ("food_b", 5.32, 7, 311111111111111L), + ("food_d", 8.22, 99, 411111111111111L) + ).toDF("product_name", "scan_price", "cost", "id") + val testRuleSet = RuleSet(testDF, "product_name") + + // Ensure that the RuleSet DataFrame is set properly + assert(testRuleSet.getDf.exceptAll(testDF).count() == 0, "RuleSet DataFrame is not equal to the input DataFrame.") + + // Ensure that the group-by columns are set properly + assert(testRuleSet.isGrouped) + assert(testRuleSet.getGroupBys.length == 1) + assert(testRuleSet.getGroupBys.head == "product_name") + + // Ensure that the RuleSet properties are set properly + assert(testRuleSet.getRules.isEmpty) + + } + + test("A rule set should be created from a DataFrame and list of rules.") { + val testDF = Seq( + ("Toyota", "Camry", 30000.00, 111111111111111L), + ("Ford", "Escape", 18750.00, 211111111111111L), + ("Ford", "Mustang", 32000.00, 311111111111111L), + ("Nissan", "Maxima", 25000.00, 411111111111111L) + ).toDF("make", "model", "msrp", "id") + val makeLovRule = Rule("Valid_Auto_Maker_Rule", col("make"), Array("Ford", "Toyota", "Nissan", "BMW", "Chevrolet")) + val modelLovRule = Rule("Valid_Auto_Models_Rule", col("model"), Array("Camry", "Mustang", "Maxima", "Escape", "330i")) + val groupedRuleSet = RuleSet(testDF, Array(makeLovRule, modelLovRule), Array("make")) + + // Ensure that the RuleSet DataFrame is set properly + assert(groupedRuleSet.getDf.exceptAll(testDF).count() == 0, "RuleSet DataFrame is not equal to the input DataFrame.") + + // Ensure that the RuleSet properties are set properly + assert(groupedRuleSet.isGrouped) + assert(groupedRuleSet.getGroupBys.length == 1) + assert(groupedRuleSet.getGroupBys.head == "make") + assert(groupedRuleSet.getRules.length == 2) + assert((groupedRuleSet.getRules.map(_.ruleName) diff Seq("Valid_Auto_Maker_Rule", "Valid_Auto_Models_Rule")).isEmpty) + + // Ensure a RuleSet can be created with a non-grouped DataFrame + val nonGroupedRuleSet = RuleSet(testDF, Array(makeLovRule, modelLovRule)) + + // Ensure that the RuleSet DataFrame is set properly + assert(nonGroupedRuleSet.getDf.exceptAll(testDF).count() == 0, "RuleSet DataFrame is not equal to the input DataFrame.") + + // Ensure that the RuleSet properties are set properly + assert(nonGroupedRuleSet.getGroupBys.isEmpty) + assert(nonGroupedRuleSet.getRules.length == 2) + assert((nonGroupedRuleSet.getRules.map(_.ruleName) diff Seq("Valid_Auto_Maker_Rule", "Valid_Auto_Models_Rule")).isEmpty) + } + + test("A rule set should be created from a DataFrame and list of MinMax rules.") { + val testDF = Seq( + ("Toyota", "Camry", 30000.00, 111111111111111L), + ("Ford", "Escape", 18750.00, 211111111111111L), + ("Ford", "Mustang", 32000.00, 311111111111111L), + ("Nissan", "Maxima", 25000.00, 411111111111111L) + ).toDF("make", "model", "msrp", "id") + val msrpBoundsRuleSet = RuleSet(testDF).addMinMaxRules("Valid_Auto_MSRP_Rule", col("msrp"), Bounds(1.0, 100000.0)) + + // Ensure that the RuleSet DataFrame is set properly + assert(msrpBoundsRuleSet.getDf.exceptAll(testDF).count() == 0, "RuleSet DataFrame is not equal to the input DataFrame.") + + // Ensure that the RuleSet properties are set properly + assert(msrpBoundsRuleSet.getGroupBys.isEmpty) + assert(msrpBoundsRuleSet.getRules.length == 2) + assert(Seq("Valid_Auto_MSRP_Rule_min", "Valid_Auto_MSRP_Rule_max").contains(msrpBoundsRuleSet.getRules(0).ruleName)) + assert(Seq("Valid_Auto_MSRP_Rule_min", "Valid_Auto_MSRP_Rule_max").contains(msrpBoundsRuleSet.getRules(1).ruleName)) + + } + + test("Two rule sets can be merged together.") { + + val testDF = Seq( + ("Toyota", "Camry", 30000.00, 111111111111111L), + ("Ford", "Escape", 18750.00, 211111111111111L), + ("Ford", "Mustang", 32000.00, 311111111111111L), + ("Nissan", "Maxima", 25000.00, 411111111111111L) + ).toDF("make", "model", "msrp", "id") + + // Create a bounds RuleSet + val msrpBoundsRuleSet = RuleSet(testDF).addMinMaxRules("Valid_Auto_MSRP_Rule", col("msrp"), Bounds(1.0, 100000.0)) + + // Create a LOV RuleSet + val makeLovRule = Rule("Valid_Auto_Maker_Rule", col("make"), Array("Ford", "Toyota", "Nissan", "BMW", "Chevrolet")) + val modelLovRule = Rule("Valid_Auto_Models_Rule", col("model"), Array("Camry", "Mustang", "Maxima", "Escape", "330i")) + val groupedRuleSet = RuleSet(testDF, Array(makeLovRule, modelLovRule), Array("make")) + + // Merge both RuleSets + val mergedRuleSet = groupedRuleSet.add(msrpBoundsRuleSet) + + // Ensure that the RuleSet DataFrame is set properly + assert(mergedRuleSet.getGroupBys.length == 1) + assert(mergedRuleSet.getDf.exceptAll(testDF).count() == 0, "RuleSet DataFrame is not equal to the input DataFrame.") + + // Ensure that the RuleSet properties are set properly + assert(mergedRuleSet.getRules.length == 4) + val mergedRuleNames = Seq("Valid_Auto_MSRP_Rule_min", "Valid_Auto_MSRP_Rule_max", "Valid_Auto_Maker_Rule", "Valid_Auto_Models_Rule") + assert(mergedRuleSet.getRules.count(r => mergedRuleNames.contains(r.ruleName)) == 4) + + // Ensure groupBy columns are merged properly + val groupedLovRuleSet = RuleSet(testDF, Array(makeLovRule, modelLovRule), Array("make")) + val mergedTheOtherWay = msrpBoundsRuleSet.add(groupedLovRuleSet) + assert(mergedTheOtherWay.getGroupBys.length == 1) + assert(mergedTheOtherWay.getGroupBys.head == "make") + assert(mergedTheOtherWay.getDf.exceptAll(testDF).count() == 0) + assert(mergedTheOtherWay.getRules.count(r => mergedRuleNames.contains(r.ruleName)) == 4) + + } + +} diff --git a/src/test/scala/com/databricks/labs/validation/RuleTestSuite.scala b/src/test/scala/com/databricks/labs/validation/RuleTestSuite.scala new file mode 100644 index 0000000..82158af --- /dev/null +++ b/src/test/scala/com/databricks/labs/validation/RuleTestSuite.scala @@ -0,0 +1,113 @@ +package com.databricks.labs.validation + +import com.databricks.labs.validation.utils.Structures.Bounds +import org.apache.spark.sql.functions.col +import org.scalatest.funsuite.AnyFunSuite + + +class RuleTestSuite extends AnyFunSuite with SparkSessionFixture { + + import spark.implicits._ + + spark.sparkContext.setLogLevel("ERROR") + + test("A MinMaxRule should be instantiated correctly.") { + + val minMaxRule = Rule("Temperature_MinMax_Rule", col("temperature"), Bounds(34.0, 85.0)) + + // Ensure that all attributes are set correctly + assert(minMaxRule.ruleName == "Temperature_MinMax_Rule", "Rule name is not set as expected.") + assert(minMaxRule.inputColumnName == "temperature", "Input column name is not set as expected.") + assert(minMaxRule.ruleType == RuleType.ValidateBounds, "The rule type is not set as expected.") + assert(!minMaxRule.isImplicitBool, "The rule should not be an implicit boolean expression.") + assert(!minMaxRule.isAgg, "The rule should not be an aggregation.") + + // Ensure that the boundaries are set correctly + assert(minMaxRule.boundaries.lower == 34.0, "Lower boundary is not set as expected.") + assert(minMaxRule.boundaries.upper == 85.0, "Upper boundary is not set as expected.") + + } + + test("An implicit boolean expression should be instantiated correctly.") { + + // Ensure a single column of type boolean can be instantiated correctly + val coolingBoolRule = Rule("Implicit_Cooling_Rule", col("cooling_bool")) + + // Ensure that all attributes are set correctly + assert(coolingBoolRule.ruleName == "Implicit_Cooling_Rule", "Rule name is not set as expected.") + assert(coolingBoolRule.inputColumnName == "cooling_bool", "Input column name is not set as expected.") + assert(coolingBoolRule.ruleType == RuleType.ValidateExpr, "The rule type is not set as expected.") + assert(coolingBoolRule.isImplicitBool, "The rule should not be an implicit boolean expression.") + assert(!coolingBoolRule.isAgg, "The rule should not be an aggregation.") + + // Ensure that a boolean expression can be used to create an implicit boolean rule + val coolingExprRule = Rule("Implicit_Cooling_Expr", col("current_temp") > col("target_temp")) + + // Ensure that all attributes are set correctly + assert(coolingExprRule.ruleName == "Implicit_Cooling_Expr", "Rule name is not set as expected.") + assert(coolingExprRule.inputColumnName == "(current_temp > target_temp)", "Input column name is not set as expected.") + assert(coolingExprRule.ruleType == RuleType.ValidateExpr, "The rule type is not set as expected.") + assert(coolingExprRule.isImplicitBool, "The rule should not be an implicit boolean expression.") + assert(!coolingExprRule.isAgg, "The rule should not be an aggregation.") + + } + + test("A column can be ruled equivalent to an expression.") { + + // Ensure that equivalent comparision can be made between a column and expression + val coolingBoolRule = Rule("Thermostat_Cooling_Rule", col("cooling_bool"), (col("current_temp") - col("target_temp")) >= 7.0) + + // Ensure that all attributes are set correctly + assert(coolingBoolRule.ruleName == "Thermostat_Cooling_Rule", "Rule name is not set as expected.") + assert(coolingBoolRule.inputColumnName == "cooling_bool", "Input column name is not set as expected.") + assert(coolingBoolRule.ruleType == RuleType.ValidateExpr, "The rule type is not set as expected.") + assert(!coolingBoolRule.isImplicitBool, "The rule should not be an implicit boolean expression.") + assert(!coolingBoolRule.isAgg, "The rule should not be an aggregation.") + + } + + test("A list of numerical values rule can be instantiated correctly.") { + + // Ensure that a rule with a numerical LOV can be created + val heatingRateIntRule = Rule("Heating_Rate_Int_Rule", col("heating_rate"), Array(0, 1, 5, 10, 15)) + + // Ensure that all attributes are set correctly for Integers + assert(heatingRateIntRule.ruleName == "Heating_Rate_Int_Rule", "Rule name is not set as expected.") + assert(heatingRateIntRule.inputColumnName == "heating_rate", "Input column name is not set as expected.") + assert(heatingRateIntRule.ruleType == RuleType.ValidateNumerics, "The rule type is not set as expected.") + assert(!heatingRateIntRule.isImplicitBool, "The rule should not be an implicit boolean expression.") + assert(!heatingRateIntRule.isAgg, "The rule should not be an aggregation.") + + // Ensure that all attributes are set correctly for Doubles + val heatingRateDoubleRule = Rule("Heating_Rate_Double_Rule", col("heating_rate"), Array(0.0, 0.1, 0.5, 0.10, 0.15)) + assert(heatingRateDoubleRule.ruleName == "Heating_Rate_Double_Rule", "Rule name is not set as expected.") + assert(heatingRateDoubleRule.inputColumnName == "heating_rate", "Input column name is not set as expected.") + assert(heatingRateDoubleRule.ruleType == RuleType.ValidateNumerics, "The rule type is not set as expected.") + assert(!heatingRateDoubleRule.isImplicitBool, "The rule should not be an implicit boolean expression.") + assert(!heatingRateDoubleRule.isAgg, "The rule should not be an aggregation.") + + // Ensure that all attributes are set correctly for Longs + val heatingRateLongRule = Rule("Heating_Rate_Long_Rule", col("heating_rate"), Array(111111111111111L, 211111111111111L, 311111111111111L)) + assert(heatingRateLongRule.ruleName == "Heating_Rate_Long_Rule", "Rule name is not set as expected.") + assert(heatingRateLongRule.inputColumnName == "heating_rate", "Input column name is not set as expected.") + assert(heatingRateLongRule.ruleType == RuleType.ValidateNumerics, "The rule type is not set as expected.") + assert(!heatingRateLongRule.isImplicitBool, "The rule should not be an implicit boolean expression.") + assert(!heatingRateLongRule.isAgg, "The rule should not be an aggregation.") + + } + + test("A list of string values rule can be instantiated correctly.") { + + // Ensure that a rule with a numerical LOV can be created + val buildingNameRule = Rule("Building_LOV_Rule", col("site_name"), Array("SiteA", "SiteB", "SiteC")) + + // Ensure that all attributes are set correctly for Integers + assert(buildingNameRule.ruleName == "Building_LOV_Rule", "Rule name is not set as expected.") + assert(buildingNameRule.inputColumnName == "site_name", "Input column name is not set as expected.") + assert(buildingNameRule.ruleType == RuleType.ValidateStrings, "The rule type is not set as expected.") + assert(!buildingNameRule.isImplicitBool, "The rule should not be an implicit boolean expression.") + assert(!buildingNameRule.isAgg, "The rule should not be an aggregation.") + + } + +} diff --git a/src/test/scala/com/databricks/labs/validation/ValidatorTestSuite.scala b/src/test/scala/com/databricks/labs/validation/ValidatorTestSuite.scala index 6003887..5bd6c50 100644 --- a/src/test/scala/com/databricks/labs/validation/ValidatorTestSuite.scala +++ b/src/test/scala/com/databricks/labs/validation/ValidatorTestSuite.scala @@ -1,280 +1,619 @@ package com.databricks.labs.validation import com.databricks.labs.validation.utils.Structures.{Bounds, MinMaxRuleDef} -import org.apache.spark.sql.functions.{col, min} -import org.scalatest.funsuite.AnyFunSuite import org.apache.spark.sql.functions._ +import org.apache.spark.sql.expressions.Window +import org.scalatest.funsuite.AnyFunSuite -case class ValidationValue(validDateTime: java.lang.Long, validNumerics: Array[Double], bounds: Array[Double], validStrings: Array[String]) +case class ValidationValue(ruleName: String, passed: Boolean, permitted: String, actual: String) class ValidatorTestSuite extends AnyFunSuite with SparkSessionFixture { import spark.implicits._ + spark.sparkContext.setLogLevel("ERROR") -// -// test("The input dataframe should have no rule failures on MinMaxRule") { -// val expectedDF = Seq( -// ("MinMax_Cost_Generated_max","bounds",ValidationValue(null,null,Array(0.0, 12.0),null),0,false), -// ("MinMax_Cost_Generated_min","bounds",ValidationValue(null,null,Array(0.0, 12.0),null),0,false), -// ("MinMax_Cost_manual_max","bounds",ValidationValue(null,null,Array(0.0, 12.0),null),0,false), -// ("MinMax_Cost_manual_min","bounds",ValidationValue(null,null,Array(0.0, 12.0),null),0,false), -// ("MinMax_Cost_max","bounds",ValidationValue(null,null,Array(0.0, 12.0),null),0,false), -// ("MinMax_Cost_min","bounds",ValidationValue(null,null,Array(0.0, 12.0),null),0,false), -// ("MinMax_Scan_Price_max","bounds",ValidationValue(null,null,Array(0.0, 29.99),null),0,false), -// ("MinMax_Scan_Price_min","bounds",ValidationValue(null,null,Array(0.0, 29.99),null),0,false), -// ("MinMax_Sku_Price_max","bounds",ValidationValue(null,null,Array(0.0, 29.99),null),0,false), -// ("MinMax_Sku_Price_min","bounds",ValidationValue(null,null,Array(0.0, 29.99),null),0,false) -// ).toDF("Rule_Name","Rule_Type","Validation_Values","Invalid_Count","Failed") -// val data = Seq() -// // 2 per rule so 2 MinMax_Sku_Price + 2 MinMax_Scan_Price + 2 MinMax_Cost + 2 MinMax_Cost_Generated -// // + 2 MinMax_Cost_manual = 10 rules -// val testDF = Seq( -// (1, 2, 3), -// (4, 5, 6), -// (7, 8, 9) -// ).toDF("retail_price", "scan_price", "cost") -// val minMaxPriceDefs = Array( -// MinMaxRuleDef("MinMax_Sku_Price", col("retail_price"), Bounds(0.0, 29.99)), -// MinMaxRuleDef("MinMax_Scan_Price", col("scan_price"), Bounds(0.0, 29.99)), -// MinMaxRuleDef("MinMax_Cost", col("cost"), Bounds(0.0, 12.0)) -// ) -// -// // Generate the array of Rules from the minmax generator -// val rulesArray = RuleSet.generateMinMaxRules(MinMaxRuleDef("MinMax_Cost_Generated", col("cost"), Bounds(0.0, 12.0))) -// -// val someRuleSet = RuleSet(testDF) -// someRuleSet.addMinMaxRules(minMaxPriceDefs: _*) -// someRuleSet.addMinMaxRules("MinMax_Cost_manual", col("cost"), Bounds(0.0,12.0)) -// someRuleSet.add(rulesArray) -// val (rulesReport, passed) = someRuleSet.validate() -// assert(rulesReport.except(expectedDF).count() == 0) -// assert(passed) -// assert(rulesReport.count() == 10) -// } -// -// test("The input rule should have 1 invalid count for MinMax_Scan_Price_Minus_Retail_Price_min and max for failing complex type.") { -// val expectedDF = Seq( -// ("MinMax_Retail_Price_Minus_Scan_Price_max","bounds",ValidationValue(null,null,Array(0.0, 29.99),null),1,true), -// ("MinMax_Retail_Price_Minus_Scan_Price_min","bounds",ValidationValue(null,null,Array(0.0, 29.99),null),1,true), -// ("MinMax_Scan_Price_Minus_Retail_Price_max","bounds",ValidationValue(null,null,Array(0.0, 29.99),null),0,false), -// ("MinMax_Scan_Price_Minus_Retail_Price_min","bounds",ValidationValue(null,null,Array(0.0, 29.99),null),0,false) -// ).toDF("Rule_Name","Rule_Type","Validation_Values","Invalid_Count","Failed") -// -// val testDF = Seq( -// (1, 2, 3), -// (4, 5, 6), -// (7, 8, 9) -// ).toDF("retail_price", "scan_price", "cost") -// val minMaxPriceDefs = Array( -// MinMaxRuleDef("MinMax_Retail_Price_Minus_Scan_Price", col("retail_price")-col("scan_price"), Bounds(0.0, 29.99)), -// MinMaxRuleDef("MinMax_Scan_Price_Minus_Retail_Price", col("scan_price")-col("retail_price"), Bounds(0.0, 29.99)) -// ) -// -// // Generate the array of Rules from the minmax generator -// val someRuleSet = RuleSet(testDF) -// someRuleSet.addMinMaxRules(minMaxPriceDefs: _*) -// val (rulesReport, passed) = someRuleSet.validate() -// assert(rulesReport.except(expectedDF).count() == 0, "Expected df is not equal to the returned rules report.") -// assert(!passed) -// assert(rulesReport.count() == 4) -// } -// -// test("The input rule should have 3 invalid count for failing aggregate type.") { -// val expectedDF = Seq( -// ("MinMax_Min_Retail_Price","bounds",ValidationValue(null,null,Array(0.0, 29.99),null),0,false), -// ("MinMax_Min_Scan_Price","bounds",ValidationValue(null,null,Array(3.0, 29.99),null),1,true) -// ).toDF("Rule_Name","Rule_Type","Validation_Values","Invalid_Count","Failed") -// val testDF = Seq( -// (1, 2, 3), -// (4, 5, 6), -// (7, 8, 9) -// ).toDF("retail_price", "scan_price", "cost") -// val minMaxPriceDefs = Seq( -// Rule("MinMax_Min_Retail_Price", min("retail_price"), Bounds(0.0, 29.99)), -// Rule("MinMax_Min_Scan_Price", min("scan_price"), Bounds(3.0, 29.99)) -// ) -// -// -// // Generate the array of Rules from the minmax generator -// val someRuleSet = RuleSet(testDF) -// someRuleSet.add(minMaxPriceDefs) -// val (rulesReport, passed) = someRuleSet.validate() -// assert(rulesReport.except(expectedDF).count() == 0, "Expected df is not equal to the returned rules report.") -// assert(!passed) -// assert(rulesReport.count() == 2) -// } -// -// test("The input dataframe should have exactly 1 rule failure on MinMaxRule") { -// val expectedDF = Seq( -// ("MinMax_Cost_max","bounds",ValidationValue(null,null,Array(0.0, 12.00),null),1,true), -// ("MinMax_Cost_min","bounds",ValidationValue(null,null,Array(0.0, 12.00),null),0,false), -// ("MinMax_Scan_Price_max","bounds",ValidationValue(null,null,Array(0.0, 29.99),null),0,false), -// ("MinMax_Scan_Price_min","bounds",ValidationValue(null,null,Array(0.0, 29.99),null),0,false), -// ("MinMax_Sku_Price_max","bounds",ValidationValue(null,null,Array(0.0, 29.99),null),0,false), -// ("MinMax_Sku_Price_min","bounds",ValidationValue(null,null,Array(0.0, 29.99),null),0,false) -// ).toDF("Rule_Name","Rule_Type","Validation_Values","Invalid_Count","Failed") -// val testDF = Seq( -// (1, 2, 3), -// (4, 5, 6), -// (7, 8, 99) -// ).toDF("retail_price", "scan_price", "cost") -// val minMaxPriceDefs = Array( -// MinMaxRuleDef("MinMax_Sku_Price", col("retail_price"), Bounds(0.0, 29.99)), -// MinMaxRuleDef("MinMax_Scan_Price", col("scan_price"), Bounds(0.0, 29.99)), -// MinMaxRuleDef("MinMax_Cost", col("cost"), Bounds(0.0, 12.0)) -// ) -// // Generate the array of Rules from the minmax generator -// -// val someRuleSet = RuleSet(testDF) -// someRuleSet.addMinMaxRules(minMaxPriceDefs: _*) -// val (rulesReport, passed) = someRuleSet.validate() -// val failedResults = rulesReport.filter(rulesReport("Invalid_Count") > 0).collect() -// assert(failedResults.length == 1) -// assert(rulesReport.except(expectedDF).count() == 0, "Expected df is not equal to the returned rules report.") -// assert(failedResults(0)(0) == "MinMax_Cost_max") -// assert(!passed) -// } -// -// test("The DF in the rulesset object is the same as the input test df") { -// val testDF = Seq( -// (1, 2, 3), -// (4, 5, 6), -// (7, 8, 99) -// ).toDF("retail_price", "scan_price", "cost") -// val minMaxPriceDefs = Array( -// MinMaxRuleDef("MinMax_Sku_Price", col("retail_price"), Bounds(0.0, 29.99)), -// MinMaxRuleDef("MinMax_Scan_Price", col("scan_price"), Bounds(0.0, 29.99)), -// MinMaxRuleDef("MinMax_Cost", col("cost"), Bounds(0.0, 12.0)) -// ) -// // Generate the array of Rules from the minmax generator -// -// val someRuleSet = RuleSet(testDF) -// someRuleSet.addMinMaxRules(minMaxPriceDefs: _*) -// val rulesDf = someRuleSet.getDf -// assert(testDF.except(rulesDf).count() == 0) -// } -// -// test("The group by columns are the correct group by clauses in the validation") { -// val expectedDF = Seq( -// (3,"MinMax_Scan_Price_max","bounds",ValidationValue(null,null,Array(0.0, 29.99),null),0,false), -// (6,"MinMax_Scan_Price_max","bounds",ValidationValue(null,null,Array(0.0, 29.99),null),0,false), -// (3,"MinMax_Scan_Price_min","bounds",ValidationValue(null,null,Array(0.0, 29.99),null),0,false), -// (6,"MinMax_Scan_Price_min","bounds",ValidationValue(null,null,Array(0.0, 29.99),null),0,false), -// (3,"MinMax_Sku_Price_max","bounds",ValidationValue(null,null,Array(0.0, 29.99),null),0,false), -// (6,"MinMax_Sku_Price_max","bounds",ValidationValue(null,null,Array(0.0, 29.99),null),0,false), -// (3,"MinMax_Sku_Price_min","bounds",ValidationValue(null,null,Array(0.0, 29.99),null),0,false), -// (6,"MinMax_Sku_Price_min","bounds",ValidationValue(null,null,Array(0.0, 29.99),null),0,false) -// ).toDF("cost","Rule_Name","Rule_Type","Validation_Values","Invalid_Count","Failed") -// // 2 groups so count of the rules should yield (2 minmax rules * 2 columns) * 2 groups in cost (8 rows) -// val testDF = Seq( -// (1, 2, 3), -// (4, 5, 6), -// (7, 8, 3) -// ).toDF("retail_price", "scan_price", "cost") -// val minMaxPriceDefs = Array( -// MinMaxRuleDef("MinMax_Sku_Price", col("retail_price"), Bounds(0.0, 29.99)), -// MinMaxRuleDef("MinMax_Scan_Price", col("scan_price"), Bounds(0.0, 29.99)) -// ) -// -// val someRuleSet = RuleSet(testDF, "cost") -// someRuleSet.addMinMaxRules(minMaxPriceDefs: _*) -// val groupBys = someRuleSet.getGroupBys -// val (groupByValidated, passed) = someRuleSet.validate() -// -// assert(groupBys.length == 1) -// assert(groupBys.head == "cost") -// assert(someRuleSet.isGrouped) -// assert(passed) -// assert(groupByValidated.count() == 8) -// assert(groupByValidated.except(expectedDF).count() == 0, "Expected df is not equal to the returned rules report.") -// assert(groupByValidated.filter(groupByValidated("Invalid_Count") > 0).count() == 0) -// assert(groupByValidated.filter(groupByValidated("Failed") === true).count() == 0) -// } -// -// test("The group by columns are with rules failing the validation") { -// val expectedDF = Seq( -// (3,"MinMax_Sku_Price_max","bounds",ValidationValue(null,null,Array(0.0, 0.0),null),1,true), -// (6,"MinMax_Sku_Price_max","bounds",ValidationValue(null,null,Array(0.0, 0.0),null),1,true), -// (3,"MinMax_Sku_Price_min","bounds",ValidationValue(null,null,Array(0.0, 0.0),null),1,true), -// (6,"MinMax_Sku_Price_min","bounds",ValidationValue(null,null,Array(0.0, 0.0),null),1,true), -// (3,"MinMax_Scan_Price_max","bounds",ValidationValue(null,null,Array(0.0, 29.99),null),0,false), -// (6,"MinMax_Scan_Price_max","bounds",ValidationValue(null,null,Array(0.0, 29.99),null),0,false), -// (3,"MinMax_Scan_Price_min","bounds",ValidationValue(null,null,Array(0.0, 29.99),null),0,false), -// (6,"MinMax_Scan_Price_min","bounds",ValidationValue(null,null,Array(0.0, 29.99),null),0,false) -// ).toDF("cost","Rule_Name","Rule_Type","Validation_Values","Invalid_Count","Failed") -// // 2 groups so count of the rules should yield (2 minmax rules * 2 columns) * 2 groups in cost (8 rows) -// val testDF = Seq( -// (1, 2, 3), -// (4, 5, 6), -// (7, 8, 3) -// ).toDF("retail_price", "scan_price", "cost") -// val minMaxPriceDefs = Array( -// MinMaxRuleDef("MinMax_Sku_Price", col("retail_price"), Bounds(0.0, 0.0)), -// MinMaxRuleDef("MinMax_Scan_Price", col("scan_price"), Bounds(0.0, 29.99)) -// ) -// -// val someRuleSet = RuleSet(testDF, "cost") -// someRuleSet.addMinMaxRules(minMaxPriceDefs: _*) -// val groupBys = someRuleSet.getGroupBys -// val (groupByValidated, passed) = someRuleSet.validate() -// -// assert(groupBys.length == 1, "Group by length is not 1") -// assert(groupBys.head == "cost", "Group by column is not cost") -// assert(someRuleSet.isGrouped) -// assert(!passed, "Rule set did not fail.") -// assert(groupByValidated.count() == 8, "Rule count should be 8") -// assert(groupByValidated.except(expectedDF).count() == 0, "Expected df is not equal to the returned rules report.") -// assert(groupByValidated.filter(groupByValidated("Invalid_Count") > 0).count() == 4, "Invalid count is not 4.") -// assert(groupByValidated.filter(groupByValidated("Failed") === true).count() == 4, "Failed count is not 4.") -// } -// - test("Validate list of values with numeric types, string types and long types.") { + test("The input dataframe should have no rule failures on MinMaxRule") { + // 2 per rule so 2 MinMax_Sku_Price + 2 MinMax_Scan_Price + 2 MinMax_Cost + 2 MinMax_Cost_Generated + // + 2 MinMax_Cost_manual = 10 rules + val testDF = Seq( + (1, 2, 3), + (4, 5, 6), + (7, 8, 9) + ).toDF("retail_price", "scan_price", "cost") + + val expectedColumns = testDF.columns ++ Seq("MinMax_Sku_Price_min", "MinMax_Sku_Price_max", "MinMax_Scan_Price_min", + "MinMax_Scan_Price_max", "MinMax_Cost_min", "MinMax_Cost_max", "MinMax_Cost_manual_min", "MinMax_Cost_manual_max", + "MinMax_Cost_Generated_min", "MinMax_Cost_Generated_max") + val expectedDF = Seq( + (1, 2, 3, + ValidationValue("MinMax_Sku_Price_min", passed = true, "[0.0, 29.99]", "1"), + ValidationValue("MinMax_Sku_Price_max", passed = true, "[0.0, 29.99]", "1"), + ValidationValue("MinMax_Scan_Price_min", passed = true, "[0.0, 29.99]", "2"), + ValidationValue("MinMax_Scan_Price_max", passed = true, "[0.0, 29.99]", "2"), + ValidationValue("MinMax_Cost_min", passed = true, "[0.0, 12.0]", "3"), + ValidationValue("MinMax_Cost_max", passed = true, "[0.0, 12.0]", "3"), + ValidationValue("MinMax_Cost_manual_min", passed = true, "[0.0, 12.0]", "3"), + ValidationValue("MinMax_Cost_manual_max", passed = true, "[0.0, 12.0]", "3"), + ValidationValue("MinMax_Cost_Generated_min", passed = true, "[0.0, 12.0]", "3"), + ValidationValue("MinMax_Cost_Generated_max", passed = true, "[0.0, 12.0]", "3") + ), + (4, 5, 6, + ValidationValue("MinMax_Sku_Price_min", passed = true, "[0.0, 29.99]", "4"), + ValidationValue("MinMax_Sku_Price_max", passed = true, "[0.0, 29.99]", "4"), + ValidationValue("MinMax_Scan_Price_min", passed = true, "[0.0, 29.99]", "5"), + ValidationValue("MinMax_Scan_Price_max", passed = true, "[0.0, 29.99]", "5"), + ValidationValue("MinMax_Cost_min", passed = true, "[0.0, 12.0]", "6"), + ValidationValue("MinMax_Cost_max", passed = true, "[0.0, 12.0]", "6"), + ValidationValue("MinMax_Cost_manual_min", passed = true, "[0.0, 12.0]", "6"), + ValidationValue("MinMax_Cost_manual_max", passed = true, "[0.0, 12.0]", "6"), + ValidationValue("MinMax_Cost_Generated_min", passed = true, "[0.0, 12.0]", "6"), + ValidationValue("MinMax_Cost_Generated_max", passed = true, "[0.0, 12.0]", "6") + ), + (7, 8, 9, + ValidationValue("MinMax_Sku_Price_min", passed = true, "[0.0, 29.99]", "7"), + ValidationValue("MinMax_Sku_Price_max", passed = true, "[0.0, 29.99]", "7"), + ValidationValue("MinMax_Scan_Price_min", passed = true, "[0.0, 29.99]", "8"), + ValidationValue("MinMax_Scan_Price_max", passed = true, "[0.0, 29.99]", "8"), + ValidationValue("MinMax_Cost_min", passed = true, "[0.0, 12.0]", "9"), + ValidationValue("MinMax_Cost_max", passed = true, "[0.0, 12.0]", "9"), + ValidationValue("MinMax_Cost_manual_min", passed = true, "[0.0, 12.0]", "9"), + ValidationValue("MinMax_Cost_manual_max", passed = true, "[0.0, 12.0]", "9"), + ValidationValue("MinMax_Cost_Generated_min", passed = true, "[0.0, 12.0]", "9"), + ValidationValue("MinMax_Cost_Generated_max", passed = true, "[0.0, 12.0]", "9") + ) + ).toDF(expectedColumns: _*) + + // Create an Array of MinMax Rules + val minMaxPriceDefs = Array( + MinMaxRuleDef("MinMax_Sku_Price", col("retail_price"), Bounds(0.0, 29.99)), + MinMaxRuleDef("MinMax_Scan_Price", col("scan_price"), Bounds(0.0, 29.99)), + MinMaxRuleDef("MinMax_Cost", col("cost"), Bounds(0.0, 12.0)) + ) + + // Generate the array of Rules from the minmax generator + val rulesArray = RuleSet.generateMinMaxRules(MinMaxRuleDef("MinMax_Cost_Generated", col("cost"), Bounds(0.0, 12.0))) + val someRuleSet = RuleSet(testDF) + someRuleSet.addMinMaxRules(minMaxPriceDefs: _*) + + // Manually add a Rule + someRuleSet.addMinMaxRules("MinMax_Cost_manual", col("cost"), Bounds(0.0, 12.0)) + someRuleSet.add(rulesArray) + val validationResults = someRuleSet.validate() + + // Ensure that validate report is expected + assert(validationResults.completeReport.exceptAll(expectedDF).count() == 0, "Expected df is not equal to the returned rules report.") + + // Ensure that there are 2 Rules per MinMax Rule added as separate columns + assert(validationResults.completeReport.count() == 3) + assert((validationResults.completeReport.columns diff testDF.columns).length == 10) + + // Ensure that all Rules passed;there should be no failed Rules + assert(validationResults.summaryReport.count() == 0) + } + + test("The input rule should have 3 invalid count for MinMax_Scan_Price_Minus_Retail_Price_min and max for failing complex type.") { + val testDF = Seq( + (1, 2, 3), + (4, 5, 6), + (7, 8, 9) + ).toDF("retail_price", "scan_price", "cost") + val expectedColumns = testDF.columns ++ Seq("MinMax_Retail_Price_Minus_Scan_Price_min", "MinMax_Retail_Price_Minus_Scan_Price_max", + "MinMax_Scan_Price_Minus_Retail_Price_min", "MinMax_Scan_Price_Minus_Retail_Price_max") + val expectedDF = Seq( + (1, 2, 3, + ValidationValue("MinMax_Retail_Price_Minus_Scan_Price_min", passed = false, "[0.0, 29.99]", "-1"), + ValidationValue("MinMax_Retail_Price_Minus_Scan_Price_max", passed = false, "[0.0, 29.99]", "-1"), + ValidationValue("MinMax_Scan_Price_Minus_Retail_Price_min", passed = true, "[0.0, 29.99]", "1"), + ValidationValue("MinMax_Scan_Price_Minus_Retail_Price_max", passed = true, "[0.0, 29.99]", "1") + ), + (4, 5, 6, + ValidationValue("MinMax_Retail_Price_Minus_Scan_Price_min", passed = false, "[0.0, 29.99]", "-1"), + ValidationValue("MinMax_Retail_Price_Minus_Scan_Price_max", passed = false, "[0.0, 29.99]", "-1"), + ValidationValue("MinMax_Scan_Price_Minus_Retail_Price_min", passed = true, "[0.0, 29.99]", "1"), + ValidationValue("MinMax_Scan_Price_Minus_Retail_Price_max", passed = true, "[0.0, 29.99]", "1") + ), + (7, 8, 9, + ValidationValue("MinMax_Retail_Price_Minus_Scan_Price_min", passed = false, "[0.0, 29.99]", "-1"), + ValidationValue("MinMax_Retail_Price_Minus_Scan_Price_max", passed = false, "[0.0, 29.99]", "-1"), + ValidationValue("MinMax_Scan_Price_Minus_Retail_Price_min", passed = true, "[0.0, 29.99]", "1"), + ValidationValue("MinMax_Scan_Price_Minus_Retail_Price_max", passed = true, "[0.0, 29.99]", "1") + ) + ).toDF(expectedColumns: _*) + + val minMaxPriceDefs = Array( + MinMaxRuleDef("MinMax_Retail_Price_Minus_Scan_Price", col("retail_price") - col("scan_price"), Bounds(0.0, 29.99)), + MinMaxRuleDef("MinMax_Scan_Price_Minus_Retail_Price", col("scan_price") - col("retail_price"), Bounds(0.0, 29.99)) + ) + + // Generate the array of Rules from the minmax generator + val someRuleSet = RuleSet(testDF) + someRuleSet.addMinMaxRules(minMaxPriceDefs: _*) + val validationResults = someRuleSet.validate() + + // Ensure that validate report is expected + assert(validationResults.completeReport.exceptAll(expectedDF).count() == 0, "Expected df is not equal to the returned rules report.") + + // Ensure that there are failed rows in summary report + assert(validationResults.summaryReport.count() > 0) + assert(validationResults.summaryReport.count() == 3) + } + + test("The input rule should have 1 invalid count for failing aggregate type.") { + val testDF = Seq( + (1, 2, 3), + (4, 5, 6), + (7, 8, 9) + ).toDF("retail_price", "scan_price", "cost") + val expectedColumns = testDF.columns ++ Seq("MinMax_Min_Retail_Price", "MinMax_Min_Scan_Price") + val expectedDF = Seq( + (1, 2, 3, + ValidationValue("MinMax_Min_Retail_Price", passed = true, "[0.0, 29.99]", "1"), + ValidationValue("MinMax_Min_Scan_Price", passed = false, "[3.0, 29.99]", "2") + ), + (4, 5, 6, + ValidationValue("MinMax_Min_Retail_Price", passed = true, "[0.0, 29.99]", "4"), + ValidationValue("MinMax_Min_Scan_Price", passed = true, "[3.0, 29.99]", "5") + ), + (7, 8, 9, + ValidationValue("MinMax_Min_Retail_Price", passed = true, "[0.0, 29.99]", "7"), + ValidationValue("MinMax_Min_Scan_Price", passed = true, "[3.0, 29.99]", "8") + ) + ).toDF(expectedColumns: _*) + val minMaxPriceDefs = Seq( + Rule("MinMax_Min_Retail_Price", min("retail_price"), Bounds(0.0, 29.99)), + Rule("MinMax_Min_Scan_Price", min("scan_price"), Bounds(3.0, 29.99)) + ) + + // Generate the array of Rules from the minmax generator + val someRuleSet = RuleSet(testDF) + someRuleSet.add(minMaxPriceDefs) + val validationResults = someRuleSet.validate() + + // Ensure that validate report is expected + assert(validationResults.completeReport.exceptAll(expectedDF).count() == 0, "Expected df is not equal to the returned rules report.") + + // Ensure that there is a failed row + assert(validationResults.summaryReport.count() > 0) + assert(validationResults.summaryReport.count() == 1) + } + + test("The input dataframe should have exactly 1 rule failure on MinMaxRule") { + val testDF = Seq( + (1, 2, 3), + (4, 5, 6), + (7, 8, 99) + ).toDF("retail_price", "scan_price", "cost") + val expectedColumns = testDF.columns ++ Seq("MinMax_Sku_Price_min", "MinMax_Sku_Price_max", + "MinMax_Scan_Price_min", "MinMax_Scan_Price_max", "MinMax_Cost_min", "MinMax_Cost_max" + ) + val expectedDF = Seq( + (1, 2, 3, + ValidationValue("MinMax_Sku_Price_min", passed = true, "[0.0, 29.99]", "1"), + ValidationValue("MinMax_Sku_Price_max", passed = true, "[0.0, 29.99]", "1"), + ValidationValue("MinMax_Scan_Price_min", passed = true, "[0.0, 29.99]", "2"), + ValidationValue("MinMax_Scan_Price_max", passed = true, "[0.0, 29.99]", "2"), + ValidationValue("MinMax_Cost_min", passed = true, "[0.0, 12.0]", "3"), + ValidationValue("MinMax_Cost_max", passed = true, "[0.0, 12.0]", "3"), + ), + (4, 5, 6, + ValidationValue("MinMax_Sku_Price_min", passed = true, "[0.0, 29.99]", "4"), + ValidationValue("MinMax_Sku_Price_max", passed = true, "[0.0, 29.99]", "4"), + ValidationValue("MinMax_Scan_Price_min", passed = true, "[0.0, 29.99]", "5"), + ValidationValue("MinMax_Scan_Price_max", passed = true, "[0.0, 29.99]", "5"), + ValidationValue("MinMax_Cost_min", passed = true, "[0.0, 12.0]", "6"), + ValidationValue("MinMax_Cost_max", passed = true, "[0.0, 12.0]", "6"), + ), + (7, 8, 99, + ValidationValue("MinMax_Sku_Price_min", passed = true, "[0.0, 29.99]", "7"), + ValidationValue("MinMax_Sku_Price_max", passed = true, "[0.0, 29.99]", "7"), + ValidationValue("MinMax_Scan_Price_min", passed = true, "[0.0, 29.99]", "8"), + ValidationValue("MinMax_Scan_Price_max", passed = true, "[0.0, 29.99]", "8"), + ValidationValue("MinMax_Cost_min", passed = false, "[0.0, 12.0]", "99"), + ValidationValue("MinMax_Cost_max", passed = false, "[0.0, 12.0]", "99"), + ) + ).toDF(expectedColumns: _*) + + val minMaxPriceDefs = Array( + MinMaxRuleDef("MinMax_Sku_Price", col("retail_price"), Bounds(0.0, 29.99)), + MinMaxRuleDef("MinMax_Scan_Price", col("scan_price"), Bounds(0.0, 29.99)), + MinMaxRuleDef("MinMax_Cost", col("cost"), Bounds(0.0, 12.0)) + ) + + // Generate the array of Rules from the minmax generator + val someRuleSet = RuleSet(testDF) + someRuleSet.addMinMaxRules(minMaxPriceDefs: _*) + val validationResults = someRuleSet.validate() + + // Ensure that validate report is expected + assert(validationResults.completeReport.exceptAll(expectedDF).count() == 0, "Expected df is not equal to the returned rules report.") + + // Ensure that there is a failed row + assert(validationResults.summaryReport.count() > 0) + assert(validationResults.summaryReport.count() == 1) + + // Ensure that the the failed Rules are MinMax_Cost_min, MinMax_Cost_max + assert(validationResults.summaryReport.select("failed_rules.ruleName").as[Array[String]].collect()(0)(0) == "MinMax_Cost_min", "MinMax_Cost_max") + assert(validationResults.summaryReport.select("failed_rules.ruleName").as[Array[String]].collect()(0)(1) == "MinMax_Cost_max", "MinMax_Cost_max") + } + + test("The DF in the rulesset object is the same as the input test df") { + val testDF = Seq( + (1, 2, 3), + (4, 5, 6), + (7, 8, 99) + ).toDF("retail_price", "scan_price", "cost") + val minMaxPriceDefs = Array( + MinMaxRuleDef("MinMax_Sku_Price", col("retail_price"), Bounds(0.0, 29.99)), + MinMaxRuleDef("MinMax_Scan_Price", col("scan_price"), Bounds(0.0, 29.99)), + MinMaxRuleDef("MinMax_Cost", col("cost"), Bounds(0.0, 12.0)) + ) + // Generate the array of Rules from the minmax generator + val someRuleSet = RuleSet(testDF) + someRuleSet.addMinMaxRules(minMaxPriceDefs: _*) + val rulesDf = someRuleSet.getDf + assert(testDF.except(rulesDf).count() == 0) + } + + test("The group by columns are the correct group by clauses in the validation") { + // 2 groups so count of the rules should yield (2 minmax rules * 2 columns) * 2 groups in cost (8 rows) + val testDF = Seq( + (1, 2, 3), + (4, 5, 6), + (7, 8, 3) + ).toDF("retail_price", "scan_price", "cost") + val expectedColumns = Seq("cost", "MinMax_Sku_Price_min", "MinMax_Sku_Price_max", "MinMax_Scan_Price_min", "MinMax_Scan_Price_max") + val expectedDF = Seq( + (3, + ValidationValue("MinMax_Sku_Price_min", passed = true, "[0.0, 29.99]", "1"), + ValidationValue("MinMax_Sku_Price_max", passed = true, "[0.0, 29.99]", "7"), + ValidationValue("MinMax_Scan_Price_min", passed = true, "[0.0, 29.99]", "2"), + ValidationValue("MinMax_Scan_Price_max", passed = true, "[0.0, 29.99]", "8") + ), + (6, + ValidationValue("MinMax_Sku_Price_min", passed = true, "[0.0, 29.99]", "4"), + ValidationValue("MinMax_Sku_Price_max", passed = true, "[0.0, 29.99]", "4"), + ValidationValue("MinMax_Scan_Price_min", passed = true, "[0.0, 29.99]", "5"), + ValidationValue("MinMax_Scan_Price_max", passed = true, "[0.0, 29.99]", "5") + ) + ).toDF(expectedColumns: _*) + + val minMaxPriceDefs = Array( + MinMaxRuleDef("MinMax_Sku_Price", col("retail_price"), Bounds(0.0, 29.99)), + MinMaxRuleDef("MinMax_Scan_Price", col("scan_price"), Bounds(0.0, 29.99)) + ) + + val someRuleSet = RuleSet(testDF, "cost") + someRuleSet.addMinMaxRules(minMaxPriceDefs: _*) + val groupBys = someRuleSet.getGroupBys + val validationResults = someRuleSet.validate() + + // Ensure that input DF was grouped by "cost" column + assert(groupBys.length == 1) + assert(groupBys.head == "cost") + assert(someRuleSet.isGrouped) + + // Ensure that all rows passed + assert(validationResults.summaryReport.count() == 0) + + // Ensure that the complete report matches the expected output + assert(validationResults.completeReport.count() == 2) + assert(validationResults.completeReport.exceptAll(expectedDF).count() == 0, "Expected df is not equal to the returned rules report.") + } + + test("The group by columns are with rules failing the validation") { + // 2 groups so count of the rules should yield (2 minmax rules * 2 columns) * 2 groups in cost (8 rows) + val testDF = Seq( + (1, 2, 3), + (4, 5, 6), + (7, 8, 3) + ).toDF("retail_price", "scan_price", "cost") + val expectedColumns = Seq("cost", "MinMax_Sku_Price_min", "MinMax_Sku_Price_max", "MinMax_Scan_Price_min", "MinMax_Scan_Price_max") + val expectedDF = Seq( + (3, + ValidationValue("MinMax_Sku_Price_min", passed = false, "[0.0, 0.0]", "1"), + ValidationValue("MinMax_Sku_Price_max", passed = false, "[0.0, 0.0]", "7"), + ValidationValue("MinMax_Scan_Price_min", passed = true, "[0.0, 29.99]", "2"), + ValidationValue("MinMax_Scan_Price_max", passed = true, "[0.0, 29.99]", "8") + ), + (6, + ValidationValue("MinMax_Sku_Price_min", passed = false, "[0.0, 0.0]", "4"), + ValidationValue("MinMax_Sku_Price_max", passed = false, "[0.0, 0.0]", "4"), + ValidationValue("MinMax_Scan_Price_min", passed = true, "[0.0, 29.99]", "5"), + ValidationValue("MinMax_Scan_Price_max", passed = true, "[0.0, 29.99]", "5") + ) + ).toDF(expectedColumns: _*) + val minMaxPriceDefs = Array( + MinMaxRuleDef("MinMax_Sku_Price", col("retail_price"), Bounds(0.0, 0.0)), + MinMaxRuleDef("MinMax_Scan_Price", col("scan_price"), Bounds(0.0, 29.99)) + ) + + val someRuleSet = RuleSet(testDF, "cost") + someRuleSet.addMinMaxRules(minMaxPriceDefs: _*) + val groupBys = someRuleSet.getGroupBys + val validationResults = someRuleSet.validate() + + assert(groupBys.length == 1, "Group by length is not 1") + assert(groupBys.head == "cost", "Group by column is not cost") + assert(someRuleSet.isGrouped) + + // Ensure that there are failed rows + assert(validationResults.summaryReport.count() > 0, "Rule set did not fail.") + assert(validationResults.summaryReport.count() == 2, "Failed row count should be 2") + assert(validationResults.completeReport.count() == 2, "Row count should be 2") + + // Ensure that the complete report matches expected output + assert(validationResults.completeReport.exceptAll(expectedDF).count() == 0, "Expected df is not equal to the returned rules report.") + } + + test("Validate list of values with integer, double, and long types.") { val testDF = Seq( ("food_a", 2.51, 3, 111111111111111L), ("food_b", 5.11, 6, 211111111111111L), ("food_c", 8.22, 99, 311111111111111L) ).toDF("product_name", "scan_price", "cost", "id") + val expectedColumns = testDF.columns ++ Seq("CheckIfCostIsInLOV", "CheckIfScanPriceIsInLOV", "CheckIfIdIsInLOV") val numericLovExpectedDF = Seq( - ("CheckIfCostIsInLOV","validNumerics",ValidationValue(null,Array(3,6,99),null,null),0,false), - ("CheckIfScanPriceIsInLOV","validNumerics",ValidationValue(null,Array(2.51,5.11,8.22),null,null),0,false), - ("CheckIfIdIsInLOV","validNumerics",ValidationValue(null,Array(111111111111111L,211111111111111L,311111111111111L),null,null),0,false) - ).toDF("Rule_Name","Rule_Type","Validation_Values","Invalid_Count","Failed") + ("food_a", 2.51, 3, 111111111111111L, + ValidationValue("CheckIfCostIsInLOV", passed = true, "[3.0, 6.0, 99.0]", "3"), + ValidationValue("CheckIfScanPriceIsInLOV", passed = true, "[2.51, 5.11, 8.22]", "2.51"), + ValidationValue("CheckIfIdIsInLOV", passed = true, "[1.11111111111111E14, 2.11111111111111E14, 3.11111111111111E14]", "111111111111111") + ), + ("food_b", 5.11, 6, 211111111111111L, + ValidationValue("CheckIfCostIsInLOV", passed = true, "[3.0, 6.0, 99.0]", "6"), + ValidationValue("CheckIfScanPriceIsInLOV", passed = true, "[2.51, 5.11, 8.22]", "5.11"), + ValidationValue("CheckIfIdIsInLOV", passed = true, "[1.11111111111111E14, 2.11111111111111E14, 3.11111111111111E14]", "211111111111111") + ), + ("food_c", 8.22, 99, 311111111111111L, + ValidationValue("CheckIfCostIsInLOV", passed = true, "[3.0, 6.0, 99.0]", "99"), + ValidationValue("CheckIfScanPriceIsInLOV", passed = true, "[2.51, 5.11, 8.22]", "8.22"), + ValidationValue("CheckIfIdIsInLOV", passed = true, "[1.11111111111111E14, 2.11111111111111E14, 3.11111111111111E14]", "311111111111111") + ) + ).toDF(expectedColumns: _*) + val numericRules = Array( - Rule("CheckIfCostIsInLOV", col("cost"), Array(3,6,99)), - Rule("CheckIfScanPriceIsInLOV", col("scan_price"), Array(2.51,5.11,8.22)), - Rule("CheckIfIdIsInLOV", col("id"), Array(111111111111111L,211111111111111L,311111111111111L)) + Rule("CheckIfCostIsInLOV", col("cost"), Array(3, 6, 99)), + Rule("CheckIfScanPriceIsInLOV", col("scan_price"), Array(2.51, 5.11, 8.22)), + Rule("CheckIfIdIsInLOV", col("id"), Array(111111111111111L, 211111111111111L, 311111111111111L)) ) - // Generate the array of Rules from the minmax generator + // Generate the array of Rules from the minmax generator val numericRuleSet = RuleSet(testDF) numericRuleSet.add(numericRules) val numericValidationResults = numericRuleSet.validate() + + // Ensure that all ruleTypes are ValidateNumerics assert(numericRules.map(_.ruleType == RuleType.ValidateNumerics).reduce(_ && _), "Not every value is validate numerics.") - assert(numericRules.map(_.boundaries == null).reduce(_ && _), "Boundaries are not null.") + + // Ensure that there are infinite boundaries, by default + assert(numericRules.map(_.boundaries.lower == Double.NegativeInfinity).reduce(_ && _), "Lower boundaries are not negatively infinite.") + assert(numericRules.map(_.boundaries.upper == Double.PositiveInfinity).reduce(_ && _), "Upper boundaries are not positively infinite.") + + // Ensure that the complete report matches expected output + assert(numericValidationResults.completeReport.exceptAll(numericLovExpectedDF).count() == 0, "Expected numeric df is not equal to the returned rules report.") + + // Ensure that all rows passed the Rules assert(numericValidationResults.summaryReport.isEmpty) -// assert(numericValidated.except(numericLovExpectedDF).count() == 0, "Expected df is not equal to the returned rules report.") -// assert(numericValidated.filter(numericValidated("Invalid_Count") > 0).count() == 0) -// assert(numericValidated.filter(numericValidated("Failed") === true).count() == 0) - val stringRule = Rule("CheckIfProductNameInLOV", col("product_name"), Array("food_a","food_b","food_c")) - // Generate the array of Rules from the minmax generator + // Ensure rows can be validated against a list of invalid numerics + val invalidNumColumns = testDF.columns ++ Seq("CheckIfCostIsInLOV", "CheckIfScanPriceIsInLOV", "CheckIfIdIsInLOV") + val invalidNumsExpectedDF = Seq( + ("food_a", 2.51, 3, 111111111111111L, + ValidationValue("Invalid_Price_Rule", passed = true, "[-1.0, -5.0, 0.0, 1000.0]", "2.51"), + ValidationValue("Invalid_Id_Rule", passed = true, "[7.11111111111111E14, 8.11111111111111E14, 9.11111111111111E14]", "111111111111111"), + ValidationValue("Invalid_Cost_Rule", passed = true, "[99.0, 10000.0, 100000.0, 1000000.0]", "3") + ), + ("food_b", 5.11, 6, 211111111111111L, + ValidationValue("Invalid_Price_Rule", passed = true, "[-1.0, -5.0, 0.0, 1000.0]", "5.11"), + ValidationValue("Invalid_Id_Rule", passed = true, "[7.11111111111111E14, 8.11111111111111E14, 9.11111111111111E14]", "211111111111111"), + ValidationValue("Invalid_Cost_Rule", passed = true, "[99.0, 10000.0, 100000.0, 1000000.0]", "6") + ), + ("food_c", 8.22, 99, 311111111111111L, + ValidationValue("Invalid_Price_Rule", passed = true, "[-1.0, -5.0, 0.0, 1000.0]", "8.22"), + ValidationValue("Invalid_Id_Rule", passed = true, "[7.11111111111111E14, 8.11111111111111E14, 9.11111111111111E14]", "311111111111111"), + ValidationValue("Invalid_Cost_Rule", passed = false, "[99.0, 10000.0, 100000.0, 1000000.0]", "99") + ) + ).toDF(expectedColumns: _*) + + val invalidPrices = Array(-1.00, -5.00, 0.00, 1000.0) + val invalidIds = Array(711111111111111L, 811111111111111L, 911111111111111L) + val invalidCosts = Array(99, 10000, 100000, 1000000) + val invalidNumericalRules = Array( + Rule("Invalid_Price_Rule", col("scan_price"), invalidPrices, invertMatch = true), + Rule("Invalid_Id_Rule", col("id"), invalidIds, invertMatch = true), + Rule("Invalid_Cost_Rule", col("cost"), invalidCosts, invertMatch = true), + ) + val invalidNumericalResults = RuleSet(testDF).add(invalidNumericalRules).validate() + + // Ensure that there is 1 failed row + assert(invalidNumericalResults.summaryReport.count() == 1) + + // Ensure that the invertMatch attribute is set properly + assert(invalidNumericalRules.count(_.invertMatch) == 3) + + // Ensure that the validation report matches expected output + assert(invalidNumericalResults.completeReport.exceptAll(invalidNumsExpectedDF).count() == 0, "Expected invalid numerics df is not equal to the returned rules report.") + + } + test("The input df should have no rule failures for valid string LOVs.") { + val testDF = Seq( + ("food_a", 2.51, 3, 111111111111111L), + ("food_b", 5.11, 6, 211111111111111L), + ("food_c", 8.22, 99, 311111111111111L) + ).toDF("product_name", "scan_price", "cost", "id") + + // Create a String List of Values Rule + val validProductNamesRule = Rule("CheckIfProductNameInLOV", col("product_name"), Array("food_a", "food_b", "food_c")) + val stringIgnoreCaseRule = Rule("IgnoreCaseProductNameLOV", col("product_name"), Array("Food_B", "food_A", "FOOD_C"), ignoreCase = true) + val invalidFoodsRule = Rule("InvalidProductNameLOV", col("product_name"), Array("food_x", "food_y", "food_z"), invertMatch = true) + + val expectedStringLovColumns = testDF.columns ++ Seq("CheckIfProductNameInLOV", "IgnoreCaseProductNameLOV", "InvalidProductNameLOV") val stringLovExpectedDF = Seq( - ("CheckIfProductNameInLOV","validStrings",ValidationValue(null,null,null,Array("food_a", "food_b", "food_c")),0,false) - ).toDF("Rule_Name","Rule_Type","Validation_Values","Invalid_Count","Failed") + ("food_a", 2.51, 3, 111111111111111L, + ValidationValue("CheckIfProductNameInLOV", passed = true, "[food_a, food_b, food_c]", "food_a"), + ValidationValue("IgnoreCaseProductNameLOV", passed = true, "[food_b, food_a, food_c]", "food_a"), + ValidationValue("InvalidProductNameLOV", passed = true, "[food_x, food_y, food_z]", "food_a") + ), + ("food_b", 5.11, 6, 211111111111111L, + ValidationValue("CheckIfProductNameInLOV", passed = true, "[food_a, food_b, food_c]", "food_b"), + ValidationValue("IgnoreCaseProductNameLOV", passed = true, "[food_b, food_a, food_c]", "food_b"), + ValidationValue("InvalidProductNameLOV", passed = true, "[food_x, food_y, food_z]", "food_b") + ), + ("food_c", 8.22, 99, 311111111111111L, + ValidationValue("CheckIfProductNameInLOV", passed = true, "[food_a, food_b, food_c]", "food_c"), + ValidationValue("IgnoreCaseProductNameLOV", passed = true, "[food_b, food_a, food_c]", "food_c"), + ValidationValue("InvalidProductNameLOV", passed = true, "[food_x, food_y, food_z]", "food_c") + ) + ).toDF(expectedStringLovColumns: _*) + + // Validate testDF against String LOV Rule + val productNameRules = Array(validProductNamesRule, stringIgnoreCaseRule, invalidFoodsRule) + val stringRuleSet = RuleSet(testDF).add(productNameRules) - val stringRuleSet = RuleSet(testDF) - stringRuleSet.add(stringRule) val stringValidationResults = stringRuleSet.validate() - assert(stringRule.ruleType == RuleType.ValidateStrings) - assert(stringRule.boundaries == null) + + // Ensure that the ruleType is set properly + assert(validProductNamesRule.ruleType == RuleType.ValidateStrings) + + // Ensure that the complete report matches expected output + assert(stringValidationResults.completeReport.exceptAll(stringLovExpectedDF).count() == 0, "Expected String LOV df is not equal to the returned rules report.") + + // Ensure that there are infinite boundaries, by default + assert(validProductNamesRule.boundaries.lower == Double.NegativeInfinity, "Lower boundaries are not negatively infinite.") + assert(validProductNamesRule.boundaries.upper == Double.PositiveInfinity, "Upper boundaries are not positively infinite.") + + // Ensure that all rows passed; there are no failed rows assert(stringValidationResults.summaryReport.isEmpty) -// assert(stringValidated.except(stringLovExpectedDF).count() == 0, "Expected df is not equal to the returned rules report.") -// assert(stringValidated.filter(stringValidated("Invalid_Count") > 0).count() == 0) -// assert(stringValidated.filter(stringValidated("Failed") === true).count() == 0) } + test("The input df should have no rule failures for an implicit expression rule.") { + + val testDF = Seq( + (1, "iot_thermostat_1", 84.00, 74.00), + (2, "iot_thermostat_2", 67.05, 72.00), + (3, "iot_thermostat_3", 91.14, 76.00) + ).toDF("device_id", "device_name", "current_temp", "target_temp") + + val expectedColumns = testDF.columns ++ Seq("TemperatureDiffExpressionRule") + val expectedDF = Seq( + (1, "iot_thermostat_1", 84.00, 74.00, ValidationValue("TemperatureDiffExpressionRule", passed = true, "(abs((current_temp - target_temp)) < 50.0)", "true")), + (2, "iot_thermostat_2", 67.05, 72.00, ValidationValue("TemperatureDiffExpressionRule", passed = true, "(abs((current_temp - target_temp)) < 50.0)", "true")), + (3, "iot_thermostat_3", 91.14, 76.00, ValidationValue("TemperatureDiffExpressionRule", passed = true, "(abs((current_temp - target_temp)) < 50.0)", "true")) + ).toDF(expectedColumns: _*) + + val exprRuleSet = RuleSet(testDF) + exprRuleSet.add(Rule("TemperatureDiffExpressionRule", abs(col("current_temp") - col("target_temp")) < 50.00)) + + val validationResults = exprRuleSet.validate() + + // Ensure that there are no failed rows for rule expression + assert(validationResults.summaryReport.isEmpty) + + // Ensure that the ruleType is set correctly + assert(exprRuleSet.getRules.head.ruleType == RuleType.ValidateExpr) + assert(exprRuleSet.getRules.head.isImplicitBool) + + // Ensure that the complete report matches the expected output + assert(validationResults.completeReport.exceptAll(expectedDF).count() == 0, "Expected expression df is not equal to the returned rules report.") + + } + + test("The input df should have a single rule failure for an expression rule.") { + + val testDF = Seq( + (1, "iot_thermostat_1", 84.00, 74.00, -10.00, -10.00), + (2, "iot_thermostat_2", 76.00, 66.00, -10.00, -10.00), + (3, "iot_thermostat_3", 91.00, 69.00, -20.00, -10.00) + ).toDF("device_id", "device_name", "current_temp", "target_temp", "temp_diff", "cooling_rate") + + val expectedColumns = testDF.columns ++ Seq("ImplicitCoolingExpressionRule") + val expectedDF = Seq( + (1, "iot_thermostat_1", 84, 74, -10, -10, + ValidationValue("CoolingExpressionRule", passed = true, "abs(cooling_rate)", "10.0") + ), + (2, "iot_thermostat_2", 76, 66, -10, -10, + ValidationValue("CoolingExpressionRule", passed = true, "abs(cooling_rate)", "10.0") + ), + (3, "iot_thermostat_3", 91, 69, -20, -10, + ValidationValue("CoolingExpressionRule", passed = false, "abs(cooling_rate)", "10.0") + ) + ).toDF(expectedColumns: _*) + + val exprRuleSet = RuleSet(testDF) + // Create a rule that ensure the cooling rate can accommodate the temp difference + exprRuleSet.add(Rule("CoolingExpressionRule", abs(col("cooling_rate")), expr("abs(temp_diff)"))) + + val validationResults = exprRuleSet.validate() + + // Ensure that there is a single row failure + assert(validationResults.summaryReport.count() > 0) + assert(validationResults.summaryReport.count() == 1) + + // Ensure that the ruleType is set correctly + assert(exprRuleSet.getRules.head.ruleType == RuleType.ValidateExpr) + assert(!exprRuleSet.getRules.head.isImplicitBool) + + // Ensure that the complete report matches the expected output + assert(validationResults.completeReport.exceptAll(expectedDF).count() == 0, "Expected explicit expression df is not equal to the returned rules report.") + + } + + test("The input df should have 3 rule failures for complex expression rules.") { + + val testDF = Seq( + ("Northwest", 1001, 123256, 9.32, 8.99, 4.23, "2021-04-01", "2020-02-01 12:00:00.000"), // bad expiration date + ("Northwest", 1001, 123456, 19.99, 16.49, 12.99, "2021-07-26", "2020-02-02 12:08:00.000"), + ("Northwest", 1001, 123456, 0.99, 0.99, 0.10, "2021-07-26", "2020-02-02 12:10:00.000"), // price change too rapid -- same day + ("Northwest", 1001, 123456, 0.98, 0.90, 0.10, "2021-07-26", "2020-02-05 12:13:00.000"), + ("Northwest", 1001, 123456, 0.99, 0.99, 0.10, "2021-07-26", "2020-02-07 00:00:00.000"), + ("Northwest", 1001, 122987, -9.99, -9.49, -6.49, "2021-07-26", "2021-02-01 00:00:00.000"), + ).toDF("region", "store_id", "sku", "retail_price", "scan_price", "cost", "expiration_date", "create_ts") + .withColumn("create_ts", 'create_ts.cast("timestamp")) + .withColumn("create_dt", 'create_ts.cast("date")) + + // Limit price updates to at most one per day + val window = Window.partitionBy("region", "store_id", "sku").orderBy("create_ts") + val skuUpdateRule = Rule("One_Update_Per_Day_Rule", unix_timestamp(col("create_ts")) - unix_timestamp(lag("create_ts", 1).over(window)) > 60 * 60 * 24) + + // Limit expiration date to be within a range + val expirationDateRule = Rule("Expiration_Date_Rule", col("expiration_date").cast("date").between("2021-05-01", "2021-12-31")) + + // Group by region, store_id, sku, expiration_date, create_ts + val validDatesRuleset = RuleSet(testDF, Array(skuUpdateRule, expirationDateRule), Seq("region", "store_id", "sku", "expiration_date", "create_ts")) + val validDatesResults = validDatesRuleset.validate() + + // Ensure that there are 2 rule failures + assert(validDatesResults.summaryReport.count() == 2) + assert(validDatesResults.completeReport.filter(not(col("One_Update_Per_Day_Rule.passed"))).count() == 1) + assert(validDatesResults.completeReport.filter(not(col("Expiration_Date_Rule.passed"))).count() == 1) + assert(validDatesResults.completeReport.filter(not(col("One_Update_Per_Day_Rule.passed"))).select("sku").as[Int].collect.head == 123456) + assert(validDatesResults.completeReport.filter(not(col("Expiration_Date_Rule.passed"))).select("sku").as[Int].collect.head == 123256) + + // Ensure that the ruleTypes are set correctly + assert(validDatesRuleset.getRules.count(_.ruleType == RuleType.ValidateExpr) == 2) + assert(validDatesRuleset.getRules.count(_.isImplicitBool) == 2) + assert(validDatesRuleset.getGroupBys.length == 5) + + // Limit price columns to be non-negative amounts + val nonNegativeColumns = array(col("retail_price"), col("scan_price"), col("cost")) + val nonNegativeValueRule = Rule("Non_Negative_Values_Rule", size(filter(nonNegativeColumns, c => c <= 0.0)) === 0) + + // Group by region, store_id, sku, retail_price, scan_price, cost + val nonNegativeValuesRuleset = RuleSet(testDF, Array(nonNegativeValueRule), Seq("region", "store_id", "sku", "retail_price", "scan_price", "cost")) + val nonNegativeValuesResults = nonNegativeValuesRuleset.validate() + + // Ensure that there is 1 rule failure + assert(nonNegativeValuesResults.summaryReport.count() == 1) + assert(nonNegativeValuesResults.completeReport.filter(not(col("Non_Negative_Values_Rule.passed"))).count() == 1) + assert(nonNegativeValuesResults.completeReport.filter(not(col("Non_Negative_Values_Rule.passed"))).select("sku").as[Int].collect.head == 122987) + + // Ensure that the ruleType is set correctly + assert(nonNegativeValuesRuleset.getRules.head.ruleType == RuleType.ValidateExpr) + assert(nonNegativeValuesRuleset.getRules.head.isImplicitBool) + assert(nonNegativeValuesRuleset.getGroupBys.length == 6) + + } }