Skip to content

[BUG][pylibcudf] Inconsistent parquet read results #20187

@JigaoLuo

Description

@JigaoLuo

Describe the bug
Hi all,
I believe I’ve encountered a bug when reading a Parquet file with a unique key column. The read results occasionally include duplicate entries, which should not happen.

Steps/Code to reproduce bug

Build & Run

I am building with the commit d5749db in branch-25.12, as you could get the details in the output print_env.sh at the end.

$ conda env create --name cudf_dev --file conda/environments/all_cuda-129_arch-x86_64.yaml
$ conda activate cudf_dev
$ ./build.sh libcudf pylibcudf
$ pip3 install torch torchvision --index-url https://download.pytorch.org/whl/cu129 #https://pytorch.org/get-started/locally/

$ python pylibcudf_rowgroup_repro.py /xssd/0/tpch-sf100/customer.parquet 8 32 c_custkey
# $ for i in {1..10}; do python pylibcudf_rowgroup_repro.py /xssd/0/tpch-sf100/customer.parquet 8 32 c_custkey; done # repeat more time you will see the error
I could see the following print in 10 runs:
$ for i in {1..10}; do python pylibcudf_rowgroup_repo.py /xssd/0/tpch-sf100/customer.parquet 8 32 c_custkey; done
num_rowgroups:  2
columns:  ['c_custkey']
threads:  8
READ_REPEAT:  32
num_rowgroups:  2
columns:  ['c_custkey']
threads:  8
READ_REPEAT:  32
num_rowgroups:  2
columns:  ['c_custkey']
threads:  8
READ_REPEAT:  32
[StorageBug][Thread1][Repeat9] Found 2 duplicates in a unique-key-column
[StorageBug][Thread2][Repeat13] Found 3 duplicates in a unique-key-column
[StorageBug][Thread3][Repeat21] Found 22 duplicates in a unique-key-column
num_rowgroups:  2
columns:  ['c_custkey']
threads:  8
READ_REPEAT:  32
[StorageBug][Thread2][Repeat3] Found 1 duplicates in a unique-key-column
num_rowgroups:  2
columns:  ['c_custkey']
threads:  8
READ_REPEAT:  32
[StorageBug][Thread3][Repeat2] Found 8 duplicates in a unique-key-column
[StorageBug][Thread4][Repeat21] Found 7 duplicates in a unique-key-column
num_rowgroups:  2
columns:  ['c_custkey']
threads:  8
READ_REPEAT:  32
[StorageBug][Thread5][Repeat8] Found 3 duplicates in a unique-key-column
num_rowgroups:  2
columns:  ['c_custkey']
threads:  8
READ_REPEAT:  32
[StorageBug][Thread3][Repeat28] Found 13 duplicates in a unique-key-column
num_rowgroups:  2
columns:  ['c_custkey']
threads:  8
READ_REPEAT:  32
[StorageBug][Thread5][Repeat21] Found 11 duplicates in a unique-key-column
[StorageBug][Thread1][Repeat31] Found 56 duplicates in a unique-key-column
num_rowgroups:  2
columns:  ['c_custkey']
threads:  8
READ_REPEAT:  32
[StorageBug][Thread2][Repeat8] Found 15 duplicates in a unique-key-column
[StorageBug][Thread3][Repeat11] Found 2 duplicates in a unique-key-column
num_rowgroups:  2
columns:  ['c_custkey']
threads:  8
READ_REPEAT:  32
[StorageBug][Thread5][Repeat19] Found 13 duplicates in a unique-key-column
[StorageBug][Thread7][Repeat27] Found 3 duplicates in a unique-key-column

Repro: Parquet file

I uploaded the parquet file in my GitHub repo (sorry github): https://github.com/JigaoLuo/20187-file

  • The file is just a TPC-H SF100 customer table with ~56MB
  • You can view it with the tool if you have the file locally: https://parquet-viewer.xiangpeng.systems/
  • In the reproduction code, I only read the c_custkey column.

However, the c_custkey in my parquet has no duplicates. I verified via duckdb:

$ duckdb-bin 
v1.2.0 5f5512b827
Enter ".help" for usage hints.
Connected to a transient in-memory database.
Use ".open FILENAME" to reopen on a persistent database.
D SELECT count(c_custkey), count(distinct c_custkey) FROM '/xssd/0/tpch-sf100/customer.parquet';
┌──────────────────┬───────────────────────────┐
│ count(c_custkey) │ count(DISTINCT c_custkey) │
│      int64       │           int64           │
├──────────────────┼───────────────────────────┤
│     15000000     │         15000000          │
│ (15.00 million)  │      (15.00 million)      │
└──────────────────┴───────────────────────────┘

Repro: Python code

Here is the python file pylibcudf_rowgroup_repro.py, where it just lets each thread read RGs repeatedly:

import sys
import pylibcudf as plc
import rmm
import threading
import torch
import cupy

# with rmm-mr the issue is much more often to see
# mr = rmm.mr.CudaAsyncMemoryResource()
# rmm.mr.set_current_device_resource(mr)

file = sys.argv[1]
NUM_THREAD = int(sys.argv[2])
READ_REPEAT = int(sys.argv[3])
col_name = sys.argv[4]
if "," in col_name:
    columns = [col.strip() for col in col_name.split(",")]
else:
    columns = [col_name]

meta = plc.io.parquet_metadata.read_parquet_metadata(plc.io.SourceInfo([file]))
print("num_rowgroups: ", meta.num_rowgroups())
print("columns: ", columns)
print("threads: ", NUM_THREAD)
print("READ_REPEAT: ", READ_REPEAT)

def thread_read(file, th_id, columns):
    torch_stream = torch.cuda.Stream(device="cuda:0")
    cupy_stream = cupy.cuda.ExternalStream(torch_stream.cuda_stream)
    rmm_stream = rmm.pylibrmm.stream.Stream(cupy_stream)
    for repeat in range(READ_REPEAT):
        for rg in range(meta.num_rowgroups()):
            options = plc.io.parquet.ParquetReaderOptions.builder(
                plc.io.SourceInfo([file])
            ).build()
            options.set_row_groups([[rg]])
            options.set_columns(columns)
            plc_table_w_meta = plc.io.parquet.read_parquet(options, rmm_stream)
            plc_table = plc_table_w_meta.tbl
            plc_column = plc_table.columns()[0]
            rmm_stream.synchronize()   # this should be unnecessary, but just to be sure
            torch_stream.synchronize() # this should be unnecessary, but just to be sure
            with torch.cuda.stream(torch_stream):
                tensor = torch.as_tensor(plc_column.data(), device="cuda").view(torch.int64)  # zero-copy
                unique = torch.unique(tensor)
            rmm_stream.synchronize()   # this should be unnecessary, but just to be sure
            torch_stream.synchronize() # this should be unnecessary, but just to be sure
            if unique.numel() != tensor.numel():
                print(f"[StorageBug][Thread{th_id}][Repeat{repeat}] Found {tensor.numel() - unique.numel()} duplicates in a unique-key-column")

threads = []
for th_id in range(NUM_THREAD):
    t = threading.Thread(
        target=thread_read,
        args=(file, th_id, columns),
    )
    threads.append(t)
    t.start()
for t in threads:
    t.join()

And the same as attached:

pylibcudf_rowgroup_repro.py

Expected behavior

The print [StorageBug][ThreadX][RepeatY] Found Z duplicates in a unique-key-column should never happen.

Environment overview

  • Environment location: Bare-metal
  • Method of cuDF install: conda

Environment details
The output of the cudf/print_env.sh:

printenv.txt

Additional context

  • The issue occurs both with PTDS enabled and disabled.
  • The Parquet file is stored on an SSD and read via GDS.
  • I’ve experimented with tuning the pinned memory environment variable LIBCUDF_PINNED_POOL_SIZE, but the issue persists.
  • I’ve had a bottom-up discussion regarding stream interoperability with @bdice , which might be relevant here: [QST] [PyTorch] Stream interoperability between PyTorch and RMM rmm#1831
  • I also tested a CuPy version to check uniqueness, and the bug still appears there. I haven’t checked the uniqueness using pylibcudf, but feel free to include that. Added
  • Regarding the commented-out RMM Mr: enabling it (improves performance and) seems to increase the likelihood of triggering the bug. However, it's not a necessary condition for the issue to appear, which is why I’ve left it commented out.

Metadata

Metadata

Assignees

No one assigned

    Labels

    bugSomething isn't working

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions