Skip to content

Commit eff292b

Browse files
authored
HADOOP-18383. Codecs with @DoNotPool annotation are not closed causing memory leak (#4739)
1 parent 9776361 commit eff292b

4 files changed

Lines changed: 85 additions & 1 deletion

File tree

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.hadoop.io.compress;
20+
21+
import java.io.IOException;
22+
23+
/**
24+
* An exception class for when a closed compressor/decopressor is being used.
25+
* {@link org.apache.hadoop.io.compress.Compressor}
26+
* {@link org.apache.hadoop.io.compress.Decompressor}
27+
*/
28+
public class AlreadyClosedException extends IOException {
29+
30+
public AlreadyClosedException(String message) {
31+
super(message);
32+
}
33+
}

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/CodecPool.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -205,6 +205,7 @@ public static void returnCompressor(Compressor compressor) {
205205
}
206206
// if the compressor can't be reused, don't pool it.
207207
if (compressor.getClass().isAnnotationPresent(DoNotPool.class)) {
208+
compressor.end();
208209
return;
209210
}
210211
compressor.reset();
@@ -225,6 +226,7 @@ public static void returnDecompressor(Decompressor decompressor) {
225226
}
226227
// if the decompressor can't be reused, don't pool it.
227228
if (decompressor.getClass().isAnnotationPresent(DoNotPool.class)) {
229+
decompressor.end();
228230
return;
229231
}
230232
decompressor.reset();

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/zlib/BuiltInGzipDecompressor.java

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import java.util.zip.DataFormatException;
2424
import java.util.zip.Inflater;
2525

26+
import org.apache.hadoop.io.compress.AlreadyClosedException;
2627
import org.apache.hadoop.io.compress.Decompressor;
2728
import org.apache.hadoop.io.compress.DoNotPool;
2829
import org.apache.hadoop.util.DataChecksum;
@@ -105,7 +106,11 @@ private enum GzipStateLabel {
105106
* Immediately after the trailer (and potentially prior to the next gzip
106107
* member/substream header), without reset() having been called.
107108
*/
108-
FINISHED;
109+
FINISHED,
110+
/**
111+
* Immediately after end() has been called.
112+
*/
113+
ENDED;
109114
}
110115

111116
/**
@@ -182,6 +187,10 @@ public synchronized int decompress(byte[] b, int off, int len)
182187
throws IOException {
183188
int numAvailBytes = 0;
184189

190+
if (state == GzipStateLabel.ENDED) {
191+
throw new AlreadyClosedException("decompress called on closed decompressor");
192+
}
193+
185194
if (state != GzipStateLabel.DEFLATE_STREAM) {
186195
executeHeaderState();
187196

@@ -472,6 +481,8 @@ public synchronized void reset() {
472481
@Override
473482
public synchronized void end() {
474483
inflater.end();
484+
485+
state = GzipStateLabel.ENDED;
475486
}
476487

477488
/**

hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/TestCodecPool.java

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,13 +19,19 @@
1919

2020
import static org.junit.Assert.assertEquals;
2121

22+
import java.io.ByteArrayInputStream;
23+
import java.io.ByteArrayOutputStream;
24+
import java.io.OutputStream;
25+
import java.util.Random;
2226
import java.util.concurrent.Callable;
2327
import java.util.concurrent.ExecutorService;
2428
import java.util.concurrent.Executors;
2529
import java.util.concurrent.LinkedBlockingDeque;
2630
import java.util.concurrent.TimeUnit;
2731

2832
import org.apache.hadoop.conf.Configuration;
33+
import org.apache.hadoop.io.compress.zlib.BuiltInGzipDecompressor;
34+
import org.apache.hadoop.test.LambdaTestUtils;
2935
import org.junit.Before;
3036
import org.junit.Test;
3137

@@ -189,4 +195,36 @@ public void testDecompressorNotReturnSameInstance() {
189195
CodecPool.returnDecompressor(decompressor);
190196
}
191197
}
198+
199+
@Test(timeout = 10000)
200+
public void testDoNotPoolDecompressorNotUseableAfterReturn() throws Exception {
201+
202+
final GzipCodec gzipCodec = new GzipCodec();
203+
gzipCodec.setConf(new Configuration());
204+
205+
final Random random = new Random();
206+
final byte[] bytes = new byte[1024];
207+
random.nextBytes(bytes);
208+
209+
ByteArrayOutputStream baos = new ByteArrayOutputStream();
210+
try (OutputStream outputStream = gzipCodec.createOutputStream(baos)) {
211+
outputStream.write(bytes);
212+
}
213+
214+
final byte[] gzipBytes = baos.toByteArray();
215+
final ByteArrayInputStream bais = new ByteArrayInputStream(gzipBytes);
216+
217+
// BuiltInGzipDecompressor is an explicit example of a Decompressor
218+
// with the @DoNotPool annotation
219+
final Decompressor decompressor = new BuiltInGzipDecompressor();
220+
CodecPool.returnDecompressor(decompressor);
221+
222+
final CompressionInputStream inputStream = gzipCodec.createInputStream(bais, decompressor);
223+
LambdaTestUtils.intercept(
224+
AlreadyClosedException.class,
225+
"decompress called on closed decompressor",
226+
"Decompressor from Codec with @DoNotPool should not be " +
227+
"useable after returning to CodecPool",
228+
() -> inputStream.read());
229+
}
192230
}

0 commit comments

Comments
 (0)