Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.io.OutputStream;
import java.nio.charset.StandardCharsets;

import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;

Expand Down Expand Up @@ -255,10 +256,7 @@ public BZip2CompressionOutputStream(OutputStream out)

private void writeStreamHeader() throws IOException {
if (super.out != null) {
// The compressed bzip2 stream should start with the
// identifying characters BZ. Caller of CBZip2OutputStream
// i.e. this class must write these characters.
out.write(HEADER.getBytes(StandardCharsets.UTF_8));
writeHeader(out);
}
}

Expand Down Expand Up @@ -547,4 +545,11 @@ private void updatePos(boolean shouldAddOn) {

}// end of BZip2CompressionInputStream

@VisibleForTesting
public static void writeHeader(OutputStream out) throws IOException {
// The compressed bzip2 stream should start with the
// identifying characters BZ. Caller of CBZip2OutputStream
// i.e. this class must write these characters.
out.write(HEADER.getBytes(StandardCharsets.UTF_8));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.io.InputStream;
import java.io.IOException;

import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.io.compress.SplittableCompressionCodec.READ_MODE;


Expand Down Expand Up @@ -312,13 +313,24 @@ private CBZip2InputStream(final InputStream in, READ_MODE readMode, boolean skip
}
} else if (readMode == READ_MODE.BYBLOCK) {
this.currentState = STATE.NO_PROCESS_STATE;
skipResult = this.skipToNextMarker(CBZip2InputStream.BLOCK_DELIMITER,DELIMITER_BIT_LENGTH);
skipResult = skipToNextBlockMarker();
if(!skipDecompression){
changeStateToProcessABlock();
}
}
}

/**
* Skips bytes in the stream until the start marker of a block is reached
* or end of stream is reached. Used for testing purposes to identify the
* start offsets of blocks.
*/
@VisibleForTesting
boolean skipToNextBlockMarker() throws IOException {
return skipToNextMarker(
CBZip2InputStream.BLOCK_DELIMITER, DELIMITER_BIT_LENGTH);
}

/**
* Returns the number of bytes between the current stream position
* and the immediate next BZip2 block marker.
Expand Down Expand Up @@ -428,7 +440,7 @@ public int read(final byte[] dest, final int offs, final int len)
//report 'end of block' or 'end of stream'
result = b;

skipResult = this.skipToNextMarker(CBZip2InputStream.BLOCK_DELIMITER, DELIMITER_BIT_LENGTH);
skipResult = skipToNextBlockMarker();

changeStateToProcessABlock();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.io.OutputStream;
import java.io.IOException;

import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.io.IOUtils;

/**
Expand Down Expand Up @@ -781,8 +782,7 @@ private void initBlock() {
inUse[i] = false;
}

/* 20 is just a paranoia constant */
this.allowableBlockSize = (this.blockSize100k * BZip2Constants.baseBlockSize) - 20;
this.allowableBlockSize = getAllowableBlockSize(this.blockSize100k);
}

private void endBlock() throws IOException {
Expand Down Expand Up @@ -2093,4 +2093,9 @@ private static final class Data extends Object {

}

@VisibleForTesting
static int getAllowableBlockSize(int blockSize100k) {
/* 20 is just a paranoia constant */
return (blockSize100k * BZip2Constants.baseBlockSize) - 20;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package org.apache.hadoop.io.compress.bzip2;

import static org.apache.hadoop.io.compress.bzip2.CBZip2OutputStream.MIN_BLOCKSIZE;
import static org.apache.hadoop.util.Preconditions.checkArgument;

import java.io.Closeable;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.charset.StandardCharsets;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.compress.BZip2Codec;

/**
* A writer that simplifies creating BZip2 compressed text data for testing
* purposes.
*/
public final class BZip2TextFileWriter implements Closeable {

// Use minimum block size to reduce amount of data to require to be written
// to CBZip2OutputStream before a new block is created.
private static final int BLOCK_SIZE_100K = MIN_BLOCKSIZE;

/**
* The amount of bytes of run-length encoded data that needs to be written
* to this writer in order for the next byte written starts a new BZip2 block.
*/
public static final int BLOCK_SIZE =
// The + 1 is needed because of how CBZip2OutputStream checks whether the
// last offset written is less than allowable block size. Because the last
// offset is one less of the amount of bytes written to the block, we need
// to write an extra byte to trigger writing a new block.
CBZip2OutputStream.getAllowableBlockSize(BLOCK_SIZE_100K) + 1;

private final CBZip2OutputStream out;

public BZip2TextFileWriter(Path path, Configuration conf) throws IOException {
this(path.getFileSystem(conf).create(path));
}

public BZip2TextFileWriter(OutputStream rawOut) throws IOException {
try {
BZip2Codec.writeHeader(rawOut);
out = new CBZip2OutputStream(rawOut, BLOCK_SIZE_100K);
} catch (Throwable e) {
rawOut.close();
throw e;
}
}

public void writeManyRecords(int totalSize, int numRecords, byte[] delimiter)
throws IOException {
checkArgument(numRecords > 0);
checkArgument(delimiter.length > 0);

int minRecordSize = totalSize / numRecords;
checkArgument(minRecordSize >= delimiter.length);

int lastRecordExtraSize = totalSize % numRecords;

for (int i = 0; i < numRecords - 1; i++) {
writeRecord(minRecordSize, delimiter);
}
writeRecord(minRecordSize + lastRecordExtraSize, delimiter);
}

public void writeRecord(int totalSize, byte[] delimiter) throws IOException {
checkArgument(delimiter.length > 0);
checkArgument(totalSize >= delimiter.length);

int contentSize = totalSize - delimiter.length;
for (int i = 0; i < contentSize; i++) {
// Alternate between characters so that internals of CBZip2OutputStream
// cannot condensed the written bytes using run-length encoding. This
// allows the caller to use #BLOCK_SIZE in order to know whether the next
// write will end just before the end of the current block, or exceed it,
// and by how much.
out.write(i % 2 == 0 ? 'a' : 'b');
}
write(delimiter);
}

public void write(String bytes) throws IOException {
write(bytes.getBytes(StandardCharsets.UTF_8));
}

public void write(byte[] bytes) throws IOException {
out.write(bytes);
}

@Override
public void close() throws IOException {
out.close();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.io.compress.bzip2;

import static org.apache.hadoop.io.compress.SplittableCompressionCodec.READ_MODE.BYBLOCK;

import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.List;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;

public final class BZip2Utils {

private BZip2Utils() {
}

/**
* Returns the start offsets of blocks that follow the first block in the
* BZip2 compressed file at the given path. The first offset corresponds to
* the first byte containing the BZip2 block marker of the second block. The
* i-th offset corresponds to the block marker of the (i + 1)-th block.
*/
public static List<Long> getNextBlockMarkerOffsets(
Path path, Configuration conf) throws IOException {
FileSystem fs = path.getFileSystem(conf);
try (InputStream fileIn = fs.open(path)) {
return getNextBlockMarkerOffsets(fileIn);
}
}

/**
* Returns the start offsets of blocks that follow the first block in the
* BZip2 compressed input stream. The first offset corresponds to
* the first byte containing the BZip2 block marker of the second block. The
* i-th offset corresponds to the block marker of the (i + 1)-th block.
*/
public static List<Long> getNextBlockMarkerOffsets(InputStream rawIn)
throws IOException {
try (CBZip2InputStream in = new CBZip2InputStream(rawIn, BYBLOCK)) {
ArrayList<Long> offsets = new ArrayList<>();
while (in.skipToNextBlockMarker()) {
offsets.add(in.getProcessedByteCount());
}
return offsets;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.io.compress.bzip2;

import static org.apache.hadoop.io.compress.bzip2.BZip2TextFileWriter.BLOCK_SIZE;
import static org.junit.Assert.assertEquals;

import java.io.ByteArrayInputStream;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

bit late, but the imports are completely out of sync with the normal hadoop rules. check your ide settings.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that file puts statics at the bottom. at least it should. if it doesn't that's a bug

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@steveloughran - I will file a JIRA and fix the imports.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thx.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@steveloughran - Created PR to sync imports - #4694

Sorry for being little late. Was busy in some other stuff. Thanks.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no worries. if you aren't behind on lots of things then you aren't a full time software engineer....

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.List;

import org.junit.After;
import org.junit.Before;
import org.junit.Test;

public final class TestBZip2TextFileWriter {

private static final byte[] DELIMITER = new byte[] {'\0'};

private ByteArrayOutputStream rawOut;
private BZip2TextFileWriter writer;

@Before
public void setUp() throws Exception {
rawOut = new ByteArrayOutputStream();
writer = new BZip2TextFileWriter(rawOut);
}

@After
public void tearDown() throws Exception {
rawOut = null;
writer.close();
}

@Test
public void writingSingleBlockSizeOfData() throws Exception {
writer.writeRecord(BLOCK_SIZE, DELIMITER);
writer.close();

List<Long> nextBlocks = getNextBlockMarkerOffsets();
assertEquals(0, nextBlocks.size());
}

@Test
public void justExceedingBeyondBlockSize() throws Exception {
writer.writeRecord(BLOCK_SIZE + 1, DELIMITER);
writer.close();

List<Long> nextBlocks = getNextBlockMarkerOffsets();
assertEquals(1, nextBlocks.size());
}

@Test
public void writingTwoBlockSizesOfData() throws Exception {
writer.writeRecord(2 * BLOCK_SIZE, DELIMITER);
writer.close();

List<Long> nextBlocks = getNextBlockMarkerOffsets();
assertEquals(1, nextBlocks.size());
}

@Test
public void justExceedingBeyondTwoBlocks() throws Exception {
writer.writeRecord(2 * BLOCK_SIZE + 1, DELIMITER);
writer.close();

List<Long> nextBlocks = getNextBlockMarkerOffsets();
assertEquals(2, nextBlocks.size());
}

private List<Long> getNextBlockMarkerOffsets() throws IOException {
ByteArrayInputStream in = new ByteArrayInputStream(rawOut.toByteArray());
return BZip2Utils.getNextBlockMarkerOffsets(in);
}
}
Loading