Skip to content
2 changes: 2 additions & 0 deletions lucene/CHANGES.txt
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ API Changes

* GITHUB#15295 : Switched to a fixed CFS threshold (Shubham Sharma)

* GITHUB#15627 : Deferred lambda in TermStates.java according to prefetch (Shubham Sharma)

New Features
---------------------

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,10 +133,10 @@ void loadNextFloorBlock() throws IOException {
loadBlock();
}

void prefetchBlock() throws IOException {
boolean prefetchBlock() throws IOException {
if (nextEnt != -1) {
// Already loaded
return;
return false;
}

// Clone the IndexInput lazily, so that consumers
Expand All @@ -145,7 +145,7 @@ void prefetchBlock() throws IOException {
ste.initIndexInput();

// TODO: Could we know the number of bytes to prefetch?
ste.in.prefetch(fp, 1);
return ste.in.prefetch(fp, 1);
}

/* Does initial decode of next block of terms; this
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -434,27 +434,7 @@ private IOBooleanSupplier prepareSeekExact(BytesRef target, boolean prefetch) th
return null;
}

if (prefetch) {
currentFrame.prefetchBlock();
}

return () -> {
currentFrame.loadBlock();

final SeekStatus result = currentFrame.scanToTerm(target, true);
if (result == SeekStatus.FOUND) {
// if (DEBUG) {
// System.out.println(" return FOUND term=" + term.utf8ToString() + " " + term);
// }
return true;
} else {
// if (DEBUG) {
// System.out.println(" got " + result + "; return NOT_FOUND term=" +
// ToStringUtils.bytesRefToString(term));
// }
return false;
}
};
return getIoBooleanSupplier(target, prefetch);
} else {
// Follow this node
node = nextNode;
Expand Down Expand Up @@ -491,26 +471,42 @@ private IOBooleanSupplier prepareSeekExact(BytesRef target, boolean prefetch) th
return null;
}

return getIoBooleanSupplier(target, prefetch);
}

private IOBooleanSupplier getIoBooleanSupplier(BytesRef target, boolean prefetch)
throws IOException {
boolean doDefer;
if (prefetch) {
currentFrame.prefetchBlock();
doDefer = currentFrame.prefetchBlock();
} else {
doDefer = false;
}

return () -> {
currentFrame.loadBlock();
return new IOBooleanSupplier() {
@Override
public boolean get() throws IOException {
currentFrame.loadBlock();

final SeekStatus result = currentFrame.scanToTerm(target, true);
if (result == SeekStatus.FOUND) {
// if (DEBUG) {
// System.out.println(" return FOUND term=" + term.utf8ToString() + " " + term);
// }
return true;
} else {
// if (DEBUG) {
// System.out.println(" got result " + result + "; return NOT_FOUND term=" +
// term.utf8ToString());
// }
final SeekStatus result = currentFrame.scanToTerm(target, true);
if (result == SeekStatus.FOUND) {
// if (DEBUG) {
// System.out.println(" return FOUND term=" + term.utf8ToString() + " " + term);
// }
return true;
} else {
// if (DEBUG) {
// System.out.println(" got result " + result + "; return NOT_FOUND term=" +
// term.utf8ToString());
// }

return false;
}
}

return false;
@Override
public boolean doDefer() {
return doDefer;
}
};
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,10 +135,10 @@ void loadNextFloorBlock() throws IOException {
loadBlock();
}

void prefetchBlock() throws IOException {
boolean prefetchBlock() throws IOException {
if (nextEnt != -1) {
// Already loaded
return;
return false;
}

// Clone the IndexInput lazily, so that consumers
Expand All @@ -147,7 +147,7 @@ void prefetchBlock() throws IOException {
ste.initIndexInput();

// TODO: Could we know the number of bytes to prefetch?
ste.in.prefetch(fp, 1);
return ste.in.prefetch(fp, 1);
}

/* Does initial decode of next block of terms; this
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -345,8 +345,9 @@ public void close() throws IOException {
}

@Override
public void prefetch(long offset, long length) throws IOException {
public boolean prefetch(long offset, long length) throws IOException {
// Not delegating to the wrapped instance on purpose. This is only used for merging.
return false;
}
};
}
Expand Down
39 changes: 18 additions & 21 deletions lucene/core/src/java/org/apache/lucene/index/TermStates.java
Original file line number Diff line number Diff line change
Expand Up @@ -193,28 +193,25 @@ public IOSupplier<TermState> get(LeafReaderContext ctx) throws IOException {
this.states[ctx.ord] = EMPTY_TERMSTATE;
return null;
}
return () -> {
if (this.states[ctx.ord] == null) {
TermState state = null;
if (termExistsSupplier.get()) {
state = termsEnum.termState();
this.states[ctx.ord] = state;
} else {
this.states[ctx.ord] = EMPTY_TERMSTATE;
}
}
TermState state = this.states[ctx.ord];
if (state == EMPTY_TERMSTATE) {
return null;
}
return state;
};
}
TermState state = this.states[ctx.ord];
if (state == EMPTY_TERMSTATE) {
return null;
IOSupplier<TermState> stateSupplier =
() -> {
if (this.states[ctx.ord] == null) {
if (termExistsSupplier.get()) {
this.states[ctx.ord] = termsEnum.termState();
} else {
this.states[ctx.ord] = EMPTY_TERMSTATE;
}
}
return this.states[ctx.ord] == EMPTY_TERMSTATE ? null : this.states[ctx.ord];
};
if (termExistsSupplier.doDefer()) {
return stateSupplier;
} else {
stateSupplier.get();
return this.states[ctx.ord] == EMPTY_TERMSTATE ? null : () -> this.states[ctx.ord];
}
}
return () -> state;
return this.states[ctx.ord] == EMPTY_TERMSTATE ? null : () -> this.states[ctx.ord];
}

/**
Expand Down
10 changes: 7 additions & 3 deletions lucene/core/src/java/org/apache/lucene/store/IndexInput.java
Original file line number Diff line number Diff line change
Expand Up @@ -202,8 +202,8 @@ public long readLong(long pos) throws IOException {
}

@Override
public void prefetch(long offset, long length) throws IOException {
slice.prefetch(offset, length);
public boolean prefetch(long offset, long length) throws IOException {
return slice.prefetch(offset, length);
}

@Override
Expand All @@ -223,8 +223,12 @@ public String toString() {
*
* @param offset start offset
* @param length the number of bytes to prefetch
* @return true if prefetch actually prefetched something, hence user can benefit from deferring
* reading the prefetched memory block.
*/
public void prefetch(long offset, long length) throws IOException {}
public boolean prefetch(long offset, long length) throws IOException {
return false;
}

/**
* Optional method: Updates the {@code IOContext} to specify a new read access pattern. IndexInput
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
import org.apache.lucene.util.ArrayUtil;
import org.apache.lucene.util.BitUtil;
import org.apache.lucene.util.Constants;
import org.apache.lucene.util.IOConsumer;

/**
* Base IndexInput implementation that uses an array of MemorySegments to represent a file.
Expand Down Expand Up @@ -328,9 +327,9 @@ public void seek(long pos) throws IOException {
}

@Override
public void prefetch(long offset, long length) throws IOException {
public boolean prefetch(long offset, long length) throws IOException {
if (NATIVE_ACCESS.isEmpty()) {
return;
return false;
}

ensureOpen();
Expand All @@ -340,19 +339,21 @@ public void prefetch(long offset, long length) throws IOException {
// power of two. There is a good chance that a good chunk of this index input is cached in
// physical memory. Let's skip the overhead of the madvise system call, we'll be trying again
// on the next power of two of the counter.
return;
return false;
}

final NativeAccess nativeAccess = NATIVE_ACCESS.get();
advise(
return advise(
offset,
length,
segment -> {
if (segment.isLoaded() == false) {
// We have a cache miss on at least one page, let's reset the counter.
sharedPrefetchCounter.set(0);
nativeAccess.madviseWillNeed(segment);
return true;
}
return false;
});
}

Expand All @@ -369,14 +370,25 @@ private void updateReadAdvice(ReadAdvice readAdvice) throws IOException {

long offset = 0;
for (MemorySegment seg : segments) {
advise(offset, seg.byteSize(), segment -> nativeAccess.madvise(segment, readAdvice));
advise(
offset,
seg.byteSize(),
segment -> {
nativeAccess.madvise(segment, readAdvice);
return true;
});
offset += seg.byteSize();
}
}

void advise(long offset, long length, IOConsumer<MemorySegment> advice) throws IOException {
@FunctionalInterface
interface ReadAdviceConsumer {
boolean accept(MemorySegment input) throws IOException;
}

boolean advise(long offset, long length, ReadAdviceConsumer advice) throws IOException {
if (NATIVE_ACCESS.isEmpty()) {
return;
return false;
}

ensureOpen();
Expand Down Expand Up @@ -404,12 +416,12 @@ void advise(long offset, long length, IOConsumer<MemorySegment> advice) throws I
length -= nativeAccess.getPageSize();
if (length <= 0) {
// This segment has no data beyond the first page.
return;
return false;
}
}

final MemorySegment advisedSlice = segment.asSlice(offset, length);
advice.accept(advisedSlice);
return advice.accept(advisedSlice);
} catch (IndexOutOfBoundsException _) {
throw new EOFException("Read past EOF: " + this);
} catch (NullPointerException | IllegalStateException e) {
Expand Down Expand Up @@ -615,6 +627,7 @@ public final MemorySegmentIndexInput slice(
slice.length,
segment -> {
nativeAccess.madvise(segment, advice);
return true;
});
}
}
Expand Down Expand Up @@ -804,9 +817,9 @@ public MemorySegment segmentSliceOrNull(long pos, long len) throws IOException {
}

@Override
public void prefetch(long offset, long length) throws IOException {
public boolean prefetch(long offset, long length) throws IOException {
Objects.checkFromIndexSize(offset, length, this.length);
super.prefetch(offset, length);
return super.prefetch(offset, length);
}
}

Expand Down Expand Up @@ -904,9 +917,9 @@ MemorySegmentIndexInput buildSlice(String sliceDescription, long ofs, long lengt
}

@Override
public void prefetch(long offset, long length) throws IOException {
public boolean prefetch(long offset, long length) throws IOException {
Objects.checkFromIndexSize(offset, length, this.length);
super.prefetch(this.offset + offset, length);
return super.prefetch(this.offset + offset, length);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,9 @@ default void readBytes(long pos, byte[] bytes, int offset, int length) throws IO
*
* @see IndexInput#prefetch
*/
default void prefetch(long offset, long length) throws IOException {}
default boolean prefetch(long offset, long length) throws IOException {
return false;
}

/**
* Returns a hint whether all the contents of this input are resident in physical memory.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,4 +34,8 @@ public interface IOBooleanSupplier {
* @throws IOException if supplying the result throws an {@link IOException}
*/
boolean get() throws IOException;

default boolean doDefer() {
return false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -66,9 +66,10 @@ public CountingPrefetchIndexInput(IndexInput input, AtomicInteger counter) {
}

@Override
public void prefetch(long offset, long length) throws IOException {
public boolean prefetch(long offset, long length) throws IOException {
in.prefetch(offset, length);
counter.incrementAndGet();
return true;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,9 +68,10 @@ public CountingPrefetchIndexInput(IndexInput input, AtomicInteger counter) {
}

@Override
public void prefetch(long offset, long length) throws IOException {
public boolean prefetch(long offset, long length) throws IOException {
in.prefetch(offset, length);
counter.incrementAndGet();
return true;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -171,10 +171,10 @@ public void seek(long pos) throws IOException {
}

@Override
public void prefetch(long offset, long length) throws IOException {
public boolean prefetch(long offset, long length) throws IOException {
ensureOpen();
ensureAccessible();
in.prefetch(offset, length);
return in.prefetch(offset, length);
}

@Override
Expand Down
Loading