-
Notifications
You must be signed in to change notification settings - Fork 86
Add bounded unique count aggregation #781
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
685e0b9 to
3c368e8
Compare
pengyu-hou
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@jbrooks-stripe the change looks good. Could you please also update the operation in the groupby.py at https://github.com/airbnb/chronon/blob/main/api/py/ai/chronon/group_by.py#L56
Could you follow the same style with the HISTOGRAM and HISTOGRAM_K? Thanks!!
3c160e0 to
d5746da
Compare
| override def irType: DataType = ListType(StringType) | ||
|
|
||
| override def merge(ir1: util.Set[String], ir2: util.Set[String]): util.Set[String] = { | ||
| ir2.asScala.foreach(v => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| ir2.asScala.foreach(v => | |
| ir2.iterator().asScala.foreach(v => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
otherwise it will create intermediate collections
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good call out!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
nikhil-zlai
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thanks for putting this together. have some minor perf related comments.
| override def irType: DataType = ListType(StringType) | ||
|
|
||
| override def merge(ir1: util.Set[String], ir2: util.Set[String]): util.Set[String] = { | ||
| ir2.asScala.foreach(v => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
otherwise it will create intermediate collections
| } | ||
|
|
||
| private def md5Hex(bytes: Array[Byte]): String = | ||
| MessageDigest.getInstance("MD5").digest(bytes).map("%02x".format(_)).mkString |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lets say i want to unique count a bunch of user / merchant ids (long values) - won't this be less efficient than simply keeping the set of longs?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
made the code change to keep the numeric type as is
|
|
||
| override def update(ir: util.Set[String], input: T): util.Set[String] = { | ||
| if (ir.size() >= k) { | ||
| return ir |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
memory optimization: we can use a sentinel set when k is reached.
| return ir | |
| if(ir == Constants.SentinelSet || ir.size() >= k) return Constants.SentinelSet |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hm.. I don't think we have sentinel set yet in OSS branch.
| ir1 | ||
| } | ||
|
|
||
| override def finalize(ir: util.Set[String]): Long = ir.size() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| override def finalize(ir: util.Set[String]): Long = ir.size() | |
| override def finalize(ir: util.Set[String]): Long = if(ir == Constants.SentinelSet) k else ir.size() |
| if (ir == BoundedUniqueCount.SentinelSet) { | ||
| val list = new util.ArrayList[Any]() | ||
| list.add(BoundedUniqueCount.SentinelMarker) | ||
| list | ||
| } else { | ||
| new util.ArrayList[Any](ir) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
aren't both of these same - can we just do
| if (ir == BoundedUniqueCount.SentinelSet) { | |
| val list = new util.ArrayList[Any]() | |
| list.add(BoundedUniqueCount.SentinelMarker) | |
| list | |
| } else { | |
| new util.ArrayList[Any](ir) | |
| new util.ArrayList[Any](ir) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @nikhil-zlai , this is used to differentiate the empty Sentinel Set and the actual empty set.
nikhil-zlai
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks great! thanks pengyu!
Summary
Adds a BOUNDED_UNIQUE_COUNT aggregation. This will allow exact unique/distinct counts, but will cap at a given value to keep memory usage constant.
Why / Goal
We have use cases where we'd prefer an exact solution instead of the approx equivalents, but want to have protections in place so that memory doesn't become an issue.
Test Plan
Checklist
Reviewers