Skip to content

Commit c2e30ce

Browse files
authored
feat: Deletion vectors (#19)
1 parent 41358ad commit c2e30ce

File tree

3 files changed

+58
-5
lines changed

3 files changed

+58
-5
lines changed

README.md

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -226,9 +226,10 @@ def spark():
226226

227227
At the beginning of your test method call `reinit_local_metastore` function
228228
from the testing package to initialize the metastore with the tables from
229-
your json folder (`JSON_TABLES_DIR`). If the method is called while the
230-
metastore already exists, it will delete all the existing tables before
231-
initializing the new ones.
229+
your json folder (`JSON_TABLES_DIR`). You can also choose to enable or disable
230+
deletion vectors for Delta tables (default: enabled). If the method is called
231+
while the metastore already exists, it will delete all the existing tables
232+
before initializing the new ones.
232233

233234
*Alternatively, you can call this method only once per testing module,
234235
but then individual testing methods might affect each other by modifying
@@ -242,7 +243,7 @@ from pyspark.testing import assertDataFrameEqual
242243
def test_process_data(
243244
spark: SparkSession,
244245
):
245-
reinit_local_metastore(spark, JSON_TABLES_DIR)
246+
reinit_local_metastore(spark, JSON_TABLES_DIR, deletion_vectors=True)
246247

247248
process_data(
248249
spark=spark,

pysparkdt/metastore.py

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
def reinit_local_metastore(
99
spark: SparkSession,
1010
json_tables_dir: str,
11+
deletion_vectors: bool = True,
1112
) -> None:
1213
"""Re-initializes dynamic metastore acting as Databricks data catalog
1314
using provided input delta table data in json format.
@@ -51,6 +52,9 @@ def reinit_local_metastore(
5152
Local Spark session.
5253
json_tables_dir
5354
Directory where the delta tables and their schemas are located.
55+
deletion_vectors
56+
Whether to enable deletion vectors for the delta tables.
57+
Defaults to True.
5458
"""
5559
# Clear all existing tables (must be done through SQL, not by clearing the
5660
# folder)
@@ -87,4 +91,14 @@ def reinit_local_metastore(
8791
query = query.option('inferSchema', True)
8892
df = query.load(data_path)
8993

90-
df.write.format('delta').saveAsTable(table_name)
94+
write_query = df.write.format('delta')
95+
96+
if deletion_vectors:
97+
write_query = write_query.option(
98+
'delta.enableDeletionVectors', 'true'
99+
)
100+
else:
101+
write_query = write_query.option(
102+
'delta.enableDeletionVectors', 'false'
103+
)
104+
write_query.saveAsTable(table_name)

tests/test_deletion_vectors.py

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
import os
2+
3+
from pyspark.sql import SparkSession
4+
from pytest import fixture
5+
6+
from pysparkdt import reinit_local_metastore, spark_base
7+
8+
DATA_DIR = f'{os.path.dirname(__file__)}/data'
9+
JSON_TABLES_DIR = f'{DATA_DIR}/tables'
10+
TMP_DIR = f'{DATA_DIR}/tmp'
11+
METASTORE_DIR = f'{TMP_DIR}/metastore'
12+
13+
14+
@fixture(scope='module')
15+
def spark():
16+
yield from spark_base(METASTORE_DIR)
17+
18+
19+
def test_deletion_vectors_disabled(spark: SparkSession):
20+
"""Test that deletion vectors are disabled when deletion_vectors=False"""
21+
reinit_local_metastore(spark, JSON_TABLES_DIR, deletion_vectors=False)
22+
23+
# Check if deletion vectors are disabled for the table
24+
table_properties = spark.sql('DESCRIBE DETAIL example_input').collect()[0]
25+
properties = table_properties.properties
26+
27+
assert properties.get('delta.enableDeletionVectors') == 'false'
28+
29+
30+
def test_deletion_vectors_enabled(spark: SparkSession):
31+
"""Test that deletion vectors are enabled when deletion_vectors=True"""
32+
reinit_local_metastore(spark, JSON_TABLES_DIR, deletion_vectors=True)
33+
34+
# Check if deletion vectors are enabled for the table
35+
table_properties = spark.sql('DESCRIBE DETAIL example_input').collect()[0]
36+
properties = table_properties.properties
37+
38+
assert properties.get('delta.enableDeletionVectors') == 'true'

0 commit comments

Comments
 (0)