Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 5 additions & 6 deletions comps/dataprep/src/integrations/opensearch.py
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ def search_by_id(self, client, doc_id):
logger.info(f"[ search by id ] fail to search docs of {doc_id}: {e}")
return None

def drop_index(self, client, index_name):
def drop_index(self, client: OpenSearchVectorSearch, index_name: str):
if logflag:
logger.info(f"[ drop index ] dropping index {index_name}")
try:
Expand Down Expand Up @@ -457,16 +457,15 @@ async def get_files(self):

while True:
response = self.search_all_documents(Config.KEY_INDEX_NAME, offset, Config.SEARCH_BATCH_SIZE)
# no doc retrieved
if len(response) < 2:
if response is None:
break

def format_opensearch_results(response, file_list):
for document in response["documents"]:
file_id = document["_id"]
file_list.append({"name": file_id, "id": file_id, "type": "File", "parent": ""})

file_list = format_opensearch_results(response, file_list)
format_opensearch_results(response, file_list)
offset += Config.SEARCH_BATCH_SIZE
# last batch
if (len(response) - 1) // 2 < Config.SEARCH_BATCH_SIZE:
Expand All @@ -490,7 +489,7 @@ async def delete_files(self, file_path: str = Body(..., embed=True)):
# drop index KEY_INDEX_NAME
if self.check_index_existence(self.opensearch_client, Config.KEY_INDEX_NAME):
try:
assert self.drop_index(index_name=Config.KEY_INDEX_NAME)
assert self.drop_index(client=self.opensearch_client, index_name=Config.KEY_INDEX_NAME)
except Exception as e:
if logflag:
logger.info(f"[ delete ] {e}. Fail to drop index {Config.KEY_INDEX_NAME}.")
Expand All @@ -501,7 +500,7 @@ async def delete_files(self, file_path: str = Body(..., embed=True)):
# drop index INDEX_NAME
if self.check_index_existence(self.opensearch_client, Config.INDEX_NAME):
try:
assert self.drop_index(index_name=Config.INDEX_NAME)
assert self.drop_index(client=self.opensearch_client, index_name=Config.INDEX_NAME)
except Exception as e:
if logflag:
logger.info(f"[ delete ] {e}. Fail to drop index {Config.INDEX_NAME}.")
Expand Down
Loading