Skip to content

[Improve][Connector-v2][File] error handling for file and directory operations in HadoopFileSystemProxy#10433

Open
zhangshenghang wants to merge 1 commit intoapache:devfrom
zhangshenghang:improve-local-file
Open

[Improve][Connector-v2][File] error handling for file and directory operations in HadoopFileSystemProxy#10433
zhangshenghang wants to merge 1 commit intoapache:devfrom
zhangshenghang:improve-local-file

Conversation

@zhangshenghang
Copy link
Member

Purpose of this pull request

Does this PR introduce any user-facing change?

How was this patch tested?

Check list

@DanielCarter-stack
Copy link

Issue 1: path.getParent() may return null causing NPE

Location: HadoopFileSystemProxy.java:210

Modified code:

IOException enhanced =
        enhanceMkdirsException(
                fs, path.getParent(), "create file " + path.getName(), e);

Related context:

  • Callers: all WriteStrategy (18+ locations)
  • Call chain: WriteStrategygetOutputStream()FileSystem.create()enhanceMkdirsException()

Problem description:
When filePath is a relative path (e.g., "test.txt") or root path (e.g., "/"), path.getParent() returns null. In the enhanceMkdirsException() method (L297), the code executes:

Path parent = path.getParent();
if (parent != null && !fs.exists(parent)) {  // If parent is null, this will be skipped here
    reason.append("Parent directory does not exist: ").append(parent).append(". ");
} else {
    reason.append("Directory does not exist and creation failed: ")
            .append(path)
            .append(". ");
}

Although L297 has a null check, the subsequent L306-L316 will still call fs.getFileStatus(path), which may fail in certain edge cases.

Potential risks:

  1. Risk 1: If path itself is null (although filePath has @NonNull, new Path(null) may pass through), it will cause NPE
  2. Risk 2: Error message is inaccurate: for relative paths, the parent directory is the current working directory, but the error message will display "Parent directory does not exist: null"
  3. Risk 3: At enhanceMkdirsException() L298, if parent is null, the error message will be confusing

Impact scope:

  • Direct impact: getOutputStream() method
  • Indirect impact: all WriteStrategy using relative paths to create files
  • Affected surface: all File Connectors (~18 WriteStrategy implementations)

Severity: MAJOR (medium-high)

Improvement suggestion:

public FSDataOutputStream getOutputStream(String filePath) throws IOException {
    return execute(
            () -> {
                Path path = new Path(filePath);
                FileSystem fs = getFileSystem();
                try {
                    return fs.create(path, true);
                } catch (IOException e) {
                    Path parent = path.getParent();
                    // Fix: Handle null parent
                    String pathContext = (parent != null) ? parent.toString() : "current directory";
                    IOException enhanced =
                            enhanceMkdirsException(
                                    fs, parent, "create file " + path.getName(), e);
                    throw CommonError.fileOperationFailed(
                            "SeaTunnel", "create", filePath, enhanced);
                }
            });
}

// Also modify enhanceMkdirsException:
private IOException enhanceMkdirsException(
        FileSystem fs, Path path, String operation, IOException cause) throws IOException {
    StringBuilder reason = new StringBuilder();
    
    // Fix: Handle null path
    if (path == null) {
        reason.append("Path is null. ");
    } else if (!fs.exists(path)) {
        Path parent = path.getParent();
        if (parent != null && !fs.exists(parent)) {
            reason.append("Parent directory does not exist: ").append(parent).append(". ");
        } else if (parent == null) {
            reason.append("Path is in current directory. ");
        } else {
            reason.append("Directory does not exist and creation failed: ")
                    .append(path)
                    .append(". ");
        }
        
        try {
            fs.getFileStatus(path);
        } catch (IOException e) {
            if (e.getMessage() != null) {
                if (e.getMessage().contains("Permission denied")) {
                    reason.append("Permission denied. ");
                } else {
                    reason.append("Hadoop error: ").append(e.getMessage()).append(". ");
                }
            }
        }
    } else {
        reason.append("Path exists but may be inaccessible: ").append(path).append(". ");
    }
    
    reason.append("Operation: ")
            .append(operation)
            .append(". ")
            .append("Current working directory: ")
            .append(fs.getWorkingDirectory());
    
    IOException enhanced = new IOException(reason.toString());
    if (cause != null) {
        enhanced.addSuppressed(cause);
    }
    return enhanced;
}

Rationale:

  1. Defensive programming: handle the case where path.getParent() returns null
  2. More accurate error messages: for relative paths, indicate "current directory" instead of "null"
  3. Improved robustness: avoid potential NPE

Issue 2: enhanceMkdirsException() method lacks JavaDoc

Location: HadoopFileSystemProxy.java:292-332

Modified code:

private IOException enhanceMkdirsException(
        FileSystem fs, Path path, String operation, IOException cause) throws IOException {
    // 45 lines of code, no JavaDoc
}

Related context:

  • Other private methods in the same class: most have comments
  • Callers: createDir() (L149), getOutputStream() (L209)

Problem description:
enhanceMkdirsException() is a complex private method (~45 lines), involving multiple FileSystem API calls and error message parsing, but lacks JavaDoc explaining its purpose, parameter meanings, and return value conventions.

Potential risks:

  1. Risk 1: Future maintainers may not understand the method's purpose and incorrectly modify or delete it
  2. Risk 2: The meaning of parameter operation is unclear (is it "current operation" or "target operation")
  3. Risk 3: The structure and format of the returned IOException is not documented, making it difficult for callers to depend on

Impact scope:

  • Direct impact: code maintainability
  • Indirect impact: future extension and debugging
  • Affected surface: single class, but affects long-term maintenance

Severity: MINOR (low)

Improvement suggestion:

/**
 * Enhances IOException with detailed diagnostic information for directory/file creation failures.
 * <p>
 * This method performs the following diagnostic checks:
 * <ul>
 *   <li>Checks if the parent directory exists</li>
 *   <li>Detects permission denied errors from Hadoop</li>
 *   <li>Captures Hadoop-specific error messages</li>
 *   <li>Includes current working directory for context</li>
 * </ul>
 * 
 * @param fs the FileSystem instance to perform diagnosticchecks on
 * @param path the path that failed to create (can be null for relative paths)
 * @param operation the operation being performed (e.g., "create directory", "create file")
 * @param cause the original IOException from FileSystem.mkdirs() or FileSystem.create() (can be null)
 * @return an enhanced IOException with detailed diagnostic information in the message
 * @throws IOException if diagnostic checks (e.g., fs.exists(), fs.getFileStatus()) fail
 * 
 * @see CommonError#fileOperationFailed(String, String, String, Throwable)
 */
private IOException enhanceMkdirsException(
        FileSystem fs, Path path, String operation, IOException cause) throws IOException {
    // ...
}

Rationale:

  1. Apache top-level projects should have comprehensive documentation
  2. Complex methods need clear contract descriptions
  3. Facilitate understanding and extension by future maintainers

Issue 3: Missing unit tests for this modification

Location: seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/hadoop/

Modified code:
No new test files

Related context:

  • Existing tests: HadoopFileSystemProxyKerberosRenewTest.java
  • Modified methods: createDir(), getOutputStream(), renameFile()
  • New methods: enhanceMkdirsException()

Problem description:
The PR modifies core error handling logic but does not add corresponding unit tests. According to Apache project standards, all new features and bug fixes should have test coverage.

Potential risks:

  1. Risk 1: Regression risk: future modifications may break this improvement
  2. Risk 2: Boundary conditions not tested: e.g., when path.getParent() returns null
  3. Risk 3: Cannot verify whether the fix is effective: e.g., whether the "directory already exists" scenario really no longer throws exceptions

Impact scope:

  • Direct impact: code quality assurance
  • Indirect impact: confidence in future maintenance
  • Affected surface: entire File Connector module

Severity: MAJOR (medium-high)

Improvement suggestion:
Create new test file HadoopFileSystemProxyTest.java:

package org.apache.seatunnel.connectors.seatunnel.file.hadoop;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException;
import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;

import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Paths;

import static org.junit.jupiter.api.Assertions.*;

class HadoopFileSystemProxyTest {
    
    private HadoopFileSystemProxy proxy;
    private HadoopConf hadoopConf;
    
    @TempDir
    java.nio.file.Path tempDir;
    
    @BeforeEach
    void setUp() throws IOException {
        hadoopConf = new HadoopConf();
        hadoopConf.setHdfsPath("file://" + tempDir.toString());
        proxy = new HadoopFileSystemProxy(hadoopConf);
    }
    
    @AfterEach
    void tearDown() throws IOException {
        if (proxy != null) {
            proxy.close();
        }
    }
    
    @Test
    void testCreateDirWhenAlreadyExists() throws IOException {
        // Given: Directory already exists
        String dirPath = tempDir.resolve("test-dir").toString();
        proxy.createDir(dirPath);
        
        // When: Create again
        // Then: Should not throw exception
        assertDoesNotThrow(() -> proxy.createDir(dirPath));
    }
    
    @Test
    void testCreateDirWhenParentNotExists() {
        // Given: Parent directory does not exist
        String dirPath = tempDir.resolve("non-existent-parent/child").toString();
        
        // When & Then: Should throw exception with detailed information
        SeaTunnelRuntimeException ex = assertThrows(
            SeaTunnelRuntimeException.class,
            () -> proxy.createDir(dirPath)
        );
        
        // Verify error message contains useful diagnostic information
        assertTrue(ex.getMessage().contains("Parent directory does not exist") ||
                   ex.getMessage().contains("Hadoop error"));
    }
    
    @Test
    void testGetOutputStreamWhenSuccess() throws IOException {
        // Given: Parent directory exists
        String filePath = tempDir.resolve("test-file.txt").toString();
        
        // When: Create output stream
        // Then: Should not throw exception
        assertDoesNotThrow(() -> proxy.getOutputStream(filePath));
    }
    
    @Test
    void testGetOutputStreamWhenParentNotExists() {
        // Given: Parent directory does not exist
        String filePath = tempDir.resolve("non-existent-parent/file.txt").toString();
        
        // When & Then: Should throw exception with parent directory information
        SeaTunnelRuntimeException ex = assertThrows(
            SeaTunnelRuntimeException.class,
            () -> proxy.getOutputStream(filePath)
        );
        
        // Verify error message contains useful diagnostic information
        String message = ex.getMessage();
        assertTrue(message.contains("Parent directory") || 
                   message.contains("Hadoop error") ||
                   message.contains("Permission denied"));
    }
    
    @Test
    void testGetOutputStreamWithRelativePath() throws IOException {
        // Given: Relative path
        String relativePath = "test-relative.txt";
        
        // When: Create output stream
        // Then: Should not throw NPE
        assertDoesNotThrow(() -> proxy.getOutputStream(relativePath));
    }
    
    @Test
    void testGetOutputStreamWithRootPath() {
        // Given: Root path (Edge Case)
        String rootPath = "/";
        
        // When & Then: Should throw exception (but not NPE)
        Exception ex = assertThrows(Exception.class, () -> {
            proxy.getOutputStream(rootPath);
        });
        
        // Verify it's not NPE
        assertFalse(ex instanceof NullPointerException);
    }
}

Rationale:

  1. Apache project standards: all code should have tests
  2. Cover key scenarios: directory already exists, parent directory does not exist, relative paths, etc.
  3. Prevent regression: ensure future modifications won't break this improvement
  4. Verify fix: ensure the behavior of "directory already exists no longer throws exceptions" is truly effective

Issue 4: fs.exists() and fs.getFileStatus() in enhanceMkdirsException() may throw swallowed exceptions

Location: HadoopFileSystemProxy.java:296-316

Modified code:

if (!fs.exists(path)) {
    Path parent = path.getParent();
    if (parent != null && !fs.exists(parent)) {
        reason.append("Parent directory does not exist: ").append(parent).append(". ");
    } else {
        reason.append("Directory does not exist and creation failed: ")
                .append(path)
                .append(". ");
    }

    try {
        fs.getFileStatus(path);  // A new IOException may be thrown here
    } catch (IOException e) {
        if (e.getMessage() != null) {
            if (e.getMessage().contains("Permission denied")) {
                reason.append("Permission denied. ");
            } else {
                reason.append("Hadoop error: ").append(e.getMessage()).append(". ");
            }
        }
    }
}

Related context:

  • Callers: createDir() (L149), getOutputStream() (L209)
  • Hadoop FileSystem API: both exists() and getFileStatus() are remote calls

Problem description:
In enhanceMkdirsException(), both fs.exists() and fs.getFileStatus() are Hadoop RPC calls that may throw new IOException (network failure, NameNode downtime, etc.). The current code's handling of these exceptions is inconsistent:

  1. fs.exists() calls (L296, L298): exceptions not caught, will be thrown directly
  2. fs.getFileStatus() call (L307): exceptions caught, but if e.getMessage() is null, all error information will be lost

Potential risks:

  1. Risk 1: If a network failure occurs during exception enhancement, fs.exists() will throw a new exception, causing the original error message to be lost
  2. Risk 2: For getFileStatus()'s IOException, if getMessage() returns null, error information will be lost
  3. Risk 3: Nested RPC calls increase failure risk: original failure + diagnostic call failure

Impact scope:

  • Direct impact: reliability of error diagnostics
  • Indirect impact: debugging difficulty (incomplete error messages)
  • Affected surface: all scenarios using createDir() and getOutputStream()

Severity: MINOR (low-medium)

Improvement suggestion:

private IOException enhanceMkdirsException(
        FileSystem fs, Path path, String operation, IOException cause) throws IOException {
    StringBuilder reason = new StringBuilder();
    
    try {
        if (!fs.exists(path)) {
            Path parent = path.getParent();
            if (parent != null && !fs.exists(parent)) {
                reason.append("Parent directory does not exist: ").append(parent).append(". ");
            } else if (parent == null) {
                reason.append("Path is in current directory. ");
            } else {
                reason.append("Directory does not exist and creation failed: ")
                        .append(path)
                        .append(". ");
            }

            // Safely attempt to get detailed error
            try {
                fs.getFileStatus(path);
            } catch (IOException e) {
                String errorMsg = e.getMessage();
                if (errorMsg != null && !errorMsg.isEmpty()) {
                    if (errorMsg.contains("Permission denied")) {
                        reason.append("Permission denied. ");
                    } else {
                        reason.append("Hadoop error: ").append(errorMsg).append(". ");
                    }
                } else {
                    reason.append("Hadoop error: (no detailed message). ");
                }
            }
        } else {
            reason.append("Path exists but may be inaccessible: ").append(path).append(". ");
        }
    } catch (IOException diagnosticEx) {
        // If diagnostic checks fail, fall back to basic information
        reason.append("Failed to diagnose path: ")
              .append(path)
              .append(". Diagnostic error: ")
              .append(diagnosticEx.getMessage())
              .append(". ");
    }
    
    reason.append("Operation: ")
            .append(operation)
            .append(". ")
            .append("Current working directory: ");
    
    // Safely get working directory
    try {
        reason.append(fs.getWorkingDirectory());
    } catch (IOException e) {
        reason.append("(unknown: ").append(e.getMessage()).append(")");
    }
    
    IOException enhanced = new IOException(reason.toString());
    if (cause != null) {
        enhanced.addSuppressed(cause);
    }
    return enhanced;
}

Rationale:

  1. Defensive programming: provide basic error information when diagnostic calls fail
  2. Preserve original exception: retain original cause through addSuppressed
  3. More robust: avoid throwing new uncaught exceptions during error handling

Issue 5: Log warning level in renameFile() may be inappropriate

Location: HadoopFileSystemProxy.java:106-111

Modified code:

if (!fileExist(oldPath.toString())) {
    log.warn(
            "rename file:[{}] to [{}] already finished in the last commit, skip. "
                    + "WARNING: In cluster mode with LocalFile without shared storage, "
                    + "the file may not be actually synced successfully, but the status shows success.",
            oldPath,
            newPath);
    return Void.class;
}

Related context:

  • Caller: FileSinkAggregatedCommitter.commit() (L59)
  • Scenario: duplicate submission after checkpoint recovery

Problem description:
In renameFile(), if the source file does not exist (possibly because the last commit has completed), the log.warn() level is used. However, this may be normal behavior rather than an exceptional situation in the following scenarios:

  1. Checkpoint recovery: after a job failure and restart, re-executing commit, the file was already successfully renamed in the previous attempt
  2. Idempotency: FileSinkAggregatedCommitter.commit() should be idempotent, and the source file not existing is normal

Using the WARN level may lead to:

  1. Log noise: normal checkpoint recovery generates large amounts of warnings
  2. False positives: operations personnel may mistakenly believe there are serious problems

Potential risks:

  1. Risk 1: Log level misuse, masking real warnings
  2. Risk 2: In large-scale clusters, frequent checkpoint recovery will generate大量 WARN logs
  3. Risk 3: Log aggregation systems (e.g., ELK) may falsely report as anomalies

Impact scope:

  • Direct impact: log readability and monitoring alerts
  • Indirect impact: operational efficiency
  • Affected surface: all jobs using File Sink

Severity: MINOR (low)

Improvement suggestion:

Option 1: Change to INFO level (recommended)

if (!fileExist(oldPath.toString())) {
    log.info(
            "rename file:[{}] to [{}] already finished in the last commit, skip. "
                    + "INFO: In cluster mode with LocalFile without shared storage, "
                    + "the file may not be actually synced successfully, but the status shows success.",
            oldPath,
            newPath);
    return Void.class;
}

Option 2: Distinguish log levels based on scenario

if (!fileExist(oldPath.toString())) {
    // Check if in checkpoint recovery scenario (can be determined from context)
    if (isCheckpointRecovery()) {
        log.debug("File [{}] already renamed to [{}], skipping", oldPath, newPath);
    } else {
        log.warn(
                "rename file:[{}] to [{}] already finished in the last commit, skip. "
                        + "WARNING: In cluster mode with LocalFile without shared storage, "
                        + "the file may not be actually synced successfully, but the status shows success.",
                oldPath,
                newPath);
    }
    return Void.class;
}

Option 3: Keep WARN, but clearly mark as "expected behavior"

if (!fileExist(oldPath.toString())) {
    log.warn(
            "rename file:[{}] to [{}] already finished in the last commit, skip. "
                    + "Note: This is expected during checkpoint recovery. "
                    + "However, in cluster mode with LocalFile without shared storage, "
                    + "the file may not be actually synced successfully, but the status shows success.",
            oldPath,
            newPath);
    return Void.class;
}

Rationale:

  1. Option 1 is simplest: INFO level is more appropriate for "duplicate skips in idempotent operations"
  2. Option 2 is most precise: distinguish between normal recovery and real exceptions
  3. Option 3 has minimal changes: keep WARN, but indicate it's expected behavior

Copy link
Collaborator

@chl-wxp chl-wxp left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

Copy link
Collaborator

@LiJie20190102 LiJie20190102 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants