Skip to content

dentiny/duckdb-distributed-execution

Repository files navigation

Duckherder - DuckDB Remote and Distributed Execution Extension

Duckherder is a DuckDB extension built upon storage extension that enables (certain) distributed query execution across multiple worker nodes using Arrow Flight for efficient data transfer. It allows you to seamlessly work with remote tables and execute queries in parallel across distributed workers while maintaining DuckDB's familiar SQL interface.

Table of Contents

Overview

Duckherder implements a client-server architecture with distributed query execution:

  • Client (Duckherder): Coordinates queries, manages remote table references, and initiates distributed execution
  • Server (Duckling): Consists of two components:
    • Driver Node: Analyzes queries, creates partition plans, and coordinates worker execution
    • Worker Nodes: Execute partitioned queries on local data and return results via Arrow Flight
  • Distributed Executor: Runs on the driver node, partitions queries based on DuckDB's physical plan analysis, and distributes tasks to workers

The extension transparently handles query routing, allowing you to run CREATE, SELECT, INSERT, DELETE, and ALTER operations on remote tables through DuckDB's storage extension interface.

Architecture

High-Level System Architecture

┌─────────────────────────────────────────┐
│         Client DuckDB Instance          │
│  ┌───────────────────────────────────┐  │
│  │   Duckherder Catalog (dh)         │  │
│  │   - Remote table references       │  │
│  │   - Query routing logic           │  │
│  └───────────────────────────────────┘  │
│              │                          │
│              │ Arrow Flight Protocol    │
│              │                          │
└──────────────┼──────────────────────────┘
               │
┌──────────────┴──────────────────────────┐
│         Server DuckDB Instance(s)       │
│  ┌───────────────────────────────────┐  │
│  │   Driver Node                     │  │
│  │   - Distributed executor          │  │
│  │   - Query plan analysis           │  │
│  │   - Task coordination             │  │
│  └───────────┬───────────────────────┘  │
│              │                          │
│  ┌───────────┴───────────────────────┐  │
│  │   Worker Nodes                    │  │
│  │   - Actual table data             │  │
│  │   - Partitioned execution         │  │
│  │   - Result streaming              │  │
│  └───────────────────────────────────┘  │
└─────────────────────────────────────────┘

Distributed Execution Flow

┌─────────────┐
│ COORDINATOR │ Extract query plan
└──────┬──────┘
       │
       ├─ Phase 1: Extract & validate logical plan
       ├─ Phase 2: Analyze DuckDB's natural parallelism
       ├─ Phase 3: Create partition plans
       ├─ Phase 4: Prepare result schema
       │
       ├──────────┬──────────┬──────────┐
       ▼          ▼          ▼          ▼
   ┌────────┐ ┌────────┐ ┌────────┐ ┌────────┐
   │WORKER 0│ │WORKER 1│ │WORKER 2│ │WORKER N│
   └───┬────┘ └───┬────┘ └───┬────┘ └───┬────┘
       │          │          │          │
       │ Execute  │ Execute  │ Execute  │ Execute
       │ partition│ partition│ partition│ partition
       │ (Local)  │ (Local)  │ (Local)  │ (Local)
       │          │          │          │
       └──────────┴──────────┴──────────┘
              │
              ▼
       ┌─────────────┐
       │ COORDINATOR │  Merge results
       │   Combine   │  Smart aggregation
       │  Finalize   │  Return final result
       └─────────────┘

Distributed Execution System

Partitioning Strategy

The distributed executor analyzes DuckDB's physical plan to create optimal partitions:

1. Natural Parallelism Analysis

  • Queries EstimatedThreadCount() from DuckDB's physical plan
  • Understands how many parallel tasks DuckDB would naturally create
  • Extracts cardinality and operator information

2. Partition Strategy Selection (Priority Order)

The system uses a three-tier hierarchy to select the optimal partitioning strategy:

Priority Condition Strategy Example Predicate Notes
1st Row groups detected (cardinality known) ROW GROUP-BASED WHERE rowid BETWEEN 0 AND 245759 Assigns whole row groups to tasks
2nd TABLE_SCAN + ≥100 rows/worker RANGE-BASED WHERE rowid BETWEEN 0 AND 2499 Contiguous ranges, not row-group-aligned
3rd Small table / Non-TABLE_SCAN / Unknown cardinality MODULO WHERE (rowid % node id) = 0 Fallback for all other cases. TODO(hjiang): for small table no need to distribute, should directly execute on driver node.

Row group-based partitioning (preferred):

  • Aligned with DuckDB's storage structure (122,880 rows per group)
  • Tasks process complete row groups (e.g., Task 0: groups [0,1], Task 1: groups [2,3])
  • Optimal cache locality and I/O efficiency
  • Respects DuckDB's natural parallelism boundaries

Range-based partitioning (fallback):

  • Used when row groups can't be detected but cardinality is known
  • Contiguous row access
  • Not aligned with row group boundaries

Modulo partitioning (final fallback):

  • Scattered row access (poor cache locality)
  • Works for any table size and operator type
  • Simple and always correct

3. How Row Group-Based Partitioning Works

When row groups are detected (the common case), the system:

  • Calculates total row groups: cardinality / 122,880
  • Assigns whole row groups to tasks (e.g., Task 0 gets groups [0,1], Task 1 gets groups [2,3])
  • Converts row group ranges to rowid ranges: rowid BETWEEN (rg_start * 122880) AND (rg_end * 122880 - 1)
  • Workers scan complete row groups for optimal performance

DuckDB Thread Model → Distributed Model Mapping

DuckDB Thread Model Distributed Model Implementation
Thread Worker Node Physical machine/process
LocalSinkState Worker Result QueryResult → Arrow batches
Sink() Worker Execute HandleExecutePartition()
GlobalSinkState ColumnDataCollection Coordinator's result collection
Combine() CollectAndMergeResults() Merging worker outputs
Finalize() Return MaterializedQueryResult Final result to client

Installation

Building from Source

Prerequisites

  1. Install VCPKG for dependency management:
git clone https://github.com/Microsoft/vcpkg.git
./vcpkg/bootstrap-vcpkg.sh
export VCPKG_TOOLCHAIN_PATH=`pwd`/vcpkg/scripts/buildsystems/vcpkg.cmake
  1. Build the extension:
# Clone the repo.
git clone --recurse-submodules https://github.com/dentiny/duckdb-distributed-execution.git
cd duckdb-distributed-execution

# Build with release mode.
export VCPKG_TOOLCHAIN_PATH=/path/to/vcpkg/scripts/buildsystems/vcpkg.cmake
CMAKE_BUILD_PARALLEL_LEVEL=$(nproc) make

The build produces:

  • ./build/release/duckdb - DuckDB shell with extension pre-loaded
  • ./build/release/test/unittest - Test runner
  • ./build/release/extension/duckherder/duckherder.duckdb_extension - Loadable extension
  • ./build/release/distributed_server - Standalone distributed driver node
  • ./build/release/distributed_worker - Standalone distributed worker node

Running Standalone Server and Workers

For production deployments or testing distributed execution across multiple machines, you can run standalone server and worker processes:

Starting the Driver/Coordinator Server

# Start the driver server on default host (0.0.0.0) and port (8815) without workers
./build/release/distributed_server

# Start the driver server on a specific host and port
./build/release/distributed_server 192.168.1.100 8815

# Start the driver server with 4 local workers (for single-machine distributed execution)
./build/release/distributed_server 0.0.0.0 8815 4

Starting Standalone Worker Nodes

For multi-machine setups, start worker nodes on separate machines:

# Start a worker on default host (0.0.0.0) and port (8816) with default worker ID (worker-1)
./build/release/distributed_worker

# Start a worker on a specific port with custom worker ID
./build/release/distributed_worker 0.0.0.0 8817 worker-2

Usage

Local Server Management

-- Start a distributed server on port 8815.
SELECT duckherder_start_local_server(8815);

-- Stop the local distributed server.
SELECT duckherder_stop_local_server();

Attach to the Server

-- Attach to the duckherder server as database 'dh'
-- TODO(hjiang): currently only support database 'dh'
ATTACH DATABASE 'dh' (TYPE duckherder, server_host 'localhost', server_port 8815);

Register and Unregister Remote Tables

-- Register a remote table mapping.
-- Syntax: duckherder_register_remote_table(local_table_name, remote_table_name)
PRAGMA duckherder_register_remote_table('my_table', 'my_table');

-- Unregister a remote table mapping.
-- Syntax: duckherder_unregister_remote_table(local_table_name)
PRAGMA duckherder_unregister_remote_table('my_table');

Load Extensions on Server

-- Load an extension on the remote server
SELECT duckherder_load_extension('parquet');
SELECT duckherder_load_extension('json');

Driver/Worker Node Registration

Starting and Registering Standalone Workers

For distributed execution, you can either use local workers (managed by the driver) or register external standalone workers:

Option 1: Start standalone workers in the same process (for testing, debugging and development)

-- Start a local server with driver node
SELECT duckherder_start_local_server(8815);

-- Start standalone workers within the same process
SELECT duckherder_start_standalone_worker(8816);
SELECT duckherder_start_standalone_worker(8817);
SELECT duckherder_start_standalone_worker(8818);

-- Check how many workers are registered
SELECT duckherder_get_worker_count();  -- Returns: 3

Option 2: Register external workers (for multi-machine setups)

-- Start a local server with driver node
SELECT duckherder_start_local_server(8815);

-- Register external worker nodes running on different machines
-- Syntax: duckherder_register_worker(worker_id, location)
SELECT duckherder_register_worker('worker-1', 'grpc://192.168.1.101:8816');
SELECT duckherder_register_worker('worker-2', 'grpc://192.168.1.102:8816');
SELECT duckherder_register_worker('worker-3', 'grpc://192.168.1.103:8816');

-- Verify workers are registered
SELECT duckherder_get_worker_count();  -- Returns: 3

Option 3: Using the standalone distributed_server executable with local workers

# Start the server with 4 automatically managed local workers
./build/release/distributed_server 0.0.0.0 8815 4

Complete Multi-Machine Setup Example

On the driver machine:

-- Start the driver node
SELECT duckherder_start_local_server(8815);

On each worker machine, run:

# Worker 1
./build/release/distributed_worker 0.0.0.0 8816 worker-1

# Worker 2  
./build/release/distributed_worker 0.0.0.0 8816 worker-2

# Worker 3
./build/release/distributed_worker 0.0.0.0 8816 worker-3

Back on the driver, register the workers:

-- Register each worker with its network location
SELECT duckherder_register_worker('worker-1', 'grpc://192.168.1.101:8816');
SELECT duckherder_register_worker('worker-2', 'grpc://192.168.1.102:8816');
SELECT duckherder_register_worker('worker-3', 'grpc://192.168.1.103:8816');

-- Confirm registration
SELECT duckherder_get_worker_count();

### Working with Remote Tables

```sql
-- Create a table.
CREATE TABLE dh.users (
    id INTEGER,
    name VARCHAR,
    email VARCHAR,
    created_at TIMESTAMP
);

-- Insert data.
INSERT INTO dh.users VALUES 
    (1, 'Alice', '[email protected]', '2024-01-15 10:30:00'),
    (2, 'Bob', '[email protected]', '2024-01-16 14:20:00'),
    (3, 'Charlie', '[email protected]', '2024-01-17 09:15:00');

-- Create an index.
CREATE INDEX idx_users_email ON dh.users(email);

-- Simple SELECT.
SELECT * FROM dh.users WHERE id > 1;

-- Distributed query, which automatically gets parallelized.
SELECT * FROM dh.large_table WHERE value > 1000;

-- Drop a table.
DROP TABLE dh.users;

Query Execution Monitoring and Statistics

duckherder provides built-in stats (current not persisted anywhere) for executed queries, including start timestamp, executime duration and disribution mode.

D SELECT * FROM duckherder_get_query_execution_stats();
┌───────────────────────────────────────┬────────────────┬────────────────┬───────────────────┬──────────────────┬─────────────────────┬────────────────────────────┐
│                  sql                  │ execution_mode │ merge_strategy │ query_duration_ms │ num_workers_used │ num_tasks_generated │    execution_start_time    │
│                varcharvarcharvarchar     │       int64       │      int64       │        int64        │         timestamp          │
├───────────────────────────────────────┼────────────────┼────────────────┼───────────────────┼──────────────────┼─────────────────────┼────────────────────────────┤
│ SELECT * FROM distributed_basic_table │ DELEGATED      │ CONCATENATE17412025-11-17 02:10:27.482089 │
│ SELECT * FROM distributed_basic_table │ DELEGATED      │ CONCATENATE1412025-11-17 02:10:30.436037 │
│ SELECT * FROM distributed_basic_table │ DELEGATED      │ CONCATENATE2412025-11-17 02:10:33.675752 │
│ SELECT * FROM distributed_basic_table │ DELEGATED      │ CONCATENATE1412025-11-17 02:10:36.992988 │
└───────────────────────────────────────┴────────────────┴────────────────┴───────────────────┴──────────────────┴─────────────────────┴────────────────────────────┘

Clear Query Statistics

-- Clear all recorded query history and statistics
SELECT duckherder_clear_query_recorder_stats();

-- Verify the history is cleared
SELECT COUNT(*) FROM duckherder_get_query_history();  -- Returns: 0

Roadmap

Table and Index Operations

  • Create/drop table
  • Create/drop index
  • Update table schema
  • Update index

Data Type Support

  • Primitive type support
  • List type support
  • Map type support
  • Struct type support

Distributed Query Support

  • Intelligent query partitioning
  • Natural parallelism analysis
  • Row group-aligned execution
  • Range-based partitioning
  • Aggregation pushdown (infrastructure ready)
  • GROUP BY distributed execution
  • JOIN optimization (broadcast/co-partition)
  • ORDER BY support (distributed sort)
  • Driver collect partition and execution stats

Multi-Client Support

  • Query server authentication and authorization
  • Multiple DuckDB instance support on servers
  • Connection pooling

Full Write Support

  • Persist server-side database file
  • Recover DuckDB instance via database file
  • Transaction support

Additional Features

  • Query timing statistics
  • Comprehensive execution logging
  • Support official extension install and load
  • Query resource consumption tracking
  • Support community extension install and load
  • Dynamic worker scaling
  • Query result caching
  • Util function to register driver node and worker nodes

Contributing

See CONTRIBUTING.md for development guidelines.

License

See LICENSE for license information.

About

Distributed execution for duckdb queries.

Resources

License

Contributing

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published