Skip to content

Commit d92a5f9

Browse files
authored
fix: DH-20516: Add implicit barriers to serial Selectables. (#7252) (#7255)
1 parent d687f04 commit d92a5f9

File tree

5 files changed

+147
-8
lines changed

5 files changed

+147
-8
lines changed

docs/groovy/conceptual/query-engine/parallelization.md

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,10 @@ The [`ConcurrencyControl`](https://docs.deephaven.io/core/javadoc/io/deephaven/a
8585
To explicitly mark a `Selectable` or `Filter` as stateful, use the `withSerial` method.
8686

8787
- A serial `Filter` cannot be reordered with respect to other `Filter`s. Every input row to a stateful `Filter` is evaluated in order.
88-
- When a `Selectable` is serial, then every row for that column is evaluated in order. For `Selectable`s, no additional ordering between expressions is imposed. As with every `select` or `update` call, if column B references column A, then the necessary inputs to column B from column A are evaluated before column B is evaluated. To impose further ordering constraints, use barriers.
88+
- When a `Selectable` is serial, then every row for that column is evaluated in order.
89+
- For `Selectable`s, additional ordering constraints are controlled by the value of the `QueryTable.SERIAL_SELECT_IMPLICIT_BARRIERS`. This is set by the property `QueryTable.serialSelectImplicitBarriers` (defaulting to the value of `QueryTable.statelessSelectByDefault`).
90+
- If `QueryTable.SERIAL_SELECT_IMPLICIT_BARRIERS` is false, no additional ordering between expressions is imposed. As with every `select` or `update` call, if column B references column A, then the necessary inputs to column B from column A are evaluated before column B is evaluated. To impose further ordering constraints, use barriers.
91+
- If `QueryTable.SERIAL_SELECT_IMPLICIT_BARRIERS` is true, then a serial `Selectable` is an absolute barrier with respect to all other serial `Selectable`s. This prohibits serial `Selectable`s from being evaluated concurrently, permitting them to access global state. `Selectable`s that are not serial may be reordered with respect to a serial selectable.
8992

9093
`Filter`s and `Selectable`s may declare a _barrier_. A barrier is an opaque object (compared using reference equality) that is used to mark a particular `Filter` or `Selectable`. Subsequent `Filter`s or `Selectable`s may respect a previously declared barrier. If a `Filter` respects a barrier, that `Filter` cannot begin evaluation until the `Filter` that declares the barrier has been completely evaluated. Similarly, if a `Selectable` respects a barrier, it cannot begin evaluation until the `Selectable` that declared the barrier has been completely evaluated.
9194

engine/table/src/main/java/io/deephaven/engine/table/impl/QueryTable.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -317,6 +317,14 @@ public interface MemoizableOperation<T extends DynamicNode & NotificationStepRec
317317
public static boolean STATELESS_SELECT_BY_DEFAULT =
318318
Configuration.getInstance().getBooleanWithDefault("QueryTable.statelessSelectByDefault", false);
319319

320+
/**
321+
* If set to true, then stateful SelectColumns form implicit barriers. If set to false, then StatefulSelectColumns
322+
* do not form implicit barriers.
323+
*/
324+
public static boolean SERIAL_SELECT_IMPLICIT_BARRIERS =
325+
Configuration.getInstance().getBooleanWithDefault("QueryTable.serialSelectImplicitBarriers",
326+
STATELESS_SELECT_BY_DEFAULT);
327+
320328
private static final AtomicReferenceFieldUpdater<QueryTable, ModifiedColumnSet> MODIFIED_COLUMN_SET_UPDATER =
321329
AtomicReferenceFieldUpdater.newUpdater(QueryTable.class, ModifiedColumnSet.class, "modifiedColumnSet");
322330
private static final AtomicReferenceFieldUpdater<QueryTable, Map> CACHED_OPERATIONS_UPDATER =

engine/table/src/main/java/io/deephaven/engine/table/impl/select/analyzers/SelectAndViewAnalyzer.java

Lines changed: 31 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -158,11 +158,12 @@ public static AnalyzerContext createContext(
158158
compilationProcessor.compile();
159159

160160
final Map<Object, Integer> barrierToLayerIndex = new IdentityHashMap<>();
161+
final List<Object> implicitSerialBarriers = new ArrayList<>();
161162

162163
// Second pass builds the analyzer and destination columns
163164
final HashMap<String, ColumnSource<?>> resultAlias = new HashMap<>();
164165
for (int columnIndex = 0; columnIndex < context.processedCols.size(); ++columnIndex) {
165-
final SelectColumn sc = context.processedCols.get(columnIndex);
166+
SelectColumn sc = context.processedCols.get(columnIndex);
166167

167168
// if this select column depends on result column then its updates must happen in result-key-space
168169
// note: if flatResult is true then we are not preserving any parent columns
@@ -172,6 +173,17 @@ public static AnalyzerContext createContext(
172173

173174
sc.initInputs(rowSet, useResultKeySpace ? context.allSourcesInResultKeySpace : context.allSources);
174175

176+
if (QueryTable.SERIAL_SELECT_IMPLICIT_BARRIERS && !sc.isStateless()) {
177+
final Object implicitBarrier = new ImplicitBarrier(sc.getName(), columnIndex);
178+
if (!implicitSerialBarriers.isEmpty()) {
179+
sc = sc.withRespectedBarriers(implicitSerialBarriers.toArray())
180+
.withDeclaredBarriers(implicitBarrier);
181+
} else {
182+
sc = sc.withDeclaredBarriers(implicitBarrier);
183+
}
184+
implicitSerialBarriers.add(implicitBarrier);
185+
}
186+
175187
// TODO (deephaven-core#5760): If layers may define more than one column, we'll need to fix resultAlias.
176188
// new columns shadow known aliases
177189
resultAlias.remove(sc.getName());
@@ -882,6 +894,24 @@ public void applyUpdate(
882894
scheduler.tryToKickOffWork();
883895
}
884896

897+
/**
898+
* A class used as an implicit barrier for serial selectables, with a useful toString.
899+
*/
900+
private static class ImplicitBarrier {
901+
private final String name;
902+
private final int fci;
903+
904+
public ImplicitBarrier(String name, int fci) {
905+
this.name = name;
906+
this.fci = fci;
907+
}
908+
909+
@Override
910+
public String toString() {
911+
return "Implicit barrier for Serial column " + name + " (index " + fci + ")";
912+
}
913+
}
914+
885915
private class UpdateScheduler {
886916
private final ReentrantLock runLock = new ReentrantLock();
887917

engine/table/src/test/java/io/deephaven/engine/table/impl/QueryTableSelectBarrierTest.java

Lines changed: 86 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919
import io.deephaven.engine.util.TableTools;
2020
import io.deephaven.util.SafeCloseable;
2121
import io.deephaven.util.thread.ThreadInitializationFactory;
22-
import org.junit.Ignore;
2322
import org.junit.Rule;
2423
import org.junit.Test;
2524

@@ -73,6 +72,7 @@ private void testBarrierSelectColumn() {
7372
final SafeCloseable ignored3 = threadPool::shutdown) {
7473
QueryTable.FORCE_PARALLEL_SELECT_AND_UPDATE = true;
7574
QueryTable.STATELESS_SELECT_BY_DEFAULT = false;
75+
QueryTable.SERIAL_SELECT_IMPLICIT_BARRIERS = false;
7676

7777
final SelectColumn sa =
7878
SelectColumnFactory.getExpression("A=slow_a.getAndIncrementSlow(0, new int[]{ 0, 1 })");
@@ -156,6 +156,7 @@ private void testBarrierSelectable() {
156156
final SafeCloseable ignored3 = threadPool::shutdown) {
157157
QueryTable.FORCE_PARALLEL_SELECT_AND_UPDATE = true;
158158
QueryTable.STATELESS_SELECT_BY_DEFAULT = true;
159+
QueryTable.SERIAL_SELECT_IMPLICIT_BARRIERS = false;
159160

160161
final Selectable sa =
161162
Selectable.of(ColumnName.of("A"), Method.of(ColumnName.of("AtomicInt"), "getAndIncrementSlow",
@@ -227,6 +228,79 @@ private void testBarrierSelectable() {
227228
}
228229
}
229230

231+
// if you would like to convince yourself that we are reliably producing out-of-order conditions as appropriate
232+
@Test
233+
public void testRepeatedImplicitBarriers() {
234+
for (int ii = 0; ii < REPEATS_FOR_CONFIDENCE; ++ii) {
235+
System.out.println("Repetition " + ii);
236+
try (final SafeCloseable ignored = LivenessScopeStack.open()) {
237+
testImplicitBarriers();
238+
}
239+
}
240+
}
241+
242+
private void testImplicitBarriers() {
243+
final int segments = 10;
244+
final int size = Math.toIntExact(QueryTable.MINIMUM_PARALLEL_SELECT_ROWS * segments);
245+
final SlowedAtomicInteger slow_a = new SlowedAtomicInteger(0, 2);
246+
QueryScope.addParam("slow_a", slow_a);
247+
QueryScope.addParam("size", size);
248+
final Table x = TableTools.emptyTable(size)
249+
.updateView("AtomicInt=slow_a")
250+
.updateView("SecondChecks=new int[]{0}")
251+
.updateView("FirstChecks=new int[]{1}");
252+
253+
// use segments times the number of columns we are evaluating threads, so we can start off each of the
254+
// individual blocks of data at once to maximize "chaos" in
255+
// terms of the atomic integers being mixed up
256+
final OperationInitializationThreadPool threadPool =
257+
new OperationInitializationThreadPool(ThreadInitializationFactory.NO_OP, segments * 4);
258+
final ExecutionContext executionContext = ExecutionContext.getContext().withOperationInitializer(threadPool);
259+
try (final SafeCloseable ignored = executionContext.open();
260+
final SafeCloseable ignored2 = new SaveQueryTableOptions();
261+
final SafeCloseable ignored3 = threadPool::shutdown) {
262+
QueryTable.FORCE_PARALLEL_SELECT_AND_UPDATE = true;
263+
QueryTable.STATELESS_SELECT_BY_DEFAULT = true;
264+
QueryTable.SERIAL_SELECT_IMPLICIT_BARRIERS = true;
265+
266+
final Selectable sa =
267+
Selectable.of(ColumnName.of("A"), Method.of(ColumnName.of("AtomicInt"), "getAndIncrementSlow",
268+
Literal.of(0), ColumnName.of("FirstChecks")));
269+
final Selectable sb =
270+
Selectable.of(ColumnName.of("B"), Method.of(ColumnName.of("AtomicInt"), "getAndIncrementSlow",
271+
Literal.of(1), ColumnName.of("SecondChecks")));
272+
273+
final Selectable sc = Selectable.of(ColumnName.of("C"), Literal.of(1));
274+
275+
slow_a.reset(0, -1);
276+
System.out.println(Instant.now() + ": serial");
277+
final Table y = x.update(List.of(sa.withSerial(), sb.withSerial()));
278+
279+
final Table expected =
280+
x.updateView(List.of(SelectColumn.ofStateless(SelectColumnFactory.getExpression("A=i"),
281+
SelectColumnFactory.getExpression("B=size + i"))));
282+
assertTableEquals(expected, y);
283+
284+
// now do the same thing with an extra column that is not stateless (so we certainly parallelize)
285+
slow_a.reset(0, -1);
286+
System.out.println(Instant.now() + ": serial + const");
287+
final Table y2 = x.update(List.of(sa.withSerial(), sb.withSerial(), sc));
288+
assertTableEquals(expected.update("C=1"), y2);
289+
290+
// now turn off the implicit barriers
291+
QueryTable.SERIAL_SELECT_IMPLICIT_BARRIERS = false;
292+
((QueryTable) x).clearMemoizedResults();
293+
slow_a.reset(0, 1);
294+
slow_a.setMinThreads(1);
295+
final Table y3 = x.update(List.of(sa.withSerial(), sb.withSerial(), sc));
296+
checkTotalSum(size, y3);
297+
checkMixedColumns(y3, true);
298+
assertFalse(isOutOfOrder(y3, "A"));
299+
assertFalse(isOutOfOrder(y3, "B"));
300+
System.out.println(Instant.now() + ": no barrier");
301+
}
302+
}
303+
230304
// if you would like to convince yourself that we are reliably producing out-of-order conditions as appropriate
231305
@Test
232306
public void testRepeatedBarrierAcrossShift() {
@@ -254,6 +328,7 @@ private void testBarrierAcrossShift() {
254328
final SafeCloseable ignored3 = threadPool::shutdown) {
255329
QueryTable.FORCE_PARALLEL_SELECT_AND_UPDATE = true;
256330
QueryTable.STATELESS_SELECT_BY_DEFAULT = false;
331+
QueryTable.SERIAL_SELECT_IMPLICIT_BARRIERS = false;
257332

258333
final SelectColumn sa =
259334
SelectColumnFactory.getExpression("A=slow_a.getAndIncrementSlow(0, new int[] { 0, 1, 0, 1 })");
@@ -363,6 +438,7 @@ private void testBarrierAliases() {
363438
final SafeCloseable ignored3 = threadPool::shutdown) {
364439
QueryTable.FORCE_PARALLEL_SELECT_AND_UPDATE = true;
365440
QueryTable.STATELESS_SELECT_BY_DEFAULT = false;
441+
QueryTable.SERIAL_SELECT_IMPLICIT_BARRIERS = false;
366442

367443
final SelectColumn sa = SelectColumnFactory
368444
.getExpression("A=slow_a.getAndIncrementSlow(0, new int[] { 0, 1 })");
@@ -654,11 +730,13 @@ private static boolean isOutOfOrder(Table z, final String column) {
654730
private static class SaveQueryTableOptions implements SafeCloseable {
655731
final boolean oldForceParallel = QueryTable.FORCE_PARALLEL_SELECT_AND_UPDATE;
656732
final boolean oldStateless = QueryTable.STATELESS_SELECT_BY_DEFAULT;
733+
final boolean oldSerialBarriers = QueryTable.SERIAL_SELECT_IMPLICIT_BARRIERS;
657734

658735
@Override
659736
public void close() {
660737
QueryTable.FORCE_PARALLEL_SELECT_AND_UPDATE = oldForceParallel;
661738
QueryTable.STATELESS_SELECT_BY_DEFAULT = oldStateless;
739+
QueryTable.SERIAL_SELECT_IMPLICIT_BARRIERS = oldSerialBarriers;
662740
}
663741
}
664742

@@ -680,6 +758,7 @@ public static class SlowedAtomicInteger {
680758
private final List<Set<Thread>> columnThreads;
681759
int minChecks;
682760
int maxChecks;
761+
int minThreads = 2;
683762

684763
/**
685764
*
@@ -702,7 +781,7 @@ public static class SlowedAtomicInteger {
702781
/**
703782
* Do the get and increment on the underlying atomic integer, possibly sleeping to cause data to be interspersed
704783
* if there are not declared barriers.
705-
*
784+
*
706785
* @param columnNumber the column number we are updating
707786
* @param checkThreads an array of column numbers that we should verify had at least two threads access them. -1
708787
* indicates that we are not checking this position. The {@link #reset(int, int)} call enables us to
@@ -733,7 +812,7 @@ private void mabyeWaitABit(int columnNumber, int[] checkThreads) {
733812
if (checkThreads[ii] < 0) {
734813
continue;
735814
}
736-
sleepRequired = sleepRequired || columnThreads.get(checkThread).size() < 2;
815+
sleepRequired = sleepRequired || columnThreads.get(checkThread).size() < minThreads;
737816
}
738817

739818
if (sleepRequired) {
@@ -751,6 +830,10 @@ void reset(final int minChecks, final int maxChecks) {
751830
this.maxChecks = maxChecks;
752831
columnThreads.forEach(Set::clear);
753832
}
833+
834+
void setMinThreads(int minThreads) {
835+
this.minThreads = minThreads;
836+
}
754837
}
755838
}
756839

table-api/src/main/java/io/deephaven/api/ConcurrencyControl.java

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -37,9 +37,24 @@ public interface ConcurrencyControl<T> {
3737
* parts of a filter are executed out of order relative to this serial wrapper.
3838
* </p>
3939
* <p>
40-
* For selectables, no additional ordering between expressions is imposed. As with every select or update call, if
41-
* column B references column A, then the necessary inputs from column A are evaluated before column B is evaluated.
42-
* To impose further ordering constraints, use barriers.
40+
* For selectables, additional ordering constraints are controlled by the value of the
41+
* {@code QueryTable.SERIAL_SELECT_IMPLICIT_BARRIERS}. This is set by the property
42+
* {@code QueryTable.serialSelectImplicitBarriers} (defaulting to the value of
43+
* {@code QueryTable.statelessSelectByDefault}).
44+
* </p>
45+
*
46+
* <p>
47+
* If {@code QueryTable.SERIAL_SELECT_IMPLICIT_BARRIERS} is false, then no additional ordering between selectable
48+
* expressions is imposed. As with every select or update call, if column B references column A, then the necessary
49+
* inputs from column A are evaluated before column B is evaluated. To impose further ordering constraints, use
50+
* barriers.
51+
* </p>
52+
*
53+
* <p>
54+
* If {@code QueryTable.SERIAL_SELECT_IMPLICIT_BARRIERS} is true, then a serial selectable is an absolute barrier
55+
* with respect to all other serial selectables. This prohibits serial selectables from being evaluated
56+
* concurrently, permitting them to access global state. Selectables that are not serial may be reordered with
57+
* respect to a serial selectable.
4358
* </p>
4459
* </li>
4560
* </ul>

0 commit comments

Comments
 (0)