Skip to content

Commit 4d44d21

Browse files
authored
fix: DH-21128: Always call Java Consumer's accept method regardless of whether partition value is provided (#7480)
Two new tests are added to cover the no-partition cases. However, the ticking table test seems to reveal a dead-lock situation, which could be a problem of the test data service.
1 parent f301fed commit 4d44d21

File tree

2 files changed

+63
-21
lines changed

2 files changed

+63
-21
lines changed

py/server/deephaven/experimental/table_data_service.py

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -401,8 +401,9 @@ def location_cb_proxy(
401401
):
402402
j_tbl_location_key = _JTableLocationKeyImpl(pt_location_key)
403403
if pt_table is None or pt_table.to_batches() is None:
404-
location_cb.apply(
405-
j_tbl_location_key, jpy.array("java.nio.ByteBuffer", [])
404+
location_cb.accept(
405+
j_tbl_location_key,
406+
jpy.array("java.nio.ByteBuffer", []),
406407
)
407408
else:
408409
if pt_table.num_rows != 1:
@@ -456,8 +457,9 @@ def location_cb_proxy(
456457
):
457458
j_tbl_location_key = _JTableLocationKeyImpl(pt_location_key)
458459
if pt_table is None:
459-
location_cb.apply(
460-
j_tbl_location_key, jpy.array("java.nio.ByteBuffer", [])
460+
location_cb.accept(
461+
j_tbl_location_key,
462+
jpy.array("java.nio.ByteBuffer", []),
461463
)
462464
else:
463465
if pt_table.num_rows != 1:

py/server/tests/test_table_data_service.py

Lines changed: 57 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,8 @@ def __init__(
7777
self.sub_new_partition_cancelled: bool = False
7878
self.sub_new_partition_fail_test: bool = False
7979
self.sub_partition_size_fail_test: bool = False
80+
self.table_location_no_partition_test: bool = False
81+
self.sub_table_location_no_partition_test: bool = False
8082
self.partitions: dict[TableLocationKey, pa.Table] = {}
8183
self.partitions_size_subscriptions: dict[TableLocationKey, bool] = {}
8284
self.existing_partitions_called: int = 0
@@ -115,13 +117,16 @@ def table_locations(
115117
partition_key = TableLocationKeyImpl(f"{ticker}/NYSE")
116118
self.partitions[partition_key] = pa_table
117119

118-
expr = (pc.field("Ticker") == f"{ticker}") & (
119-
pc.field("Exchange") == "NYSE"
120-
)
121-
location_cb(
122-
partition_key,
123-
pa_table.filter(expr).select(["Ticker", "Exchange"]).slice(0, 1),
124-
)
120+
if self.table_location_no_partition_test:
121+
location_cb(partition_key, None)
122+
else:
123+
expr = (pc.field("Ticker") == f"{ticker}") & (
124+
pc.field("Exchange") == "NYSE"
125+
)
126+
location_cb(
127+
partition_key,
128+
pa_table.filter(expr).select(["Ticker", "Exchange"]).slice(0, 1),
129+
)
125130
self.existing_partitions_called += 1
126131

127132
# indicate that we've finished notifying existing table locations
@@ -185,10 +190,13 @@ def _th_new_partitions(
185190
expr = (pc.field("Ticker") == f"{ticker}") & (
186191
pc.field("Exchange") == "NYSE"
187192
)
188-
location_cb(
189-
partition_key,
190-
pa_table.filter(expr).select(["Ticker", "Exchange"]).slice(0, 1),
191-
)
193+
if self.sub_table_location_no_partition_test:
194+
location_cb(partition_key, None)
195+
else:
196+
location_cb(
197+
partition_key,
198+
pa_table.filter(expr).select(["Ticker", "Exchange"]).slice(0, 1),
199+
)
192200
if self.sub_new_partition_fail_test:
193201
failure_cb(Exception("table location subscription failure"))
194202
return
@@ -217,10 +225,14 @@ def subscribe_to_table_locations(
217225
expr = (pc.field("Ticker") == f"{ticker}") & (
218226
pc.field("Exchange") == "NYSE"
219227
)
220-
location_cb(
221-
partition_key,
222-
pa_table.filter(expr).select(["Ticker", "Exchange"]).slice(0, 1),
223-
)
228+
if self.sub_table_location_no_partition_test:
229+
location_cb(partition_key, None)
230+
else:
231+
location_cb(
232+
partition_key,
233+
pa_table.filter(expr).select(["Ticker", "Exchange"]).slice(0, 1),
234+
)
235+
success_cb()
224236

225237
exec_ctx = get_exec_ctx()
226238
th = threading.Thread(
@@ -233,7 +245,6 @@ def _cancellation_callback():
233245
self.sub_new_partition_cancelled = True
234246
self.location_cb = None
235247

236-
success_cb()
237248
return _cancellation_callback
238249

239250
def _th_partition_size_changes(
@@ -296,6 +307,7 @@ def subscribe_to_table_location_size(
296307

297308
# need to initial size
298309
size_cb(self.partitions[table_location_key].num_rows)
310+
success_cb()
299311

300312
self.partitions_size_subscriptions[table_location_key] = True
301313
th = threading.Thread(
@@ -307,7 +319,6 @@ def subscribe_to_table_location_size(
307319
def _cancellation_callback():
308320
self.partitions_size_subscriptions[table_location_key] = False
309321

310-
success_cb()
311322
return _cancellation_callback
312323

313324

@@ -608,6 +619,35 @@ def test_partition_size_sub_failure(self):
608619

609620
self.assertTrue(table.is_failed)
610621

622+
def test_make_static_table_no_partition(self):
623+
backend = TestBackend(self.gen_pa_table(), pt_schema=self.pa_table.schema)
624+
backend.table_location_no_partition_test = True
625+
data_service = TableDataService(backend)
626+
table = data_service.make_table(TableKeyImpl("test"), refreshing=False)
627+
self.assertIsNotNone(table)
628+
self.assertFalse(table.columns[0].column_type == ColumnType.PARTITIONING)
629+
self.assertFalse(table.columns[1].column_type == ColumnType.PARTITIONING)
630+
self.assertEqual(table.columns[2:], self.test_table.columns[2:])
631+
self.assertEqual(table.size, 2)
632+
self.assertEqual(backend.existing_partitions_called, 1)
633+
self.assertEqual(backend.partition_size_called, 1)
634+
635+
def test_make_live_table_with_no_partition(self):
636+
backend = TestBackend(self.gen_pa_table(), pt_schema=self.pa_table.schema)
637+
data_service = TableDataService(backend)
638+
backend.sub_table_location_no_partition_test = True
639+
table = data_service.make_table(TableKeyImpl("test"), refreshing=True)
640+
self.assertIsNotNone(table)
641+
self.assertFalse(table.columns[0].column_type == ColumnType.PARTITIONING)
642+
self.assertFalse(table.columns[1].column_type == ColumnType.PARTITIONING)
643+
self.assertEqual(table.columns[2:], self.test_table.columns[2:])
644+
645+
self.wait_ticking_table_update(table, 20, 5)
646+
647+
self.assertGreaterEqual(table.size, 20)
648+
self.assertEqual(backend.existing_partitions_called, 0)
649+
self.assertEqual(backend.partition_size_called, 0)
650+
611651

612652
if __name__ == "__main__":
613653
unittest.main()

0 commit comments

Comments
 (0)