Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ protected <T> ClosableIterator<HoodieRecord<T>> deserializeRecords(byte[] conten

// TODO abstract this w/in HoodieDataBlock
@Override
protected <T> ClosableIterator<HoodieRecord<T>> lookupRecords(List<String> keys, boolean fullKey) throws IOException {
protected <T> ClosableIterator<HoodieRecord<T>> lookupRecords(List<String> sortedKeys, boolean fullKey) throws IOException {
HoodieLogBlockContentLocation blockContentLoc = getBlockContentLocation().get();

// NOTE: It's important to extend Hadoop configuration here to make sure configuration
Expand All @@ -195,11 +195,6 @@ protected <T> ClosableIterator<HoodieRecord<T>> lookupRecords(List<String> keys,
blockContentLoc.getContentPositionInLogFile(),
blockContentLoc.getBlockSize());

// HFile read will be efficient if keys are sorted, since on storage records are sorted by key.
// This will avoid unnecessary seeks.
List<String> sortedKeys = new ArrayList<>(keys);
Collections.sort(sortedKeys);

final HoodieAvroHFileReader reader =
new HoodieAvroHFileReader(inlineConf, inlinePath, new CacheConfig(inlineConf), inlinePath.getFileSystem(inlineConf),
Option.of(getSchemaFromHeader()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,20 +120,14 @@ public HoodieAvroHFileReader(Path path, HFile.Reader reader, Option<Schema> sche
.orElseGet(() -> Lazy.lazily(() -> fetchSchema(reader)));
}

@Override
public Option<HoodieRecord<IndexedRecord>> getRecordByKey(String key, Schema readerSchema) throws IOException {
synchronized (sharedScannerLock) {
return fetchRecordByKeyInternal(sharedScanner, key, getSchema(), readerSchema)
.map(data -> unsafeCast(new HoodieAvroIndexedRecord(data)));
}
}

@Override
public ClosableIterator<HoodieRecord<IndexedRecord>> getRecordsByKeysIterator(List<String> keys, Schema schema) throws IOException {
// We're caching blocks for this scanner to minimize amount of traffic
// to the underlying storage as we fetched (potentially) sparsely distributed
// keys
HFileScanner scanner = getHFileScanner(reader, true);
scanner.seekTo(); // places the cursor at the beginning of the first data block.
ClosableIterator<IndexedRecord> iterator = new RecordByKeyIterator(scanner, keys, getSchema(), schema);
return new CloseableMappingIterator<>(iterator, data -> unsafeCast(new HoodieAvroIndexedRecord(data)));
}
Expand Down Expand Up @@ -216,6 +210,7 @@ protected ClosableIterator<IndexedRecord> getIndexedRecordsByKeysIterator(List<S
// to the underlying storage as we fetched (potentially) sparsely distributed
// keys
HFileScanner scanner = getHFileScanner(reader, true);
scanner.seekTo(); // places the cursor at the beginning of the first data block.
return new RecordByKeyIterator(scanner, keys, getSchema(), readerSchema);
}

Expand All @@ -225,6 +220,7 @@ protected ClosableIterator<IndexedRecord> getIndexedRecordsByKeyPrefixIterator(L
// to the underlying storage as we fetched (potentially) sparsely distributed
// keys
HFileScanner scanner = getHFileScanner(reader, true);
scanner.seekTo(); // places the cursor at the beginning of the first data block.
return new RecordByKeyPrefixIterator(scanner, keyPrefixes, getSchema(), readerSchema);
}

Expand Down Expand Up @@ -267,14 +263,14 @@ private static Iterator<IndexedRecord> getRecordByKeyPrefixIteratorInternal(HFil
//
// Consider entries w/ the following keys in HFile: [key01, key02, key03, key04,..., key20];
// In case looked up key-prefix is
// - "key", `seekTo()` will return -1 and place the cursor just before "key01",
// - "key", `reseekTo()` will return -1 and place the cursor just before "key01",
// `getCell()` will return "key01" entry
// - "key03", `seekTo()` will return 0 (exact match) and place the cursor just before "key03",
// - "key03", `reseekTo()` will return 0 (exact match) and place the cursor just before "key03",
// `getCell()` will return "key03" entry
// - "key1", `seekTo()` will return 1 (first not lower than) and place the cursor just before
// "key10" (i.e. on "key09");
// - "key1", `reseekTo()` will return 1 (first not lower than) and leave the cursor wherever it was just before calling
// reseekTo();
//
int val = scanner.seekTo(kv);
int val = scanner.reseekTo(kv);
if (val == 1) {
// Try moving to next entry, matching the prefix key; if we're at the EOF,
// `next()` will return false
Expand Down Expand Up @@ -333,10 +329,13 @@ public IndexedRecord next() {

private static Option<IndexedRecord> fetchRecordByKeyInternal(HFileScanner scanner, String key, Schema writerSchema, Schema readerSchema) throws IOException {
KeyValue kv = new KeyValue(key.getBytes(), null, null, null);
if (scanner.seekTo(kv) != 0) {
int returnVal = scanner.reseekTo(kv);
if (returnVal != 0) {
// key is not found.
return Option.empty();
}

// key is found and the cursor is left where the key is found
Cell c = scanner.getCell();
byte[] valueBytes = copyValueFromCell(c);
GenericRecord record = deserialize(key.getBytes(), valueBytes, writerSchema, readerSchema);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,6 @@

public interface HoodieSeekingFileReader<T> extends HoodieFileReader<T> {

default Option<HoodieRecord<T>> getRecordByKey(String key, Schema readerSchema) throws IOException {
throw new UnsupportedOperationException();
}

default Option<HoodieRecord<T>> getRecordByKey(String key) throws IOException {
return getRecordByKey(key, getSchema());
}

default ClosableIterator<HoodieRecord<T>> getRecordsByKeysIterator(List<String> keys, Schema schema) throws IOException {
throw new UnsupportedOperationException();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,6 @@ protected Map<String, HoodieRecord<HoodieMetadataPayload>> getRecordsByKeys(List
return Collections.emptyMap();
}

// sort
Map<String, HoodieRecord<HoodieMetadataPayload>> result;

// Load the file slices for the partition. Each file slice is a shard which saves a portion of the keys.
Expand Down Expand Up @@ -274,16 +273,18 @@ private Map<String, HoodieRecord<HoodieMetadataPayload>> lookupKeysFromFileSlice
return Collections.emptyMap();
}

List<String> sortedKeys = new ArrayList<>(keys);
Collections.sort(sortedKeys);
boolean fullKeys = true;
Map<String, HoodieRecord<HoodieMetadataPayload>> logRecords = readLogRecords(logRecordScanner, keys, fullKeys, timings);
return readFromBaseAndMergeWithLogRecords(baseFileReader, keys, fullKeys, logRecords, timings, partitionName);
Map<String, HoodieRecord<HoodieMetadataPayload>> logRecords = readLogRecords(logRecordScanner, sortedKeys, fullKeys, timings);
return readFromBaseAndMergeWithLogRecords(baseFileReader, sortedKeys, fullKeys, logRecords, timings, partitionName);
} catch (IOException ioe) {
throw new HoodieIOException("Error merging records from metadata table for " + keys.size() + " key : ", ioe);
}
}

private Map<String, HoodieRecord<HoodieMetadataPayload>> readLogRecords(HoodieMetadataLogRecordReader logRecordReader,
List<String> keys,
List<String> sortedKeys,
boolean fullKey,
List<Long> timings) {
HoodieTimer timer = HoodieTimer.start();
Expand All @@ -294,14 +295,14 @@ private Map<String, HoodieRecord<HoodieMetadataPayload>> readLogRecords(HoodieMe
}

try {
return fullKey ? logRecordReader.getRecordsByKeys(keys) : logRecordReader.getRecordsByKeyPrefixes(keys);
return fullKey ? logRecordReader.getRecordsByKeys(sortedKeys) : logRecordReader.getRecordsByKeyPrefixes(sortedKeys);
} finally {
timings.add(timer.endTimer());
}
}

private Map<String, HoodieRecord<HoodieMetadataPayload>> readFromBaseAndMergeWithLogRecords(HoodieSeekingFileReader<?> reader,
List<String> keys,
List<String> sortedKeys,
boolean fullKeys,
Map<String, HoodieRecord<HoodieMetadataPayload>> logRecords,
List<Long> timings,
Expand All @@ -317,7 +318,7 @@ private Map<String, HoodieRecord<HoodieMetadataPayload>> readFromBaseAndMergeWit
HoodieTimer readTimer = HoodieTimer.start();

Map<String, HoodieRecord<HoodieMetadataPayload>> records =
fetchBaseFileRecordsByKeys(reader, keys, fullKeys, partitionName);
fetchBaseFileRecordsByKeys(reader, sortedKeys, fullKeys, partitionName);

metrics.ifPresent(m -> m.updateMetrics(HoodieMetadataMetrics.BASEFILE_READ_STR, readTimer.endTimer()));

Expand All @@ -336,12 +337,12 @@ private Map<String, HoodieRecord<HoodieMetadataPayload>> readFromBaseAndMergeWit

@SuppressWarnings("unchecked")
private Map<String, HoodieRecord<HoodieMetadataPayload>> fetchBaseFileRecordsByKeys(HoodieSeekingFileReader reader,
List<String> keys,
List<String> sortedKeys,
boolean fullKeys,
String partitionName) throws IOException {
ClosableIterator<HoodieRecord<?>> records = fullKeys
? reader.getRecordsByKeysIterator(keys)
: reader.getRecordsByKeyPrefixIterator(keys);
? reader.getRecordsByKeysIterator(sortedKeys)
: reader.getRecordsByKeyPrefixIterator(sortedKeys);

return toStream(records)
.map(record -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,16 +74,16 @@ public List<HoodieRecord<HoodieMetadataPayload>> getRecords() {
}

@SuppressWarnings("unchecked")
public Map<String, HoodieRecord<HoodieMetadataPayload>> getRecordsByKeyPrefixes(List<String> keyPrefixes) {
if (keyPrefixes.isEmpty()) {
public Map<String, HoodieRecord<HoodieMetadataPayload>> getRecordsByKeyPrefixes(List<String> sortedKeyPrefixes) {
if (sortedKeyPrefixes.isEmpty()) {
return Collections.emptyMap();
}

// NOTE: Locking is necessary since we're accessing [[HoodieMetadataLogRecordReader]]
// materialized state, to make sure there's no concurrent access
synchronized (this) {
logRecordScanner.scanByKeyPrefixes(keyPrefixes);
Predicate<String> p = createPrefixMatchingPredicate(keyPrefixes);
logRecordScanner.scanByKeyPrefixes(sortedKeyPrefixes);
Predicate<String> p = createPrefixMatchingPredicate(sortedKeyPrefixes);
return logRecordScanner.getRecords().entrySet()
.stream()
.filter(r -> r != null && p.test(r.getKey()))
Expand All @@ -97,17 +97,17 @@ public Map<String, HoodieRecord<HoodieMetadataPayload>> getRecordsByKeyPrefixes(
* the delta-log blocks
*/
@SuppressWarnings("unchecked")
public Map<String, HoodieRecord<HoodieMetadataPayload>> getRecordsByKeys(List<String> keys) {
if (keys.isEmpty()) {
public Map<String, HoodieRecord<HoodieMetadataPayload>> getRecordsByKeys(List<String> sortedKeys) {
if (sortedKeys.isEmpty()) {
return Collections.emptyMap();
}

// NOTE: Locking is necessary since we're accessing [[HoodieMetadataLogRecordReader]]
// materialized state, to make sure there's no concurrent access
synchronized (this) {
logRecordScanner.scanByFullKeys(keys);
logRecordScanner.scanByFullKeys(sortedKeys);
Map<String, HoodieRecord> allRecords = logRecordScanner.getRecords();
return keys.stream()
return sortedKeys.stream()
.map(key -> (HoodieRecord<HoodieMetadataPayload>) allRecords.get(key))
.filter(Objects::nonNull)
.collect(Collectors.toMap(HoodieRecord::getRecordKey, r -> r));
Expand Down