Skip to content

Commit 7ffc783

Browse files
committed
TEZ-4295: Could not decompress data. Buffer length is too small.
1 parent 0af54df commit 7ffc783

3 files changed

Lines changed: 189 additions & 12 deletions

File tree

tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/IFile.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -363,7 +363,7 @@ public Writer(Serialization keySerialization, Serialization valSerialization, FS
363363
void setupOutputStream(CompressionCodec codec) throws IOException {
364364
this.checksumOut = new IFileOutputStream(this.rawOut);
365365
if (codec != null) {
366-
this.compressor = CodecPool.getCompressor(codec);
366+
this.compressor = CodecUtils.getCompressor(codec);
367367
if (this.compressor != null) {
368368
this.compressor.reset();
369369
this.compressedOut = codec.createOutputStream(checksumOut, compressor);
@@ -773,7 +773,7 @@ public Reader(InputStream in, long length,
773773
checksumIn = new IFileInputStream(in, length, readAhead,
774774
readAheadLength/* , isCompressed */);
775775
if (isCompressed && codec != null) {
776-
decompressor = CodecPool.getDecompressor(codec);
776+
decompressor = CodecUtils.getDecompressor(codec);
777777
if (decompressor != null) {
778778
this.in = codec.createInputStream(checksumIn, decompressor);
779779
} else {
@@ -818,7 +818,7 @@ public static void readToMemory(byte[] buffer, InputStream in, int compressedLen
818818
in = checksumIn;
819819
Decompressor decompressor = null;
820820
if (isCompressed && codec != null) {
821-
decompressor = CodecPool.getDecompressor(codec);
821+
decompressor = CodecUtils.getDecompressor(codec);
822822
if (decompressor != null) {
823823
decompressor.reset();
824824
in = CodecUtils.getDecompressedInputStreamWithBufferSize(codec, checksumIn, decompressor,

tez-runtime-library/src/main/java/org/apache/tez/runtime/library/utils/CodecUtils.java

Lines changed: 18 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323

2424
import org.apache.hadoop.conf.Configurable;
2525
import org.apache.hadoop.conf.Configuration;
26+
import org.apache.hadoop.io.compress.CodecPool;
2627
import org.apache.hadoop.io.compress.CompressionCodec;
2728
import org.apache.hadoop.io.compress.CompressionInputStream;
2829
import org.apache.hadoop.io.compress.Compressor;
@@ -40,7 +41,7 @@
4041
public final class CodecUtils {
4142

4243
private static final Logger LOG = LoggerFactory.getLogger(IFile.class);
43-
private static final int DEFAULT_BUFFER_SIZE = 128 * 1024;
44+
private static final int DEFAULT_BUFFER_SIZE = 256 * 1024;
4445

4546
private CodecUtils() {
4647
}
@@ -77,19 +78,19 @@ public static InputStream getDecompressedInputStreamWithBufferSize(CompressionCo
7778
IFileInputStream checksumIn, Decompressor decompressor, int compressedLength)
7879
throws IOException {
7980
String bufferSizeProp = TezRuntimeUtils.getBufferSizeProperty(codec);
80-
Configurable configurableCodec = (Configurable) codec;
81-
int originalSize = bufferSizeProp == null ? DEFAULT_BUFFER_SIZE :
82-
configurableCodec.getConf().getInt(bufferSizeProp, DEFAULT_BUFFER_SIZE);
83-
8481
CompressionInputStream in = null;
8582

8683
if (bufferSizeProp != null) {
84+
Configurable configurableCodec = (Configurable) codec;
8785
Configuration conf = configurableCodec.getConf();
88-
int newBufSize = Math.min(compressedLength, DEFAULT_BUFFER_SIZE);
89-
LOG.trace("buffer size was set according to min(compressedLength, {}): {}={}",
90-
DEFAULT_BUFFER_SIZE, bufferSizeProp, newBufSize);
9186

92-
synchronized (codec) {
87+
synchronized (conf) {
88+
int originalSize = configurableCodec.getConf().getInt(bufferSizeProp, DEFAULT_BUFFER_SIZE);
89+
90+
int newBufSize = Math.min(compressedLength, DEFAULT_BUFFER_SIZE);
91+
LOG.trace("buffer size was set according to min(compressedLength, {}): {}={}",
92+
DEFAULT_BUFFER_SIZE, bufferSizeProp, newBufSize);
93+
9394
conf.setInt(bufferSizeProp, newBufSize);
9495

9596
in = codec.createInputStream(checksumIn, decompressor);
@@ -125,4 +126,12 @@ public static InputStream getDecompressedInputStreamWithBufferSize(CompressionCo
125126

126127
return in;
127128
}
129+
130+
public static Compressor getCompressor(CompressionCodec codec) {
131+
return CodecPool.getCompressor(codec);
132+
}
133+
134+
public static Decompressor getDecompressor(CompressionCodec codec) {
135+
return CodecPool.getDecompressor(codec);
136+
}
128137
}
Lines changed: 168 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,168 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.tez.runtime.library.utils;
19+
20+
import java.io.IOException;
21+
import java.util.concurrent.CountDownLatch;
22+
import java.util.concurrent.ExecutorService;
23+
import java.util.concurrent.Executors;
24+
import java.util.concurrent.Future;
25+
26+
import org.apache.hadoop.conf.Configuration;
27+
import org.apache.hadoop.fs.CommonConfigurationKeys;
28+
import org.apache.hadoop.io.compress.CodecPool;
29+
import org.apache.hadoop.io.compress.CompressionCodec;
30+
import org.apache.hadoop.io.compress.Compressor;
31+
import org.apache.hadoop.io.compress.Decompressor;
32+
import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
33+
import org.apache.tez.runtime.library.common.sort.impl.IFileInputStream;
34+
import org.junit.Assert;
35+
import org.junit.Test;
36+
import org.mockito.Mockito;
37+
38+
public class TestCodecUtils {
39+
40+
@Test
41+
public void testConcurrentDecompressorCreationWithModifiedBuffersize() throws Exception {
42+
int modifiedBufferSize = 1000;
43+
int numberOfThreads = 1000;
44+
45+
ExecutorService service = Executors.newFixedThreadPool(numberOfThreads);
46+
47+
Configuration conf = new Configuration();
48+
conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_COMPRESS, true);
49+
CompressionCodec codec = CodecUtils.getCodec(conf);
50+
51+
Future<?>[] futures = new Future[numberOfThreads];
52+
final CountDownLatch latch = new CountDownLatch(1);
53+
54+
for (int i = 0; i < numberOfThreads; i++) {
55+
futures[i] = service.submit(() -> {
56+
try {
57+
waitForLatch(latch);
58+
59+
Assert.assertEquals(Integer.toString(CommonConfigurationKeys.IO_FILE_BUFFER_SIZE_DEFAULT),
60+
conf.get(CommonConfigurationKeys.IO_FILE_BUFFER_SIZE_KEY));
61+
62+
Decompressor decompressor = CodecUtils.getDecompressor(codec);
63+
CodecUtils.getDecompressedInputStreamWithBufferSize(codec,
64+
Mockito.mock(IFileInputStream.class), decompressor, modifiedBufferSize);
65+
66+
Assert.assertEquals(Integer.toString(CommonConfigurationKeys.IO_FILE_BUFFER_SIZE_DEFAULT),
67+
conf.get(CommonConfigurationKeys.IO_FILE_BUFFER_SIZE_KEY));
68+
69+
CodecPool.returnDecompressor(decompressor);
70+
} catch (IOException e) {
71+
throw new RuntimeException(e);
72+
}
73+
});
74+
}
75+
latch.countDown();
76+
77+
for (Future<?> f : futures) {
78+
f.get();
79+
}
80+
}
81+
82+
@Test
83+
public void testConcurrentCompressorDecompressorCreation() throws Exception {
84+
int modifiedBufferSize = 1000;
85+
int numberOfThreads = 1000;
86+
87+
ExecutorService service = Executors.newFixedThreadPool(numberOfThreads);
88+
89+
Configuration conf = new Configuration();
90+
conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_COMPRESS, true);
91+
CompressionCodec codec = CodecUtils.getCodec(conf);
92+
93+
Future<?>[] futures = new Future[numberOfThreads];
94+
final CountDownLatch latch = new CountDownLatch(1);
95+
96+
for (int i = 0; i < numberOfThreads; i++) {
97+
// let's "randomly" choose from scenarios and test them concurrently
98+
// 1. getDecompressedInputStreamWithBufferSize
99+
if (i % 3 == 0) {
100+
futures[i] = service.submit(() -> {
101+
try {
102+
waitForLatch(latch);
103+
104+
Assert.assertEquals(
105+
Integer.toString(CommonConfigurationKeys.IO_FILE_BUFFER_SIZE_DEFAULT),
106+
conf.get(CommonConfigurationKeys.IO_FILE_BUFFER_SIZE_KEY));
107+
108+
Decompressor decompressor = CodecUtils.getDecompressor(codec);
109+
CodecUtils.getDecompressedInputStreamWithBufferSize(codec,
110+
Mockito.mock(IFileInputStream.class), decompressor, modifiedBufferSize);
111+
112+
Assert.assertEquals(
113+
Integer.toString(CommonConfigurationKeys.IO_FILE_BUFFER_SIZE_DEFAULT),
114+
conf.get(CommonConfigurationKeys.IO_FILE_BUFFER_SIZE_KEY));
115+
116+
CodecPool.returnDecompressor(decompressor);
117+
} catch (IOException e) {
118+
throw new RuntimeException(e);
119+
}
120+
});
121+
// 2. getCompressor
122+
} else if (i % 3 == 1) {
123+
futures[i] = service.submit(() -> {
124+
waitForLatch(latch);
125+
126+
Assert.assertEquals(Integer.toString(CommonConfigurationKeys.IO_FILE_BUFFER_SIZE_DEFAULT),
127+
conf.get(CommonConfigurationKeys.IO_FILE_BUFFER_SIZE_KEY));
128+
129+
Compressor compressor = CodecUtils.getCompressor(codec);
130+
131+
Assert.assertEquals(Integer.toString(CommonConfigurationKeys.IO_FILE_BUFFER_SIZE_DEFAULT),
132+
conf.get(CommonConfigurationKeys.IO_FILE_BUFFER_SIZE_KEY));
133+
134+
CodecPool.returnCompressor(compressor);
135+
136+
});
137+
// 3. getDecompressor
138+
} else if (i % 3 == 2) {
139+
futures[i] = service.submit(() -> {
140+
waitForLatch(latch);
141+
142+
Assert.assertEquals(Integer.toString(CommonConfigurationKeys.IO_FILE_BUFFER_SIZE_DEFAULT),
143+
conf.get(CommonConfigurationKeys.IO_FILE_BUFFER_SIZE_KEY));
144+
145+
Decompressor decompressor = CodecUtils.getDecompressor(codec);
146+
147+
Assert.assertEquals(Integer.toString(CommonConfigurationKeys.IO_FILE_BUFFER_SIZE_DEFAULT),
148+
conf.get(CommonConfigurationKeys.IO_FILE_BUFFER_SIZE_KEY));
149+
150+
CodecPool.returnDecompressor(decompressor);
151+
});
152+
}
153+
}
154+
latch.countDown();
155+
156+
for (Future<?> f : futures) {
157+
f.get();
158+
}
159+
}
160+
161+
private void waitForLatch(CountDownLatch latch) {
162+
try {
163+
latch.await();
164+
} catch (InterruptedException e) {
165+
throw new RuntimeException(e);
166+
}
167+
}
168+
}

0 commit comments

Comments
 (0)