Skip to content
2 changes: 2 additions & 0 deletions lucene/CHANGES.txt
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ API Changes

* GITHUB#15295 : Switched to a fixed CFS threshold (Shubham Sharma)

* GITHUB#15627 : Deferred lambda in TermStates.java according to prefetch (Shubham Sharma)

New Features
---------------------

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,10 +133,10 @@ void loadNextFloorBlock() throws IOException {
loadBlock();
}

void prefetchBlock() throws IOException {
boolean prefetchBlock() throws IOException {
if (nextEnt != -1) {
// Already loaded
return;
return false;
}

// Clone the IndexInput lazily, so that consumers
Expand All @@ -145,7 +145,7 @@ void prefetchBlock() throws IOException {
ste.initIndexInput();

// TODO: Could we know the number of bytes to prefetch?
ste.in.prefetch(fp, 1);
return ste.in.prefetch(fp, 1);
}

/* Does initial decode of next block of terms; this
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -476,26 +476,37 @@ private IOBooleanSupplier prepareSeekExact(BytesRef target, boolean prefetch) th

private IOBooleanSupplier getIOBooleanSupplier(BytesRef target, boolean prefetch)
throws IOException {
boolean doDefer;
if (prefetch) {
currentFrame.prefetchBlock();
doDefer = currentFrame.prefetchBlock();
} else {
doDefer = false;
}

return () -> {
currentFrame.loadBlock();
return new IOBooleanSupplier() {
@Override
public boolean get() throws IOException {
currentFrame.loadBlock();

final SeekStatus result = currentFrame.scanToTerm(target, true);
if (result == SeekStatus.FOUND) {
// if (DEBUG) {
// System.out.println(" return FOUND term=" + term.utf8ToString() + " " + term);
// }
return true;
} else {
// if (DEBUG) {
// System.out.println(" got result " + result + "; return NOT_FOUND term=" +
// term.utf8ToString());
// }
final SeekStatus result = currentFrame.scanToTerm(target, true);
if (result == SeekStatus.FOUND) {
// if (DEBUG) {
// System.out.println(" return FOUND term=" + term.utf8ToString() + " " + term);
// }
return true;
} else {
// if (DEBUG) {
// System.out.println(" got result " + result + "; return NOT_FOUND term=" +
// term.utf8ToString());
// }

return false;
}
}

return false;
@Override
public boolean doDefer() {
return doDefer;
}
};
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,10 +135,10 @@ void loadNextFloorBlock() throws IOException {
loadBlock();
}

void prefetchBlock() throws IOException {
boolean prefetchBlock() throws IOException {
if (nextEnt != -1) {
// Already loaded
return;
return false;
}

// Clone the IndexInput lazily, so that consumers
Expand All @@ -147,7 +147,7 @@ void prefetchBlock() throws IOException {
ste.initIndexInput();

// TODO: Could we know the number of bytes to prefetch?
ste.in.prefetch(fp, 1);
return ste.in.prefetch(fp, 1);
}

/* Does initial decode of next block of terms; this
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -345,8 +345,9 @@ public void close() throws IOException {
}

@Override
public void prefetch(long offset, long length) throws IOException {
public boolean prefetch(long offset, long length) throws IOException {
// Not delegating to the wrapped instance on purpose. This is only used for merging.
return false;
}
};
}
Expand Down
42 changes: 21 additions & 21 deletions lucene/core/src/java/org/apache/lucene/index/TermStates.java
Original file line number Diff line number Diff line change
Expand Up @@ -193,28 +193,28 @@ public IOSupplier<TermState> get(LeafReaderContext ctx) throws IOException {
this.states[ctx.ord] = EMPTY_TERMSTATE;
return null;
}
return () -> {
if (this.states[ctx.ord] == null) {
TermState state = null;
if (termExistsSupplier.get()) {
state = termsEnum.termState();
this.states[ctx.ord] = state;
} else {
this.states[ctx.ord] = EMPTY_TERMSTATE;
}
}
TermState state = this.states[ctx.ord];
if (state == EMPTY_TERMSTATE) {
return null;
}
return state;
};
}
TermState state = this.states[ctx.ord];
if (state == EMPTY_TERMSTATE) {
return null;
IOSupplier<TermState> stateSupplier =
() -> {
if (this.states[ctx.ord] == null) {
if (termExistsSupplier.get()) {
this.states[ctx.ord] = termsEnum.termState();
} else {
this.states[ctx.ord] = EMPTY_TERMSTATE;
}
}
TermState termState = this.states[ctx.ord];
return termState == EMPTY_TERMSTATE ? null : termState;
};
if (termExistsSupplier.doDefer()) {
return stateSupplier;
} else {
stateSupplier.get();
TermState termState = this.states[ctx.ord];
return termState == EMPTY_TERMSTATE ? null : () -> termState;
}
}
return () -> state;
TermState termState = this.states[ctx.ord];
return termState == EMPTY_TERMSTATE ? null : () -> termState;
}

/**
Expand Down
10 changes: 7 additions & 3 deletions lucene/core/src/java/org/apache/lucene/store/IndexInput.java
Original file line number Diff line number Diff line change
Expand Up @@ -202,8 +202,8 @@ public long readLong(long pos) throws IOException {
}

@Override
public void prefetch(long offset, long length) throws IOException {
slice.prefetch(offset, length);
public boolean prefetch(long offset, long length) throws IOException {
return slice.prefetch(offset, length);
}

@Override
Expand All @@ -223,8 +223,12 @@ public String toString() {
*
* @param offset start offset
* @param length the number of bytes to prefetch
* @return true if prefetch actually prefetched something, hence user can benefit from deferring
* reading the prefetched memory block.
*/
public void prefetch(long offset, long length) throws IOException {}
public boolean prefetch(long offset, long length) throws IOException {
return false;
}

/**
* Optional method: Updates the {@code IOContext} to specify a new read access pattern. IndexInput
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
import org.apache.lucene.util.ArrayUtil;
import org.apache.lucene.util.BitUtil;
import org.apache.lucene.util.Constants;
import org.apache.lucene.util.IOConsumer;
import org.apache.lucene.util.IOFunction;

/**
* Base IndexInput implementation that uses an array of MemorySegments to represent a file.
Expand Down Expand Up @@ -328,9 +328,9 @@ public void seek(long pos) throws IOException {
}

@Override
public void prefetch(long offset, long length) throws IOException {
public boolean prefetch(long offset, long length) throws IOException {
if (NATIVE_ACCESS.isEmpty()) {
return;
return false;
}

ensureOpen();
Expand All @@ -340,19 +340,21 @@ public void prefetch(long offset, long length) throws IOException {
// power of two. There is a good chance that a good chunk of this index input is cached in
// physical memory. Let's skip the overhead of the madvise system call, we'll be trying again
// on the next power of two of the counter.
return;
return false;
}

final NativeAccess nativeAccess = NATIVE_ACCESS.get();
advise(
return advise(
offset,
length,
segment -> {
if (segment.isLoaded() == false) {
// We have a cache miss on at least one page, let's reset the counter.
sharedPrefetchCounter.set(0);
nativeAccess.madviseWillNeed(segment);
return true;
}
return false;
});
}

Expand All @@ -369,14 +371,21 @@ private void updateReadAdvice(ReadAdvice readAdvice) throws IOException {

long offset = 0;
for (MemorySegment seg : segments) {
advise(offset, seg.byteSize(), segment -> nativeAccess.madvise(segment, readAdvice));
advise(
offset,
seg.byteSize(),
segment -> {
nativeAccess.madvise(segment, readAdvice);
return true;
});
offset += seg.byteSize();
}
}

void advise(long offset, long length, IOConsumer<MemorySegment> advice) throws IOException {
boolean advise(long offset, long length, IOFunction<MemorySegment, Boolean> advice)
throws IOException {
if (NATIVE_ACCESS.isEmpty()) {
return;
return false;
}

ensureOpen();
Expand Down Expand Up @@ -404,12 +413,12 @@ void advise(long offset, long length, IOConsumer<MemorySegment> advice) throws I
length -= nativeAccess.getPageSize();
if (length <= 0) {
// This segment has no data beyond the first page.
return;
return false;
}
}

final MemorySegment advisedSlice = segment.asSlice(offset, length);
advice.accept(advisedSlice);
return advice.apply(advisedSlice);
} catch (IndexOutOfBoundsException _) {
throw new EOFException("Read past EOF: " + this);
} catch (NullPointerException | IllegalStateException e) {
Expand Down Expand Up @@ -615,6 +624,7 @@ public final MemorySegmentIndexInput slice(
slice.length,
segment -> {
nativeAccess.madvise(segment, advice);
return true;
});
}
}
Expand Down Expand Up @@ -804,9 +814,9 @@ public MemorySegment segmentSliceOrNull(long pos, long len) throws IOException {
}

@Override
public void prefetch(long offset, long length) throws IOException {
public boolean prefetch(long offset, long length) throws IOException {
Objects.checkFromIndexSize(offset, length, this.length);
super.prefetch(offset, length);
return super.prefetch(offset, length);
}
}

Expand Down Expand Up @@ -904,9 +914,9 @@ MemorySegmentIndexInput buildSlice(String sliceDescription, long ofs, long lengt
}

@Override
public void prefetch(long offset, long length) throws IOException {
public boolean prefetch(long offset, long length) throws IOException {
Objects.checkFromIndexSize(offset, length, this.length);
super.prefetch(this.offset + offset, length);
return super.prefetch(this.offset + offset, length);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,9 @@ default void readBytes(long pos, byte[] bytes, int offset, int length) throws IO
*
* @see IndexInput#prefetch
*/
default void prefetch(long offset, long length) throws IOException {}
default boolean prefetch(long offset, long length) throws IOException {
return false;
}

/**
* Returns a hint whether all the contents of this input are resident in physical memory.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,4 +34,13 @@ public interface IOBooleanSupplier {
* @throws IOException if supplying the result throws an {@link IOException}
*/
boolean get() throws IOException;

/**
* Returns whether the TermState get should be deferred.
*
* @return {@code true} if the TermState get should be deferred, {@code false} otherwise
*/
default boolean doDefer() {
return false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -66,9 +66,10 @@ public CountingPrefetchIndexInput(IndexInput input, AtomicInteger counter) {
}

@Override
public void prefetch(long offset, long length) throws IOException {
public boolean prefetch(long offset, long length) throws IOException {
in.prefetch(offset, length);
counter.incrementAndGet();
return true;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,9 +68,10 @@ public CountingPrefetchIndexInput(IndexInput input, AtomicInteger counter) {
}

@Override
public void prefetch(long offset, long length) throws IOException {
public boolean prefetch(long offset, long length) throws IOException {
in.prefetch(offset, length);
counter.incrementAndGet();
return true;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -171,10 +171,10 @@ public void seek(long pos) throws IOException {
}

@Override
public void prefetch(long offset, long length) throws IOException {
public boolean prefetch(long offset, long length) throws IOException {
ensureOpen();
ensureAccessible();
in.prefetch(offset, length);
return in.prefetch(offset, length);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ private void onRead(long offset, int len) {
}

@Override
public void prefetch(long offset, long length) throws IOException {
public boolean prefetch(long offset, long length) throws IOException {
final long firstPage = (sliceOffset + offset) >> PAGE_SHIFT;
final long lastPage = (sliceOffset + offset + length - 1) >> PAGE_SHIFT;

Expand All @@ -152,6 +152,8 @@ public void prefetch(long offset, long length) throws IOException {
for (long page = firstPage; page <= lastPage; ++page) {
pendingPages.add(page);
}

return true;
}

@Override
Expand Down