Skip to content

Commit 0910997

Browse files
authored
HDDS-12636. Reduce code duplication for tarball creation (apache#8121)
1 parent cbafa02 commit 0910997

7 files changed

Lines changed: 188 additions & 247 deletions

File tree

hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsUtils.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -632,7 +632,7 @@ public static void validatePath(Path path, Path ancestor) {
632632
"Ancestor should not be null");
633633
Preconditions.checkArgument(
634634
path.normalize().startsWith(ancestor.normalize()),
635-
"Path should be a descendant of %s", ancestor);
635+
"Path %s should be a descendant of %s", path, ancestor);
636636
}
637637

638638
public static File createDir(String dirPath) {

hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/TarContainerPacker.java

Lines changed: 6 additions & 95 deletions
Original file line numberDiff line numberDiff line change
@@ -17,13 +17,15 @@
1717

1818
package org.apache.hadoop.ozone.container.keyvalue;
1919

20-
import static java.util.stream.Collectors.toList;
2120
import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.CONTAINER_ALREADY_EXISTS;
21+
import static org.apache.hadoop.hdds.utils.Archiver.extractEntry;
22+
import static org.apache.hadoop.hdds.utils.Archiver.includeFile;
23+
import static org.apache.hadoop.hdds.utils.Archiver.includePath;
24+
import static org.apache.hadoop.hdds.utils.Archiver.readEntry;
25+
import static org.apache.hadoop.hdds.utils.Archiver.tar;
26+
import static org.apache.hadoop.hdds.utils.Archiver.untar;
2227
import static org.apache.hadoop.ozone.OzoneConsts.SCHEMA_V3;
2328

24-
import com.google.common.annotations.VisibleForTesting;
25-
import java.io.BufferedOutputStream;
26-
import java.io.ByteArrayOutputStream;
2729
import java.io.File;
2830
import java.io.IOException;
2931
import java.io.InputStream;
@@ -32,16 +34,11 @@
3234
import java.nio.file.Path;
3335
import java.nio.file.Paths;
3436
import java.nio.file.StandardCopyOption;
35-
import java.util.stream.Stream;
3637
import org.apache.commons.compress.archivers.ArchiveEntry;
3738
import org.apache.commons.compress.archivers.ArchiveInputStream;
3839
import org.apache.commons.compress.archivers.ArchiveOutputStream;
3940
import org.apache.commons.compress.archivers.tar.TarArchiveEntry;
40-
import org.apache.commons.compress.archivers.tar.TarArchiveInputStream;
41-
import org.apache.commons.compress.archivers.tar.TarArchiveOutputStream;
4241
import org.apache.commons.io.FileUtils;
43-
import org.apache.commons.io.IOUtils;
44-
import org.apache.hadoop.hdds.HddsUtils;
4542
import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
4643
import org.apache.hadoop.ozone.OzoneConsts;
4744
import org.apache.hadoop.ozone.container.common.interfaces.Container;
@@ -108,37 +105,6 @@ public byte[] unpackContainerData(Container<KeyValueContainerData> container,
108105
return descriptorFileContent;
109106
}
110107

111-
private void extractEntry(ArchiveEntry entry, InputStream input, long size,
112-
Path ancestor, Path path) throws IOException {
113-
HddsUtils.validatePath(path, ancestor);
114-
115-
if (entry.isDirectory()) {
116-
Files.createDirectories(path);
117-
} else {
118-
Path parent = path.getParent();
119-
if (parent != null) {
120-
Files.createDirectories(parent);
121-
}
122-
123-
try (OutputStream fileOutput = Files.newOutputStream(path);
124-
OutputStream output = new BufferedOutputStream(fileOutput)) {
125-
int bufferSize = 1024;
126-
byte[] buffer = new byte[bufferSize + 1];
127-
long remaining = size;
128-
while (remaining > 0) {
129-
int len = (int) Math.min(remaining, bufferSize);
130-
int read = input.read(buffer, 0, len);
131-
if (read >= 0) {
132-
remaining -= read;
133-
output.write(buffer, 0, read);
134-
} else {
135-
remaining = 0;
136-
}
137-
}
138-
}
139-
}
140-
}
141-
142108
/**
143109
* Given a containerData include all the required container data/metadata
144110
* in a tar file.
@@ -218,65 +184,10 @@ public static Path getChunkPath(Path baseDir,
218184
return KeyValueContainerLocationUtil.getChunksLocationPath(baseDir.toString()).toPath();
219185
}
220186

221-
private byte[] readEntry(InputStream input, final long size)
222-
throws IOException {
223-
ByteArrayOutputStream output = new ByteArrayOutputStream();
224-
int bufferSize = 1024;
225-
byte[] buffer = new byte[bufferSize + 1];
226-
long remaining = size;
227-
while (remaining > 0) {
228-
int len = (int) Math.min(remaining, bufferSize);
229-
int read = input.read(buffer, 0, len);
230-
remaining -= read;
231-
output.write(buffer, 0, read);
232-
}
233-
return output.toByteArray();
234-
}
235-
236-
private void includePath(Path dir, String subdir,
237-
ArchiveOutputStream<TarArchiveEntry> archiveOutput) throws IOException {
238-
239-
// Add a directory entry before adding files, in case the directory is
240-
// empty.
241-
TarArchiveEntry entry = archiveOutput.createArchiveEntry(dir.toFile(), subdir);
242-
archiveOutput.putArchiveEntry(entry);
243-
archiveOutput.closeArchiveEntry();
244-
245-
// Add files in the directory.
246-
try (Stream<Path> dirEntries = Files.list(dir)) {
247-
for (Path path : dirEntries.collect(toList())) {
248-
String entryName = subdir + "/" + path.getFileName();
249-
includeFile(path.toFile(), entryName, archiveOutput);
250-
}
251-
}
252-
}
253-
254-
static void includeFile(File file, String entryName,
255-
ArchiveOutputStream<TarArchiveEntry> archiveOutput) throws IOException {
256-
TarArchiveEntry entry = archiveOutput.createArchiveEntry(file, entryName);
257-
archiveOutput.putArchiveEntry(entry);
258-
try (InputStream input = Files.newInputStream(file.toPath())) {
259-
IOUtils.copy(input, archiveOutput);
260-
}
261-
archiveOutput.closeArchiveEntry();
262-
}
263-
264-
private static ArchiveInputStream<TarArchiveEntry> untar(InputStream input) {
265-
return new TarArchiveInputStream(input);
266-
}
267-
268-
private static ArchiveOutputStream<TarArchiveEntry> tar(OutputStream output) {
269-
TarArchiveOutputStream os = new TarArchiveOutputStream(output);
270-
os.setBigNumberMode(TarArchiveOutputStream.BIGNUMBER_POSIX);
271-
return os;
272-
}
273-
274-
@VisibleForTesting
275187
InputStream decompress(InputStream input) throws IOException {
276188
return compression.wrap(input);
277189
}
278190

279-
@VisibleForTesting
280191
OutputStream compress(OutputStream output) throws IOException {
281192
return compression.wrap(output);
282193
}

hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestTarContainerPacker.java

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@
4848
import org.apache.commons.io.FileUtils;
4949
import org.apache.commons.io.IOUtils;
5050
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
51+
import org.apache.hadoop.hdds.utils.Archiver;
5152
import org.apache.hadoop.ozone.container.common.impl.ContainerLayoutVersion;
5253
import org.apache.hadoop.ozone.container.replication.CopyContainerCompression;
5354
import org.apache.ozone.test.SpyInputStream;
@@ -391,12 +392,10 @@ private File writeSingleFile(Path parentPath, String fileName,
391392
private File packContainerWithSingleFile(File file, String entryName)
392393
throws Exception {
393394
File targetFile = TEMP_DIR.resolve("container.tar").toFile();
394-
try (OutputStream output = newOutputStream(targetFile.toPath());
395-
OutputStream compressed = packer.compress(output);
396-
TarArchiveOutputStream archive =
397-
new TarArchiveOutputStream(compressed)) {
395+
Path path = targetFile.toPath();
396+
try (TarArchiveOutputStream archive = new TarArchiveOutputStream(packer.compress(newOutputStream(path)))) {
398397
archive.setBigNumberMode(TarArchiveOutputStream.BIGNUMBER_POSIX);
399-
TarContainerPacker.includeFile(file, entryName, archive);
398+
Archiver.includeFile(file, entryName, archive);
400399
}
401400
return targetFile;
402401
}
Lines changed: 160 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,160 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.hadoop.hdds.utils;
19+
20+
import static java.util.stream.Collectors.toList;
21+
22+
import java.io.BufferedOutputStream;
23+
import java.io.ByteArrayOutputStream;
24+
import java.io.File;
25+
import java.io.IOException;
26+
import java.io.InputStream;
27+
import java.io.OutputStream;
28+
import java.nio.file.Files;
29+
import java.nio.file.Path;
30+
import java.nio.file.Paths;
31+
import java.util.stream.Stream;
32+
import org.apache.commons.compress.archivers.ArchiveEntry;
33+
import org.apache.commons.compress.archivers.ArchiveInputStream;
34+
import org.apache.commons.compress.archivers.ArchiveOutputStream;
35+
import org.apache.commons.compress.archivers.tar.TarArchiveEntry;
36+
import org.apache.commons.compress.archivers.tar.TarArchiveInputStream;
37+
import org.apache.commons.compress.archivers.tar.TarArchiveOutputStream;
38+
import org.apache.commons.io.IOUtils;
39+
import org.apache.hadoop.hdds.HddsUtils;
40+
41+
/** Create and extract archives. */
42+
public final class Archiver {
43+
44+
private Archiver() {
45+
// no instances (for now)
46+
}
47+
48+
/** Create tarball including contents of {@code from}. */
49+
public static void create(File tarFile, Path from) throws IOException {
50+
try (ArchiveOutputStream<TarArchiveEntry> out = tar(Files.newOutputStream(tarFile.toPath()))) {
51+
includePath(from, "", out);
52+
}
53+
}
54+
55+
/** Extract {@code tarFile} to {@code dir}. */
56+
public static void extract(File tarFile, Path dir) throws IOException {
57+
Files.createDirectories(dir);
58+
String parent = dir.toString();
59+
try (ArchiveInputStream<TarArchiveEntry> in = untar(Files.newInputStream(tarFile.toPath()))) {
60+
TarArchiveEntry entry;
61+
while ((entry = in.getNextEntry()) != null) {
62+
Path path = Paths.get(parent, entry.getName());
63+
extractEntry(entry, in, entry.getSize(), dir, path);
64+
}
65+
}
66+
}
67+
68+
public static byte[] readEntry(InputStream input, final long size)
69+
throws IOException {
70+
ByteArrayOutputStream output = new ByteArrayOutputStream();
71+
int bufferSize = 1024;
72+
byte[] buffer = new byte[bufferSize + 1];
73+
long remaining = size;
74+
while (remaining > 0) {
75+
int len = (int) Math.min(remaining, bufferSize);
76+
int read = input.read(buffer, 0, len);
77+
remaining -= read;
78+
output.write(buffer, 0, read);
79+
}
80+
return output.toByteArray();
81+
}
82+
83+
public static void includePath(Path dir, String subdir,
84+
ArchiveOutputStream<TarArchiveEntry> archiveOutput) throws IOException {
85+
86+
// Add a directory entry before adding files, in case the directory is
87+
// empty.
88+
TarArchiveEntry entry = archiveOutput.createArchiveEntry(dir.toFile(), subdir);
89+
archiveOutput.putArchiveEntry(entry);
90+
archiveOutput.closeArchiveEntry();
91+
92+
// Add files in the directory.
93+
try (Stream<Path> dirEntries = Files.list(dir)) {
94+
for (Path path : dirEntries.collect(toList())) {
95+
File file = path.toFile();
96+
String entryName = subdir + "/" + path.getFileName();
97+
if (file.isDirectory()) {
98+
includePath(path, entryName, archiveOutput);
99+
} else {
100+
includeFile(file, entryName, archiveOutput);
101+
}
102+
}
103+
}
104+
}
105+
106+
public static long includeFile(File file, String entryName,
107+
ArchiveOutputStream<TarArchiveEntry> archiveOutput) throws IOException {
108+
final long bytes;
109+
TarArchiveEntry entry = archiveOutput.createArchiveEntry(file, entryName);
110+
archiveOutput.putArchiveEntry(entry);
111+
try (InputStream input = Files.newInputStream(file.toPath())) {
112+
bytes = IOUtils.copyLarge(input, archiveOutput);
113+
}
114+
archiveOutput.closeArchiveEntry();
115+
return bytes;
116+
}
117+
118+
public static void extractEntry(ArchiveEntry entry, InputStream input, long size,
119+
Path ancestor, Path path) throws IOException {
120+
HddsUtils.validatePath(path, ancestor);
121+
122+
if (entry.isDirectory()) {
123+
Files.createDirectories(path);
124+
} else {
125+
Path parent = path.getParent();
126+
if (parent != null) {
127+
Files.createDirectories(parent);
128+
}
129+
130+
try (OutputStream fileOutput = Files.newOutputStream(path);
131+
OutputStream output = new BufferedOutputStream(fileOutput)) {
132+
int bufferSize = 1024;
133+
byte[] buffer = new byte[bufferSize + 1];
134+
long remaining = size;
135+
while (remaining > 0) {
136+
int len = (int) Math.min(remaining, bufferSize);
137+
int read = input.read(buffer, 0, len);
138+
if (read >= 0) {
139+
remaining -= read;
140+
output.write(buffer, 0, read);
141+
} else {
142+
remaining = 0;
143+
}
144+
}
145+
}
146+
}
147+
}
148+
149+
public static ArchiveInputStream<TarArchiveEntry> untar(InputStream input) {
150+
return new TarArchiveInputStream(input);
151+
}
152+
153+
public static ArchiveOutputStream<TarArchiveEntry> tar(OutputStream output) {
154+
TarArchiveOutputStream os = new TarArchiveOutputStream(output);
155+
os.setBigNumberMode(TarArchiveOutputStream.BIGNUMBER_POSIX);
156+
os.setLongFileMode(TarArchiveOutputStream.LONGFILE_POSIX);
157+
return os;
158+
}
159+
160+
}

0 commit comments

Comments
 (0)