Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.zip.GZIPOutputStream;

import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
Expand All @@ -41,101 +40,10 @@
@InterfaceAudience.Public
@InterfaceStability.Evolving
public class GzipCodec extends DefaultCodec {
/**
* A bridge that wraps around a DeflaterOutputStream to make it
* a CompressionOutputStream.
*/
@InterfaceStability.Evolving
protected static class GzipOutputStream extends CompressorStream {

private static class ResetableGZIPOutputStream extends GZIPOutputStream {
/**
* Fixed ten-byte gzip header. See {@link GZIPOutputStream}'s source for
* details.
*/
private static final byte[] GZIP_HEADER = new byte[] {
0x1f, (byte) 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00 };

private boolean reset = false;

public ResetableGZIPOutputStream(OutputStream out) throws IOException {
super(out);
}

public synchronized void resetState() throws IOException {
reset = true;
}

@Override
public synchronized void write(byte[] buf, int off, int len)
throws IOException {
if (reset) {
def.reset();
crc.reset();
out.write(GZIP_HEADER);
reset = false;
}
super.write(buf, off, len);
}

@Override
public synchronized void close() throws IOException {
reset = false;
super.close();
}

}

public GzipOutputStream(OutputStream out) throws IOException {
super(new ResetableGZIPOutputStream(out));
}

/**
* Allow children types to put a different type in here.
* @param out the Deflater stream to use
*/
protected GzipOutputStream(CompressorStream out) {
super(out);
}

@Override
public void close() throws IOException {
out.close();
}

@Override
public void flush() throws IOException {
out.flush();
}

@Override
public void write(int b) throws IOException {
out.write(b);
}

@Override
public void write(byte[] data, int offset, int length)
throws IOException {
out.write(data, offset, length);
}

@Override
public void finish() throws IOException {
((ResetableGZIPOutputStream) out).finish();
}

@Override
public void resetState() throws IOException {
((ResetableGZIPOutputStream) out).resetState();
}
}

@Override
public CompressionOutputStream createOutputStream(OutputStream out)
throws IOException {
if (!ZlibFactory.isNativeZlibLoaded(conf)) {
return new GzipOutputStream(out);
}
return CompressionCodec.Util.
createOutputStreamWithCodecPool(this, conf, out);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,10 +86,6 @@ public int compress(byte[] b, int off, int len) throws IOException {

int compressedBytesWritten = 0;

if (currentBufLen <= 0) {
return compressedBytesWritten;
}

Comment on lines -89 to -92
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, found this bug. If we set an empty input to the compress stream, it will cause endless loop.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we add a test case for this?

Copy link
Member Author

@viirya viirya Sep 3, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Without removing the condition, current test will timeout after removing GzipOutputStream.

Copy link
Member Author

@viirya viirya Sep 3, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we still need another test for it? We currently have test coverage for it.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Which test is failing before removing the condition? so it passes with the wrapper class but fails after?

Also, we can remove the currentBufLen variable now since it is no longer used anywhere else.

Copy link
Member Author

@viirya viirya Sep 3, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

testGzipCodec will cause timeout after removing GzipOutputStream.

Because of this line:

codecTest(conf, seed, 0, "org.apache.hadoop.io.compress.GzipCodec");

It writes an empty input to the compress stream. Due to this currentBufLen check, compress will return 0 endlessly.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's still good to have a dedicated test for this edge case. We can use @Test(timeout=<value>) to check the timeout.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i see. let me add one then.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks!

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added one test for empty input case.

// If we are not within uncompressed data yet, output the header.
if (state == BuiltInGzipDecompressor.GzipStateLabel.HEADER_BASIC) {
int outputHeaderSize = writeHeader(b, off, len);
Expand Down