Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.zip.GZIPOutputStream;

import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
Expand All @@ -41,101 +40,10 @@
@InterfaceAudience.Public
@InterfaceStability.Evolving
public class GzipCodec extends DefaultCodec {
/**
* A bridge that wraps around a DeflaterOutputStream to make it
* a CompressionOutputStream.
*/
@InterfaceStability.Evolving
protected static class GzipOutputStream extends CompressorStream {

private static class ResetableGZIPOutputStream extends GZIPOutputStream {
/**
* Fixed ten-byte gzip header. See {@link GZIPOutputStream}'s source for
* details.
*/
private static final byte[] GZIP_HEADER = new byte[] {
0x1f, (byte) 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00 };

private boolean reset = false;

public ResetableGZIPOutputStream(OutputStream out) throws IOException {
super(out);
}

public synchronized void resetState() throws IOException {
reset = true;
}

@Override
public synchronized void write(byte[] buf, int off, int len)
throws IOException {
if (reset) {
def.reset();
crc.reset();
out.write(GZIP_HEADER);
reset = false;
}
super.write(buf, off, len);
}

@Override
public synchronized void close() throws IOException {
reset = false;
super.close();
}

}

public GzipOutputStream(OutputStream out) throws IOException {
super(new ResetableGZIPOutputStream(out));
}

/**
* Allow children types to put a different type in here.
* @param out the Deflater stream to use
*/
protected GzipOutputStream(CompressorStream out) {
super(out);
}

@Override
public void close() throws IOException {
out.close();
}

@Override
public void flush() throws IOException {
out.flush();
}

@Override
public void write(int b) throws IOException {
out.write(b);
}

@Override
public void write(byte[] data, int offset, int length)
throws IOException {
out.write(data, offset, length);
}

@Override
public void finish() throws IOException {
((ResetableGZIPOutputStream) out).finish();
}

@Override
public void resetState() throws IOException {
((ResetableGZIPOutputStream) out).resetState();
}
}

@Override
public CompressionOutputStream createOutputStream(OutputStream out)
throws IOException {
if (!ZlibFactory.isNativeZlibLoaded(conf)) {
return new GzipOutputStream(out);
}
return CompressionCodec.Util.
createOutputStreamWithCodecPool(this, conf, out);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,6 @@ public class BuiltInGzipCompressor implements Compressor {

private int numExtraBytesWritten = 0;

private int currentBufLen = 0;
private int accuBufLen = 0;

private final Checksum crc = DataChecksum.newCrc32();
Expand Down Expand Up @@ -86,10 +85,6 @@ public int compress(byte[] b, int off, int len) throws IOException {

int compressedBytesWritten = 0;

if (currentBufLen <= 0) {
return compressedBytesWritten;
}

Comment on lines -89 to -92
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, found this bug. If we set an empty input to the compress stream, it will cause endless loop.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we add a test case for this?

Copy link
Member Author

@viirya viirya Sep 3, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Without removing the condition, current test will timeout after removing GzipOutputStream.

Copy link
Member Author

@viirya viirya Sep 3, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we still need another test for it? We currently have test coverage for it.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Which test is failing before removing the condition? so it passes with the wrapper class but fails after?

Also, we can remove the currentBufLen variable now since it is no longer used anywhere else.

Copy link
Member Author

@viirya viirya Sep 3, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

testGzipCodec will cause timeout after removing GzipOutputStream.

Because of this line:

codecTest(conf, seed, 0, "org.apache.hadoop.io.compress.GzipCodec");

It writes an empty input to the compress stream. Due to this currentBufLen check, compress will return 0 endlessly.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's still good to have a dedicated test for this edge case. We can use @Test(timeout=<value>) to check the timeout.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i see. let me add one then.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks!

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added one test for empty input case.

// If we are not within uncompressed data yet, output the header.
if (state == BuiltInGzipDecompressor.GzipStateLabel.HEADER_BASIC) {
int outputHeaderSize = writeHeader(b, off, len);
Expand Down Expand Up @@ -166,7 +161,6 @@ private void init(Configuration conf) {
public void reinit(Configuration conf) {
init(conf);
numExtraBytesWritten = 0;
currentBufLen = 0;
headerOff = 0;
trailerOff = 0;
crc.reset();
Expand All @@ -178,7 +172,6 @@ public void reset() {
deflater.reset();
state = BuiltInGzipDecompressor.GzipStateLabel.HEADER_BASIC;
numExtraBytesWritten = 0;
currentBufLen = 0;
headerOff = 0;
trailerOff = 0;
crc.reset();
Expand All @@ -201,8 +194,7 @@ public void setInput(byte[] b, int off, int len) {

deflater.setInput(b, off, len);
crc.update(b, off, len); // CRC-32 is on uncompressed data
currentBufLen = len;
accuBufLen += currentBufLen;
accuBufLen += len;
}

private int writeHeader(byte[] b, int off, int len) {
Expand Down Expand Up @@ -251,7 +243,6 @@ private int writeTrailer(byte[] b, int off, int len) {

if (trailerOff == gzipTrailerLen) {
state = BuiltInGzipDecompressor.GzipStateLabel.FINISHED;
currentBufLen = 0;
headerOff = 0;
trailerOff = 0;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
*/
package org.apache.hadoop.io.compress;

import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
Expand Down Expand Up @@ -1051,4 +1052,45 @@ public void testCodecPoolAndGzipDecompressor() {
}
}
}

@Test(timeout=20000)
public void testGzipCompressorWithEmptyInput() throws IOException {
Comment on lines +1056 to +1057
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In current trunk, this test will cause:

org.junit.runners.model.TestTimedOutException: test timed out after 20000 milliseconds         
        at org.apache.hadoop.io.compress.TestCodec.testGzipCompressorWithEmptyInput(TestCodec.java:1076)                                                                                       

// don't use native libs
ZlibFactory.setNativeZlibLoaded(false);
Configuration conf = new Configuration();
CompressionCodec codec = ReflectionUtils.newInstance(GzipCodec.class, conf);

Compressor compressor = codec.createCompressor();
assertThat(compressor).withFailMessage("should be BuiltInGzipCompressor")
.isInstanceOf(BuiltInGzipCompressor.class);

byte[] b = new byte[0];
compressor.setInput(b, 0, b.length);
compressor.finish();

byte[] output = new byte[100];
int outputOff = 0;

while (!compressor.finished()) {
byte[] buf = new byte[100];
int compressed = compressor.compress(buf, 0, buf.length);
System.arraycopy(buf, 0, output, outputOff, compressed);
outputOff += compressed;
}

DataInputBuffer gzbuf = new DataInputBuffer();
gzbuf.reset(output, outputOff);

Decompressor decom = codec.createDecompressor();
assertThat(decom).as("decompressor should not be null").isNotNull();
assertThat(decom).withFailMessage("should be BuiltInGzipDecompressor")
.isInstanceOf(BuiltInGzipDecompressor.class);
try (InputStream gzin = codec.createInputStream(gzbuf, decom);
DataOutputBuffer dflbuf = new DataOutputBuffer()) {
dflbuf.reset();
IOUtils.copyBytes(gzin, dflbuf, 4096);
final byte[] dflchk = Arrays.copyOf(dflbuf.getData(), dflbuf.getLength());
assertThat(b).as("check decompressed output").isEqualTo(dflchk);
}
}
}