Skip to content

Commit ac06756

Browse files
feat: DH-19475 predicate pushdown - parquet dictionary support (#7062)
Co-authored-by: Larry Booker <lbooker42@gmail.com>
1 parent 358dfac commit ac06756

17 files changed

Lines changed: 1036 additions & 120 deletions

File tree

engine/table/src/main/java/io/deephaven/engine/table/impl/AbstractFilterExecution.java

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -484,14 +484,17 @@ private void filterStatelessCollection(
484484
try {
485485
for (int ii = 0; ii < filters.size(); ii++) {
486486
final WhereFilter filter = filters.get(ii);
487-
487+
// Only consider column sources that are actually present in the source table, because filters may refer
488+
// to columns like "i" or "ii" that are not actually in the table.
489+
final Map<String, ColumnSource<?>> columnSourceMap = sourceTable.getColumnSourceMap();
488490
final List<ColumnSource<?>> filterSources = filter.getColumns().stream()
489-
.map(sourceTable::getColumnSource).collect(Collectors.toList());
491+
.filter(columnSourceMap::containsKey)
492+
.map(sourceTable::getColumnSource)
493+
.collect(Collectors.toList());
490494
final PushdownFilterMatcher executor =
491495
PushdownFilterMatcher.getPushdownFilterMatcher(filter, filterSources);
492496
if (executor != null) {
493-
final PushdownFilterContext context = executor.makePushdownFilterContext(filter, filter.getColumns()
494-
.stream().map(sourceTable::getColumnSource).collect(Collectors.toList()));
497+
final PushdownFilterContext context = executor.makePushdownFilterContext(filter, filterSources);
495498
statelessFilters[ii] = new StatelessFilter(ii, filter, executor, context, barrierDependencies);
496499
} else {
497500
statelessFilters[ii] = new StatelessFilter(ii, filter, null, null, barrierDependencies);

engine/table/src/main/java/io/deephaven/engine/table/impl/BasePushdownFilterContext.java

Lines changed: 274 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -3,16 +3,107 @@
33
//
44
package io.deephaven.engine.table.impl;
55

6+
import io.deephaven.api.Strings;
7+
import io.deephaven.chunk.Chunk;
8+
import io.deephaven.chunk.LongChunk;
9+
import io.deephaven.chunk.WritableLongChunk;
10+
import io.deephaven.chunk.attributes.Values;
11+
import io.deephaven.engine.liveness.LivenessScopeStack;
12+
import io.deephaven.engine.rowset.RowSet;
13+
import io.deephaven.engine.rowset.TrackingRowSet;
14+
import io.deephaven.engine.rowset.chunkattributes.OrderedRowKeys;
15+
import io.deephaven.engine.table.ColumnSource;
16+
import io.deephaven.engine.table.Table;
17+
import io.deephaven.engine.table.impl.chunkfilter.ChunkFilter;
18+
import io.deephaven.engine.table.impl.select.*;
19+
import io.deephaven.engine.table.impl.sources.NullValueColumnSource;
20+
import io.deephaven.engine.util.TableTools;
21+
import io.deephaven.util.SafeCloseable;
22+
import org.jetbrains.annotations.MustBeInvokedByOverriders;
23+
24+
import java.util.List;
625
import java.util.Map;
726

827
/**
928
* Base class for {@link PushdownFilterContext} to help with execution cost tracking.
1029
*/
11-
public abstract class BasePushdownFilterContext implements PushdownFilterContext {
12-
protected long executedFilterCost;
30+
public class BasePushdownFilterContext implements PushdownFilterContext {
31+
32+
/**
33+
* Enum for the behavior of a filter when applied to null values.
34+
*/
35+
public enum FilterNullBehavior {
36+
/**
37+
* The filter includes nulls in its results, like {@code x == null}.
38+
*/
39+
INCLUDES_NULLS,
40+
41+
/**
42+
* The filter does not include nulls in its results, like {@code x > 5}.
43+
*/
44+
EXCLUDES_NULLS,
45+
46+
/**
47+
* The filter throws an exception when applied to nulls, like {@code x.beginsWith("A")}.
48+
*/
49+
FAILS_ON_NULLS
50+
}
51+
52+
protected final WhereFilter filter;
53+
private final List<ColumnSource<?>> columnSources;
54+
55+
private final boolean isRangeFilter;
56+
private final boolean isMatchFilter;
57+
private final boolean supportsChunkFilter;
58+
private final boolean filterSupportsPushdown;
59+
60+
private long executedFilterCost;
61+
62+
/**
63+
* The behavior of this filter when applied to null values. This is lazily computed on first access. Should be
64+
* accessed via {@link #filterNullBehavior()}.
65+
*/
66+
private volatile FilterNullBehavior filterNullBehavior;
67+
68+
/**
69+
* A dummy table to use for initializing {@link ConditionFilter}. This is lazily computed on first access. Should be
70+
* accessed via {@link #conditionalFilterInitTable()}.
71+
*/
72+
private volatile Table conditionalFilterInitTable;
73+
74+
/**
75+
* Interface for a unified chunk filter that can be used to apply a filter to a chunk of data, whether the
76+
* underlying filter is a {@link ExposesChunkFilter} or a {@link ConditionFilter}.
77+
*/
78+
public interface UnifiedChunkFilter extends SafeCloseable {
79+
LongChunk<OrderedRowKeys> filter(Chunk<? extends Values> values, LongChunk<OrderedRowKeys> keys);
80+
}
81+
82+
public BasePushdownFilterContext(
83+
final WhereFilter filter,
84+
final List<ColumnSource<?>> columnSources) {
85+
this.filter = filter;
86+
this.columnSources = columnSources;
1387

14-
public BasePushdownFilterContext() {
1588
executedFilterCost = 0;
89+
90+
isRangeFilter = filter instanceof RangeFilter
91+
&& ((RangeFilter) filter).getRealFilter() instanceof AbstractRangeFilter;
92+
isMatchFilter = filter instanceof MatchFilter &&
93+
((MatchFilter) filter).getFailoverFilterIfCached() == null;
94+
final boolean isConditionFilter = filter instanceof ConditionFilter;
95+
96+
// TODO (DH-19666): Multi column filters are not supported yet
97+
filterSupportsPushdown = isRangeFilter || isMatchFilter ||
98+
(isConditionFilter && ((ConditionFilter) filter).getNumInputsUsed() == 1);
99+
// Do not use columnSources.size(), multiple logical columns may alias (rename) the same physical column,
100+
// yielding a single entry.
101+
102+
supportsChunkFilter = filterSupportsPushdown &&
103+
((filter instanceof ExposesChunkFilter && ((ExposesChunkFilter) filter).chunkFilter().isPresent())
104+
|| isConditionFilter);
105+
106+
filterNullBehavior = null; // lazily initialized
16107
}
17108

18109
@Override
@@ -26,13 +117,189 @@ public void updateExecutedFilterCost(long executedFilterCost) {
26117
}
27118

28119
/**
29-
* The mapping from filter column names to column names from the table definition if they differ. User should use
30-
* this mapping as {@code renameMap().getOrDefault(colNameFromFilter, colNameFromFilter)}
120+
* Get the column sources this filter will use.
31121
*/
32-
public abstract Map<String, String> renameMap();
122+
public List<ColumnSource<?>> columnSources() {
123+
return columnSources;
124+
}
125+
126+
/**
127+
* Whether this is a simple range filter, not implemented by a ConditionFilter.
128+
*/
129+
public boolean isRangeFilter() {
130+
return isRangeFilter;
131+
}
132+
133+
/**
134+
* Whether this is a MatchFilter.
135+
*/
136+
public boolean isMatchFilter() {
137+
return isMatchFilter;
138+
}
139+
140+
/**
141+
* Whether this filter supports pushdown-based filtering. This includes simple range filters, match filters, and
142+
* ConditionFilters with exactly one column.
143+
*/
144+
public boolean filterSupportsPushdown() {
145+
return filterSupportsPushdown;
146+
}
147+
148+
/**
149+
* Whether this filter supports direct chunk filtering, i.e., it can be applied to a chunk of data rather than a
150+
* table. This includes any filter that implements {#@link ExposesChunkFilter} or {@link ConditionFilter} with
151+
* exactly one column.
152+
*/
153+
public boolean supportsChunkFilter() {
154+
return supportsChunkFilter;
155+
}
156+
157+
/**
158+
* Get the behavior of this filter when applied to null values. This is lazily computed on first access.
159+
*/
160+
public FilterNullBehavior filterNullBehavior() {
161+
FilterNullBehavior local = filterNullBehavior;
162+
if (local == null) {
163+
synchronized (this) {
164+
local = filterNullBehavior;
165+
if (local == null) {
166+
local = computeFilterNullBehavior();
167+
filterNullBehavior = local;
168+
}
169+
}
170+
}
171+
return local;
172+
}
173+
174+
private FilterNullBehavior computeFilterNullBehavior() {
175+
// Create a dummy table with a single row and column, and `null` entry, and apply the filter to see
176+
// if the filter includes nulls.
177+
final ColumnSource<?> columnSource = columnSources.get(0);
178+
final NullValueColumnSource<?> nullValueColumnSource =
179+
NullValueColumnSource.getInstance(columnSource.getType(), columnSource.getComponentType());
180+
final Map<String, ColumnSource<?>> columnSourceMap =
181+
Map.of(filter.getColumns().get(0), nullValueColumnSource);
182+
try (final SafeCloseable ignored = LivenessScopeStack.open()) {
183+
final Table nullTestDummyTable = TableTools.newTable(1, columnSourceMap);
184+
final TrackingRowSet rowSet = nullTestDummyTable.getRowSet();
185+
try (final RowSet result = filter.filter(rowSet, rowSet, nullTestDummyTable, false)) {
186+
return result.isEmpty() ? FilterNullBehavior.EXCLUDES_NULLS : FilterNullBehavior.INCLUDES_NULLS;
187+
} catch (final Exception e) {
188+
return FilterNullBehavior.FAILS_ON_NULLS;
189+
}
190+
}
191+
}
192+
193+
/**
194+
* Create a {@link UnifiedChunkFilter} for the {@link WhereFilter} that efficiently filters chunks of data. Every
195+
* thread that uses this should create its own instance and must close it after use.
196+
*
197+
* @param maxChunkSize the maximum size of the chunk that will be filtered
198+
* @return the initialized {@link UnifiedChunkFilter}
199+
*/
200+
public final UnifiedChunkFilter createChunkFilter(final int maxChunkSize) {
201+
if (!supportsChunkFilter) {
202+
throw new UnsupportedOperationException("Filter does not support chunk filtering: " + Strings.of(filter));
203+
}
204+
if (filter instanceof ExposesChunkFilter) {
205+
final ChunkFilter chunkFilter = ((ExposesChunkFilter) filter).chunkFilter()
206+
.orElseThrow(() -> new IllegalStateException("ExposesChunkFilter#chunkFilter() returned null."));
207+
return new DirectChunkFilter(chunkFilter, maxChunkSize);
208+
} else if (filter instanceof ConditionFilter) {
209+
// Create a dummy table with no rows and single column of the correct type and name as the filter. This is
210+
// used to extract a chunk filter kernel from the conditional filter and bind it to the correct name and
211+
// type without capturing references to the actual table or its column sources.
212+
final Table initTable = conditionalFilterInitTable();
213+
try {
214+
final ConditionFilter conditionFilter = (ConditionFilter) filter;
215+
final AbstractConditionFilter.Filter acfFilter =
216+
conditionFilter.getFilter(initTable, initTable.getRowSet());
217+
return new ConditionKernelChunkFilter(acfFilter, maxChunkSize);
218+
} catch (final Exception e) {
219+
throw new IllegalArgumentException("Error creating condition filter in BasePushdownFilterContext", e);
220+
}
221+
} else {
222+
throw new UnsupportedOperationException(
223+
"Filter does not support chunk filtering: " + Strings.of(filter));
224+
}
225+
}
226+
227+
private Table conditionalFilterInitTable() {
228+
Table local = conditionalFilterInitTable;
229+
if (local == null) {
230+
synchronized (this) {
231+
local = conditionalFilterInitTable;
232+
if (local == null) {
233+
final Map<String, ColumnSource<?>> columnSourceMap = Map.of(filter.getColumns().get(0),
234+
NullValueColumnSource.getInstance(
235+
columnSources.get(0).getType(),
236+
columnSources.get(0).getComponentType()));
237+
local = TableTools.newTable(0, columnSourceMap);
238+
conditionalFilterInitTable = local;
239+
}
240+
}
241+
}
242+
return local;
243+
}
244+
245+
246+
/**
247+
* A {@link UnifiedChunkFilter} that wraps a {@link ChunkFilter} directly.
248+
*/
249+
private static final class DirectChunkFilter implements UnifiedChunkFilter {
250+
private final ChunkFilter chunkFilter;
251+
private final WritableLongChunk<OrderedRowKeys> resultChunk;
252+
253+
private DirectChunkFilter(final ChunkFilter chunkFilter, final int maxChunkSize) {
254+
this.chunkFilter = chunkFilter;
255+
// We need to create a WritableLongChunk to hold the results of the chunk filter.
256+
this.resultChunk = WritableLongChunk.makeWritableChunk(maxChunkSize);
257+
}
258+
259+
@Override
260+
public LongChunk<OrderedRowKeys> filter(Chunk<? extends Values> values, LongChunk<OrderedRowKeys> keys) {
261+
chunkFilter.filter(values, keys, resultChunk);
262+
return resultChunk;
263+
}
264+
265+
@Override
266+
public void close() {
267+
resultChunk.close();
268+
}
269+
}
270+
271+
/**
272+
* A {@link UnifiedChunkFilter} that wraps a {@link ConditionFilter} by extracting its kernel and context.
273+
*/
274+
private static final class ConditionKernelChunkFilter implements UnifiedChunkFilter {
275+
private final AbstractConditionFilter.Filter acfFilter;
276+
private final ConditionFilter.FilterKernel.Context conditionFilterContext;
277+
278+
private ConditionKernelChunkFilter(
279+
final AbstractConditionFilter.Filter acfFilter,
280+
final int maxChunkSize) {
281+
this.acfFilter = acfFilter;
282+
// Create the context for the ConditionFilter, which will be used to filter chunks.
283+
this.conditionFilterContext = acfFilter.getContext(maxChunkSize);
284+
}
285+
286+
@Override
287+
public LongChunk<OrderedRowKeys> filter(
288+
Chunk<? extends Values> values,
289+
LongChunk<OrderedRowKeys> keys) {
290+
// noinspection unchecked
291+
return acfFilter.filter(conditionFilterContext, keys, new Chunk[] {values});
292+
}
293+
294+
@Override
295+
public void close() {
296+
conditionFilterContext.close();
297+
}
298+
}
33299

300+
@MustBeInvokedByOverriders
34301
@Override
35302
public void close() {
36-
// No-op
303+
conditionalFilterInitTable = null;
37304
}
38305
}

engine/table/src/main/java/io/deephaven/engine/table/impl/PushdownResult.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,10 @@ public final class PushdownResult implements SafeCloseable {
4040
* Requires querying an in-memory index structure
4141
*/
4242
public static final long IN_MEMORY_DATA_INDEX_COST = 30_000L;
43+
/**
44+
* Requires reading a dictionary to determine matches
45+
*/
46+
public static final long DICTIONARY_DATA_COST = 35_000L;
4347
/**
4448
* Requires using binary search on sorted data
4549
*/

engine/table/src/main/java/io/deephaven/engine/table/impl/QueryTable.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -271,6 +271,13 @@ public interface MemoizableOperation<T extends DynamicNode & NotificationStepRec
271271
public static boolean DISABLE_WHERE_PUSHDOWN_DATA_INDEX =
272272
Configuration.getInstance().getBooleanWithDefault("QueryTable.disableWherePushdownDataIndex", false);
273273

274+
/**
275+
* Disable the usage of parquet row group dictionaries during push-down filtering.
276+
*/
277+
public static boolean DISABLE_WHERE_PUSHDOWN_PARQUET_DICTIONARY =
278+
Configuration.getInstance().getBooleanWithDefault("QueryTable.disableWherePushdownParquetDictionary",
279+
false);
280+
274281
/**
275282
* You can choose to enable or disable the column parallel select and update.
276283
*/
@@ -303,7 +310,8 @@ public interface MemoizableOperation<T extends DynamicNode & NotificationStepRec
303310

304311
/**
305312
* If set to true, then the default behavior of condition filters is to be stateless. Stateless filters are allowed
306-
* to be processed in parallel by the engine.
313+
* to be processed in parallel by the engine. Also, enabling this setting allows the engine to push down filters to
314+
* the data source when possible, like in case of parquet files.
307315
*/
308316
public static boolean STATELESS_FILTERS_BY_DEFAULT =
309317
Configuration.getInstance().getBooleanWithDefault("QueryTable.statelessFiltersByDefault", false);

engine/table/src/main/java/io/deephaven/engine/table/impl/select/ConditionFilter.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,10 +80,26 @@ public static WhereFilter createConditionFilter(@NotNull String formula) {
8080
return createConditionFilter(formula, FormulaParserConfiguration.parser);
8181
}
8282

83+
public static WhereFilter createStateless(@NotNull String formula) {
84+
return new ConditionFilter(formula) {
85+
@Override
86+
public boolean permitParallelization() {
87+
return true;
88+
}
89+
};
90+
}
91+
8392
String getClassBodyStr() {
8493
return classBody;
8594
}
8695

96+
/**
97+
* Get the number of inputs (columns and special variables) used by this filter.
98+
*/
99+
public int getNumInputsUsed() {
100+
return usedInputs.size();
101+
}
102+
87103
public interface FilterKernel<CONTEXT extends FilterKernel.Context> {
88104
class Context implements io.deephaven.engine.table.Context {
89105
public final WritableLongChunk<OrderedRowKeys> resultChunk;

0 commit comments

Comments
 (0)