Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
244 changes: 166 additions & 78 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
[![Scala CI](https://github.com/databrickslabs/dataframe-rules-engine/actions/workflows/scala.yml/badge.svg?branch=master)](https://github.com/databrickslabs/dataframe-rules-engine/actions/workflows/scala.yml)
[![codecov](https://codecov.io/gh/databrickslabs/dataframe-rules-engine/branch/master/graph/badge.svg?token=6DEXO6I0BG)](https://codecov.io/gh/databrickslabs/dataframe-rules-engine)
# dataframe-rules-engine
Simplified Validation for Production Workloads
Simplified Validation at scale for Production Spark Workloads on streaming / standard DataFrames and DataSets

## Project Description
As pipelines move from bronze to gold, it's very common that some level of governance be performed in
Expand Down Expand Up @@ -48,7 +48,12 @@ import com.databricks.labs.validation._
As of version 0.2 streaming dataframes are fully supported

## Quickstart
The basic steps to validating data with the rules engine are:
* create rules
* create ruleset
* validate

Below are some examples to demonstrate the basic process.

```scala
val myRules = ??? // Definition of my base rules
Expand All @@ -70,50 +75,76 @@ val validationResults = RuleSet(df, by = Array("myGroupA", "myGroupB"))

## Rules

As of version 0.2 There are three primary rule types
* Boundary Rules
* Rules that fail if the input column is outside of the specified boundaries
* Boundary Rules are **EXCLUSIVE** on both sides meaning Bounds(0.0, 10.0) will fail all values between and
including 0.0 and 10.0
* Boundary rules are created when the validation is of type `Bounds()`.
* The default Bounds() are `Bounds(lower: Double = Double.NegativeInfinity, upper: Double = Double.PositiveInfinity)`;
therefore, if a lower or an upper is not specified, any value will pass
* Group Boundaries often come in pairs, to minimize the amount of rules that must be manually created, helper logic
was created to define [MinMax Rules](#minmax-rules)
* Categorical Rules (Strings and Numerical)
* Rules that fail if the result of the input column is not in a list of values
* Expression Rules
* Rules that fail if the input column != defined expression
There are four primary rule types
* [Simple Rules](#simple-rule)
* [Boundary Rules](#boundary-rules)
* [Implicit Boolean rules](#implicit-boolean-rules)
* [Categorical Rule](#categorical-rules)

Rules' input columns can be composed of:
* simple column references `col("my_column_name")`
* complex columns `col("Revenue") - col("Cost")`
* implicit boolean evaluation `lit(true)`
* These rules only take a single input column and it must resolve to true or false. All records in which it resolves
to true will be considered passed
* aggregate columns `min("ColumnName")`
* Do not mix aggregate input columns with non-aggregate input columns, instead create two Rule Sets
* If the rules' input columns are a mixture of aggregates and non-aggregates and no groupBy columns are passed
into the RuleSet the dataframe will be grouped by all df columns

Rules can be applied to simple DataFrames or grouped Dataframes. To use a grouped dataframe simply pass
your dataframe into the RuleSet and pass one or more columns in as `by` columns. This will apply the rule
at the group level which can be helpful at times. Any input column expressions passed into a RuleSet must be able
to be evaluated inside of the `.agg()` of a groupedDataframe
These rule types can be applied to:
* Streaming Datasets
* Stateful Streaming Datasets
* Grouped Datasets
* Important distinction as the rules apply only within the range of the grouped keys when a grouped dataset
is passed for testing

### Simple Rule
A rule with a name, a check column, and an allowed value
```scala
Rule("Require_specific_version", col("version"), lit(0.2))
Rule("Require_version>=0.2", col("version") >= 0.2, lit(true))
```

### Simple Boundary Rule
### Implicit Boolean Rules
These rules are the same as columnar expression based rules except they don't require the comparison against `lit(true)`.
A type validation is done on the column before validation begins to ensure that the resolved expression resolves to
a boolean type.
```scala
// Passes where result is true
Rule("Require_version>=0.2", col("version") >= 0.2)
Rule("Require_version>=0.2", col("myDFBooleanCol"))
```

Note that the following is true, conceptually, since the implicit boolean compares against an implicit true. This
just means that when you're using simple rules that resolve to true or false, you don't have to state it explicitly.
```scala
Rule("Require_version>=0.2", col("version") >= 0.2, lit(true)) == Rule("Require_version>=0.2", col("version") >= 0.2)
```

### Boundary Rules
* Boundary Rules

**Example:** `Rule("Longitude Range", col("longitude"), Bounds(-180, 180, lowerInclusive = true, upperInclusive = true))`
* Rules that fail if the input column is outside of the specified boundaries
* `Bounds(lower = 0.0)` **FAILS** when value >= 0.0
* Boundary rules are created when the validation is of type `Bounds()`.
* The default Bounds() are `Bounds(lower: Double = Double.NegativeInfinity, upper: Double = Double.PositiveInfinity,
lowerInclusive: Boolean = false, upperInclusive: Boolean = false)`;
therefore, if a lower / upper is not specified, any numeric value will pass.
* **Inclusive Vs Exclusive:** When `Bounds` are defined, the user can decide whether to make a Boundary
inclusive or exclusive. The **default** is exclusive.
* **Exclusive Example:** `Bounds(0.0, lowerInclusive = true)` **FAILS** when 0.0 > value
* **Exclusive Example:** `Bounds(0.0, 10.0)` **FAILS** when 0.0 >= value <= 10.0
* **Inclusive Example:** `Bounds(0.0, 10.0, lowerInclusive = true, upperInclusive = true)` **FAILS** when 0.0 > value < 10.0
* **Mixed Inclusion Example:** `Bounds(0.0, 10.0, lowerInclusive = true)` **FAILS** when 0.0 > value <= 10.0
* Grouped Boundaries often come in pairs, to minimize the amount of rules that must be manually created, helper logic
was created to define [MinMax Rules](#minmax-rules)
* Categorical Rules (Strings and Numerical)
* Rules that fail if the result of the input column is not in a list of values
* Expression Rules
* Rules that fail if the input column != defined expression

#### Additional Boundary Rule Examples
**Non-grouped RuleSet** - Passes when the retail_price in a record is exclusive between the Bounds
```scala
// Passes when retail_price > 0.0 AND retail_price < 6.99
Rule("Retail_Price_Validation", col("retail_price"), Bounds(0.0, 6.99))
// Passes when retail_price >= 0.0 AND retail_price <= 6.99
Rule("Retail_Price_Validation", col("retail_price"), Bounds(0.0, 6.99, lowerInclusive = true, upperInclusive = true))
// Passes when retail_price > 0.0
Rule("Retail_Price_GT0", col("retail_price"), Bounds(lower = 0.0))
// Passes when retail_price >= 0.0
Rule("Retail_Price_GT0", col("retail_price"), Bounds(lower = 0.0, lowerInclusive = true))
```
**Grouped RuleSet** - Passes when the minimum value in the group is within (exclusive) the boundary
```scala
Expand All @@ -123,42 +154,50 @@ Rule("Retail_Price_Validation", col("retail_price"), Bounds(lower = 0.0))
Rule("Retail_Price_Validation", col("retail_price"), Bounds(0.0, 1000.0))
```

### Implicit Boolean Rules
These rules are the same as columnar expression based rules except they don't require the comparison against `lit(true)`.
A type validation is done on the column before validation begins to ensure that the resolved expression resolves to
a boolean type.
### Categorical Rules
There are two types of categorical rules which are used to validate against a pre-defined list of valid
values. As of 0.2 accepted categorical types are String, Double, Int, Long but any types outside of this can
be input as an array() column of any type so long as it can be evaluated against the input column.

```scala
// Passes where result is true
Rule("Require_version>=0.2", col("version") >= 0.2)
Rule("Require_version>=0.2", col("myDFBooleanCol"))
val catNumerics = Array(
Rule("Valid_Stores", col("store_id"), Lookups.validStoreIDs),
Rule("Valid_Skus", col("sku"), Lookups.validSkus),
Rule("Valid_zips", array_contains(col("zips"), expr("x -> f(x)")), lit(true))
)

val catStrings = Array(
Rule("Valid_Regions", col("region"), Lookups.validRegions)
)
```

### List of Rules
A list of rules can be created as an Array and passed into the RuleSet to simplify Rule management
An optional `ignoreCase` parameter can be specified when evaluating against a list of String values to ignore or apply
case-sensitivity. By default, input columns will be evaluated against a list of Strings with case-sensitivity applied.
```scala
val specializedRules = Array(
// Example of aggregate column
Rule("Reasonable_sku_counts", count(col("sku")), Bounds(lower = 20.0, upper = 200.0)),
// Example of calculated column from catalyst UDF def getDiscountPercentage(retailPrice: Column, scanPrice: Column): Column = ???
Rule("Max_allowed_discount",
max(getDiscountPercentage(col("retail_price"), col("scan_price"))),
Bounds(upper = 90.0)),
// Example distinct values rule
Rule("Unique_Skus", countDistinct("sku"), Bounds(upper = 1.0))

RuleSet(df, by = "store").add(specializedRules)
)
Rule("Valid_Regions", col("region"), Lookups.validRegions, ignoreCase=true)
```

Furthermore, the evaluation of categorical rules can be inverted by specifying `invertMatch=true` as a parameter.
This can be handy when defining a Rule that an input column cannot match list of invalid values. For example:
```scala
Rule("Invalid_Skus", col("sku"), Lookups.invalidSkus, invertMatch=true)
```

### MinMax Rules
It's very common to build rules to validate min and max allowable values so there's a helper function
to speed up this process. It really only makes sense to use minmax when specifying both an upper and a lower bound
in the Bounds object. Using this method in the example below will only require three lines of code instead of the 6
if each rule were built manually
This is not considered a rule type as it isn't actually a rule type but rather a helper that builds in-between
rules for you when validating grouped datasets with agg functions.

It's very common to build rules on a grouped dataset to validate some upper and lower boundary within a group
so there's a helper function to speed up this process.
It really only makes sense to use minmax when specifying both an upper and a lower bound on a grouped dataset as
otherwise it's magically handled for you and it doesn't make sense.

Using this method in the example below will only require three lines of code instead of the 6 if each rule were built manually.
The same inclusive / exclusive overrides are available here as defined above.
```scala
val minMaxPriceDefs = Array(
MinMaxRuleDef("MinMax_Sku_Price", col("retail_price"), Bounds(0.0, 29.99)),
MinMaxRuleDef("MinMax_Scan_Price", col("scan_price"), Bounds(0.0, 29.99)),
MinMaxRuleDef("MinMax_Scan_Price", col("scan_price"), Bounds(0.0, 29.99, upperInclusive = true)),
MinMaxRuleDef("MinMax_Cost", col("cost"), Bounds(0.0, 12.0))
)

Expand All @@ -168,39 +207,84 @@ val minMaxPriceRules = RuleSet.generateMinMaxRules(minMaxPriceDefs: _*)
OR -- simply add the list of minmax rules or simple individual rule definitions
to an existing RuleSet (if not using builder pattern)
```scala
val someRuleSet = RuleSet(df)
val someRuleSet = RuleSet(df, by = "region_id")
someRuleSet.addMinMaxRules(minMaxPriceDefs: _*)
someRuleSet.addMinMaxRules("Retail_Price_Validation", col("retail_price"), Bounds(0.0, 6.99))
```

### Categorical Rules
There are two types of categorical rules which are used to validate against a pre-defined list of valid
values. As of 0.2 accepted categorical types are String, Double, Int, Long but any types outside of this can
be input as an array() column of any type so long as it can be evaluated against the input column.

Without minMax
```scala
val catNumerics = Array(
Rule("Valid_Stores", col("store_id"), Lookups.validStoreIDs),
Rule("Valid_Skus", col("sku"), Lookups.validSkus),
Rule("Valid_zips", array_contains(col("zips"), expr("x -> f(x)")), lit(true))
)
import com.databricks.labs.validation.RuleSet
val validationReport = RuleSet(df, by = "region_id")
.add(Rule("Min_Sku_Price", min(col("retail_price")), Bounds(0.0)))
.add(Rule("Max_Sku_Price", max(col("retail_price")), Bounds(29.99, upperInclusive = true)))
// PLUS 4 more rules.
//.add(Rule(...))
//.add(Rule(...))
//.add(Rule(...))
//.add(Rule(...))
```

val catStrings = Array(
Rule("Valid_Regions", col("region"), Lookups.validRegions)
## Lists of Rules
A list of rules can be created as an Array and added to a RuleSet to simplify Rule management. It's very common
for more complex sets of rules to be rolled up and packaged by business group / region / etc. These are also
commonly packaged into logical structures (like case classes) and unrolled later and then unpacked into the right
rule sets. This is made easy through the ability to add lists of rules in various ways.
```scala
val specializedRules = Array(
// Example of aggregate column
Rule("Reasonable_sku_counts", count(col("sku")), Bounds(lower = 20.0, upper = 200.0)),
// Example of calculated column from catalyst UDF def getDiscountPercentage(retailPrice: Column, scanPrice: Column): Column = ???
Rule("Max_allowed_discount",
max(getDiscountPercentage(col("retail_price"), col("scan_price"))),
Bounds(upper = 90.0)),
// Example distinct values rule
Rule("Unique_Skus", countDistinct("sku"), Bounds(upper = 1.0))
)
RuleSet(df, by = "store").add(specializedRules)
```
Common Real World Example
```scala
case class GlobalRules(regionID: Int, bu: String, subOrg: String, rules: Array[Rule]*)
// a structure like this will be fed from all over the world with their own specific rules that can all be tested
// on the global source of truth
```

An optional `ignoreCase` parameter can be specified when evaluating against a list of String values to ignore or apply
case-sensitivity. By default, input columns will be evaluated against a list of Strings with case-sensitivity applied.
## Constructing the Check Column
So far, we've only discussed simple column references as the input column, but remember, a column is just an
expression and thus, the check column can actually be a check expression
* simple column references `col("my_column_name")`
* complex columns `col("Revenue") - col("Cost")`
* aggregates `min("ColumnName")`
* It can be confusing to mix aggregate and non-aggregate aggregate input columns. It's generally better to create two Rule Sets
* If any of the rules' input columns are aggregates and no groupBy columns are provided
into the RuleSet the dataframe will be grouped by all df columns.

## Grouped Datasets
Rules can be applied to simple DataFrames or grouped Dataframes. To use a grouped dataframe simply pass
your dataframe into the RuleSet and pass one or more columns in as `by` columns. This will apply the rule
at the group level which can be helpful at times. Any input column expressions passed into a RuleSet must be able
to be evaluated inside of the `.agg()` of a `groupedDataframe`
```scala
Rule("Valid_Regions", col("region"), Lookups.validRegions, ignoreCase=true)
RuleSet(df, by = "region_id")
//
RuleSet(df, by = Seq("region_id", "store_id"))
```

Furthermore, the evaluation of categorical rules can be inverted by specifying `invertMatch=true` as a parameter.
This can be handy when defining a Rule that an input column cannot match list of invalid values. For example:
Below shows a more, real-world example of validating a dataset and another way to instantiate a RuleSet.
```scala
Rule("Invalid_Skus", col("sku"), Lookups.invalidSkus, invertMatch=true)
```
def momValue(c: Column): Column = coalesce(lag(c, 1).over(regionalTimeW), c) / c

val regionalTimeW = Window.partitionBy(col("region_id")).orderBy(col("year"), col("month"))
val regionalRules = Array(
// No region has more than 42 stores, thus 100 is a safe fat-finger check number
Rule("storeCount", countDistinct(col("store_id")), Bounds(0, 100, inclusiveLower = true)),
// month over month sales should be pretty stable within the region, if it's not, flag for review
Rule("momSalesIncrease", momValue(col("total_sales")), Bounds(0.25, 4.0), inclusiveLower = true)
)
RuleSet(df, regionalRules, by = "region_id")
```


### Validation
Now that you have some rules built up... it's time to build the ruleset and validate it. As mentioned above,
Expand Down Expand Up @@ -232,17 +316,21 @@ ValidationResults(completeReport: DataFrame, summaryReport: DataFrame)
```
AS you can see, there are two reports included, a `completeReport` and a `summaryReport`.
#### The completeReport
`validationResults.completeReport.show()`

The complete report is verbose and will add all rule validations to the right side of the original df
passed into RuleSet. Note that if the RuleSet is grouped, the result will include the groupBy columns and all rule
evaluation specs and results

#### The summaryReport
`validationResults.summaryReport.show()`

The summary report is meant to be just that, a summary of the failed rules. This will return only the records that
failed and only the rules that failed for that record; thus, if the `summaryReport.isEmpty` then all rules passed.

## Next Steps
Clearly, this is just a start. This is a small package and, as such, a GREAT place to start if you've never
contributed to a project before. Please feel free to fork the repo and/or submit PRs. I'd love to see what
contributed to a project before. Please feel free to fork the repo and/or submit PRs. We'd love to see what
you come up with. If you're not much of a developer or don't have the time you can still contribute! Please
post your ideas in the issues and label them appropriately (i.e. bug/enhancement) and someone will review it
and add it as soon as possible.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,7 @@ class Validator(ruleSet: RuleSet, detailLvl: Int) extends SparkSessionWrapper {
case RuleType.ValidateBounds =>
struct(
lit(rule.ruleName).alias("ruleName"),
(rule.inputColumn > rule.boundaries.lower && rule.inputColumn < rule.boundaries.upper)
.alias("passed"),
rule.boundaries.validationLogic(rule.inputColumn).alias("passed"),
array(lit(rule.boundaries.lower), lit(rule.boundaries.upper)).cast("string").alias("permitted"),
rule.inputColumn.cast("string").alias("actual")
).alias(rule.ruleName)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,17 @@ object Lookups {

object Structures {

case class Bounds(lower: Double = Double.NegativeInfinity, upper: Double = Double.PositiveInfinity)
case class Bounds(
lower: Double = Double.NegativeInfinity,
upper: Double = Double.PositiveInfinity,
lowerInclusive: Boolean = false,
upperInclusive: Boolean = false) {
def validationLogic(c: Column): Column = {
val lowerLogic = if (lowerInclusive) c >= lower else c > lower
val upperLogic = if (upperInclusive) c <= upper else c < upper
lowerLogic && upperLogic
}
}

case class MinMaxRuleDef(ruleName: String, column: Column, bounds: Bounds, by: Column*)

Expand Down
Loading