diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/GzipCodec.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/GzipCodec.java index 1f4c02c261b84..9136fa9b927f8 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/GzipCodec.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/GzipCodec.java @@ -24,7 +24,6 @@ import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; -import java.util.zip.GZIPOutputStream; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; @@ -41,101 +40,10 @@ @InterfaceAudience.Public @InterfaceStability.Evolving public class GzipCodec extends DefaultCodec { - /** - * A bridge that wraps around a DeflaterOutputStream to make it - * a CompressionOutputStream. - */ - @InterfaceStability.Evolving - protected static class GzipOutputStream extends CompressorStream { - - private static class ResetableGZIPOutputStream extends GZIPOutputStream { - /** - * Fixed ten-byte gzip header. See {@link GZIPOutputStream}'s source for - * details. - */ - private static final byte[] GZIP_HEADER = new byte[] { - 0x1f, (byte) 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00 }; - - private boolean reset = false; - - public ResetableGZIPOutputStream(OutputStream out) throws IOException { - super(out); - } - - public synchronized void resetState() throws IOException { - reset = true; - } - - @Override - public synchronized void write(byte[] buf, int off, int len) - throws IOException { - if (reset) { - def.reset(); - crc.reset(); - out.write(GZIP_HEADER); - reset = false; - } - super.write(buf, off, len); - } - - @Override - public synchronized void close() throws IOException { - reset = false; - super.close(); - } - - } - - public GzipOutputStream(OutputStream out) throws IOException { - super(new ResetableGZIPOutputStream(out)); - } - - /** - * Allow children types to put a different type in here. - * @param out the Deflater stream to use - */ - protected GzipOutputStream(CompressorStream out) { - super(out); - } - - @Override - public void close() throws IOException { - out.close(); - } - - @Override - public void flush() throws IOException { - out.flush(); - } - - @Override - public void write(int b) throws IOException { - out.write(b); - } - - @Override - public void write(byte[] data, int offset, int length) - throws IOException { - out.write(data, offset, length); - } - - @Override - public void finish() throws IOException { - ((ResetableGZIPOutputStream) out).finish(); - } - - @Override - public void resetState() throws IOException { - ((ResetableGZIPOutputStream) out).resetState(); - } - } @Override public CompressionOutputStream createOutputStream(OutputStream out) throws IOException { - if (!ZlibFactory.isNativeZlibLoaded(conf)) { - return new GzipOutputStream(out); - } return CompressionCodec.Util. createOutputStreamWithCodecPool(this, conf, out); } diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/zlib/BuiltInGzipCompressor.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/zlib/BuiltInGzipCompressor.java index 61f7c12541e07..fcb431dce86ca 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/zlib/BuiltInGzipCompressor.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/zlib/BuiltInGzipCompressor.java @@ -56,7 +56,6 @@ public class BuiltInGzipCompressor implements Compressor { private int numExtraBytesWritten = 0; - private int currentBufLen = 0; private int accuBufLen = 0; private final Checksum crc = DataChecksum.newCrc32(); @@ -86,10 +85,6 @@ public int compress(byte[] b, int off, int len) throws IOException { int compressedBytesWritten = 0; - if (currentBufLen <= 0) { - return compressedBytesWritten; - } - // If we are not within uncompressed data yet, output the header. if (state == BuiltInGzipDecompressor.GzipStateLabel.HEADER_BASIC) { int outputHeaderSize = writeHeader(b, off, len); @@ -166,7 +161,6 @@ private void init(Configuration conf) { public void reinit(Configuration conf) { init(conf); numExtraBytesWritten = 0; - currentBufLen = 0; headerOff = 0; trailerOff = 0; crc.reset(); @@ -178,7 +172,6 @@ public void reset() { deflater.reset(); state = BuiltInGzipDecompressor.GzipStateLabel.HEADER_BASIC; numExtraBytesWritten = 0; - currentBufLen = 0; headerOff = 0; trailerOff = 0; crc.reset(); @@ -201,8 +194,7 @@ public void setInput(byte[] b, int off, int len) { deflater.setInput(b, off, len); crc.update(b, off, len); // CRC-32 is on uncompressed data - currentBufLen = len; - accuBufLen += currentBufLen; + accuBufLen += len; } private int writeHeader(byte[] b, int off, int len) { @@ -251,7 +243,6 @@ private int writeTrailer(byte[] b, int off, int len) { if (trailerOff == gzipTrailerLen) { state = BuiltInGzipDecompressor.GzipStateLabel.FINISHED; - currentBufLen = 0; headerOff = 0; trailerOff = 0; } diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/TestCodec.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/TestCodec.java index e3afefd1b730f..26867eed91a97 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/TestCodec.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/TestCodec.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.io.compress; +import static org.assertj.core.api.Assertions.assertThat; import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; @@ -1051,4 +1052,45 @@ public void testCodecPoolAndGzipDecompressor() { } } } + + @Test(timeout=20000) + public void testGzipCompressorWithEmptyInput() throws IOException { + // don't use native libs + ZlibFactory.setNativeZlibLoaded(false); + Configuration conf = new Configuration(); + CompressionCodec codec = ReflectionUtils.newInstance(GzipCodec.class, conf); + + Compressor compressor = codec.createCompressor(); + assertThat(compressor).withFailMessage("should be BuiltInGzipCompressor") + .isInstanceOf(BuiltInGzipCompressor.class); + + byte[] b = new byte[0]; + compressor.setInput(b, 0, b.length); + compressor.finish(); + + byte[] output = new byte[100]; + int outputOff = 0; + + while (!compressor.finished()) { + byte[] buf = new byte[100]; + int compressed = compressor.compress(buf, 0, buf.length); + System.arraycopy(buf, 0, output, outputOff, compressed); + outputOff += compressed; + } + + DataInputBuffer gzbuf = new DataInputBuffer(); + gzbuf.reset(output, outputOff); + + Decompressor decom = codec.createDecompressor(); + assertThat(decom).as("decompressor should not be null").isNotNull(); + assertThat(decom).withFailMessage("should be BuiltInGzipDecompressor") + .isInstanceOf(BuiltInGzipDecompressor.class); + try (InputStream gzin = codec.createInputStream(gzbuf, decom); + DataOutputBuffer dflbuf = new DataOutputBuffer()) { + dflbuf.reset(); + IOUtils.copyBytes(gzin, dflbuf, 4096); + final byte[] dflchk = Arrays.copyOf(dflbuf.getData(), dflbuf.getLength()); + assertThat(b).as("check decompressed output").isEqualTo(dflchk); + } + } }