Skip to content

[spark] Empty projection in batch read throws Invalid metadata length with COUNT(*)/COUNT(1) #2724

@beryllw

Description

@beryllw

Search before asking

  • I searched in the issues and found nothing similar.

Fluss version

main (development)

Please describe the bug 🐞

When executing aggregate queries like COUNT(*)/COUNT(1) on a Fluss table via Spark SQL, the server throws IllegalStateException: Invalid metadata length at FileLogProjection.project(). This is likely caused by Spark pushing down an empty column projection for count-only queries. The LogFetcher retries indefinitely, making the query hang.

How to Reproduce:

CREATE TABLE log_orders (
    order_id BIGINT,
    item_id BIGINT,
    amount INT,
    address STRING
);

INSERT INTO log_orders VALUES
    (1, 101, 10, 'Beijing'),
    (2, 102, 20, 'Shanghai'),
    (3, 103, 30, 'Hangzhou'),
    (4, 104, 40, 'Shenzhen'),
    (5, 105, 50, 'Guangzhou');

SELECT COUNT(*) FROM log_orders;

Stack Trace:

26/02/25 12:31:07 ERROR LogFetcher: Failed to fetch log from node 0 for bucket TableBucket{tableId=0, bucket=0}
org.apache.fluss.exception.UnknownServerException: The server experienced an unexpected error when processing the request.
26/02/25 12:31:07 WARN LogFetchCollector: Unknown server error while fetching offset -2 for bucket TableBucket{tableId=0, bucket=0}: org.apache.fluss.exception.UnknownServerException: java.lang.IllegalStateException: Invalid metadata length
        at org.apache.fluss.utils.Preconditions.checkState(Preconditions.java:161)
        at org.apache.fluss.record.FileLogProjection.project(FileLogProjection.java:232)
        at org.apache.fluss.server.log.LogSegment.read(LogSegment.java:534)
        at org.apache.fluss.server.log.LocalLog.read(LocalLog.java:409)
        at org.apache.fluss.server.log.LogTablet.read(LogTablet.java:413)
        at org.apache.fluss.server.replica.Replica.readRecords(Replica.java:1498)
        at org.apache.fluss.server.replica.Replica.lambda$fetchRecords$11(Replica.java:1054)
        at org.apache.fluss.utils.concurrent.LockUtils.inLock(LockUtils.java:42)
        at org.apache.fluss.utils.concurrent.LockUtils.inReadLock(LockUtils.java:55)
        at org.apache.fluss.server.replica.Replica.fetchRecords(Replica.java:1050)
        at org.apache.fluss.server.replica.ReplicaManager.readFromLog(ReplicaManager.java:1358)
        at org.apache.fluss.server.replica.ReplicaManager.fetchLogRecords(ReplicaManager.java:562)
        at org.apache.fluss.server.tablet.TabletService.fetchLog(TabletService.java:200)
        at jdk.internal.reflect.GeneratedMethodAccessor4.invoke(Unknown Source)
        at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
        at java.base/java.lang.reflect.Method.invoke(Unknown Source)
        at org.apache.fluss.rpc.netty.server.FlussRequestHandler.processRequest(FlussRequestHandler.java:63)
        at org.apache.fluss.rpc.netty.server.FlussRequestHandler.processRequest(FlussRequestHandler.java:34)
        at org.apache.fluss.rpc.netty.server.RequestProcessor.processRequest(RequestProcessor.java:98)
        at org.apache.fluss.rpc.netty.server.RequestProcessor.run(RequestProcessor.java:70)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
        at java.base/java.lang.Thread.run(Unknown Source)

Possible Cause: Spark optimizes COUNT(*) / COUNT(1) by projecting zero columns. FileLogProjection.project() does not handle this empty projection case, leading to invalid metadata length during log segment reads.

Solution

No response

Are you willing to submit a PR?

  • I'm willing to submit a PR!

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No fields configured for Bug.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions