11# Copyright 2023-2024 Broadcom
22# SPDX-License-Identifier: Apache-2.0
3- import json
43import logging
54import os
6- import pathlib
75from datetime import datetime
86
9- from common .database_storage import DatabaseStorage
107from confluence_document import ConfluenceDocument
118from langchain_community .document_loaders import ConfluenceLoader
129from vdk .api .job_input import IJobInput
13-
10+ from vdk . plugin . storage . database_storage import DatabaseStorage
1411
1512log = logging .getLogger (__name__ )
1613
1714
18- def read_json_file (file_path ):
19- try :
20- with open (file_path ) as file :
21- return json .load (file )
22- except (FileNotFoundError , json .JSONDecodeError ) as e :
23- log .error (f"Error reading JSON file: { e } " )
24- return None
25-
26-
27- def write_json_file (file_path , data ):
28- try :
29- with open (file_path , "w" ) as file :
30- json .dump (data , file , indent = 4 )
31- except OSError as e :
32- log .error (f"Error writing JSON file: { e } " )
33-
34-
35- def update_saved_documents (file_path , new_docs ):
36- existing_docs = read_json_file (file_path ) or []
37-
38- if (
39- isinstance (existing_docs , list )
40- and existing_docs
41- and isinstance (existing_docs [0 ], dict )
42- ):
43- existing_docs = [
44- ConfluenceDocument (
45- doc ["metadata" ], doc ["data" ], doc ["metadata" ].get ("deleted" , False )
46- )
47- for doc in existing_docs
48- ]
49-
50- existing_docs_dict = {doc .metadata ["id" ]: doc for doc in existing_docs }
51-
52- for doc in new_docs :
53- existing_docs_dict [doc .metadata ["id" ]] = doc
15+ def merge_docs (existing_docs , new_docs ) -> list :
16+ if existing_docs :
17+ existing_docs_dict = {doc .metadata ["id" ]: doc for doc in existing_docs }
5418
55- updated_docs_list = list (existing_docs_dict .values ())
19+ for doc in new_docs :
20+ existing_docs_dict [doc .metadata ["id" ]] = doc
21+ return list (existing_docs_dict .values ())
22+ else :
23+ return new_docs
5624
57- serialized_docs = [doc .serialize () for doc in updated_docs_list ]
58- write_json_file (file_path , serialized_docs )
5925
60-
61- def flag_deleted_pages (file_path , current_confluence_documents ):
62- existing_docs = read_json_file (file_path )
26+ def flag_deleted_pages (existing_docs , current_confluence_documents ):
6327 if existing_docs is None :
6428 log .error ("Existing documents not found. Exiting." )
6529 return
6630
67- existing_docs = [
68- ConfluenceDocument (
69- doc ["metadata" ], doc ["data" ], doc ["metadata" ].get ("deleted" , False )
70- )
71- for doc in existing_docs
72- ]
73-
7431 current_page_ids = {doc .metadata ["id" ] for doc in current_confluence_documents }
7532
7633 num_deleted = 0
@@ -80,9 +37,6 @@ def flag_deleted_pages(file_path, current_confluence_documents):
8037 num_deleted += 1
8138 log .info (f"Found { num_deleted } deleted pages." )
8239
83- serialized_docs = [doc .serialize () for doc in existing_docs ]
84- write_json_file (file_path , serialized_docs )
85-
8640
8741class ConfluenceDataSource :
8842 """
@@ -170,34 +124,33 @@ def run(job_input: IJobInput):
170124 .setdefault (parent_page_id , {})
171125 .get ("last_date" , "1900-01-01 12:00" )
172126 )
173- data_file = os .path .join (
174- job_input .get_temporary_write_directory (), "confluence_data.json"
175- )
176127 storage_name = get_value (job_input , "storage_name" , "confluence_data" )
177128 storage = DatabaseStorage (get_value (job_input , "storage_connection_string" ))
178129 # TODO: this is not optimal . We just care about the IDs, we should not need to retrieve everything
179- data = storage .retrieve (storage_name )
180- pathlib .Path (data_file ).write_text (data if data else "[]" )
130+ existing_docs = storage .retrieve (storage_name )
131+ if existing_docs :
132+ existing_docs = [ConfluenceDocument (** doc ) for doc in existing_docs ]
181133
182134 confluence_reader = ConfluenceDataSource (confluence_url , token , space_key )
183-
184135 updated_docs = confluence_reader .fetch_updated_pages_in_confluence_space (
185136 last_date , parent_page_id
186137 )
187138 log .info (f"Found { len (updated_docs )} updated pages" )
188- update_saved_documents ( data_file , updated_docs )
139+ all_docs = merge_docs ( existing_docs , updated_docs )
189140
190141 # This is buggy , it doesn't account for server timezone and local timezone
191142 # But also assumes that server clock and local clock are synchronized (which they are likely not)
192143 # The ts should be the one of the latest processed page.
193144 set_property (job_input , "last_date" , datetime .now ().strftime ("%Y-%m-%d %H:%M" ))
194145
195146 flag_deleted_pages (
196- data_file ,
147+ all_docs ,
197148 confluence_reader .fetch_all_pages_in_confluence_space (parent_page_id ),
198149 )
199150
200151 # TODO: it would be better to save each page in separate row.
201152 # But that's quick solution for now to pass the data to the next job
202153
203- storage .store (storage_name , pathlib .Path (data_file ).read_text ())
154+ log .info (f"Store { len (all_docs )} documents in { storage_name } " )
155+ # TODO: why not use job_input.send_object_for_ingestion ... it's our ingestion interface
156+ storage .store (storage_name , [doc .serialize () for doc in all_docs ])
0 commit comments