Skip to content

Commit e65a8c6

Browse files
committed
vdk-core: Verify payload after pre-processing it
Ingestion allows pre-processing payloads. If the payload is curated during pre-processing, currently, verification and subsequently job execution would fail. Verify payload after pre-processing it, since this pre-processing might be responsible for making it serializable. Add payload verification functional tests since it can no longer be tested in send_object_for_ingestion async function. Tested by: locally run ingestion job which curates json during pre-processing; functional test added. Signed-off-by: Yana Zhivkova <[email protected]>
1 parent b9552cb commit e65a8c6

File tree

4 files changed

+103
-35
lines changed

4 files changed

+103
-35
lines changed

projects/vdk-core/src/vdk/internal/builtin_plugins/ingestion/ingester_base.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -112,8 +112,6 @@ def send_object_for_ingestion(
112112
"""
113113
See parent doc
114114
"""
115-
self.__verify_payload_format(payload_dict=payload)
116-
117115
if collection_id is None:
118116
collection_id = "{data_job_name}|{execution_id}".format(
119117
data_job_name=self._data_job_name, execution_id=self._op_id
@@ -435,6 +433,11 @@ def _payload_poster_thread(self):
435433
metadata=ingestion_metadata,
436434
)
437435

436+
# Verify payload after pre-processing it, since this preprocessing might be responsible for
437+
# making it serializable
438+
for payload_dict in payload_obj:
439+
self.__verify_payload_format(payload_dict=payload_dict)
440+
438441
try:
439442
ingestion_metadata = self._ingester.ingest_payload(
440443
payload=payload_obj,
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
# Copyright 2021 VMware, Inc.
2+
# SPDX-License-Identifier: Apache-2.0
3+
import logging
4+
from datetime import datetime
5+
6+
from vdk.api.job_input import IJobInput
7+
8+
log = logging.getLogger(__name__)
9+
10+
11+
def run(job_input: IJobInput):
12+
payload_str = job_input.get_arguments()["payload"]
13+
14+
log.info("Trying to ingest payload: " + payload_str)
15+
payload = payload_str
16+
if payload_str == "None":
17+
payload = None
18+
elif payload_str == "date":
19+
payload = {"key1": datetime.utcnow()}
20+
21+
job_input.send_object_for_ingestion(
22+
payload=payload, destination_table="object_table", method="memory"
23+
)
Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
# Copyright 2021 VMware, Inc.
2+
# SPDX-License-Identifier: Apache-2.0
3+
"""
4+
Functional test aiming at verifying that payload is verified
5+
"""
6+
import logging
7+
8+
from click.testing import Result
9+
from vdk.plugin.test_utils.util_funcs import cli_assert_equal
10+
from vdk.plugin.test_utils.util_funcs import CliEntryBasedTestRunner
11+
from vdk.plugin.test_utils.util_funcs import jobs_path_from_caller_directory
12+
from vdk.plugin.test_utils.util_plugins import IngestIntoMemoryPlugin
13+
14+
15+
log = logging.getLogger(__name__)
16+
17+
18+
def test_payload_verification_none():
19+
ingest_plugin = IngestIntoMemoryPlugin()
20+
runner = CliEntryBasedTestRunner(ingest_plugin)
21+
22+
# Use a sample data job, in which bad payload None is passed to
23+
# ingestion method call, thus causing a User Error to be raised.
24+
result: Result = runner.invoke(
25+
[
26+
"run",
27+
jobs_path_from_caller_directory("test-ingest-bad-payload-job"),
28+
"--arguments",
29+
'{"payload": "None"}',
30+
]
31+
)
32+
33+
cli_assert_equal(1, result)
34+
assert "Payload given to ingestion method should not be empty." in result.stdout
35+
36+
37+
def test_payload_verification_bad_type():
38+
ingest_plugin = IngestIntoMemoryPlugin()
39+
runner = CliEntryBasedTestRunner(ingest_plugin)
40+
41+
# Use a sample data job, in which bad payload type string is passed to
42+
# ingestion method call, thus causing a User Error to be raised.
43+
result: Result = runner.invoke(
44+
[
45+
"run",
46+
jobs_path_from_caller_directory("test-ingest-bad-payload-job"),
47+
"--arguments",
48+
'{"payload": "wrong_type_string"}',
49+
]
50+
)
51+
52+
cli_assert_equal(1, result)
53+
assert (
54+
"Payload given to ingestion method should be a dictionary, but it is not"
55+
in result.stdout
56+
)
57+
58+
59+
def test_payload_verification_unserializable():
60+
ingest_plugin = IngestIntoMemoryPlugin()
61+
runner = CliEntryBasedTestRunner(ingest_plugin)
62+
63+
# Use a sample data job, in which unserializable payload is passed to
64+
# ingestion method call, thus causing a User Error to be raised.
65+
result: Result = runner.invoke(
66+
[
67+
"run",
68+
jobs_path_from_caller_directory("test-ingest-bad-payload-job"),
69+
"--arguments",
70+
'{"payload": "date"}',
71+
]
72+
)
73+
74+
cli_assert_equal(1, result)
75+
assert "JSON Serialization Error. Payload is not json serializable" in result.stdout

projects/vdk-core/tests/vdk/internal/builtin_plugins/ingestion/test_ingester_base.py

Lines changed: 0 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
# Copyright 2021 VMware, Inc.
22
# SPDX-License-Identifier: Apache-2.0
3-
from datetime import datetime
43
from unittest.mock import call
54
from unittest.mock import MagicMock
65

@@ -49,38 +48,6 @@ def create_ingester_base(kwargs=None, config_dict=None, ingester=None) -> Ingest
4948
)
5049

5150

52-
def test_send_object_for_ingestion():
53-
test_unserializable_payload = {"key1": 42, "key2": datetime.utcnow()}
54-
ingester_base = create_ingester_base()
55-
56-
with pytest.raises(errors.UserCodeError) as exc_info:
57-
ingester_base.send_object_for_ingestion(
58-
payload=None,
59-
destination_table=shared_test_values.get("destination_table1"),
60-
method=shared_test_values.get("method"),
61-
target=shared_test_values.get("target"),
62-
)
63-
assert exc_info.type == errors.UserCodeError
64-
65-
with pytest.raises(errors.UserCodeError) as exc_info:
66-
ingester_base.send_object_for_ingestion(
67-
payload="wrong_payload_type",
68-
destination_table=None,
69-
method=shared_test_values.get("method"),
70-
target=shared_test_values.get("target"),
71-
)
72-
assert exc_info.type == errors.UserCodeError
73-
74-
with pytest.raises(errors.UserCodeError) as exc_info:
75-
ingester_base.send_object_for_ingestion(
76-
payload=test_unserializable_payload,
77-
destination_table=shared_test_values.get("destination_table1"),
78-
method=shared_test_values.get("method"),
79-
target=shared_test_values.get("target"),
80-
)
81-
assert exc_info.type == errors.UserCodeError
82-
83-
8451
def test_send_object_for_ingestion_send_to_wait():
8552
ingester_base = create_ingester_base(
8653
config_dict={"ingester_wait_to_finish_after_every_send": True}

0 commit comments

Comments
 (0)