Skip to content

Conversation

@gabotechs
Copy link
Owner

@gabotechs gabotechs commented Oct 20, 2025

This PR need the following other PRs to be merged first:

Which issue does this PR close?

  • No issue

Rationale for this change

Currently, the producer and consumer ends of a DynamicFilterPhysicalExpr depend on their inner pointers to be the same in order to properly communicate updates on the inner filter values:

SortExec(DynamicFilterPhysicalExpr(Arc<RwLock<Inner>>)) // <- this Arc pointer...
  SomeOtherExec
    DataSourceExec(DynamicFilterPhysicalExpr(Arc<RwLock<Inner>>)) // <- ...must be the same as this one

However, under certain situations, both producer and consumer parts of DynamicFilterPhysicalExpr will not necessary share the same Arc pointer, like when the DynamicFilterPhysicalExpr gets serialized from protobuf and then deserialized.


This PR proposes an alternative way of storing the Arc<RwLock<Inner>> inner values of a dynamic filters: a DynamicFilterRegistry structure that lives globally to a DataFusion session, and that different instances of a DynamicFilterPhysicalExpr in a plan can access through the SessionConfig using a unique identifier.

SessionState(
  SessionConfig(
    DynamicFilterRegistry(HashMap<Uuid, Inner>)
  )
)

SortExec(DynamicFilterPhysicalExpr(Uuid)) // <- this pushes updates to session_config.dynamic_registry[Uuid]
  SomeOtherExec
    DataSourceExec(DynamicFilterPhysicalExpr(Uuid)) // <- this reads updates from session_config.dynamic_registry[Uuid]

Allows serializing/deserializing dynamic filters from and to protobuf, while keeping the producer and consumer part of the dynamic filter connected.

Having a registry of dynamic filter values global to a session could allow us to subscribe to changes there in the future, so that they can be streamed and synced over the wire in a distributed query. A potential evolution for allowing this could be making DynamicFilterRegistry a trait, very roughly something like this:

trait DynamicFilterRegistry {
    fn push_changes(&self, id: Uuid, value: Arc<dyn PhysicalExpr>);

    fn consume_changes(&self, id: Uuid) -> Arc<dyn PhysicalExpr):
}

That distributed systems could implement themselves.

What changes are included in this PR?

  • Introduce a new DynamicFilterRegistry structure that holds dynamic filter values.
  • Store a DynamicFilterRegistry in the SessionConfig so that it's shared across any plan in the same session.
  • Adds protobuf serialization/deserialization support for dynamic filters
  • Adds a new test that checks that roundtrips dynamic filter serialization

Are these changes tested?

yes, by existing tests and a new test.

Are there any user-facing changes?

Users of the datafusion-proto package can now serialize/deserialize dynamic filters

@gabotechs gabotechs force-pushed the change-physical-optimizer-rule-signature branch from 75e9a56 to 3a445e3 Compare October 20, 2025 10:06
@gabotechs gabotechs force-pushed the support-proto-serde-for-dynamic-filters branch from 4cd7688 to 2b753fc Compare October 20, 2025 10:07
@gabotechs gabotechs force-pushed the change-physical-optimizer-rule-signature branch from 3a445e3 to f722d32 Compare October 20, 2025 10:40
@gabotechs gabotechs force-pushed the support-proto-serde-for-dynamic-filters branch from 2b753fc to 89154ca Compare October 20, 2025 10:45
Comment on lines -1378 to 1382
let config = config.unwrap_or_default();
let config = config
.unwrap_or_default()
.with_extension(Arc::new(DynamicFiltersRegistry::default()));
let runtime_env = runtime_env.unwrap_or_else(|| Arc::new(RuntimeEnv::default()));
Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure this is the best way of propagating the DynamicFiltersRegistry, this looks more like the way I'd propagate my custom stuff in my custom project, so maybe there's a better way of propagating this in this repo. Any suggestions here are very appreciated.

…c values in a DynamicFiltersRegistry that lives globally to a DataFusion session
@milenkovicm
Copy link

Out of curiosity, why creating a global (static) DynamicFiltersRegistry would not be an option? Current approach is quite invasive and changes quite few things all over the place. I'm not sure if that would get you started

When DynamicFilterPhysicalExpr::new called it could register itself registered there. The only issue is that it would not be per session but per process.

Also, maybe some kind of week hash map would be better fit than dash map, but i'm not sure

wdyt @gabotechs ?

@gabotechs
Copy link
Owner Author

wdyt @gabotechs ?

That could be an option, although I'd expect that to come with all the disadvantages that having global mutable state usually has in general:

  • race conditions because of multiple threads accessing to it
  • unbounded growth unless manual cleanups are performed as it never gets Drop-ed
  • all queries in the process competing for acquiring the same lock[s]
  • testability issues as multiple test cases will concurrently access this struct

@gabotechs
Copy link
Owner Author

I'm no longer going to work on this though, so it's safe to close this one.

@gabotechs gabotechs closed this Dec 9, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants