Skip to content

Commit 857c37b

Browse files
committed
chore: BlockAccessors Hardening & Improved Error Handling
* BlockAccessors are now Autocloseable * BlockAccessors now create hard links for the duration of their lifespan * This allows us to ensure data is accessible for the duration of the accessor's life * Tests are updated and migrated * Deleted redundant configuration tests * Added tests: * Tests for subsequent reads to ensure that closing an accessor does not affect the data or the ability for a new accessor to access it Signed-off-by: Atanas Atanasov <[email protected]>
1 parent b7b995a commit 857c37b

File tree

27 files changed

+819
-609
lines changed

27 files changed

+819
-609
lines changed

block-node/app/build.gradle.kts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,9 @@ tasks.withType<JavaExec>().configureEach {
1515
modularity.inferModulePath = true
1616
val serverDataDir = layout.buildDirectory.get().dir("block-node-storage")
1717
environment("FILES_HISTORIC_ROOT_PATH", "${serverDataDir}/files-historic")
18+
environment("FILES_HISTORIC_LINKS_ROOT_PATH", "${serverDataDir}/files-historic-links")
1819
environment("FILES_RECENT_LIVE_ROOT_PATH", "${serverDataDir}/files-live")
20+
environment("FILES_RECENT_LINKS_ROOT_PATH", "${serverDataDir}/files-live-links")
1921
environment("FILES_RECENT_UNVERIFIED_ROOT_PATH", "${serverDataDir}/files-unverified")
2022
}
2123

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,9 @@
11
# SPDX-License-Identifier: Apache-2.0
2-
32
# Test Properties File
43
prometheus.endpointEnabled=false
54
mediator.ringBufferSize=1024
65
notifier.ringBufferSize=1024
76
files.recent.liveRootPath=build/tmp/data/liveRootPath
7+
files.recent.linksRootPath=build/tmp/data/liveLinks
88
files.historic.rootPath=build/tmp/data/historic
9+
files.historic.linksRootPath=build/tmp/data/historicLinks

block-node/app/src/testFixtures/java/org/hiero/block/node/app/fixtures/blocks/InMemoryBlockAccessor.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,4 +85,9 @@ public Bytes blockBytes(final Format format) {
8585
private Bytes zstdCompressBytes(final Bytes bytes) {
8686
return Bytes.wrap(Zstd.compress(bytes.toByteArray()));
8787
}
88+
89+
@Override
90+
public void close() {
91+
// do nothing
92+
}
8893
}

block-node/app/src/testFixtures/java/org/hiero/block/node/app/fixtures/blocks/MinimalBlockAccessor.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,4 +32,9 @@ public Bytes blockBytes(final Format format) {
3232
private Bytes zstdCompressBytes(final Bytes bytes) {
3333
return Bytes.wrap(Zstd.compress(bytes.toByteArray()));
3434
}
35+
36+
@Override
37+
public void close() {
38+
// do nothing
39+
}
3540
}

block-node/base/src/main/java/org/hiero/block/node/base/tar/TaredBlockIterator.java

Lines changed: 14 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,14 @@
11
// SPDX-License-Identifier: Apache-2.0
22
package org.hiero.block.node.base.tar;
33

4+
import edu.umd.cs.findbugs.annotations.NonNull;
45
import java.nio.charset.StandardCharsets;
56
import java.text.DecimalFormat;
67
import java.text.NumberFormat;
78
import java.util.Arrays;
89
import java.util.Iterator;
910
import java.util.NoSuchElementException;
11+
import java.util.Objects;
1012
import org.hiero.block.node.spi.historicalblocks.BlockAccessor;
1113
import org.hiero.block.node.spi.historicalblocks.BlockAccessor.Format;
1214

@@ -59,13 +61,14 @@ enum State {
5961
* @param format the format of the block data files written to the tar file
6062
* @param blockIterator the iterator over the blocks to be written into the tar file
6163
*/
62-
public TaredBlockIterator(Format format, Iterator<BlockAccessor> blockIterator) {
63-
this.format = format;
64+
public TaredBlockIterator(@NonNull final Format format, @NonNull final Iterator<BlockAccessor> blockIterator) {
65+
this.format = Objects.requireNonNull(format);
6466
this.extension = switch (format) {
6567
case Format.JSON -> ".json";
6668
case Format.PROTOBUF -> ".blk";
67-
case Format.ZSTD_PROTOBUF -> ".blk.zstd";};
68-
this.blockIterator = blockIterator;
69+
case Format.ZSTD_PROTOBUF -> ".blk.zstd";
70+
};
71+
this.blockIterator = Objects.requireNonNull(blockIterator);
6972
this.currentBuffer = new byte[CHUNK_SIZE];
7073
this.bufferPosition = 0;
7174
}
@@ -98,9 +101,13 @@ public byte[] next() {
98101
case NEXT_FILE:
99102
if (blockIterator.hasNext()) {
100103
// get the next block from the iterator
101-
final BlockAccessor currentBlock = blockIterator.next();
102-
currentBlockBytes = currentBlock.blockBytes(format).toByteArray();
103-
currentBlockName = BLOCK_NUMBER_FORMAT.format(currentBlock.blockNumber()) + extension;
104+
final BlockAccessor currentBlockAccessor = blockIterator.next();
105+
currentBlockName = BLOCK_NUMBER_FORMAT.format(currentBlockAccessor.blockNumber()) + extension;
106+
currentBlockBytes =
107+
currentBlockAccessor.blockBytes(format).toByteArray();
108+
// close the accessor as we are done with it and we need
109+
// to free resources
110+
currentBlockAccessor.close();
104111
filePosition = 0;
105112
state = State.WRITE_HEADER;
106113
} else {

block-node/block-access/src/main/java/org/hiero/block/node/access/service/BlockAccessServicePlugin.java

Lines changed: 15 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@
55
import static java.lang.System.Logger.Level.INFO;
66
import static java.lang.System.Logger.Level.TRACE;
77

8-
import com.hedera.hapi.block.stream.Block;
98
import com.swirlds.metrics.api.Counter;
109
import org.hiero.block.api.BlockAccessServiceInterface;
1110
import org.hiero.block.api.BlockRequest;
@@ -14,6 +13,7 @@
1413
import org.hiero.block.node.spi.BlockNodeContext;
1514
import org.hiero.block.node.spi.BlockNodePlugin;
1615
import org.hiero.block.node.spi.ServiceBuilder;
16+
import org.hiero.block.node.spi.historicalblocks.BlockAccessor;
1717
import org.hiero.block.node.spi.historicalblocks.HistoricalBlockFacility;
1818

1919
/**
@@ -44,12 +44,10 @@ public class BlockAccessServicePlugin implements BlockNodePlugin, BlockAccessSer
4444
*/
4545
public BlockResponse getBlock(BlockRequest request) {
4646
requestCounter.increment();
47-
4847
try {
4948
long blockNumberToRetrieve;
5049
// The block number and retrieve latest are mutually exclusive in
5150
// the proto definition, so no need to check for that here.
52-
5351
// if retrieveLatest is set, or the request is for the largest possible block number, get the latest block.
5452
if (request.hasBlockNumber() && request.blockNumber() >= 0) {
5553
blockNumberToRetrieve = request.blockNumber();
@@ -63,7 +61,6 @@ public BlockResponse getBlock(BlockRequest request) {
6361
LOGGER.log(INFO, "Invalid request, 'retrieve_latest' or a valid 'block number' is required.");
6462
return new BlockResponse(Code.INVALID_REQUEST, null);
6563
}
66-
6764
// Check if block is within the available range
6865
if (!blockProvider.availableBlocks().contains(blockNumberToRetrieve)) {
6966
long lowestBlockNumber = blockProvider.availableBlocks().min();
@@ -77,14 +74,21 @@ public BlockResponse getBlock(BlockRequest request) {
7774
responseCounterNotAvailable.increment();
7875
return new BlockResponse(Code.NOT_AVAILABLE, null);
7976
}
80-
8177
// Retrieve the block
82-
Block block = blockProvider.block(blockNumberToRetrieve).block();
83-
responseCounterSuccess.increment();
84-
return new BlockResponse(Code.SUCCESS, block);
85-
86-
} catch (RuntimeException e) {
87-
String message = "Failed to retrieve block number %d.".formatted(request.blockNumber());
78+
try (final BlockAccessor accessor = blockProvider.block(blockNumberToRetrieve)) {
79+
if (accessor != null) {
80+
// even though we have checked for the existence of the block
81+
// it may not be available anymore, so we have to ensure the accessor
82+
// still exists
83+
final BlockResponse response = new BlockResponse(Code.SUCCESS, accessor.block());
84+
responseCounterSuccess.increment();
85+
return response;
86+
} else {
87+
return new BlockResponse(Code.NOT_FOUND, null);
88+
}
89+
}
90+
} catch (final RuntimeException e) {
91+
final String message = "Failed to retrieve block number %d.".formatted(request.blockNumber());
8892
LOGGER.log(ERROR, message, e);
8993
responseCounterNotFound.increment(); // Should this be a failure counter?
9094
return new BlockResponse(Code.NOT_FOUND, null);

block-node/block-providers/files.historic/src/main/java/org/hiero/block/node/blocks/files/historic/BlockFileHistoricPlugin.java

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,11 +15,13 @@
1515
import java.nio.file.Files;
1616
import java.nio.file.Path;
1717
import java.util.ArrayList;
18+
import java.util.Comparator;
1819
import java.util.List;
1920
import java.util.Objects;
2021
import java.util.concurrent.CopyOnWriteArrayList;
2122
import java.util.concurrent.ExecutorService;
2223
import java.util.concurrent.atomic.AtomicLong;
24+
import java.util.stream.Stream;
2325
import org.hiero.block.node.base.ranges.ConcurrentLongRangeSet;
2426
import org.hiero.block.node.spi.BlockNodeContext;
2527
import org.hiero.block.node.spi.ServiceBuilder;
@@ -91,9 +93,22 @@ public void init(final BlockNodeContext context, final ServiceBuilder serviceBui
9193
blockRetentionThreshold = config.blockRetentionThreshold();
9294
// Initialize metrics
9395
initMetrics(context.metrics());
94-
// create plugin data root directory if it does not exist
96+
final Path linksRoot = config.linksRootPath();
97+
// attempt to clear any existing links root directory
98+
if (Files.exists(linksRoot)) {
99+
try (final Stream<Path> paths = Files.walk(linksRoot)) {
100+
for (final Path path : paths.sorted(Comparator.reverseOrder()).toList()) {
101+
Files.delete(path);
102+
}
103+
} catch (final IOException e) {
104+
LOGGER.log(ERROR, "Could not clear links root directory", e);
105+
context.serverHealth().shutdown(name(), "Could not clear links root directory");
106+
}
107+
}
108+
// create plugin data and links root directories if they do not exist
95109
try {
96110
Files.createDirectories(config.rootPath());
111+
Files.createDirectories(linksRoot);
97112
} catch (IOException e) {
98113
LOGGER.log(ERROR, "Could not create root directory", e);
99114
context.serverHealth().shutdown(name(), "Could not create root directory");

block-node/block-providers/files.historic/src/main/java/org/hiero/block/node/blocks/files/historic/FilesHistoricConfig.java

Lines changed: 4 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -6,15 +6,15 @@
66
import com.swirlds.config.api.validation.annotation.Max;
77
import com.swirlds.config.api.validation.annotation.Min;
88
import java.nio.file.Path;
9-
import java.util.Objects;
10-
import org.hiero.block.common.utils.Preconditions;
119
import org.hiero.block.node.base.CompressionType;
1210
import org.hiero.block.node.base.Loggable;
1311

1412
/**
1513
* Use this configuration across the files recent plugin.
1614
*
1715
* @param rootPath provides the root path for saving historic blocks
16+
* @param linksRootPath provides the root path for saving hard links when issuing
17+
* accessors
1818
* @param compression compression type to use for the storage. It is assumed this never changes while a node is running
1919
* and has existing files.
2020
* @param powersOfTenPerZipFileContents the number files in a zip file specified in powers of ten. Can can be one of
@@ -30,16 +30,7 @@
3030
@ConfigData("files.historic")
3131
public record FilesHistoricConfig(
3232
@Loggable @ConfigProperty(defaultValue = "/opt/hiero/block-node/data/historic") Path rootPath,
33+
@Loggable @ConfigProperty(defaultValue = "/opt/hiero/block-node/data/historic-links") Path linksRootPath,
3334
@Loggable @ConfigProperty(defaultValue = "ZSTD") CompressionType compression,
3435
@Loggable @ConfigProperty(defaultValue = "4") @Min(1) @Max(6) int powersOfTenPerZipFileContents,
35-
@Loggable @ConfigProperty(defaultValue = "0") long blockRetentionThreshold) {
36-
/**
37-
* Constructor.
38-
*/
39-
public FilesHistoricConfig {
40-
Objects.requireNonNull(rootPath);
41-
Objects.requireNonNull(compression);
42-
Preconditions.requireInRange(powersOfTenPerZipFileContents, 1, 6);
43-
Preconditions.requireGreaterOrEqual(blockRetentionThreshold, 0L);
44-
}
45-
}
36+
@Loggable @ConfigProperty(defaultValue = "0") long blockRetentionThreshold) {}

block-node/block-providers/files.historic/src/main/java/org/hiero/block/node/blocks/files/historic/ZipBlockAccessor.java

Lines changed: 43 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
// SPDX-License-Identifier: Apache-2.0
22
package org.hiero.block.node.blocks.files.historic;
33

4+
import static java.lang.System.Logger.Level.INFO;
45
import static java.lang.System.Logger.Level.WARNING;
56

67
import com.hedera.hapi.block.stream.Block;
@@ -15,6 +16,7 @@
1516
import java.nio.file.Files;
1617
import java.nio.file.Path;
1718
import java.util.Objects;
19+
import java.util.UUID;
1820
import org.hiero.block.node.base.CompressionType;
1921
import org.hiero.block.node.spi.historicalblocks.BlockAccessor;
2022

@@ -28,25 +30,44 @@ final class ZipBlockAccessor implements BlockAccessor {
2830
private static final String FAILED_TO_PARSE_MESSAGE = "Failed to parse block from file %s.";
2931
/** Message logged when data cannot be read from a block file */
3032
private static final String FAILED_TO_READ_MESSAGE = "Failed to read block from file %s.";
31-
/** The block path. */
32-
private final BlockPath blockPath;
33+
/** Message logged when the provided path to a zip file is not a regular file or does not exist. */
34+
private static final String INVALID_ZIP_FILE_PATH_MESSAGE =
35+
"Provided path to zip file is not a regular file or does not exist: %s";
36+
/** The block number. */
37+
private final long blockNumber;
38+
/** The compression type. */
39+
private final CompressionType compressionType;
40+
/** The zip entry name. */
41+
private final String zipEntryName;
42+
/** The zip path. */
43+
private final Path zipFileLink;
3344

3445
/**
3546
* Constructs a ZipBlockAccessor with the specified block path.
3647
*
3748
* @param blockPath the block path
49+
* @param config the configuration for the block provider
3850
*/
39-
ZipBlockAccessor(@NonNull final BlockPath blockPath) {
40-
this.blockPath = Objects.requireNonNull(blockPath);
51+
ZipBlockAccessor(@NonNull final BlockPath blockPath, @NonNull final FilesHistoricConfig config) throws IOException {
52+
this.blockNumber = Long.parseLong(blockPath.blockNumStr());
53+
this.compressionType = blockPath.compressionType();
54+
this.zipEntryName = blockPath.blockFileName();
55+
final Path zipFilePath = blockPath.zipFilePath();
56+
if (!Files.isRegularFile(zipFilePath)) {
57+
final String msg = INVALID_ZIP_FILE_PATH_MESSAGE.formatted(zipFilePath);
58+
throw new IOException(msg);
59+
}
60+
// create a hard link to the block file for the duration of the accessor's life
61+
final Path link = config.linksRootPath().resolve(UUID.randomUUID().toString());
62+
this.zipFileLink = Files.createLink(link, zipFilePath);
4163
}
4264

4365
/**
4466
* {@inheritDoc}
4567
*/
4668
@Override
4769
public long blockNumber() {
48-
// TODO maybe there is nice option here than having to parse the string
49-
return Long.parseLong(blockPath.blockNumStr());
70+
return blockNumber;
5071
}
5172

5273
/**
@@ -55,12 +76,11 @@ public long blockNumber() {
5576
@Override
5677
public Bytes blockBytes(@NonNull final Format format) {
5778
Objects.requireNonNull(format);
58-
CompressionType compressionType = blockPath.compressionType();
59-
try (final FileSystem zipFs = FileSystems.newFileSystem(blockPath.zipFilePath())) {
60-
final Path entry = zipFs.getPath(blockPath.blockFileName());
79+
try (final FileSystem zipFs = FileSystems.newFileSystem(zipFileLink)) {
80+
final Path entry = zipFs.getPath(zipEntryName);
6181
return getBytesFromPath(format, entry, compressionType);
6282
} catch (final UncheckedIOException | IOException e) {
63-
LOGGER.log(WARNING, FAILED_TO_READ_MESSAGE.formatted(blockPath), e);
83+
LOGGER.log(WARNING, FAILED_TO_READ_MESSAGE.formatted(zipFileLink), e);
6484
return null;
6585
}
6686
}
@@ -112,12 +132,24 @@ private Bytes getJsonBytesFromProtobufBytes(final Bytes sourceData) {
112132
try {
113133
return Block.JSON.toBytes(Block.PROTOBUF.parse(sourceData));
114134
} catch (final UncheckedIOException | ParseException e) {
115-
final String message = FAILED_TO_PARSE_MESSAGE.formatted(blockPath);
135+
final String message = FAILED_TO_PARSE_MESSAGE.formatted(zipFileLink);
116136
LOGGER.log(WARNING, message, e);
117137
return null;
118138
}
119139
} else {
120140
return null;
121141
}
122142
}
143+
144+
/**
145+
* This method deletes the link to the zip file.
146+
*/
147+
@Override
148+
public void close() {
149+
try {
150+
Files.delete(zipFileLink);
151+
} catch (final IOException e) {
152+
LOGGER.log(INFO, "Failed to delete accessor link for block: {0}, path: {0}", blockNumber, zipFileLink);
153+
}
154+
}
123155
}

block-node/block-providers/files.historic/src/main/java/org/hiero/block/node/blocks/files/historic/ZipBlockArchive.java

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
package org.hiero.block.node.blocks.files.historic;
33

44
import static java.lang.System.Logger.Level.ERROR;
5+
import static java.lang.System.Logger.Level.INFO;
56
import static java.lang.System.Logger.Level.WARNING;
67
import static org.hiero.block.node.base.BlockFile.blockNumberFromFile;
78
import static org.hiero.block.node.blocks.files.historic.BlockPath.computeBlockPath;
@@ -86,6 +87,9 @@ long writeNewZipFile(List<BlockAccessor> batch) throws IOException {
8687
// it is here possible that the accessor will no longer be able to access the block
8788
// because it is possible that the block has been deleted or has been moved
8889
final Bytes bytes = blockAccessor.blockBytes(format);
90+
// close the accessor as we are done with it and we need to free
91+
// resources
92+
blockAccessor.close();
8993
// calculate CRC-32 checksum
9094
final CRC32 crc = new CRC32();
9195
crc.update(bytes.toByteArray());
@@ -121,11 +125,12 @@ long writeNewZipFile(List<BlockAccessor> batch) throws IOException {
121125
*/
122126
BlockAccessor blockAccessor(long blockNumber) {
123127
try {
124-
// get existing block path or null if we cannot find it
128+
// get existing block path or null if we cannot find it or create accessor for
125129
final BlockPath blockPath = computeExistingBlockPath(config, blockNumber);
126-
return blockPath == null ? null : new ZipBlockAccessor(blockPath);
130+
return blockPath == null ? null : new ZipBlockAccessor(blockPath, config);
127131
} catch (final IOException e) {
128-
throw new UncheckedIOException(e);
132+
LOGGER.log(INFO, "Could not create zip block accessor", e);
133+
return null;
129134
}
130135
}
131136

0 commit comments

Comments
 (0)