Skip to content

CHIP-10: PySpark + Notebook Integration for Chronon Feature Development #980

@camweston-stripe

Description

@camweston-stripe

CHIP-10: PySpark + Notebook Integration for Chronon Feature Development

Table of Contents

  1. Problem Statement
  2. Value Proposition
  3. Requirements
  4. Architecture
  5. Detailed Design
  6. User Experience
  7. Performance Improvements
  8. Extension to Other PySpark Environments
  9. Implementation Plan
  10. Conclusion
  11. Code

Problem Statement

Current feature prototyping workflows in Chronon are highly fragmented, forcing developers to continuously switch between different tools and contexts. This creates significant friction in the development process:

  1. Context Switching - Engineers must navigate between multiple environments:

    • Code editors for defining features
    • Command line for analyzing, validating, and executing features
    • Additional tools for analyzing and visualizing results
  2. Long Feedback Loops - The current workflow introduces delays at every stage:

    • Waiting for builds to complete
    • Queuing for shared cluster resources
    • Manual inspection of logs in different systems
  3. High Cognitive Load - The disjointed process requires developers to:

    • Maintain mental context across multiple tools
    • Remember complex CLI commands and flags
    • Slow down on iteration speed due to cross tooling, shared resources, and build times

These challenges significantly impair developer productivity and extend the time required to build and refine features. The experience is particularly challenging for new users who must learn multiple interfaces simultaneously.

Value Proposition

This CHIP introduces a comprehensive PySpark integration framework for Chronon that delivers:

Unified Development Environment

We've created an extensible architecture that enables ML engineers to define, validate, execute, and analyze Chronon features entirely within notebook environments. This centralized workflow eliminates context switching and allows seamless transitions between feature engineering and model development in a single workspace. All in a single application, users can now:

  • Define GroupBys and Joins
  • Analyze and Execute their features
  • Visualize and examine the returned DataFrame
  • Interact with their models

Python-First Experience

The integration provides intuitive Python interfaces for all Chronon operations, wrapping the underlying JVM-based functionality in a developer-friendly API. This makes Chronon more accessible to data scientists and ML engineers who are primarily Python users while retaining the Scala/JVM implementation.

Bridging Python and JVM Through Py4J

At the core of our implementation is a robust Python-JVM bridge using Py4J, which:

  • Transforms Python objects to Java equivalents for execution
  • Manages data transfer between environments
  • Captures JVM logs and presents them within the PySpark/Notebook context
  • Handles error propagation across the language boundary

Immediate Feedback Loop

The notebook integration dramatically reduces feedback time (up to 1600% faster validation at Stripe) by:

  • Eliminating build steps through direct execution
  • Using dedicated notebook cluster resources (e.g no longer having to wait for your Spark job to be accepted)
  • Enabling in-notebook visualization of results by returning the execution result as a DataFrame
  • Capturing and displaying logs directly in the execution cell so that users no longer need to navigate through the Spark-UI

Platform Agnosticism

The architecture uses a flexible abstraction layer that separates core feature execution logic from platform-specific details. This enables:

  • Initial implementation on Databricks
  • Straightforward extension to Jupyter and other PySpark environments
  • Consistent user experience across platforms
  • Ability to leverage platform-specific behaviors
  • Overridable methods that allow adopters to integrate company specific logic.

By bringing together these elements in a centralized environment, the notebook integration acts as a catalyst for Chronon feature development. It transforms what was previously a powerful but burdensome workflow into an intuitive, efficient, and accessible platform that accelerates the entire ML feature prototyping development cycle, all from the familiar context of a notebook interface.

Requirements

Goals

  1. Create a unified, centralized, notebook-based, interface for defining, validating, and executing Chronon features
  2. Design a platform-agnostic architecture that can support multiple PySpark environments (Databricks, Jupyter, LocalMode, etc.)
  3. Provide intuitive Python APIs for all feature operations that perform direct interactions with our Scala code
  4. Support immediate feedback and visualization of results within the notebook environment by returning execution results as a DataFrame
  5. Enable a complete feature development workflow in a single environment
  6. Match or exceed the performance of existing CLI tools

Non-Goals

  1. Adding new feature computation capabilities beyond what's already available in Chronon
  2. Replace existing production CLI flows with PySpark executions
  3. Production orchestration from notebooks (scheduling, monitoring, etc.)
  4. Complex visualization tools or dashboards

Architecture

The proposed architecture introduces an extensible framework that bridges Python Chronon code with JVM-based Chronon functionality through a well-defined abstraction layer.

Key Components

classDiagram
    class PySparkExecutable~T~ {
        <<Abstract>>
        +obj: T
        +spark: SparkSession
        +get_platform()* PlatformInterface
    }
    
    class GroupByExecutable {
        <<Abstract>>
        +run(start_date, end_date) DataFrame
        +analyze(start_date, end_date)
    }
    
    class JoinExecutable {
        <<Abstract>>
        +run(start_date, end_date) DataFrame
        +analyze(start_date, end_date)
    }
    
    class PlatformInterface {
        <<Abstract>>
        +start_log_capture()* Any
        +end_log_capture()* void
    }
    
    PySparkExecutable <|-- GroupByExecutable
    PySparkExecutable <|-- JoinExecutable
    
    GroupByExecutable <|-- DatabricksGroupBy
    JoinExecutable <|-- DatabricksJoin
    
    PlatformInterface <|-- DatabricksPlatform
    DatabricksGroupBy --* DatabricksPlatform
    DatabricksJoin --* DatabricksPlatform
Loading

Interaction between Python Chronon code and Scala Chronon code

sequenceDiagram
    participant User as User (Python Notebook)
    participant PJExec as DatabricksJoin
    participant PPlatform as DatabricksPlatform
    participant Py4J as Py4J Bridge
    participant JUtils as PySparkUtils (Scala)
    participant JCore as Chronon Core (JVM)
    participant JDF as JVM DataFrame
    participant PSpark as PySpark DataFrame

    User->>PJExec: executable = DatabricksJoin(join, spark)
    User->>PJExec: result_df = executable.run(start_date, end_date, skip_first_hole=False, sample_num_of_rows=1000)
    
    %% Preparation steps
    PJExec->>PJExec: Copy & prepare Join object
    PJExec->>PPlatform: log_operation("Executing Join...")
    PJExec->>PJExec: Update left source dates
    
    %% Handle underlying joins if needed
    PJExec->>PJExec: _execute_underlying_join_sources()
    
    %% Update join parts sources
    PJExec->>PJExec: _update_source_dates_for_join_parts()
    
    %% Start log capture
    PJExec->>PPlatform: log_token = start_log_capture()
    
    %% Convert Python object to JSON
    PJExec->>PJExec: json_representation = thrift_simple_json(join)
    
    %% Send to JVM via Py4J
    PJExec->>Py4J: Send JSON representation
    Py4J->>JUtils: PySparkUtils.parseJoin(json_representation)
    
    %% JVM parses JSON to Java object
    JUtils->>JUtils: ThriftJsonCodec.fromJsonStr[api.Join]
    
    %% Execute Join on JVM
    PJExec->>Py4J: runJoin(java_join, end_date, step_days, skip_first_hole, sample_num_of_rows...)
    Py4J->>JUtils: PySparkUtils.runJoin(...)
    
    %% Initialize Join object in JVM
    JUtils->>JCore: new Join(joinConf, endDate, tableUtils, skipFirstHole)
    
    %% Execute Join computation
    JUtils->>JCore: join.computeJoin(stepDays)
    
    %% Join processing in Chronon Core
    JCore->>JCore: Process left source
    JCore->>JCore: Process join parts
    JCore->>JCore: Perform join operations
    
    %% Return results directly as DataFrame
    JCore->>JDF: Create DataFrame with join results
    JDF->>JUtils: Return JVM DataFrame
    
    %% Return DataFrame to Python
    JUtils->>Py4J: Return JVM DataFrame reference
    Py4J->>PJExec: JVM DataFrame reference
    Note over PJExec: JVM DataFrame is converted to Python DataFrame
    PJExec->>PSpark: DataFrame(result_df_scala, self.spark)
    
    %% End log capture and return
    PJExec->>PPlatform: end_log_capture(log_token)
    PJExec->>PPlatform: log_operation("Join executed successfully")
    PJExec->>User: Return PySpark DataFrame

Loading

Detailed Design

Abstract Base Classes

At the heart of the architecture are abstract base classes that define common interfaces and behaviors:

  1. PySparkExecutable - Generic base class for all executable Chronon objects

    • Manages Python-to-JVM conversion
    • Provides utilities for source date handling
    • Handles execution of underlying join sources
  2. GroupByExecutable/JoinExecutable - Feature-specific interfaces

    • Define common operations for each feature type
    • Implement type-specific logic for run and analyze
    • Handle feature-specific parameter requirements
  3. PlatformInterface - Abstraction for platform-specific operations

    • Log capturing and display
    • UDF registration
    • At Stripe we have converted TableUtils to a Trait and implement in a subclass StripeTableUtils + DatabricksTableUtils. This allows us to easily override functions to provide platform/company specific logic without a messy merge conflict when we try to rebase with the OSS repo. Ideally in the near future we can make this change to the OSS repo as well which would allow adopters to implement their own table utils for their platform. Once this is done, the platform interface can be responsible for providing the table utils class to use instead of just providing the default TableUtils.

Platform Implementation

A Platform can be thought of as your PySpark environment layer. Here we define domain specific logic, such as a distinct Constants Provider or Table Utils class to use in this Platform. A Databricks Platform would look something like this:

class DatabricksPlatform(PlatformInterface):
        
    def start_log_capture(self, job_name: str) -> tuple[int, str]:
        # Databricks-specific log capture
        return (os.path.getsize(LOG_FILE), job_name)
    
    # Additional Databricks-specific methods

Feature Execution Flow

  1. Object Preparation

    • Deep copy the object to avoid modifying the original
    • Update source dates based on provided parameters
    • Execute any underlying join sources if needed
  2. JVM Conversion

    • Convert Python thrift object to JSON
    • Pass JSON to JVM via Py4J
    • Parse into Java thrift object
  3. Execution

    • Call appropriate JVM method with parameters
    • Capture logs during execution
    • Process resulting DataFrame
  4. Result Handling

    • Convert Java DataFrame to Python DataFrame
    • Display logs and messages
    • Return DataFrame to user

User Experience

Example: Define and Execute a GroupBy

# Import required modules
from ai.chronon.pyspark.databricks import DatabricksGroupBy
from ai.chronon.group_by import GroupBy, Aggregation, Operation, Window, TimeUnit
from ai.chronon.api.ttypes import Source, EventSource
from ai.chronon.query import Query, select

# Define a GroupBy
my_group_by = GroupBy(
    sources=[my_source],
    keys=["user_id"],
    aggregations=[
        Aggregation(
            input_column="amount",
            operation=Operation.SUM,
            windows=[Window(length=7, timeUnit=TimeUnit.DAYS)]
        )
    ],
    name="notebook_example_groupby",
    # Additional parameters
)

# Create executable
executable_gb = DatabricksGroupBy(my_group_by, spark)

# Run anaylze
executable_gb.analyze(start_date="20250101", end_date="20250131")

# Executes a GroupBy Backfill operation that is helpful for prototyping
result_df = executable_gb.run(start_date="20250101", end_date="20250131")

# Display results
display(result_df)

Define and Execute a Join

# Create Join definition
my_join = Join(
    left=my_source,
    right_parts=[JoinPart(group_by=my_group_by)],
    name="notebook_example_join"
    # Additional parameters
)

# Create executable
executable_join = DatabricksJoin(my_join, spark)

# Run analyze
executable_join.analyze(start_date="20250101", end_date="20250131", enable_heavy_hitter_analysis=True)

# Executes a Join Backfill Operation to generate training data for your model
result_df = executable_join.run(start_date="20250101", end_date="20250131")

# Display results
display(result_df)

Performance Improvements

Initial benchmarks from the Databricks implementation show significant performance improvements at Stripe over the traditional CLI workflow:

Operation CLI Workflow Notebook Workflow Improvement
Validation (first run) 3m55s 14s 1678%
Validation (with cached Bazel build) 1m15s 14s 536%
Average validation 1m46s 14s 757%

These improvements are primarily due to:

  1. Elimination of Bazel build times
  2. Dedicated compute resources vs. shared cluster
  3. Reduced overhead from fewer process launches

This is in reference to the Validation spark job, which is now baked into the Analyzer in the OSS repo.

Extension to Other PySpark Environments

The abstraction layers enable straightforward extension to other Pyspark environments. To add support for a new platform (e.g., Jupyter):

  1. Implement the PlatformInterface for the target platform
  2. Create concrete implementations of GroupByExecutable and JoinExecutable
  3. Configure environment-specific initialization
class JupyterPlatform(PlatformInterface):
    # Can override methods here to provide Jupyter logic, like providing Jupyter specific constant providers or table utils
    
class JupyterGroupBy(GroupByExecutable):
    def __init__(self, group_by, spark_session):
        super().__init__(group_by, spark_session)
        # You can pass Jupyter specific parameters into to set metadata 
        # that allow you to customize things like:
        # - What namespace is written to 
        # - Table name prefixing (in the Databricks implementation we prefix the table name with the notebook username)
        # - Root dir for where your existing feature defs are if you want to import features that were defined in an IDE
        self.obj = self.set_metadata(self.obj)
        
    def get_platform(self) -> PlatformInterface:
        return JupyterPlatform(self.spark)

Implementation Plan

  1. Phase 1: Core Abstractions

    • Implement abstract base classes
    • Define platform interface
    • Create utility methods for Python-JVM communication
  2. Phase 2: Databricks Implementation

    • Implement Databricks-specific platform
    • Create Databricks executable classes
    • Test and validate core functionality
  3. Phase 3: Documentation and Examples

    • Create comprehensive documentation
    • Publish user guides

The code for this CHIP can be found in this PR. This implementation is already present at Stripe but the OSS PR is currently a WIP.

Conclusion

The PySpark integration architecture provides a unified, efficient environment for Chronon feature development while maintaining flexibility for multiple platforms. By abstracting platform-specific details and providing intuitive interfaces, it significantly improves the developer experience without sacrificing functionality or performance.

The extensible design ensures that as new PySpark platforms emerge or requirements evolve, the system can adapt without requiring fundamental changes to the core architecture. This approach balances immediate productivity improvements with long-term maintainability and growth.

Code

The code for this CHIP can be found in this PR

Metadata

Metadata

Assignees

Labels

CHIPChronon Improvements Proposals - use template: https://github.com/airbnb/chronon/issues/439

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions