diff --git a/cpp/src/arrow/util/compression_lz4.cc b/cpp/src/arrow/util/compression_lz4.cc index 7ecc350d353..18ae760ae9a 100644 --- a/cpp/src/arrow/util/compression_lz4.cc +++ b/cpp/src/arrow/util/compression_lz4.cc @@ -426,30 +426,59 @@ class Lz4HadoopCodec : public Lz4Codec { int64_t MaxCompressedLen(int64_t input_len, const uint8_t* ARROW_ARG_UNUSED(input)) override { - return kPrefixLength + Lz4Codec::MaxCompressedLen(input_len, nullptr); + // Each block gets its own 8-byte prefix and is compressed independently, + // so we sum LZ4_compressBound per block (not for the whole input at once, + // since LZ4_compressBound has a small per-call overhead). + const int64_t num_full_blocks = input_len / kBlockSize; + const int64_t tail = input_len % kBlockSize; + int64_t max_len = num_full_blocks * + (kPrefixLength + Lz4Codec::MaxCompressedLen(kBlockSize, nullptr)); + if (tail > 0) { + max_len += kPrefixLength + Lz4Codec::MaxCompressedLen(tail, nullptr); + } + return max_len; } Result Compress(int64_t input_len, const uint8_t* input, int64_t output_buffer_len, uint8_t* output_buffer) override { - if (output_buffer_len < kPrefixLength) { - return Status::Invalid("Output buffer too small for Lz4HadoopCodec compression"); + // Hadoop's BlockCompressorStream splits data into blocks of at most + // kBlockSize uncompressed bytes, each prefixed with [decompressed_size, + // compressed_size] in big-endian uint32. Hadoop's Lz4Decompressor + // allocates a fixed output buffer of the same size, so a single block + // that decompresses to more than kBlockSize will overflow that buffer + // and fail. We must split large inputs the same way Hadoop does. + int64_t total_output_len = 0; + + while (input_len > 0) { + const int64_t block_input_len = input_len < kBlockSize ? input_len : kBlockSize; + + if (output_buffer_len < kPrefixLength) { + return Status::Invalid("Output buffer too small for Lz4HadoopCodec compression"); + } + + ARROW_ASSIGN_OR_RAISE( + int64_t block_compressed_len, + Lz4Codec::Compress(block_input_len, input, output_buffer_len - kPrefixLength, + output_buffer + kPrefixLength)); + + // Prepend decompressed size in bytes and compressed size in bytes + // to be compatible with Hadoop Lz4Codec + const uint32_t decompressed_size = + bit_util::ToBigEndian(static_cast(block_input_len)); + const uint32_t compressed_size = + bit_util::ToBigEndian(static_cast(block_compressed_len)); + SafeStore(output_buffer, decompressed_size); + SafeStore(output_buffer + sizeof(uint32_t), compressed_size); + + const int64_t block_total_len = kPrefixLength + block_compressed_len; + total_output_len += block_total_len; + output_buffer += block_total_len; + output_buffer_len -= block_total_len; + input += block_input_len; + input_len -= block_input_len; } - ARROW_ASSIGN_OR_RAISE( - int64_t output_len, - Lz4Codec::Compress(input_len, input, output_buffer_len - kPrefixLength, - output_buffer + kPrefixLength)); - - // Prepend decompressed size in bytes and compressed size in bytes - // to be compatible with Hadoop Lz4Codec - const uint32_t decompressed_size = - bit_util::ToBigEndian(static_cast(input_len)); - const uint32_t compressed_size = - bit_util::ToBigEndian(static_cast(output_len)); - SafeStore(output_buffer, decompressed_size); - SafeStore(output_buffer + sizeof(uint32_t), compressed_size); - - return kPrefixLength + output_len; + return total_output_len; } Result> MakeCompressor() override { @@ -470,6 +499,17 @@ class Lz4HadoopCodec : public Lz4Codec { // Offset starting at which page data can be read/written static const int64_t kPrefixLength = sizeof(uint32_t) * 2; + // Maximum uncompressed block size per Hadoop-framed LZ4 block. + // Hadoop's IO_COMPRESSION_CODEC_LZ4_BUFFERSIZE is configurable at + // runtime (default 256 KiB). Both its BlockCompressorStream and + // Lz4Decompressor use that value: the compressor splits data into + // blocks of this size, and the decompressor allocates a fixed output + // buffer of the same size. We use the default here since we cannot + // read Hadoop's runtime configuration from C++. + static constexpr int64_t kBlockSize = 256 * 1024; + static_assert(kBlockSize <= std::numeric_limits::max(), + "kBlockSize must fit in uint32_t for Hadoop framing prefix"); + static const int64_t kNotHadoop = -1; int64_t TryDecompressHadoop(int64_t input_len, const uint8_t* input, diff --git a/cpp/src/arrow/util/compression_test.cc b/cpp/src/arrow/util/compression_test.cc index 5ba93cc291d..df66bfbf9c8 100644 --- a/cpp/src/arrow/util/compression_test.cc +++ b/cpp/src/arrow/util/compression_test.cc @@ -871,6 +871,66 @@ TEST(TestCodecLZ4Hadoop, Compatibility) { std::vector data = MakeRandomData(100); CheckCodecRoundtrip(c1, c2, data, /*check_reverse=*/false); } + +TEST(TestCodecLZ4Hadoop, MultiBlockRoundtrip) { + // Verify multi-block Hadoop-framed LZ4 compression: + // 1. MaxCompressedLen provides a sufficient buffer (even for incompressible data) + // 2. Compress -> Decompress round-trips correctly + // 3. No block exceeds Hadoop's 256 KiB decompressed limit + // (Hadoop's Lz4Decompressor allocates a fixed output buffer of that size; + // exceeding it causes LZ4Exception on JVM readers.) + // + // Check (3) fails without the multi-block splitting fix. + ASSERT_OK_AND_ASSIGN(auto codec, Codec::Create(Compression::LZ4_HADOOP)); + constexpr int kHadoopBlockSize = 256 * 1024; + + for (int data_size : + {0, 1, 10000, 256 * 1024, 256 * 1024 + 1, 320000, 512 * 1024, 1024 * 1024}) { + ARROW_SCOPED_TRACE("data_size = ", data_size); + std::vector data = MakeRandomData(data_size); + + // (1) MaxCompressedLen must be sufficient — Compress fails if not. + int64_t max_compressed_len = codec->MaxCompressedLen(data.size(), data.data()); + std::vector compressed(max_compressed_len); + + ASSERT_OK_AND_ASSIGN( + int64_t compressed_len, + codec->Compress(data.size(), data.data(), max_compressed_len, compressed.data())); + ASSERT_LE(compressed_len, max_compressed_len); + + // (2) Round-trip: decompress and verify data integrity. + std::vector decompressed(data.size()); + ASSERT_OK_AND_ASSIGN(int64_t decompressed_len, + codec->Decompress(compressed_len, compressed.data(), data.size(), + decompressed.data())); + ASSERT_EQ(decompressed_len, static_cast(data.size())); + ASSERT_EQ(data, decompressed); + + // (3) Walk Hadoop-framed blocks and verify each decompressed_size <= 256 KiB. + const uint8_t* ptr = compressed.data(); + int64_t remaining = compressed_len; + int block_count = 0; + while (remaining > 0) { + ASSERT_GE(remaining, 8) << "truncated block header"; + uint32_t block_decompressed_size = + static_cast(ptr[0]) << 24 | static_cast(ptr[1]) << 16 | + static_cast(ptr[2]) << 8 | static_cast(ptr[3]); + uint32_t block_compressed_size = + static_cast(ptr[4]) << 24 | static_cast(ptr[5]) << 16 | + static_cast(ptr[6]) << 8 | static_cast(ptr[7]); + ASSERT_LE(static_cast(block_decompressed_size), kHadoopBlockSize) + << "block " << block_count << " exceeds Hadoop's 256 KiB limit"; + ASSERT_GE(remaining, 8 + static_cast(block_compressed_size)); + ptr += 8 + block_compressed_size; + remaining -= 8 + block_compressed_size; + ++block_count; + } + ASSERT_EQ(remaining, 0); + if (data_size > kHadoopBlockSize) { + ASSERT_GE(block_count, 2) << "expected multiple blocks for data > 256 KiB"; + } + } +} #endif } // namespace util