Skip to content

Commit 39e911e

Browse files
antoniivanovpre-commit-ci[bot]yonitoo
authored
examples: refactor a bit the examples (#3127)
- moved the common storage code to vdk-storage library - refactored confluence-reader to directly use it instead of using file --------- Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com> Co-authored-by: Yoan Salambashev <[email protected]>
1 parent 6d21c29 commit 39e911e

File tree

20 files changed

+238
-83
lines changed

20 files changed

+238
-83
lines changed

examples/confluence-reader/README.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,9 +13,9 @@ The `ConfluenceDataSource` class is the heart of this data job. It provides a se
1313

1414
These methods make use of the last_modification.txt file to determine the last modification date and track changes in the Confluence space, allowing for efficient data retrieval and management.
1515

16-
## JSON Data Format
16+
## Output Data Format
1717

18-
The resulting JSON data (confluence_data.json) is generated using the `ConfluenceDocument` class (see confluence_document.py).
18+
The resulting data is generated using the `ConfluenceDocument` class (see confluence_document.py).
1919
It follows this structured format:
2020

2121
```json

examples/confluence-reader/confluence_document.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33

44

55
class ConfluenceDocument:
6-
def __init__(self, metadata, data, deleted=False):
6+
def __init__(self, metadata: dict, data: str, deleted=False):
77
"""
88
Initializes a ConfluenceDocument instance.
99

examples/confluence-reader/fetch_confluence_space.py

Lines changed: 18 additions & 65 deletions
Original file line numberDiff line numberDiff line change
@@ -1,76 +1,33 @@
11
# Copyright 2023-2024 Broadcom
22
# SPDX-License-Identifier: Apache-2.0
3-
import json
43
import logging
54
import os
6-
import pathlib
75
from datetime import datetime
86

9-
from common.database_storage import DatabaseStorage
107
from confluence_document import ConfluenceDocument
118
from langchain_community.document_loaders import ConfluenceLoader
129
from vdk.api.job_input import IJobInput
13-
10+
from vdk.plugin.storage.database_storage import DatabaseStorage
1411

1512
log = logging.getLogger(__name__)
1613

1714

18-
def read_json_file(file_path):
19-
try:
20-
with open(file_path) as file:
21-
return json.load(file)
22-
except (FileNotFoundError, json.JSONDecodeError) as e:
23-
log.error(f"Error reading JSON file: {e}")
24-
return None
25-
26-
27-
def write_json_file(file_path, data):
28-
try:
29-
with open(file_path, "w") as file:
30-
json.dump(data, file, indent=4)
31-
except OSError as e:
32-
log.error(f"Error writing JSON file: {e}")
33-
34-
35-
def update_saved_documents(file_path, new_docs):
36-
existing_docs = read_json_file(file_path) or []
37-
38-
if (
39-
isinstance(existing_docs, list)
40-
and existing_docs
41-
and isinstance(existing_docs[0], dict)
42-
):
43-
existing_docs = [
44-
ConfluenceDocument(
45-
doc["metadata"], doc["data"], doc["metadata"].get("deleted", False)
46-
)
47-
for doc in existing_docs
48-
]
49-
50-
existing_docs_dict = {doc.metadata["id"]: doc for doc in existing_docs}
51-
52-
for doc in new_docs:
53-
existing_docs_dict[doc.metadata["id"]] = doc
15+
def merge_docs(existing_docs, new_docs) -> list:
16+
if existing_docs:
17+
existing_docs_dict = {doc.metadata["id"]: doc for doc in existing_docs}
5418

55-
updated_docs_list = list(existing_docs_dict.values())
19+
for doc in new_docs:
20+
existing_docs_dict[doc.metadata["id"]] = doc
21+
return list(existing_docs_dict.values())
22+
else:
23+
return new_docs
5624

57-
serialized_docs = [doc.serialize() for doc in updated_docs_list]
58-
write_json_file(file_path, serialized_docs)
5925

60-
61-
def flag_deleted_pages(file_path, current_confluence_documents):
62-
existing_docs = read_json_file(file_path)
26+
def flag_deleted_pages(existing_docs, current_confluence_documents):
6327
if existing_docs is None:
6428
log.error("Existing documents not found. Exiting.")
6529
return
6630

67-
existing_docs = [
68-
ConfluenceDocument(
69-
doc["metadata"], doc["data"], doc["metadata"].get("deleted", False)
70-
)
71-
for doc in existing_docs
72-
]
73-
7431
current_page_ids = {doc.metadata["id"] for doc in current_confluence_documents}
7532

7633
num_deleted = 0
@@ -80,9 +37,6 @@ def flag_deleted_pages(file_path, current_confluence_documents):
8037
num_deleted += 1
8138
log.info(f"Found {num_deleted} deleted pages.")
8239

83-
serialized_docs = [doc.serialize() for doc in existing_docs]
84-
write_json_file(file_path, serialized_docs)
85-
8640

8741
class ConfluenceDataSource:
8842
"""
@@ -170,34 +124,33 @@ def run(job_input: IJobInput):
170124
.setdefault(parent_page_id, {})
171125
.get("last_date", "1900-01-01 12:00")
172126
)
173-
data_file = os.path.join(
174-
job_input.get_temporary_write_directory(), "confluence_data.json"
175-
)
176127
storage_name = get_value(job_input, "storage_name", "confluence_data")
177128
storage = DatabaseStorage(get_value(job_input, "storage_connection_string"))
178129
# TODO: this is not optimal . We just care about the IDs, we should not need to retrieve everything
179-
data = storage.retrieve(storage_name)
180-
pathlib.Path(data_file).write_text(data if data else "[]")
130+
existing_docs = storage.retrieve(storage_name)
131+
if existing_docs:
132+
existing_docs = [ConfluenceDocument(**doc) for doc in existing_docs]
181133

182134
confluence_reader = ConfluenceDataSource(confluence_url, token, space_key)
183-
184135
updated_docs = confluence_reader.fetch_updated_pages_in_confluence_space(
185136
last_date, parent_page_id
186137
)
187138
log.info(f"Found {len(updated_docs)} updated pages")
188-
update_saved_documents(data_file, updated_docs)
139+
all_docs = merge_docs(existing_docs, updated_docs)
189140

190141
# This is buggy , it doesn't account for server timezone and local timezone
191142
# But also assumes that server clock and local clock are synchronized (which they are likely not)
192143
# The ts should be the one of the latest processed page.
193144
set_property(job_input, "last_date", datetime.now().strftime("%Y-%m-%d %H:%M"))
194145

195146
flag_deleted_pages(
196-
data_file,
147+
all_docs,
197148
confluence_reader.fetch_all_pages_in_confluence_space(parent_page_id),
198149
)
199150

200151
# TODO: it would be better to save each page in separate row.
201152
# But that's quick solution for now to pass the data to the next job
202153

203-
storage.store(storage_name, pathlib.Path(data_file).read_text())
154+
log.info(f"Store {len(all_docs)} documents in {storage_name}")
155+
# TODO: why not use job_input.send_object_for_ingestion ... it's our ingestion interface
156+
storage.store(storage_name, [doc.serialize() for doc in all_docs])

examples/confluence-reader/requirements.txt

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,8 @@
33
# The file is optional and can be deleted if no extra library dependencies are necessary.
44

55
atlassian-python-api
6-
langchain_community
6+
langchain-community
77
lxml
88
psycopg2-binary
99
sqlalchemy
10+
vdk-storage

examples/pgvector-embedder/00_properties.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,3 +20,7 @@ def run(job_input: IJobInput):
2020
)
2121
)
2222
job_input.set_all_properties(properties)
23+
24+
hf_home = job_input.get_temporary_write_directory() / "hf"
25+
hf_home.mkdir(parents=True, exist_ok=True)
26+
os.environ["HF_HOME"] = str(hf_home)
Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
# Copyright 2023-2024 Broadcom
2+
# SPDX-License-Identifier: Apache-2.0
3+
# Copyright 2021-2024 VMware, Inc.
4+
# SPDX-License-Identifier: Apache-2.0
5+
import os.path
6+
import pathlib
7+
8+
from vdk.api.job_input import IJobInput
9+
10+
11+
def run(job_input: IJobInput):
12+
# HF uses temporary directories in the process of its work
13+
# So make sure to use only allowed ones
14+
hf_home = job_input.get_temporary_write_directory() / "hf"
15+
hf_home.mkdir(parents=True, exist_ok=True)
16+
os.environ["HF_HOME"] = str(hf_home)

examples/pgvector-embedder/20_embed_data.py

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -5,19 +5,14 @@
55
import json
66
import logging
77

8-
from common.database_storage import DatabaseStorage
98
from config import get_value
109
from sentence_transformers import SentenceTransformer
1110
from vdk.api.job_input import IJobInput
11+
from vdk.plugin.storage.database_storage import DatabaseStorage
1212

1313
log = logging.getLogger(__name__)
1414

1515

16-
def load_documents(json_file_path):
17-
with open(json_file_path, encoding="utf-8") as file:
18-
return json.load(file)
19-
20-
2116
def embed_documents_in_batches(documents):
2217
# the model card: https://huggingface.co/sentence-transformers/all-mpnet-base-v2
2318
model = SentenceTransformer("all-mpnet-base-v2")
@@ -41,7 +36,7 @@ def run(job_input: IJobInput):
4136
storage = DatabaseStorage(get_value(job_input, "storage_connection_string"))
4237
storage_name = get_value(job_input, "storage_name", "confluence_data")
4338

44-
documents = load_documents(storage.retrieve(storage_name))
39+
documents = storage.retrieve(storage_name)
4540
if documents:
4641
log.info(f"{len(documents)} chunks loaded and cleaned for embedding.")
4742
embeddings = embed_documents_in_batches(documents)

examples/pgvector-embedder/30_create_schema.sql

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,15 @@ DROP TABLE IF EXISTS public.{destination_metadata_table} CASCADE;
44
DROP TABLE IF EXISTS public.{destination_embeddings_table} CASCADE;
55

66
-- TODO (missing vdk feature): we need to create the tables as the postgres plugin doesn't support automatic schema inference
7+
CREATE TABLE IF NOT EXISTS public.{destination_embeddings_table}
8+
(
9+
id TEXT PRIMARY KEY,
10+
embedding public.vector
11+
);
12+
713
CREATE TABLE IF NOT EXISTS public.{destination_metadata_table}
814
(
9-
id VARCHAR PRIMARY KEY,
15+
id TEXT PRIMARY KEY,
1016
title TEXT,
1117
source TEXT,
1218
data TEXT,

examples/pgvector-embedder/40_ingest_embeddings.py

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,12 @@
11
# Copyright 2023-2024 Broadcom
22
# SPDX-License-Identifier: Apache-2.0
3-
import json
43
import logging
54
import pickle
65

76
import numpy as np
8-
from common.database_storage import DatabaseStorage
97
from config import get_value
108
from vdk.api.job_input import IJobInput
9+
from vdk.plugin.storage.database_storage import DatabaseStorage
1110

1211
log = logging.getLogger(__name__)
1312

@@ -21,7 +20,7 @@ def run(job_input: IJobInput):
2120
embeddings = pickle.load(file)
2221
storage = DatabaseStorage(get_value(job_input, "storage_connection_string"))
2322
storage_name = get_value(job_input, "storage_name", "confluence_data")
24-
documents = json.loads(storage.retrieve(storage_name))
23+
documents = storage.retrieve(storage_name)
2524

2625
# TODO: our postgres plugin doesn't support updates (upserts) so updating with same ID fails.
2726

examples/pgvector-embedder/requirements.txt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,3 +4,5 @@ psycopg2-binary
44
sentence-transformers
55
sqlalchemy
66
vdk-postgres
7+
8+
vdk-storage

0 commit comments

Comments
 (0)