diff --git a/README.md b/README.md index c763d4b..9a04923 100644 --- a/README.md +++ b/README.md @@ -1,7 +1,7 @@ [![Scala CI](https://github.com/databrickslabs/dataframe-rules-engine/actions/workflows/scala.yml/badge.svg?branch=master)](https://github.com/databrickslabs/dataframe-rules-engine/actions/workflows/scala.yml) [![codecov](https://codecov.io/gh/databrickslabs/dataframe-rules-engine/branch/master/graph/badge.svg?token=6DEXO6I0BG)](https://codecov.io/gh/databrickslabs/dataframe-rules-engine) # dataframe-rules-engine -Simplified Validation for Production Workloads +Simplified Validation at scale for Production Spark Workloads on streaming / standard DataFrames and DataSets ## Project Description As pipelines move from bronze to gold, it's very common that some level of governance be performed in @@ -48,7 +48,12 @@ import com.databricks.labs.validation._ As of version 0.2 streaming dataframes are fully supported ## Quickstart +The basic steps to validating data with the rules engine are: +* create rules +* create ruleset +* validate +Below are some examples to demonstrate the basic process. ```scala val myRules = ??? // Definition of my base rules @@ -70,50 +75,76 @@ val validationResults = RuleSet(df, by = Array("myGroupA", "myGroupB")) ## Rules -As of version 0.2 There are three primary rule types -* Boundary Rules - * Rules that fail if the input column is outside of the specified boundaries - * Boundary Rules are **EXCLUSIVE** on both sides meaning Bounds(0.0, 10.0) will fail all values between and - including 0.0 and 10.0 - * Boundary rules are created when the validation is of type `Bounds()`. - * The default Bounds() are `Bounds(lower: Double = Double.NegativeInfinity, upper: Double = Double.PositiveInfinity)`; - therefore, if a lower or an upper is not specified, any value will pass - * Group Boundaries often come in pairs, to minimize the amount of rules that must be manually created, helper logic - was created to define [MinMax Rules](#minmax-rules) -* Categorical Rules (Strings and Numerical) - * Rules that fail if the result of the input column is not in a list of values -* Expression Rules - * Rules that fail if the input column != defined expression +There are four primary rule types +* [Simple Rules](#simple-rule) +* [Boundary Rules](#boundary-rules) +* [Implicit Boolean rules](#implicit-boolean-rules) +* [Categorical Rule](#categorical-rules) -Rules' input columns can be composed of: -* simple column references `col("my_column_name")` -* complex columns `col("Revenue") - col("Cost")` -* implicit boolean evaluation `lit(true)` - * These rules only take a single input column and it must resolve to true or false. All records in which it resolves - to true will be considered passed -* aggregate columns `min("ColumnName")` - * Do not mix aggregate input columns with non-aggregate input columns, instead create two Rule Sets - * If the rules' input columns are a mixture of aggregates and non-aggregates and no groupBy columns are passed - into the RuleSet the dataframe will be grouped by all df columns - -Rules can be applied to simple DataFrames or grouped Dataframes. To use a grouped dataframe simply pass -your dataframe into the RuleSet and pass one or more columns in as `by` columns. This will apply the rule -at the group level which can be helpful at times. Any input column expressions passed into a RuleSet must be able -to be evaluated inside of the `.agg()` of a groupedDataframe +These rule types can be applied to: +* Streaming Datasets +* Stateful Streaming Datasets +* Grouped Datasets + * Important distinction as the rules apply only within the range of the grouped keys when a grouped dataset + is passed for testing ### Simple Rule +A rule with a name, a check column, and an allowed value ```scala Rule("Require_specific_version", col("version"), lit(0.2)) Rule("Require_version>=0.2", col("version") >= 0.2, lit(true)) ``` -### Simple Boundary Rule +### Implicit Boolean Rules +These rules are the same as columnar expression based rules except they don't require the comparison against `lit(true)`. +A type validation is done on the column before validation begins to ensure that the resolved expression resolves to +a boolean type. +```scala +// Passes where result is true +Rule("Require_version>=0.2", col("version") >= 0.2) +Rule("Require_version>=0.2", col("myDFBooleanCol")) +``` + +Note that the following is true, conceptually, since the implicit boolean compares against an implicit true. This +just means that when you're using simple rules that resolve to true or false, you don't have to state it explicitly. +```scala +Rule("Require_version>=0.2", col("version") >= 0.2, lit(true)) == Rule("Require_version>=0.2", col("version") >= 0.2) +``` + +### Boundary Rules +* Boundary Rules + + **Example:** `Rule("Longitude Range", col("longitude"), Bounds(-180, 180, lowerInclusive = true, upperInclusive = true))` + * Rules that fail if the input column is outside of the specified boundaries + * `Bounds(lower = 0.0)` **FAILS** when value >= 0.0 + * Boundary rules are created when the validation is of type `Bounds()`. + * The default Bounds() are `Bounds(lower: Double = Double.NegativeInfinity, upper: Double = Double.PositiveInfinity, + lowerInclusive: Boolean = false, upperInclusive: Boolean = false)`; + therefore, if a lower / upper is not specified, any numeric value will pass. + * **Inclusive Vs Exclusive:** When `Bounds` are defined, the user can decide whether to make a Boundary + inclusive or exclusive. The **default** is exclusive. + * **Exclusive Example:** `Bounds(0.0, lowerInclusive = true)` **FAILS** when 0.0 > value + * **Exclusive Example:** `Bounds(0.0, 10.0)` **FAILS** when 0.0 >= value <= 10.0 + * **Inclusive Example:** `Bounds(0.0, 10.0, lowerInclusive = true, upperInclusive = true)` **FAILS** when 0.0 > value < 10.0 + * **Mixed Inclusion Example:** `Bounds(0.0, 10.0, lowerInclusive = true)` **FAILS** when 0.0 > value <= 10.0 + * Grouped Boundaries often come in pairs, to minimize the amount of rules that must be manually created, helper logic + was created to define [MinMax Rules](#minmax-rules) +* Categorical Rules (Strings and Numerical) + * Rules that fail if the result of the input column is not in a list of values +* Expression Rules + * Rules that fail if the input column != defined expression + +#### Additional Boundary Rule Examples **Non-grouped RuleSet** - Passes when the retail_price in a record is exclusive between the Bounds ```scala // Passes when retail_price > 0.0 AND retail_price < 6.99 Rule("Retail_Price_Validation", col("retail_price"), Bounds(0.0, 6.99)) +// Passes when retail_price >= 0.0 AND retail_price <= 6.99 +Rule("Retail_Price_Validation", col("retail_price"), Bounds(0.0, 6.99, lowerInclusive = true, upperInclusive = true)) // Passes when retail_price > 0.0 Rule("Retail_Price_GT0", col("retail_price"), Bounds(lower = 0.0)) +// Passes when retail_price >= 0.0 +Rule("Retail_Price_GT0", col("retail_price"), Bounds(lower = 0.0, lowerInclusive = true)) ``` **Grouped RuleSet** - Passes when the minimum value in the group is within (exclusive) the boundary ```scala @@ -123,42 +154,50 @@ Rule("Retail_Price_Validation", col("retail_price"), Bounds(lower = 0.0)) Rule("Retail_Price_Validation", col("retail_price"), Bounds(0.0, 1000.0)) ``` -### Implicit Boolean Rules -These rules are the same as columnar expression based rules except they don't require the comparison against `lit(true)`. -A type validation is done on the column before validation begins to ensure that the resolved expression resolves to -a boolean type. +### Categorical Rules +There are two types of categorical rules which are used to validate against a pre-defined list of valid +values. As of 0.2 accepted categorical types are String, Double, Int, Long but any types outside of this can +be input as an array() column of any type so long as it can be evaluated against the input column. + ```scala -// Passes where result is true -Rule("Require_version>=0.2", col("version") >= 0.2) -Rule("Require_version>=0.2", col("myDFBooleanCol")) +val catNumerics = Array( +Rule("Valid_Stores", col("store_id"), Lookups.validStoreIDs), +Rule("Valid_Skus", col("sku"), Lookups.validSkus), +Rule("Valid_zips", array_contains(col("zips"), expr("x -> f(x)")), lit(true)) +) + +val catStrings = Array( +Rule("Valid_Regions", col("region"), Lookups.validRegions) +) ``` -### List of Rules -A list of rules can be created as an Array and passed into the RuleSet to simplify Rule management +An optional `ignoreCase` parameter can be specified when evaluating against a list of String values to ignore or apply +case-sensitivity. By default, input columns will be evaluated against a list of Strings with case-sensitivity applied. ```scala -val specializedRules = Array( - // Example of aggregate column - Rule("Reasonable_sku_counts", count(col("sku")), Bounds(lower = 20.0, upper = 200.0)), - // Example of calculated column from catalyst UDF def getDiscountPercentage(retailPrice: Column, scanPrice: Column): Column = ??? - Rule("Max_allowed_discount", - max(getDiscountPercentage(col("retail_price"), col("scan_price"))), - Bounds(upper = 90.0)), - // Example distinct values rule - Rule("Unique_Skus", countDistinct("sku"), Bounds(upper = 1.0)) - - RuleSet(df, by = "store").add(specializedRules) -) +Rule("Valid_Regions", col("region"), Lookups.validRegions, ignoreCase=true) ``` +Furthermore, the evaluation of categorical rules can be inverted by specifying `invertMatch=true` as a parameter. +This can be handy when defining a Rule that an input column cannot match list of invalid values. For example: +```scala +Rule("Invalid_Skus", col("sku"), Lookups.invalidSkus, invertMatch=true) +``` + ### MinMax Rules -It's very common to build rules to validate min and max allowable values so there's a helper function -to speed up this process. It really only makes sense to use minmax when specifying both an upper and a lower bound -in the Bounds object. Using this method in the example below will only require three lines of code instead of the 6 -if each rule were built manually +This is not considered a rule type as it isn't actually a rule type but rather a helper that builds in-between +rules for you when validating grouped datasets with agg functions. + +It's very common to build rules on a grouped dataset to validate some upper and lower boundary within a group +so there's a helper function to speed up this process. +It really only makes sense to use minmax when specifying both an upper and a lower bound on a grouped dataset as +otherwise it's magically handled for you and it doesn't make sense. + +Using this method in the example below will only require three lines of code instead of the 6 if each rule were built manually. +The same inclusive / exclusive overrides are available here as defined above. ```scala val minMaxPriceDefs = Array( MinMaxRuleDef("MinMax_Sku_Price", col("retail_price"), Bounds(0.0, 29.99)), - MinMaxRuleDef("MinMax_Scan_Price", col("scan_price"), Bounds(0.0, 29.99)), + MinMaxRuleDef("MinMax_Scan_Price", col("scan_price"), Bounds(0.0, 29.99, upperInclusive = true)), MinMaxRuleDef("MinMax_Cost", col("cost"), Bounds(0.0, 12.0)) ) @@ -168,39 +207,84 @@ val minMaxPriceRules = RuleSet.generateMinMaxRules(minMaxPriceDefs: _*) OR -- simply add the list of minmax rules or simple individual rule definitions to an existing RuleSet (if not using builder pattern) ```scala -val someRuleSet = RuleSet(df) +val someRuleSet = RuleSet(df, by = "region_id") someRuleSet.addMinMaxRules(minMaxPriceDefs: _*) someRuleSet.addMinMaxRules("Retail_Price_Validation", col("retail_price"), Bounds(0.0, 6.99)) ``` -### Categorical Rules -There are two types of categorical rules which are used to validate against a pre-defined list of valid -values. As of 0.2 accepted categorical types are String, Double, Int, Long but any types outside of this can -be input as an array() column of any type so long as it can be evaluated against the input column. - +Without minMax ```scala -val catNumerics = Array( -Rule("Valid_Stores", col("store_id"), Lookups.validStoreIDs), -Rule("Valid_Skus", col("sku"), Lookups.validSkus), -Rule("Valid_zips", array_contains(col("zips"), expr("x -> f(x)")), lit(true)) -) +import com.databricks.labs.validation.RuleSet +val validationReport = RuleSet(df, by = "region_id") + .add(Rule("Min_Sku_Price", min(col("retail_price")), Bounds(0.0))) + .add(Rule("Max_Sku_Price", max(col("retail_price")), Bounds(29.99, upperInclusive = true))) +// PLUS 4 more rules. +//.add(Rule(...)) +//.add(Rule(...)) +//.add(Rule(...)) +//.add(Rule(...)) +``` -val catStrings = Array( -Rule("Valid_Regions", col("region"), Lookups.validRegions) +## Lists of Rules +A list of rules can be created as an Array and added to a RuleSet to simplify Rule management. It's very common +for more complex sets of rules to be rolled up and packaged by business group / region / etc. These are also +commonly packaged into logical structures (like case classes) and unrolled later and then unpacked into the right +rule sets. This is made easy through the ability to add lists of rules in various ways. +```scala +val specializedRules = Array( + // Example of aggregate column + Rule("Reasonable_sku_counts", count(col("sku")), Bounds(lower = 20.0, upper = 200.0)), + // Example of calculated column from catalyst UDF def getDiscountPercentage(retailPrice: Column, scanPrice: Column): Column = ??? + Rule("Max_allowed_discount", + max(getDiscountPercentage(col("retail_price"), col("scan_price"))), + Bounds(upper = 90.0)), + // Example distinct values rule + Rule("Unique_Skus", countDistinct("sku"), Bounds(upper = 1.0)) ) +RuleSet(df, by = "store").add(specializedRules) +``` +Common Real World Example +```scala +case class GlobalRules(regionID: Int, bu: String, subOrg: String, rules: Array[Rule]*) +// a structure like this will be fed from all over the world with their own specific rules that can all be tested +// on the global source of truth ``` -An optional `ignoreCase` parameter can be specified when evaluating against a list of String values to ignore or apply -case-sensitivity. By default, input columns will be evaluated against a list of Strings with case-sensitivity applied. +## Constructing the Check Column +So far, we've only discussed simple column references as the input column, but remember, a column is just an +expression and thus, the check column can actually be a check expression +* simple column references `col("my_column_name")` +* complex columns `col("Revenue") - col("Cost")` +* aggregates `min("ColumnName")` + * It can be confusing to mix aggregate and non-aggregate aggregate input columns. It's generally better to create two Rule Sets + * If any of the rules' input columns are aggregates and no groupBy columns are provided + into the RuleSet the dataframe will be grouped by all df columns. + +## Grouped Datasets +Rules can be applied to simple DataFrames or grouped Dataframes. To use a grouped dataframe simply pass +your dataframe into the RuleSet and pass one or more columns in as `by` columns. This will apply the rule +at the group level which can be helpful at times. Any input column expressions passed into a RuleSet must be able +to be evaluated inside of the `.agg()` of a `groupedDataframe` ```scala -Rule("Valid_Regions", col("region"), Lookups.validRegions, ignoreCase=true) +RuleSet(df, by = "region_id") +// +RuleSet(df, by = Seq("region_id", "store_id")) ``` -Furthermore, the evaluation of categorical rules can be inverted by specifying `invertMatch=true` as a parameter. -This can be handy when defining a Rule that an input column cannot match list of invalid values. For example: +Below shows a more, real-world example of validating a dataset and another way to instantiate a RuleSet. ```scala -Rule("Invalid_Skus", col("sku"), Lookups.invalidSkus, invertMatch=true) -``` +def momValue(c: Column): Column = coalesce(lag(c, 1).over(regionalTimeW), c) / c + +val regionalTimeW = Window.partitionBy(col("region_id")).orderBy(col("year"), col("month")) +val regionalRules = Array( + // No region has more than 42 stores, thus 100 is a safe fat-finger check number + Rule("storeCount", countDistinct(col("store_id")), Bounds(0, 100, inclusiveLower = true)), + // month over month sales should be pretty stable within the region, if it's not, flag for review + Rule("momSalesIncrease", momValue(col("total_sales")), Bounds(0.25, 4.0), inclusiveLower = true) +) +RuleSet(df, regionalRules, by = "region_id") +``` + ### Validation Now that you have some rules built up... it's time to build the ruleset and validate it. As mentioned above, @@ -232,17 +316,21 @@ ValidationResults(completeReport: DataFrame, summaryReport: DataFrame) ``` AS you can see, there are two reports included, a `completeReport` and a `summaryReport`. #### The completeReport +`validationResults.completeReport.show()` + The complete report is verbose and will add all rule validations to the right side of the original df passed into RuleSet. Note that if the RuleSet is grouped, the result will include the groupBy columns and all rule evaluation specs and results #### The summaryReport +`validationResults.summaryReport.show()` + The summary report is meant to be just that, a summary of the failed rules. This will return only the records that failed and only the rules that failed for that record; thus, if the `summaryReport.isEmpty` then all rules passed. ## Next Steps Clearly, this is just a start. This is a small package and, as such, a GREAT place to start if you've never -contributed to a project before. Please feel free to fork the repo and/or submit PRs. I'd love to see what +contributed to a project before. Please feel free to fork the repo and/or submit PRs. We'd love to see what you come up with. If you're not much of a developer or don't have the time you can still contribute! Please post your ideas in the issues and label them appropriately (i.e. bug/enhancement) and someone will review it and add it as soon as possible. diff --git a/src/main/scala/com/databricks/labs/validation/Validator.scala b/src/main/scala/com/databricks/labs/validation/Validator.scala index d005311..bf3f5e6 100644 --- a/src/main/scala/com/databricks/labs/validation/Validator.scala +++ b/src/main/scala/com/databricks/labs/validation/Validator.scala @@ -23,8 +23,7 @@ class Validator(ruleSet: RuleSet, detailLvl: Int) extends SparkSessionWrapper { case RuleType.ValidateBounds => struct( lit(rule.ruleName).alias("ruleName"), - (rule.inputColumn > rule.boundaries.lower && rule.inputColumn < rule.boundaries.upper) - .alias("passed"), + rule.boundaries.validationLogic(rule.inputColumn).alias("passed"), array(lit(rule.boundaries.lower), lit(rule.boundaries.upper)).cast("string").alias("permitted"), rule.inputColumn.cast("string").alias("actual") ).alias(rule.ruleName) diff --git a/src/main/scala/com/databricks/labs/validation/utils/Structures.scala b/src/main/scala/com/databricks/labs/validation/utils/Structures.scala index b5aec4a..113c55d 100644 --- a/src/main/scala/com/databricks/labs/validation/utils/Structures.scala +++ b/src/main/scala/com/databricks/labs/validation/utils/Structures.scala @@ -21,7 +21,17 @@ object Lookups { object Structures { - case class Bounds(lower: Double = Double.NegativeInfinity, upper: Double = Double.PositiveInfinity) + case class Bounds( + lower: Double = Double.NegativeInfinity, + upper: Double = Double.PositiveInfinity, + lowerInclusive: Boolean = false, + upperInclusive: Boolean = false) { + def validationLogic(c: Column): Column = { + val lowerLogic = if (lowerInclusive) c >= lower else c > lower + val upperLogic = if (upperInclusive) c <= upper else c < upper + lowerLogic && upperLogic + } + } case class MinMaxRuleDef(ruleName: String, column: Column, bounds: Bounds, by: Column*) diff --git a/src/test/scala/com/databricks/labs/validation/ValidatorTestSuite.scala b/src/test/scala/com/databricks/labs/validation/ValidatorTestSuite.scala index f59ed33..0ff72d5 100644 --- a/src/test/scala/com/databricks/labs/validation/ValidatorTestSuite.scala +++ b/src/test/scala/com/databricks/labs/validation/ValidatorTestSuite.scala @@ -92,6 +92,40 @@ class ValidatorTestSuite extends AnyFunSuite with SparkSessionFixture { assert(validationResults.summaryReport.count() == 0) } + test("There should be no rule failures for inclusive boundary rules.") { + val testDF = Seq( + (1001, 1.00, 2.55), + (1002, 4.25, 5.55), + (1003, 7.35, 8.99), + (1003, 5.00, 7.99) + ).toDF("sku", "retail_price", "scan_price") + + // Ensure upperInclusive boundaries can be validated + val scanPriceRule = MinMaxRuleDef("Scan_Price_Rule", col("scan_price"), Bounds(0.0, 8.99, upperInclusive = true)) + val scanPriceRuleSet = RuleSet(testDF).addMinMaxRules(scanPriceRule) + val scanPriceResults = scanPriceRuleSet.validate() + assert(!scanPriceRule.bounds.lowerInclusive) + assert(scanPriceRule.bounds.upperInclusive) + assert(scanPriceRule.bounds.lower == 0.0) + assert(scanPriceRule.bounds.upper == 8.99) + assert(scanPriceResults.summaryReport.count() == 0) + + // Ensure that both lowerInclusive and upperInclusive boundaries can be validated + val retailPriceRule = Rule("Retail_Price_Rule", col("retail_price"), Bounds(1.0, 7.35, lowerInclusive = true, upperInclusive = true)) + val retailPriceRuleSet = RuleSet(testDF).add(retailPriceRule) + val retailPriceResults = retailPriceRuleSet.validate() + assert(retailPriceRule.boundaries.lowerInclusive) + assert(retailPriceRule.boundaries.upperInclusive) + assert(retailPriceRule.boundaries.lower == 1.0) + assert(retailPriceRule.boundaries.upper == 7.35) + assert(retailPriceResults.summaryReport.count() == 0) + + // Ensure that inclusive boundaries can be applied to a grouped DataFrame + val groupedRuleSet = RuleSet(testDF, Array("sku", "retail_price")).add(retailPriceRule) + val groupedValidationResults = groupedRuleSet.validate() + assert(groupedValidationResults.summaryReport.count() == 0) + } + test("The input rule should have 3 invalid count for MinMax_Scan_Price_Minus_Retail_Price_min and max for failing complex type.") { val testDF = Seq( (1, 2, 3),