From 39c8c5cdaf05bde5f73d2a816d1b2f9402dc10b3 Mon Sep 17 00:00:00 2001 From: Shifani Rajabose Date: Sat, 8 Mar 2025 06:07:34 +0200 Subject: [PATCH 1/5] [Bug: 1378] Added Multimodal support for Milvus for dataprep component Fixes #1378 Co-authored-by: Jaini, Pallavi Signed-off-by: Ghosh, Soumyadip Signed-off-by: Piroozan, Nariman Signed-off-by: Kavulya, Soila Signed-off-by: Rajabose, Shifani Signed-off-by: Shifani Rajabose --- .../deployment/docker_compose/compose.yaml | 24 + .../src/integrations/milvus_multimodal.py | 865 ++++++++++++++++++ .../opea_dataprep_multimodal_microservice.py | 1 + 3 files changed, 890 insertions(+) create mode 100644 comps/dataprep/src/integrations/milvus_multimodal.py diff --git a/comps/dataprep/deployment/docker_compose/compose.yaml b/comps/dataprep/deployment/docker_compose/compose.yaml index 46eb503b6f..5063dcff4f 100644 --- a/comps/dataprep/deployment/docker_compose/compose.yaml +++ b/comps/dataprep/deployment/docker_compose/compose.yaml @@ -62,6 +62,30 @@ services: minio: condition: service_healthy + dataprep-multimodal-milvus: + image: ${REGISTRY:-opea}/dataprep:${TAG:-latest} + container_name: dataprep-multimodal-milvus-server + ports: + - "${DATAPREP_PORT:-11102}:5000" + depends_on: + standalone: + condition: service_healthy + etcd: + condition: service_healthy + minio: + condition: service_healthy + ipc: host + environment: + no_proxy: ${no_proxy} + http_proxy: ${http_proxy} + https_proxy: ${https_proxy} + MULTIMODAL_DATAPREP: true + DATAPREP_COMPONENT_NAME: "OPEA_DATAPREP_MULTIMODALMILVUS" + MILVUS_HOST: ${MILVUS_HOST} + LVM_ENDPOINT: ${LVM_ENDPOINT} + HUGGINGFACEHUB_API_TOKEN: ${HF_TOKEN} + restart: unless-stopped + dataprep-neo4j-llamaindex: image: ${REGISTRY:-opea}/dataprep:${TAG:-latest} container_name: dataprep-neo4j-llamaindex diff --git a/comps/dataprep/src/integrations/milvus_multimodal.py b/comps/dataprep/src/integrations/milvus_multimodal.py new file mode 100644 index 0000000000..6a02a10c0b --- /dev/null +++ b/comps/dataprep/src/integrations/milvus_multimodal.py @@ -0,0 +1,865 @@ +# Copyright (C) 2024 Intel Corporation +# SPDX-License-Identifier: Apache-2.0 +# for test + +import base64 +import json +import os +import shutil +import time +import uuid +from pathlib import Path +from typing import Any, Dict, Iterable, List, Optional, Type, Union + +import pymupdf +from fastapi import Body, File, Form, HTTPException, UploadFile +from langchain.text_splitter import RecursiveCharacterTextSplitter +from langchain_core.embeddings import Embeddings +from langchain_core.documents import Document +from langchain_milvus.vectorstores import Milvus +from langchain_core.utils import get_from_dict_or_env +from PIL import Image + +from comps import CustomLogger, DocPath, OpeaComponent, OpeaComponentRegistry, ServiceType +from comps.third_parties.bridgetower.src.bridgetower_embedding import BridgeTowerEmbedding + +from .utils.multimodal import ( + clear_upload_folder, + create_upload_folder, + convert_video_to_audio, + delete_audio_file, + extract_frames_and_annotations_from_transcripts, + extract_frames_and_generate_captions, + extract_transcript_from_audio, + generate_annotations_from_transcript, + generate_id, + load_json_file, + load_whisper_model, + write_vtt, +) + +logger = CustomLogger("opea_dataprep_milvus_multimodal") +logflag = os.getenv("LOGFLAG", False) +partition_field_name = "filename" +upload_folder = "./uploaded_files/" + +# Models +EMBED_MODEL = os.getenv("EMBEDDING_MODEL_ID", + "BridgeTower/bridgetower-large-itm-mlm-itc") +WHISPER_MODEL = os.getenv("WHISPER_MODEL", "small") + +# Lvm Microservice Information +LVM_ENDPOINT = os.getenv("LVM_ENDPOINT", "http://localhost:9399/v1/lvm") + +# MILVUS configuration +MILVUS_HOST = os.getenv("MILVUS_HOST", "localhost") +MILVUS_PORT = int(os.getenv("MILVUS_PORT", 19530)) +MILVUS_URI = f"http://{MILVUS_HOST}:{MILVUS_PORT}" +INDEX_PARAMS = {"index_type": "COMPOSITE", "metric_type": "IP", "params": {}} +COLLECTION_NAME = os.getenv("COLLECTION_NAME", "mm_rag_milvus") + +MILVUS_CONNECTION = { + "uri": MILVUS_URI, +} + +class MultimodalMilvus(Milvus): + """Milvus vector database to process multimodal data.""" + + @classmethod + def from_text_image_pairs_return_keys( + cls: Type[Milvus], + texts: List[str], + embedding: Embeddings = BridgeTowerEmbedding, + images: List[str] = None, + metadatas: Optional[List[dict]] = None, + collection_name: Optional[str] = None, + vector_schema: Optional[Dict[str, Union[str, int]]] = None, + **kwargs: Any, + ): + """ + Args: + texts (List[str]): List of texts to add to the vectorstore. + images (List[str]): Optional list of path-to-images to add to the vectorstore. If provided, the length of + the list of images must match the length of the list of text strings. + metadatas (Optional[List[dict]], optional): Optional list of metadata + dicts to add to the vectorstore. Defaults to None. + collection_name (Optional[str], optional): Optional name of the index to + create or add to. Defaults to None. + vector_schema (Optional[Dict[str, Union[str, int]]], optional): Optional + vector schema to use. Defaults to None. + **kwargs (Any): Additional keyword arguments to pass to the Milvus client. + Returns: + Tuple[Milvus, List[str]]: Tuple of the Milvus instance and the keys of + the newly created documents. + Raises: + ValueError: If the number of texts does not equal the number of images. + ValueError: If the number of metadatas does not match the number of texts. + """ + # If images are provided, the length of texts must be equal to the length of images + if images and len(texts) != len(images): + raise ValueError( + f"the len of captions {len(texts)} does not equal the len of images {len(images)}" + ) + + milvus_uri = get_from_dict_or_env(kwargs, "milvus_uri", "MILVUS_URI") + + if "milvus_uri" in kwargs: + kwargs.pop("milvus_uri") + + # flag to use generated schema + if "generate" in kwargs: + kwargs.pop("generate") + + # see if the user specified keys + keys = None + if "keys" in kwargs: + keys = kwargs.pop("keys") + + # Name of the search index if not given + if not collection_name: + collection_name = uuid.uuid4().hex + + # type check for metadata + if metadatas: + if isinstance(metadatas, list) and len(metadatas) != len( + texts): # type: ignore # noqa: E501 + raise ValueError( + "Number of metadatas must match number of texts") + if not (isinstance(metadatas, list) + and isinstance(metadatas[0], dict)): + raise ValueError("Metadatas must be a list of dicts") + #generated_schema = _prepare_metadata_fields(metadatas[0]) + + + # Create instance + metadatas_test = metadatas + instance = cls( + collection_name, + vector_schema=vector_schema, + connection_args=MILVUS_CONNECTION, + auto_id=True, + partition_key_field=partition_field_name, + **kwargs, + ) + keys = (instance.add_text_image_pairs( + texts, images, embedding, metadatas=metadatas_test, keys=keys) + if images else instance.add_text(texts, metadatas=metadatas_test, keys=keys)) + return instance, keys + + def add_text_image_pairs( + self, + texts: Iterable[str], + images: Iterable[str], + embedding: Embeddings = BridgeTowerEmbedding, + metadatas: Optional[List[dict]] = None, + embeddings: Optional[List[List[float]]] = None, + batch_size: int = 2, + clean_metadata: bool = True, + **kwargs: Any, + ) -> List[str]: + ids = [] + # Get keys or ids from kwargs + # Other vectorstores use ids + keys_or_ids = kwargs.get("keys", kwargs.get("ids")) + logger.info(f"Testing 2.0 {type(metadatas)}") + # type check for metadata + logger.info(f"test 2 {isinstance(metadatas, list)}") + logger.info(metadatas[0]) + if metadatas: + if isinstance(metadatas, list) and len(metadatas) != len( + texts): # type: ignore # noqa: E501 + raise ValueError( + "Number of metadatas must match number of texts") + if not (isinstance(metadatas, list) + and isinstance(metadatas[0], dict)): + raise ValueError("Metadatas must be a list of dicts") + + pil_imgs = [Image.open(img) for img in images] + if not embeddings: + embeddings = embedding.embed_image_text_pairs( + list(texts), pil_imgs, batch_size=batch_size) + for metadata in metadatas: + metadata["pk"] = str(uuid.uuid4().hex) + for key, value in metadata.items(): + if isinstance(value, str) and len(value) > 65535: + metadata[key] = value[:65535] + return self.add_embeddings(list(texts), embeddings, metadatas, + batch_size) + + def add_text( + self, + texts: Iterable[str], + embedding: Embeddings = BridgeTowerEmbedding, + metadatas: Optional[List[dict]] = None, + embeddings: Optional[List[List[float]]] = None, + clean_metadata: bool = True, + **kwargs: Any, + ) -> List[str]: + + ids = [] + # Get keys or ids from kwargs + # Other vectorstores use ids + keys_or_ids = kwargs.get("keys", kwargs.get("ids")) + + # type check for metadata + if metadatas: + if isinstance(metadatas, list) and len(metadatas) != len( + texts): # type: ignore # noqa: E501 + raise ValueError( + "Number of metadatas must match number of texts") + if not (isinstance(metadatas, list) + and isinstance(metadatas[0], dict)): + raise ValueError("Metadatas must be a list of dicts") + + if not embeddings: + embedding = BridgeTowerEmbedding() + embeddings = embedding.embed_documents(texts=list(texts)) + + batch_size = 2 + for metadata in metadatas: + metadata["pk"] = str(uuid.uuid4().hex) + for key, value in metadata.items(): + if isinstance(value, str) and len(value) > 65535: + metadata[key] = value[:65535] + + return self.add_embeddings(list(texts), embeddings, metadatas, + batch_size) + + +def search_by_file(collection, file_name): + query = f"{partition_field_name} == '{file_name}'" + results = collection.query( + expr=query, + output_fields=[partition_field_name, "pk"], + ) + if logflag: + logger.info(f"[ search by file ] searched by {file_name}") + logger.info(f"[ search by file ] {len(results)} results: {results}") + return results + + +def search_all(collection): + results = collection.query(expr="pk >= 0", + output_fields=[partition_field_name, "pk"]) + if logflag: + logger.info(f"[ search all ] {len(results)} results: {results}") + return results + + +def delete_all_data(my_milvus): + if logflag: + logger.info("[ delete all ] deleting all data in milvus") + if my_milvus.col: + my_milvus.col.drop() + if logflag: + logger.info("[ delete all ] delete success: all data") + + +def delete_by_partition_field(my_milvus, partition_field): + if logflag: + logger.info( + f"[ delete partition ] deleting {partition_field_name} {partition_field}" + ) + pks = my_milvus.get_pks(f'{partition_field_name} == "{partition_field}"') + if logflag: + logger.info(f"[ delete partition ] target pks: {pks}") + res = my_milvus.delete(pks) + my_milvus.col.flush() + if logflag: + logger.info(f"[ delete partition ] delete success: {res}") + + +@OpeaComponentRegistry.register("OPEA_DATAPREP_MULTIMODALMILVUS") +class OpeaMultimodalMilvusDataprep(OpeaComponent): + """A specialized dataprep component derived from OpeaComponent for milvus dataprep services. + Dataprep component for Multimodal Milvus ingestion and search services. + + Attributes: + client (Milvus): An instance of the milvus client for vector database operations. + """ + + def __init__(self, name: str, description: str, config: dict = None): + super().__init__(name, ServiceType.DATAPREP.name.lower(), description, + config) + self.device = "cpu" + # Load embeddings model + logger.info("Initializing BridgeTower model as embedder...") + self.upload_folder = "./uploaded_files/" + self.embeddings = BridgeTowerEmbedding(model_name=EMBED_MODEL, + device=self.device) + logger.info("Done initialization of embedder!") + + health_status = self.check_health() + if not health_status: + logger.error("OpeaMilvusDataprep health check failed.") + + def check_health(self) -> bool: + if logflag: + logger.info("[ health check ] start to check health of milvus") + try: + if self.embeddings is None: + logger.error("Multimodal Milvus is not initialized.") + return False + + client = Milvus( + embedding_function=self.embeddings, + collection_name=COLLECTION_NAME, + connection_args={"uri": MILVUS_URI}, + index_params=INDEX_PARAMS, + auto_id=True, + ) + _ = client.client.list_collections() + if logflag: + logger.info( + "[ health check ] Successfully connected to Milvus!") + return True + except Exception as e: + logger.info(f"[ health check ] Failed to connect to Milvus: {e}") + return False + + def invoke(self, *args, **kwargs): + pass + + def prepare_data_and_metadata_from_annotation( + self, + annotation, + path_to_frames, + title, + num_transcript_concat_for_ingesting=2, + num_transcript_concat_for_inference=7, + ): + text_list = [] + image_list = [] + metadatas = [] + for i, frame in enumerate(annotation): + frame_index = frame["sub_video_id"] + path_to_frame = os.path.join(path_to_frames, + f"frame_{frame_index}.png") + # augment this frame's transcript with a reasonable number of neighboring frames' transcripts helps semantic retrieval + lb_ingesting = max(0, i - num_transcript_concat_for_ingesting) + ub_ingesting = min(len(annotation), + i + num_transcript_concat_for_ingesting + 1) + caption_for_ingesting = " ".join([ + annotation[j]["caption"] + for j in range(lb_ingesting, ub_ingesting) + ]) + + # augment this frame's transcript with more neighboring frames' transcript to provide more context to LVM for question answering + lb_inference = max(0, i - num_transcript_concat_for_inference) + ub_inference = min(len(annotation), + i + num_transcript_concat_for_inference + 1) + caption_for_inference = " ".join([ + annotation[j]["caption"] + for j in range(lb_inference, ub_inference) + ]) + + video_id = frame["video_id"] + b64_img_str = frame["b64_img_str"] + time_of_frame = frame["time"] + embedding_type = "pair" if b64_img_str else "text" + source_video = frame["video_name"] + + text_list.append(caption_for_ingesting) + + if b64_img_str: + image_list.append(path_to_frame) + + metadatas.append({ + "content": caption_for_ingesting, + "b64_img_str": b64_img_str, + "video_id": video_id, + "source_video": source_video, + "time_of_frame_ms": float(time_of_frame), + "embedding_type": embedding_type, + "title": title, + "transcript_for_inference": caption_for_inference, + }) + + return text_list, image_list, metadatas + + def prepare_pdf_data_from_annotation(self, annotation, path_to_files, + title): + """PDF data processing has some key differences from videos and images. + + 1. Neighboring transcripts are not currently considered relevant. + We are only taking the text located on the same page as the image. + 2. The images within PDFs are indexed by page and image-within-page + indices, as opposed to a single frame index. + 3. Instead of time of frame in ms, we return the PDF page index through + the pre-existing time_of_frame_ms metadata key to maintain compatibility. + """ + text_list = [] + image_list = [] + metadatas = [] + for item in annotation: + page_index = item["frame_no"] + image_index = item["sub_video_id"] + path_to_image = os.path.join( + path_to_files, f"page{page_index}_image{image_index}.png") + caption_for_ingesting = item["caption"] + caption_for_inference = item["caption"] + + pdf_id = item["video_id"] + b64_img_str = item["b64_img_str"] + embedding_type = "pair" if b64_img_str else "text" + source = item["video_name"] + + text_list.append(caption_for_ingesting) + + if b64_img_str: + image_list.append(path_to_image) + + metadatas.append({ + "content": caption_for_ingesting, + "b64_img_str": b64_img_str, + "video_id": pdf_id, + "source_video": source, + "time_of_frame_ms": + page_index, # For PDFs save the page number + "embedding_type": embedding_type, + "title": title, + "transcript_for_inference": caption_for_inference, + }) + + return text_list, image_list, metadatas + + def ingest_multimodal(self, + filename, + data_folder, + embeddings, + is_pdf=False): + """Ingest text image pairs to Milvus from the data/ directory that consists of frames and annotations.""" + data_folder = os.path.abspath(data_folder) + annotation_file_path = os.path.join(data_folder, "annotations.json") + path_to_frames = os.path.join(data_folder, "frames") + + annotation = load_json_file(annotation_file_path) + + # prepare data to ingest + if is_pdf: + text_list, image_list, metadatas = self.prepare_pdf_data_from_annotation( + annotation, path_to_frames, filename) + else: + text_list, image_list, metadatas = self.prepare_data_and_metadata_from_annotation( + annotation, path_to_frames, filename) + + MultimodalMilvus.from_text_image_pairs_return_keys( + texts=[f"From {filename}. " + text for text in text_list], + images=image_list, + embedding=embeddings, + metadatas=metadatas, + collection_name=COLLECTION_NAME, + milvus_uri=MILVUS_URI, + ) + + async def ingest_generate_transcripts( + self, files: List[UploadFile] = File(None)): + """Upload videos or audio files with speech, generate transcripts using whisper and ingest into milvus.""" + + if files: + files_to_ingest = [] + uploaded_files_map = {} + for file in files: + if os.path.splitext(file.filename)[1] in [".mp4", ".wav"]: + files_to_ingest.append(file) + else: + raise HTTPException( + status_code=400, + detail= + f"File {file.filename} is not an mp4 file. Please upload mp4 files only.", + ) + + for file_to_ingest in files_to_ingest: + st = time.time() + file_extension = os.path.splitext(file_to_ingest.filename)[1] + is_video = file_extension == ".mp4" + file_type_str = "video" if is_video else "audio file" + logger.info( + f"Processing {file_type_str} {file_to_ingest.filename}") + + # Assign unique identifier to video + file_id = generate_id() + + # Create video file name by appending identifier + base_file_name = os.path.splitext(file_to_ingest.filename)[0] + file_name_with_id = f"{base_file_name}_{file_id}{file_extension}" + dir_name = os.path.splitext(file_name_with_id)[0] + + # Save file in upload_directory + with open(os.path.join(self.upload_folder, file_name_with_id), + "wb") as f: + shutil.copyfileobj(file_to_ingest.file, f) + + uploaded_files_map[base_file_name] = file_name_with_id + + if is_video: + # Extract temporary audio wav file from video mp4 + audio_file = dir_name + ".wav" + logger.info(f"Extracting {audio_file}") + convert_video_to_audio( + os.path.join(self.upload_folder, file_name_with_id), + os.path.join(self.upload_folder, audio_file), + ) + logger.info(f"Done extracting {audio_file}") + else: + # We already have an audio file + audio_file = file_name_with_id + + # Load whisper model + logger.info("Loading whisper model....") + whisper_model = load_whisper_model(model_name=WHISPER_MODEL) + logger.info("Done loading whisper!") + + # Extract transcript from audio + logger.info("Extracting transcript from audio") + transcripts = extract_transcript_from_audio( + whisper_model, os.path.join(self.upload_folder, + audio_file)) + + # Save transcript as vtt file and delete audio file + vtt_file = dir_name + ".vtt" + write_vtt(transcripts, + os.path.join(self.upload_folder, vtt_file)) + if is_video: + delete_audio_file( + os.path.join(self.upload_folder, audio_file)) + logger.info("Done extracting transcript.") + + if is_video: + # Store frames and caption annotations in a new directory + logger.info("Extracting frames and generating annotation") + extract_frames_and_annotations_from_transcripts( + file_id, + os.path.join(self.upload_folder, file_name_with_id), + os.path.join(self.upload_folder, vtt_file), + os.path.join(self.upload_folder, dir_name), + ) + else: + # Generate annotations based on the transcript + logger.info("Generating annotations for the transcription") + generate_annotations_from_transcript( + file_id, + os.path.join(self.upload_folder, file_name_with_id), + os.path.join(self.upload_folder, vtt_file), + os.path.join(self.upload_folder, dir_name), + ) + + logger.info("Done extracting frames and generating annotation") + # Delete temporary vtt file + os.remove(os.path.join(self.upload_folder, vtt_file)) + + # Ingest multimodal data into milvus + logger.info("Ingesting data to milvus vector store") + self.ingest_multimodal( + base_file_name, os.path.join(self.upload_folder, dir_name), + self.embeddings) + + # Delete temporary video directory containing frames and annotations + shutil.rmtree(os.path.join(self.upload_folder, dir_name)) + + logger.info(f"Processed file {file_to_ingest.filename}") + end = time.time() + logger.info(str(end - st)) + + return { + "status": 200, + "message": "Data preparation succeeded", + "file_id_maps": uploaded_files_map, + } + + raise HTTPException( + status_code=400, + detail= + "Must provide at least one video (.mp4) or audio (.wav) file.") + + async def ingest_generate_captions(self, + files: List[UploadFile] = File(None)): + """Upload images and videos without speech (only background music or no audio), generate captions using lvm microservice and ingest into milvus.""" + + if files: + file_paths = [] + uploaded_files_saved_files_map = {} + for file in files: + if os.path.splitext(file.filename)[1] in [ + ".mp4", ".png", ".jpg", ".jpeg", ".gif" + ]: + file_paths.append(file) + else: + raise HTTPException( + status_code=400, + detail= + f"File {file.filename} is not a supported file type. Please upload mp4, png, jpg, jpeg, and gif files only.", + ) + + for file in file_paths: + logger.info(f"Processing file {file.filename}") + + # Assign unique identifier to file + id = generate_id() + + # Create file name by appending identifier + name, ext = os.path.splitext(file.filename) + file_name = f"{name}_{id}{ext}" + dir_name = os.path.splitext(file_name)[0] + + # Save file in upload_directory + with open(os.path.join(self.upload_folder, file_name), + "wb") as f: + shutil.copyfileobj(file.file, f) + uploaded_files_saved_files_map[name] = file_name + + # Store frames and caption annotations in a new directory + await extract_frames_and_generate_captions( + id, + os.path.join(self.upload_folder, file_name), + LVM_ENDPOINT, + os.path.join(self.upload_folder, dir_name), + ) + + # Ingest multimodal data into milvus + self.ingest_multimodal( + name, os.path.join(self.upload_folder, dir_name), + self.embeddings) + + # Delete temporary directory containing frames and annotations + # shutil.rmtree(os.path.join(upload_folder, dir_name)) + + logger.info(f"Processed file {file.filename}") + + return { + "status": 200, + "message": "Data preparation succeeded", + "file_id_maps": uploaded_files_saved_files_map, + } + + raise HTTPException(status_code=400, + detail="Must provide at least one file.") + + async def ingest_files( + self, + files: Optional[Union[UploadFile, List[UploadFile]]] = File(None)): + + if logflag: + logger.info(f"[ milvus ingest ] files:{files}") + + if files: + accepted_media_formats = [ + ".mp4", ".png", ".jpg", ".jpeg", ".gif", ".pdf" + ] + # Create a lookup dictionary containing all media files + matched_files = { + f.filename: [f] + for f in files + if os.path.splitext(f.filename)[1] in accepted_media_formats + } + uploaded_files_map = {} + + for file in files: + file_base, file_extension = os.path.splitext(file.filename) + if file_extension == ".vtt": + if "{}.mp4".format(file_base) in matched_files: + matched_files["{}.mp4".format(file_base)].append(file) + else: + logger.info( + f"No video was found for caption file {file.filename}." + ) + elif file_extension == ".txt": + if "{}.png".format(file_base) in matched_files: + matched_files["{}.png".format(file_base)].append(file) + elif "{}.jpg".format(file_base) in matched_files: + matched_files["{}.jpg".format(file_base)].append(file) + elif "{}.jpeg".format(file_base) in matched_files: + matched_files["{}.jpeg".format(file_base)].append(file) + elif "{}.gif".format(file_base) in matched_files: + matched_files["{}.gif".format(file_base)].append(file) + else: + logger.info( + f"No image was found for caption file {file.filename}." + ) + elif file_extension not in accepted_media_formats: + logger.info( + f"Skipping file {file.filename} because of unsupported format." + ) + print("Pallavi 3") + for media_file_name, file_list in matched_files.items(): + if len(file_list) != 2 and os.path.splitext( + media_file_name)[1] != ".pdf": + raise HTTPException( + status_code=400, + detail=f"No caption file found for {media_file_name}") + + if len(matched_files.keys()) == 0: + return HTTPException( + status_code=400, + detail= + "The uploaded files have unsupported formats. Please upload at least one video file (.mp4) with captions (.vtt) or one image (.png, .jpg, .jpeg, or .gif) with caption (.txt) or one .pdf file", + ) + + for media_file in matched_files: + logger.info(f"Processing file {media_file}") + file_name, file_extension = os.path.splitext(media_file) + + # Assign unique identifier to file + file_id = generate_id() + + # Create file name by appending identifier + media_file_name = f"{file_name}_{file_id}{file_extension}" + media_dir_name = os.path.splitext(media_file_name)[0] + + # Save file in upload_directory + with open(os.path.join(self.upload_folder, media_file_name), + "wb") as f: + shutil.copyfileobj(matched_files[media_file][0].file, f) + uploaded_files_map[file_name] = media_file_name + + if file_extension == ".pdf": + # Set up location to store pdf images and text, reusing "frames" and "annotations" from video + output_dir = os.path.join(self.upload_folder, + media_dir_name) + os.makedirs(output_dir, exist_ok=True) + os.makedirs(os.path.join(output_dir, "frames"), + exist_ok=True) + doc = pymupdf.open( + os.path.join(self.upload_folder, media_file_name)) + annotations = [] + for page_idx, page in enumerate(doc, start=1): + text = page.get_text() + images = page.get_images() + for image_idx, image in enumerate(images, start=1): + # Write image and caption file for each image found in pdf + img_fname = f"page{page_idx}_image{image_idx}" + img_fpath = os.path.join(output_dir, "frames", + img_fname + ".png") + pix = pymupdf.Pixmap(doc, + image[0]) # create pixmap + + if pix.n - pix.alpha > 3: # if CMYK, convert to RGB first + pix = pymupdf.Pixmap(pymupdf.csRGB, pix) + + pix.save(img_fpath) # pixmap to png + pix = None + + # Convert image to base64 encoded string + with open(img_fpath, "rb") as image2str: + encoded_string = base64.b64encode( + image2str.read()) # png to bytes + + decoded_string = encoded_string.decode( + ) # bytes to string + + # Create annotations file, reusing metadata keys from video + annotations.append({ + "video_id": + file_id, + "video_name": + os.path.basename( + os.path.join(self.upload_folder, + media_file_name)), + "b64_img_str": + decoded_string, + "caption": + text, + "time": + 0.0, + "frame_no": + page_idx, + "sub_video_id": + image_idx, + }) + else: + # Save caption file in upload directory + caption_file_extension = os.path.splitext( + matched_files[media_file][1].filename)[1] + caption_file = f"{media_dir_name}{caption_file_extension}" + with open( + os.path.join(self.upload_folder, caption_file), + "wb") as f: + shutil.copyfileobj( + matched_files[media_file][1].file, f) + + # Store frames and caption annotations in a new directory + extract_frames_and_annotations_from_transcripts( + file_id, + os.path.join(self.upload_folder, media_file_name), + os.path.join(self.upload_folder, caption_file), + os.path.join(self.upload_folder, media_dir_name), + ) + + # Delete temporary caption file + os.remove( + os.path.join(self.upload_folder, caption_file)) + + # Ingest multimodal data into milvus + self.ingest_multimodal( + file_name, + os.path.join(self.upload_folder, media_dir_name), + self.embeddings) + + # Delete temporary media directory containing frames and annotations + shutil.rmtree( + os.path.join(self.upload_folder, media_dir_name)) + + logger.info(f"Processed file {media_file}") + + return { + "status": 200, + "message": "Data preparation succeeded", + "file_id_maps": uploaded_files_map, + } + + raise HTTPException( + status_code=400, + detail= + "Must provide at least one pair consisting of video (.mp4) and captions (.vtt) or image (.png, .jpg, .jpeg, .gif) with caption (.txt)", + ) + + async def get_files(self): + """Returns list of names of uploaded videos saved on the server.""" + + if not Path(upload_folder).exists(): + logger.info("No file uploaded, return empty list.") + return [] + + uploaded_videos = os.listdir(upload_folder) + return uploaded_videos + + async def delete_files(self, file_path: str = Body(..., embed=True)): + if logflag: + logger.info(f"[ milvus delete ] delete files: {file_path}") + + my_milvus = Milvus( + embedding_function=self.embeddings, + collection_name=COLLECTION_NAME, + connection_args={"uri": MILVUS_URI}, + index_params=INDEX_PARAMS, + auto_id=True, + ) + + # delete all uploaded files + delete_all_data(my_milvus) + + # delete files on local disk + try: + clear_upload_folder(upload_folder) + except Exception as e: + if logflag: + logger.info( + f"[ milvus delete ] {e}. Fail to delete {upload_folder}.") + raise HTTPException(status_code=500, + detail=f"Fail to delete {upload_folder}: {e}") + if logflag: + logger.info("[ milvus delete ] successfully delete all files.") + + create_upload_folder(upload_folder) + if logflag: + logger.info("[ milvus delete ] new upload folder created.") + return {"status": True} + + async def ingest_videos(self, files: List[UploadFile] = File(None)): + pass + + async def get_videos(self): + pass + + async def get_one_file(self, filename: str): + pass diff --git a/comps/dataprep/src/opea_dataprep_multimodal_microservice.py b/comps/dataprep/src/opea_dataprep_multimodal_microservice.py index 9fbb562a17..c5ae57d5d8 100644 --- a/comps/dataprep/src/opea_dataprep_multimodal_microservice.py +++ b/comps/dataprep/src/opea_dataprep_multimodal_microservice.py @@ -9,6 +9,7 @@ from fastapi import Body, File, UploadFile from integrations.redis_multimodal import OpeaMultimodalRedisDataprep from integrations.vdms_multimodal import OpeaMultimodalVdmsDataprep +from integrations.milvus_multimodal import OpeaMultimodalMilvusDataprep from opea_dataprep_loader import OpeaDataprepMultiModalLoader from comps import ( From 90647f60fdb04201e72affb6f1e0eb9fd8c1fc76 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Sat, 8 Mar 2025 04:14:06 +0000 Subject: [PATCH 2/5] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci Signed-off-by: Shifani Rajabose --- .../src/integrations/milvus_multimodal.py | 327 +++++++----------- .../opea_dataprep_multimodal_microservice.py | 2 +- 2 files changed, 118 insertions(+), 211 deletions(-) diff --git a/comps/dataprep/src/integrations/milvus_multimodal.py b/comps/dataprep/src/integrations/milvus_multimodal.py index 6a02a10c0b..5f07fe5f15 100644 --- a/comps/dataprep/src/integrations/milvus_multimodal.py +++ b/comps/dataprep/src/integrations/milvus_multimodal.py @@ -14,10 +14,10 @@ import pymupdf from fastapi import Body, File, Form, HTTPException, UploadFile from langchain.text_splitter import RecursiveCharacterTextSplitter -from langchain_core.embeddings import Embeddings from langchain_core.documents import Document -from langchain_milvus.vectorstores import Milvus +from langchain_core.embeddings import Embeddings from langchain_core.utils import get_from_dict_or_env +from langchain_milvus.vectorstores import Milvus from PIL import Image from comps import CustomLogger, DocPath, OpeaComponent, OpeaComponentRegistry, ServiceType @@ -25,8 +25,8 @@ from .utils.multimodal import ( clear_upload_folder, - create_upload_folder, convert_video_to_audio, + create_upload_folder, delete_audio_file, extract_frames_and_annotations_from_transcripts, extract_frames_and_generate_captions, @@ -44,8 +44,7 @@ upload_folder = "./uploaded_files/" # Models -EMBED_MODEL = os.getenv("EMBEDDING_MODEL_ID", - "BridgeTower/bridgetower-large-itm-mlm-itc") +EMBED_MODEL = os.getenv("EMBEDDING_MODEL_ID", "BridgeTower/bridgetower-large-itm-mlm-itc") WHISPER_MODEL = os.getenv("WHISPER_MODEL", "small") # Lvm Microservice Information @@ -62,6 +61,7 @@ "uri": MILVUS_URI, } + class MultimodalMilvus(Milvus): """Milvus vector database to process multimodal data.""" @@ -97,9 +97,7 @@ def from_text_image_pairs_return_keys( """ # If images are provided, the length of texts must be equal to the length of images if images and len(texts) != len(images): - raise ValueError( - f"the len of captions {len(texts)} does not equal the len of images {len(images)}" - ) + raise ValueError(f"the len of captions {len(texts)} does not equal the len of images {len(images)}") milvus_uri = get_from_dict_or_env(kwargs, "milvus_uri", "MILVUS_URI") @@ -121,15 +119,11 @@ def from_text_image_pairs_return_keys( # type check for metadata if metadatas: - if isinstance(metadatas, list) and len(metadatas) != len( - texts): # type: ignore # noqa: E501 - raise ValueError( - "Number of metadatas must match number of texts") - if not (isinstance(metadatas, list) - and isinstance(metadatas[0], dict)): + if isinstance(metadatas, list) and len(metadatas) != len(texts): # type: ignore + raise ValueError("Number of metadatas must match number of texts") + if not (isinstance(metadatas, list) and isinstance(metadatas[0], dict)): raise ValueError("Metadatas must be a list of dicts") - #generated_schema = _prepare_metadata_fields(metadatas[0]) - + # generated_schema = _prepare_metadata_fields(metadatas[0]) # Create instance metadatas_test = metadatas @@ -141,9 +135,11 @@ def from_text_image_pairs_return_keys( partition_key_field=partition_field_name, **kwargs, ) - keys = (instance.add_text_image_pairs( - texts, images, embedding, metadatas=metadatas_test, keys=keys) - if images else instance.add_text(texts, metadatas=metadatas_test, keys=keys)) + keys = ( + instance.add_text_image_pairs(texts, images, embedding, metadatas=metadatas_test, keys=keys) + if images + else instance.add_text(texts, metadatas=metadatas_test, keys=keys) + ) return instance, keys def add_text_image_pairs( @@ -166,25 +162,20 @@ def add_text_image_pairs( logger.info(f"test 2 {isinstance(metadatas, list)}") logger.info(metadatas[0]) if metadatas: - if isinstance(metadatas, list) and len(metadatas) != len( - texts): # type: ignore # noqa: E501 - raise ValueError( - "Number of metadatas must match number of texts") - if not (isinstance(metadatas, list) - and isinstance(metadatas[0], dict)): + if isinstance(metadatas, list) and len(metadatas) != len(texts): # type: ignore + raise ValueError("Number of metadatas must match number of texts") + if not (isinstance(metadatas, list) and isinstance(metadatas[0], dict)): raise ValueError("Metadatas must be a list of dicts") pil_imgs = [Image.open(img) for img in images] if not embeddings: - embeddings = embedding.embed_image_text_pairs( - list(texts), pil_imgs, batch_size=batch_size) + embeddings = embedding.embed_image_text_pairs(list(texts), pil_imgs, batch_size=batch_size) for metadata in metadatas: metadata["pk"] = str(uuid.uuid4().hex) for key, value in metadata.items(): if isinstance(value, str) and len(value) > 65535: metadata[key] = value[:65535] - return self.add_embeddings(list(texts), embeddings, metadatas, - batch_size) + return self.add_embeddings(list(texts), embeddings, metadatas, batch_size) def add_text( self, @@ -203,12 +194,9 @@ def add_text( # type check for metadata if metadatas: - if isinstance(metadatas, list) and len(metadatas) != len( - texts): # type: ignore # noqa: E501 - raise ValueError( - "Number of metadatas must match number of texts") - if not (isinstance(metadatas, list) - and isinstance(metadatas[0], dict)): + if isinstance(metadatas, list) and len(metadatas) != len(texts): # type: ignore + raise ValueError("Number of metadatas must match number of texts") + if not (isinstance(metadatas, list) and isinstance(metadatas[0], dict)): raise ValueError("Metadatas must be a list of dicts") if not embeddings: @@ -222,8 +210,7 @@ def add_text( if isinstance(value, str) and len(value) > 65535: metadata[key] = value[:65535] - return self.add_embeddings(list(texts), embeddings, metadatas, - batch_size) + return self.add_embeddings(list(texts), embeddings, metadatas, batch_size) def search_by_file(collection, file_name): @@ -239,8 +226,7 @@ def search_by_file(collection, file_name): def search_all(collection): - results = collection.query(expr="pk >= 0", - output_fields=[partition_field_name, "pk"]) + results = collection.query(expr="pk >= 0", output_fields=[partition_field_name, "pk"]) if logflag: logger.info(f"[ search all ] {len(results)} results: {results}") return results @@ -257,9 +243,7 @@ def delete_all_data(my_milvus): def delete_by_partition_field(my_milvus, partition_field): if logflag: - logger.info( - f"[ delete partition ] deleting {partition_field_name} {partition_field}" - ) + logger.info(f"[ delete partition ] deleting {partition_field_name} {partition_field}") pks = my_milvus.get_pks(f'{partition_field_name} == "{partition_field}"') if logflag: logger.info(f"[ delete partition ] target pks: {pks}") @@ -279,14 +263,12 @@ class OpeaMultimodalMilvusDataprep(OpeaComponent): """ def __init__(self, name: str, description: str, config: dict = None): - super().__init__(name, ServiceType.DATAPREP.name.lower(), description, - config) + super().__init__(name, ServiceType.DATAPREP.name.lower(), description, config) self.device = "cpu" # Load embeddings model logger.info("Initializing BridgeTower model as embedder...") self.upload_folder = "./uploaded_files/" - self.embeddings = BridgeTowerEmbedding(model_name=EMBED_MODEL, - device=self.device) + self.embeddings = BridgeTowerEmbedding(model_name=EMBED_MODEL, device=self.device) logger.info("Done initialization of embedder!") health_status = self.check_health() @@ -310,8 +292,7 @@ def check_health(self) -> bool: ) _ = client.client.list_collections() if logflag: - logger.info( - "[ health check ] Successfully connected to Milvus!") + logger.info("[ health check ] Successfully connected to Milvus!") return True except Exception as e: logger.info(f"[ health check ] Failed to connect to Milvus: {e}") @@ -333,25 +314,16 @@ def prepare_data_and_metadata_from_annotation( metadatas = [] for i, frame in enumerate(annotation): frame_index = frame["sub_video_id"] - path_to_frame = os.path.join(path_to_frames, - f"frame_{frame_index}.png") + path_to_frame = os.path.join(path_to_frames, f"frame_{frame_index}.png") # augment this frame's transcript with a reasonable number of neighboring frames' transcripts helps semantic retrieval lb_ingesting = max(0, i - num_transcript_concat_for_ingesting) - ub_ingesting = min(len(annotation), - i + num_transcript_concat_for_ingesting + 1) - caption_for_ingesting = " ".join([ - annotation[j]["caption"] - for j in range(lb_ingesting, ub_ingesting) - ]) + ub_ingesting = min(len(annotation), i + num_transcript_concat_for_ingesting + 1) + caption_for_ingesting = " ".join([annotation[j]["caption"] for j in range(lb_ingesting, ub_ingesting)]) # augment this frame's transcript with more neighboring frames' transcript to provide more context to LVM for question answering lb_inference = max(0, i - num_transcript_concat_for_inference) - ub_inference = min(len(annotation), - i + num_transcript_concat_for_inference + 1) - caption_for_inference = " ".join([ - annotation[j]["caption"] - for j in range(lb_inference, ub_inference) - ]) + ub_inference = min(len(annotation), i + num_transcript_concat_for_inference + 1) + caption_for_inference = " ".join([annotation[j]["caption"] for j in range(lb_inference, ub_inference)]) video_id = frame["video_id"] b64_img_str = frame["b64_img_str"] @@ -364,21 +336,22 @@ def prepare_data_and_metadata_from_annotation( if b64_img_str: image_list.append(path_to_frame) - metadatas.append({ - "content": caption_for_ingesting, - "b64_img_str": b64_img_str, - "video_id": video_id, - "source_video": source_video, - "time_of_frame_ms": float(time_of_frame), - "embedding_type": embedding_type, - "title": title, - "transcript_for_inference": caption_for_inference, - }) + metadatas.append( + { + "content": caption_for_ingesting, + "b64_img_str": b64_img_str, + "video_id": video_id, + "source_video": source_video, + "time_of_frame_ms": float(time_of_frame), + "embedding_type": embedding_type, + "title": title, + "transcript_for_inference": caption_for_inference, + } + ) return text_list, image_list, metadatas - def prepare_pdf_data_from_annotation(self, annotation, path_to_files, - title): + def prepare_pdf_data_from_annotation(self, annotation, path_to_files, title): """PDF data processing has some key differences from videos and images. 1. Neighboring transcripts are not currently considered relevant. @@ -394,8 +367,7 @@ def prepare_pdf_data_from_annotation(self, annotation, path_to_files, for item in annotation: page_index = item["frame_no"] image_index = item["sub_video_id"] - path_to_image = os.path.join( - path_to_files, f"page{page_index}_image{image_index}.png") + path_to_image = os.path.join(path_to_files, f"page{page_index}_image{image_index}.png") caption_for_ingesting = item["caption"] caption_for_inference = item["caption"] @@ -409,25 +381,22 @@ def prepare_pdf_data_from_annotation(self, annotation, path_to_files, if b64_img_str: image_list.append(path_to_image) - metadatas.append({ - "content": caption_for_ingesting, - "b64_img_str": b64_img_str, - "video_id": pdf_id, - "source_video": source, - "time_of_frame_ms": - page_index, # For PDFs save the page number - "embedding_type": embedding_type, - "title": title, - "transcript_for_inference": caption_for_inference, - }) + metadatas.append( + { + "content": caption_for_ingesting, + "b64_img_str": b64_img_str, + "video_id": pdf_id, + "source_video": source, + "time_of_frame_ms": page_index, # For PDFs save the page number + "embedding_type": embedding_type, + "title": title, + "transcript_for_inference": caption_for_inference, + } + ) return text_list, image_list, metadatas - def ingest_multimodal(self, - filename, - data_folder, - embeddings, - is_pdf=False): + def ingest_multimodal(self, filename, data_folder, embeddings, is_pdf=False): """Ingest text image pairs to Milvus from the data/ directory that consists of frames and annotations.""" data_folder = os.path.abspath(data_folder) annotation_file_path = os.path.join(data_folder, "annotations.json") @@ -438,10 +407,12 @@ def ingest_multimodal(self, # prepare data to ingest if is_pdf: text_list, image_list, metadatas = self.prepare_pdf_data_from_annotation( - annotation, path_to_frames, filename) + annotation, path_to_frames, filename + ) else: text_list, image_list, metadatas = self.prepare_data_and_metadata_from_annotation( - annotation, path_to_frames, filename) + annotation, path_to_frames, filename + ) MultimodalMilvus.from_text_image_pairs_return_keys( texts=[f"From {filename}. " + text for text in text_list], @@ -452,8 +423,7 @@ def ingest_multimodal(self, milvus_uri=MILVUS_URI, ) - async def ingest_generate_transcripts( - self, files: List[UploadFile] = File(None)): + async def ingest_generate_transcripts(self, files: List[UploadFile] = File(None)): """Upload videos or audio files with speech, generate transcripts using whisper and ingest into milvus.""" if files: @@ -465,8 +435,7 @@ async def ingest_generate_transcripts( else: raise HTTPException( status_code=400, - detail= - f"File {file.filename} is not an mp4 file. Please upload mp4 files only.", + detail=f"File {file.filename} is not an mp4 file. Please upload mp4 files only.", ) for file_to_ingest in files_to_ingest: @@ -474,8 +443,7 @@ async def ingest_generate_transcripts( file_extension = os.path.splitext(file_to_ingest.filename)[1] is_video = file_extension == ".mp4" file_type_str = "video" if is_video else "audio file" - logger.info( - f"Processing {file_type_str} {file_to_ingest.filename}") + logger.info(f"Processing {file_type_str} {file_to_ingest.filename}") # Assign unique identifier to video file_id = generate_id() @@ -486,8 +454,7 @@ async def ingest_generate_transcripts( dir_name = os.path.splitext(file_name_with_id)[0] # Save file in upload_directory - with open(os.path.join(self.upload_folder, file_name_with_id), - "wb") as f: + with open(os.path.join(self.upload_folder, file_name_with_id), "wb") as f: shutil.copyfileobj(file_to_ingest.file, f) uploaded_files_map[base_file_name] = file_name_with_id @@ -512,17 +479,13 @@ async def ingest_generate_transcripts( # Extract transcript from audio logger.info("Extracting transcript from audio") - transcripts = extract_transcript_from_audio( - whisper_model, os.path.join(self.upload_folder, - audio_file)) + transcripts = extract_transcript_from_audio(whisper_model, os.path.join(self.upload_folder, audio_file)) # Save transcript as vtt file and delete audio file vtt_file = dir_name + ".vtt" - write_vtt(transcripts, - os.path.join(self.upload_folder, vtt_file)) + write_vtt(transcripts, os.path.join(self.upload_folder, vtt_file)) if is_video: - delete_audio_file( - os.path.join(self.upload_folder, audio_file)) + delete_audio_file(os.path.join(self.upload_folder, audio_file)) logger.info("Done extracting transcript.") if is_video: @@ -550,9 +513,7 @@ async def ingest_generate_transcripts( # Ingest multimodal data into milvus logger.info("Ingesting data to milvus vector store") - self.ingest_multimodal( - base_file_name, os.path.join(self.upload_folder, dir_name), - self.embeddings) + self.ingest_multimodal(base_file_name, os.path.join(self.upload_folder, dir_name), self.embeddings) # Delete temporary video directory containing frames and annotations shutil.rmtree(os.path.join(self.upload_folder, dir_name)) @@ -567,28 +528,21 @@ async def ingest_generate_transcripts( "file_id_maps": uploaded_files_map, } - raise HTTPException( - status_code=400, - detail= - "Must provide at least one video (.mp4) or audio (.wav) file.") + raise HTTPException(status_code=400, detail="Must provide at least one video (.mp4) or audio (.wav) file.") - async def ingest_generate_captions(self, - files: List[UploadFile] = File(None)): + async def ingest_generate_captions(self, files: List[UploadFile] = File(None)): """Upload images and videos without speech (only background music or no audio), generate captions using lvm microservice and ingest into milvus.""" if files: file_paths = [] uploaded_files_saved_files_map = {} for file in files: - if os.path.splitext(file.filename)[1] in [ - ".mp4", ".png", ".jpg", ".jpeg", ".gif" - ]: + if os.path.splitext(file.filename)[1] in [".mp4", ".png", ".jpg", ".jpeg", ".gif"]: file_paths.append(file) else: raise HTTPException( status_code=400, - detail= - f"File {file.filename} is not a supported file type. Please upload mp4, png, jpg, jpeg, and gif files only.", + detail=f"File {file.filename} is not a supported file type. Please upload mp4, png, jpg, jpeg, and gif files only.", ) for file in file_paths: @@ -603,8 +557,7 @@ async def ingest_generate_captions(self, dir_name = os.path.splitext(file_name)[0] # Save file in upload_directory - with open(os.path.join(self.upload_folder, file_name), - "wb") as f: + with open(os.path.join(self.upload_folder, file_name), "wb") as f: shutil.copyfileobj(file.file, f) uploaded_files_saved_files_map[name] = file_name @@ -617,9 +570,7 @@ async def ingest_generate_captions(self, ) # Ingest multimodal data into milvus - self.ingest_multimodal( - name, os.path.join(self.upload_folder, dir_name), - self.embeddings) + self.ingest_multimodal(name, os.path.join(self.upload_folder, dir_name), self.embeddings) # Delete temporary directory containing frames and annotations # shutil.rmtree(os.path.join(upload_folder, dir_name)) @@ -632,25 +583,18 @@ async def ingest_generate_captions(self, "file_id_maps": uploaded_files_saved_files_map, } - raise HTTPException(status_code=400, - detail="Must provide at least one file.") + raise HTTPException(status_code=400, detail="Must provide at least one file.") - async def ingest_files( - self, - files: Optional[Union[UploadFile, List[UploadFile]]] = File(None)): + async def ingest_files(self, files: Optional[Union[UploadFile, List[UploadFile]]] = File(None)): if logflag: logger.info(f"[ milvus ingest ] files:{files}") if files: - accepted_media_formats = [ - ".mp4", ".png", ".jpg", ".jpeg", ".gif", ".pdf" - ] + accepted_media_formats = [".mp4", ".png", ".jpg", ".jpeg", ".gif", ".pdf"] # Create a lookup dictionary containing all media files matched_files = { - f.filename: [f] - for f in files - if os.path.splitext(f.filename)[1] in accepted_media_formats + f.filename: [f] for f in files if os.path.splitext(f.filename)[1] in accepted_media_formats } uploaded_files_map = {} @@ -660,9 +604,7 @@ async def ingest_files( if "{}.mp4".format(file_base) in matched_files: matched_files["{}.mp4".format(file_base)].append(file) else: - logger.info( - f"No video was found for caption file {file.filename}." - ) + logger.info(f"No video was found for caption file {file.filename}.") elif file_extension == ".txt": if "{}.png".format(file_base) in matched_files: matched_files["{}.png".format(file_base)].append(file) @@ -673,26 +615,18 @@ async def ingest_files( elif "{}.gif".format(file_base) in matched_files: matched_files["{}.gif".format(file_base)].append(file) else: - logger.info( - f"No image was found for caption file {file.filename}." - ) + logger.info(f"No image was found for caption file {file.filename}.") elif file_extension not in accepted_media_formats: - logger.info( - f"Skipping file {file.filename} because of unsupported format." - ) + logger.info(f"Skipping file {file.filename} because of unsupported format.") print("Pallavi 3") for media_file_name, file_list in matched_files.items(): - if len(file_list) != 2 and os.path.splitext( - media_file_name)[1] != ".pdf": - raise HTTPException( - status_code=400, - detail=f"No caption file found for {media_file_name}") + if len(file_list) != 2 and os.path.splitext(media_file_name)[1] != ".pdf": + raise HTTPException(status_code=400, detail=f"No caption file found for {media_file_name}") if len(matched_files.keys()) == 0: return HTTPException( status_code=400, - detail= - "The uploaded files have unsupported formats. Please upload at least one video file (.mp4) with captions (.vtt) or one image (.png, .jpg, .jpeg, or .gif) with caption (.txt) or one .pdf file", + detail="The uploaded files have unsupported formats. Please upload at least one video file (.mp4) with captions (.vtt) or one image (.png, .jpg, .jpeg, or .gif) with caption (.txt) or one .pdf file", ) for media_file in matched_files: @@ -707,20 +641,16 @@ async def ingest_files( media_dir_name = os.path.splitext(media_file_name)[0] # Save file in upload_directory - with open(os.path.join(self.upload_folder, media_file_name), - "wb") as f: + with open(os.path.join(self.upload_folder, media_file_name), "wb") as f: shutil.copyfileobj(matched_files[media_file][0].file, f) uploaded_files_map[file_name] = media_file_name if file_extension == ".pdf": # Set up location to store pdf images and text, reusing "frames" and "annotations" from video - output_dir = os.path.join(self.upload_folder, - media_dir_name) + output_dir = os.path.join(self.upload_folder, media_dir_name) os.makedirs(output_dir, exist_ok=True) - os.makedirs(os.path.join(output_dir, "frames"), - exist_ok=True) - doc = pymupdf.open( - os.path.join(self.upload_folder, media_file_name)) + os.makedirs(os.path.join(output_dir, "frames"), exist_ok=True) + doc = pymupdf.open(os.path.join(self.upload_folder, media_file_name)) annotations = [] for page_idx, page in enumerate(doc, start=1): text = page.get_text() @@ -728,10 +658,8 @@ async def ingest_files( for image_idx, image in enumerate(images, start=1): # Write image and caption file for each image found in pdf img_fname = f"page{page_idx}_image{image_idx}" - img_fpath = os.path.join(output_dir, "frames", - img_fname + ".png") - pix = pymupdf.Pixmap(doc, - image[0]) # create pixmap + img_fpath = os.path.join(output_dir, "frames", img_fname + ".png") + pix = pymupdf.Pixmap(doc, image[0]) # create pixmap if pix.n - pix.alpha > 3: # if CMYK, convert to RGB first pix = pymupdf.Pixmap(pymupdf.csRGB, pix) @@ -741,41 +669,28 @@ async def ingest_files( # Convert image to base64 encoded string with open(img_fpath, "rb") as image2str: - encoded_string = base64.b64encode( - image2str.read()) # png to bytes + encoded_string = base64.b64encode(image2str.read()) # png to bytes - decoded_string = encoded_string.decode( - ) # bytes to string + decoded_string = encoded_string.decode() # bytes to string # Create annotations file, reusing metadata keys from video - annotations.append({ - "video_id": - file_id, - "video_name": - os.path.basename( - os.path.join(self.upload_folder, - media_file_name)), - "b64_img_str": - decoded_string, - "caption": - text, - "time": - 0.0, - "frame_no": - page_idx, - "sub_video_id": - image_idx, - }) + annotations.append( + { + "video_id": file_id, + "video_name": os.path.basename(os.path.join(self.upload_folder, media_file_name)), + "b64_img_str": decoded_string, + "caption": text, + "time": 0.0, + "frame_no": page_idx, + "sub_video_id": image_idx, + } + ) else: # Save caption file in upload directory - caption_file_extension = os.path.splitext( - matched_files[media_file][1].filename)[1] + caption_file_extension = os.path.splitext(matched_files[media_file][1].filename)[1] caption_file = f"{media_dir_name}{caption_file_extension}" - with open( - os.path.join(self.upload_folder, caption_file), - "wb") as f: - shutil.copyfileobj( - matched_files[media_file][1].file, f) + with open(os.path.join(self.upload_folder, caption_file), "wb") as f: + shutil.copyfileobj(matched_files[media_file][1].file, f) # Store frames and caption annotations in a new directory extract_frames_and_annotations_from_transcripts( @@ -786,18 +701,13 @@ async def ingest_files( ) # Delete temporary caption file - os.remove( - os.path.join(self.upload_folder, caption_file)) + os.remove(os.path.join(self.upload_folder, caption_file)) # Ingest multimodal data into milvus - self.ingest_multimodal( - file_name, - os.path.join(self.upload_folder, media_dir_name), - self.embeddings) + self.ingest_multimodal(file_name, os.path.join(self.upload_folder, media_dir_name), self.embeddings) # Delete temporary media directory containing frames and annotations - shutil.rmtree( - os.path.join(self.upload_folder, media_dir_name)) + shutil.rmtree(os.path.join(self.upload_folder, media_dir_name)) logger.info(f"Processed file {media_file}") @@ -809,8 +719,7 @@ async def ingest_files( raise HTTPException( status_code=400, - detail= - "Must provide at least one pair consisting of video (.mp4) and captions (.vtt) or image (.png, .jpg, .jpeg, .gif) with caption (.txt)", + detail="Must provide at least one pair consisting of video (.mp4) and captions (.vtt) or image (.png, .jpg, .jpeg, .gif) with caption (.txt)", ) async def get_files(self): @@ -843,10 +752,8 @@ async def delete_files(self, file_path: str = Body(..., embed=True)): clear_upload_folder(upload_folder) except Exception as e: if logflag: - logger.info( - f"[ milvus delete ] {e}. Fail to delete {upload_folder}.") - raise HTTPException(status_code=500, - detail=f"Fail to delete {upload_folder}: {e}") + logger.info(f"[ milvus delete ] {e}. Fail to delete {upload_folder}.") + raise HTTPException(status_code=500, detail=f"Fail to delete {upload_folder}: {e}") if logflag: logger.info("[ milvus delete ] successfully delete all files.") diff --git a/comps/dataprep/src/opea_dataprep_multimodal_microservice.py b/comps/dataprep/src/opea_dataprep_multimodal_microservice.py index c5ae57d5d8..9539bc9663 100644 --- a/comps/dataprep/src/opea_dataprep_multimodal_microservice.py +++ b/comps/dataprep/src/opea_dataprep_multimodal_microservice.py @@ -7,9 +7,9 @@ from typing import List, Optional, Union from fastapi import Body, File, UploadFile +from integrations.milvus_multimodal import OpeaMultimodalMilvusDataprep from integrations.redis_multimodal import OpeaMultimodalRedisDataprep from integrations.vdms_multimodal import OpeaMultimodalVdmsDataprep -from integrations.milvus_multimodal import OpeaMultimodalMilvusDataprep from opea_dataprep_loader import OpeaDataprepMultiModalLoader from comps import ( From fea2579ee4e29b3d2fec3f707332a09a3fd4fd20 Mon Sep 17 00:00:00 2001 From: "pallavi.jaini" Date: Sat, 8 Mar 2025 09:19:33 +0000 Subject: [PATCH 3/5] Added the partition field details Signed-off-by: pallavi.jaini --- .../src/integrations/milvus_multimodal.py | 30 +++++++++++-------- 1 file changed, 18 insertions(+), 12 deletions(-) diff --git a/comps/dataprep/src/integrations/milvus_multimodal.py b/comps/dataprep/src/integrations/milvus_multimodal.py index 5f07fe5f15..f94a016a15 100644 --- a/comps/dataprep/src/integrations/milvus_multimodal.py +++ b/comps/dataprep/src/integrations/milvus_multimodal.py @@ -113,6 +113,10 @@ def from_text_image_pairs_return_keys( if "keys" in kwargs: keys = kwargs.pop("keys") + filename = "" + if "filename" in kwargs: + filename = kwargs.pop("filename") + # Name of the search index if not given if not collection_name: collection_name = uuid.uuid4().hex @@ -124,7 +128,6 @@ def from_text_image_pairs_return_keys( if not (isinstance(metadatas, list) and isinstance(metadatas[0], dict)): raise ValueError("Metadatas must be a list of dicts") # generated_schema = _prepare_metadata_fields(metadatas[0]) - # Create instance metadatas_test = metadatas instance = cls( @@ -136,9 +139,9 @@ def from_text_image_pairs_return_keys( **kwargs, ) keys = ( - instance.add_text_image_pairs(texts, images, embedding, metadatas=metadatas_test, keys=keys) + instance.add_text_image_pairs(texts, images, filename, embedding, metadatas=metadatas_test, keys=keys) if images - else instance.add_text(texts, metadatas=metadatas_test, keys=keys) + else instance.add_text(texts, filename, metadatas=metadatas_test, keys=keys) ) return instance, keys @@ -146,6 +149,7 @@ def add_text_image_pairs( self, texts: Iterable[str], images: Iterable[str], + filename: str, embedding: Embeddings = BridgeTowerEmbedding, metadatas: Optional[List[dict]] = None, embeddings: Optional[List[List[float]]] = None, @@ -157,10 +161,6 @@ def add_text_image_pairs( # Get keys or ids from kwargs # Other vectorstores use ids keys_or_ids = kwargs.get("keys", kwargs.get("ids")) - logger.info(f"Testing 2.0 {type(metadatas)}") - # type check for metadata - logger.info(f"test 2 {isinstance(metadatas, list)}") - logger.info(metadatas[0]) if metadatas: if isinstance(metadatas, list) and len(metadatas) != len(texts): # type: ignore raise ValueError("Number of metadatas must match number of texts") @@ -171,15 +171,18 @@ def add_text_image_pairs( if not embeddings: embeddings = embedding.embed_image_text_pairs(list(texts), pil_imgs, batch_size=batch_size) for metadata in metadatas: - metadata["pk"] = str(uuid.uuid4().hex) + metadata["filename"] = filename for key, value in metadata.items(): if isinstance(value, str) and len(value) > 65535: metadata[key] = value[:65535] - return self.add_embeddings(list(texts), embeddings, metadatas, batch_size) + db_ids = self.add_embeddings(list(texts), embeddings, metadatas, batch_size) + logger.info(db_ids) + return db_ids def add_text( self, texts: Iterable[str], + filename: str, embedding: Embeddings = BridgeTowerEmbedding, metadatas: Optional[List[dict]] = None, embeddings: Optional[List[List[float]]] = None, @@ -205,12 +208,15 @@ def add_text( batch_size = 2 for metadata in metadatas: - metadata["pk"] = str(uuid.uuid4().hex) + metadata["filename"] = filename for key, value in metadata.items(): if isinstance(value, str) and len(value) > 65535: metadata[key] = value[:65535] - return self.add_embeddings(list(texts), embeddings, metadatas, batch_size) + + db_ids = self.add_embeddings(list(texts), embeddings, metadatas, batch_size) + logger.info(db_ids) + return db_ids def search_by_file(collection, file_name): @@ -421,6 +427,7 @@ def ingest_multimodal(self, filename, data_folder, embeddings, is_pdf=False): metadatas=metadatas, collection_name=COLLECTION_NAME, milvus_uri=MILVUS_URI, + filename=filename, ) async def ingest_generate_transcripts(self, files: List[UploadFile] = File(None)): @@ -618,7 +625,6 @@ async def ingest_files(self, files: Optional[Union[UploadFile, List[UploadFile]] logger.info(f"No image was found for caption file {file.filename}.") elif file_extension not in accepted_media_formats: logger.info(f"Skipping file {file.filename} because of unsupported format.") - print("Pallavi 3") for media_file_name, file_list in matched_files.items(): if len(file_list) != 2 and os.path.splitext(media_file_name)[1] != ".pdf": raise HTTPException(status_code=400, detail=f"No caption file found for {media_file_name}") From 15fafe1d41398ef03e01366852e74c72c2bfa621 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Sat, 8 Mar 2025 09:20:38 +0000 Subject: [PATCH 4/5] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- comps/dataprep/src/integrations/milvus_multimodal.py | 1 - 1 file changed, 1 deletion(-) diff --git a/comps/dataprep/src/integrations/milvus_multimodal.py b/comps/dataprep/src/integrations/milvus_multimodal.py index f94a016a15..cd8f40e93a 100644 --- a/comps/dataprep/src/integrations/milvus_multimodal.py +++ b/comps/dataprep/src/integrations/milvus_multimodal.py @@ -213,7 +213,6 @@ def add_text( if isinstance(value, str) and len(value) > 65535: metadata[key] = value[:65535] - db_ids = self.add_embeddings(list(texts), embeddings, metadatas, batch_size) logger.info(db_ids) return db_ids From 53998b7180409133dfc2669b8705826b1ef2b2f0 Mon Sep 17 00:00:00 2001 From: "pallavi.jaini" Date: Mon, 10 Mar 2025 21:39:24 +0000 Subject: [PATCH 5/5] Added the test for dataprep_multimodal_milvus Signed-off-by: pallavi.jaini --- .../test_dataprep_milvus_multimodal.sh | 415 ++++++++++++++++++ 1 file changed, 415 insertions(+) create mode 100644 tests/dataprep/test_dataprep_milvus_multimodal.sh diff --git a/tests/dataprep/test_dataprep_milvus_multimodal.sh b/tests/dataprep/test_dataprep_milvus_multimodal.sh new file mode 100644 index 0000000000..371dc85a74 --- /dev/null +++ b/tests/dataprep/test_dataprep_milvus_multimodal.sh @@ -0,0 +1,415 @@ +#!/bin/bash +# Copyright (C) 2024 Intel Corporation +# SPDX-License-Identifier: Apache-2.0 + +set -x + +WORKPATH=$(dirname "$PWD") +LOG_PATH="$WORKPATH/tests" +ip_address=$(hostname -I | awk '{print $1}') +LVM_PORT=5028 +LVM_ENDPOINT="http://${ip_address}:${LVM_PORT}/v1/lvm" +WHISPER_MODEL="base" +INDEX_NAME="dataprep" +tmp_dir=$(mktemp -d) +video_name="WeAreGoingOnBullrun" +transcript_fn="${tmp_dir}/${video_name}.vtt" +video_fn="${tmp_dir}/${video_name}.mp4" +audio_name="AudioSample" +audio_fn="${tmp_dir}/${audio_name}.wav" +image_name="apple" +image_fn="${tmp_dir}/${image_name}.png" +caption_fn="${tmp_dir}/${image_name}.txt" +pdf_name="nke-10k-2023" +pdf_fn="${tmp_dir}/${pdf_name}.pdf" +DATAPREP_PORT="11102" + +function build_docker_images() { + cd $WORKPATH + echo $(pwd) + docker build --no-cache -t opea/dataprep:comps --build-arg https_proxy=$https_proxy --build-arg http_proxy=$http_proxy -f comps/dataprep/src/Dockerfile . + + if [ $? -ne 0 ]; then + echo "opea/dataprep built fail" + exit 1 + else + echo "opea/dataprep built successful" + fi +} + +function build_lvm_docker_images() { + cd $WORKPATH + echo $(pwd) + docker build --no-cache -t opea/lvm-llava:comps --build-arg https_proxy=$https_proxy --build-arg http_proxy=$http_proxy -f comps/lvms/src/integrations/dependency/llava/Dockerfile . + if [ $? -ne 0 ]; then + echo "opea/lvm-llava built fail" + exit 1 + else + echo "opea/lvm-llava built successful" + fi + docker build --no-cache -t opea/lvm:comps --build-arg https_proxy=$https_proxy --build-arg http_proxy=$http_proxy -f comps/lvms/src/Dockerfile . + if [ $? -ne 0 ]; then + echo "opea/lvm built fail" + exit 1 + else + echo "opea/lvm built successful" + fi +} + +function start_lvm_service() { + unset http_proxy + docker run -d --name="test-comps-lvm-llava" -e http_proxy=$http_proxy -e https_proxy=$https_proxy -p 5029:8399 --ipc=host opea/lvm-llava:comps + sleep 4m + docker run -d --name="test-comps-lvm-llava-svc" -e LVM_ENDPOINT=http://$ip_address:5029 -e http_proxy=$http_proxy -e https_proxy=$https_proxy -p ${LVM_PORT}:9399 --ipc=host opea/lvm:comps + sleep 1m +} + +function start_lvm() { + cd $WORKPATH + echo $(pwd) + echo "Building LVM Docker Images" + build_lvm_docker_images + echo "Starting LVM Services" + start_lvm_service + +} + +function start_service() { + export host_ip=${ip_address} + export MILVUS_HOST=$ip_address + export LVM_PORT=5028 + export LVM_ENDPOINT="http://${ip_address}:${LVM_PORT}/v1/lvm" + export INDEX_NAME="dataprep" + export TAG="comps" + service_name="etcd minio standalone dataprep-multimodal-milvus" + cd $WORKPATH/comps/dataprep/deployment/docker_compose/ + docker compose up ${service_name} -d + sleep 1m +} + +function prepare_data() { + echo "Prepare Transcript .vtt" + cd ${LOG_PATH} + echo $(pwd) + echo """WEBVTT + +00:00:00.000 --> 00:00:03.400 +Last year the smoking tire went on the bull run live rally in the + +00:00:03.400 --> 00:00:09.760 +2010 Ford SBT Raptor. I liked it so much. I bought one. Here it is. We're going back + +00:00:09.760 --> 00:00:12.920 +to bull run this year of course we'll help from our friends at Black Magic and + +00:00:12.920 --> 00:00:19.560 +we're so serious about it. We got two Valentine one radar detectors. Oh yeah. + +00:00:19.560 --> 00:00:23.760 +So we're all set up and the reason we got two is because we're going to be going + +00:00:23.760 --> 00:00:29.920 +a little bit faster. We got a 2011 Shelby GT500. The 550 horsepower + +00:00:29.920 --> 00:00:34.560 +all-luminum V8. We are going to be right in the action bringing you guys a video + +00:00:34.560 --> 00:00:40.120 +every single day live from the bull run rally July 9th to 16th and the only + +00:00:40.120 --> 00:00:45.240 +place to watch it is on BlackmagicShine.com. We're right here on the smoking + +00:00:45.240 --> 00:00:47.440 +tire.""" > ${transcript_fn} + + echo "This is an apple." > ${caption_fn} + + echo "Downloading Image" + wget https://github.com/docarray/docarray/blob/main/tests/toydata/image-data/apple.png?raw=true -O ${image_fn} + + echo "Downloading Video" + wget http://commondatastorage.googleapis.com/gtv-videos-bucket/sample/WeAreGoingOnBullrun.mp4 -O ${video_fn} + + echo "Downloading Audio" + wget https://github.com/intel/intel-extension-for-transformers/raw/main/intel_extension_for_transformers/neural_chat/assets/audio/sample.wav -O ${audio_fn} + + echo "Downloading PDF" + wget https://raw.githubusercontent.com/opea-project/GenAIComps/v1.1/comps/retrievers/redis/data/nke-10k-2023.pdf -O ${pdf_fn} + +} + +function validate_microservice() { + cd $LOG_PATH + + # test v1/generate_transcripts upload file + echo "Testing generate_transcripts API" + URL="http://${ip_address}:$DATAPREP_PORT/v1/dataprep/generate_transcripts" + HTTP_RESPONSE=$(curl --silent --write-out "HTTPSTATUS:%{http_code}" -X POST -F "files=@$video_fn" -F "files=@$audio_fn" -H 'Content-Type: multipart/form-data' "$URL") + HTTP_STATUS=$(echo $HTTP_RESPONSE | tr -d '\n' | sed -e 's/.*HTTPSTATUS://') + RESPONSE_BODY=$(echo $HTTP_RESPONSE | sed -e 's/HTTPSTATUS\:.*//g') + SERVICE_NAME="dataprep - upload - file" + + if [ "$HTTP_STATUS" -ne "200" ]; then + echo "[ $SERVICE_NAME ] HTTP status is not 200. Received status was $HTTP_STATUS" + docker logs dataprep-multimodal-milvus-server >> ${LOG_PATH}/dataprep_upload_file.log + exit 1 + else + echo "[ $SERVICE_NAME ] HTTP status is 200. Checking content..." + fi + if [[ "$RESPONSE_BODY" != *"Data preparation succeeded"* ]]; then + echo "[ $SERVICE_NAME ] Content does not match the expected result: $RESPONSE_BODY" + docker logs dataprep-multimodal-milvus-server >> ${LOG_PATH}/dataprep_upload_file.log + exit 1 + else + echo "[ $SERVICE_NAME ] Content is as expected." + fi + + # test ingest upload video file + echo "Testing ingest API with video+transcripts" + URL="http://${ip_address}:$DATAPREP_PORT/v1/dataprep/ingest" + + HTTP_RESPONSE=$(curl --silent --write-out "HTTPSTATUS:%{http_code}" -X POST -F "files=@$video_fn" -F "files=@$transcript_fn" -H 'Content-Type: multipart/form-data' "$URL") + HTTP_STATUS=$(echo $HTTP_RESPONSE | tr -d '\n' | sed -e 's/.*HTTPSTATUS://') + RESPONSE_BODY=$(echo $HTTP_RESPONSE | sed -e 's/HTTPSTATUS\:.*//g') + SERVICE_NAME="dataprep - upload - file" + + if [ "$HTTP_STATUS" -ne "200" ]; then + echo "[ $SERVICE_NAME ] HTTP status is not 200. Received status was $HTTP_STATUS" + docker logs dataprep-multimodal-milvus-server >> ${LOG_PATH}/dataprep_upload_file.log + exit 1 + else + echo "[ $SERVICE_NAME ] HTTP status is 200. Checking content..." + fi + if [[ "$RESPONSE_BODY" != *"Data preparation succeeded"* ]]; then + echo "[ $SERVICE_NAME ] Content does not match the expected result: $RESPONSE_BODY" + docker logs dataprep-multimodal-milvus-server >> ${LOG_PATH}/dataprep_upload_file.log + exit 1 + else + echo "[ $SERVICE_NAME ] Content is as expected." + fi + + # test ingest upload image file + echo "Testing ingest API with image+caption" + URL="http://${ip_address}:$DATAPREP_PORT/v1/dataprep/ingest" + + HTTP_RESPONSE=$(curl --silent --write-out "HTTPSTATUS:%{http_code}" -X POST -F "files=@$image_fn" -F "files=@$caption_fn" -H 'Content-Type: multipart/form-data' "$URL") + HTTP_STATUS=$(echo $HTTP_RESPONSE | tr -d '\n' | sed -e 's/.*HTTPSTATUS://') + RESPONSE_BODY=$(echo $HTTP_RESPONSE | sed -e 's/HTTPSTATUS\:.*//g') + SERVICE_NAME="dataprep - upload - file" + + if [ "$HTTP_STATUS" -ne "200" ]; then + echo "[ $SERVICE_NAME ] HTTP status is not 200. Received status was $HTTP_STATUS" + docker logs dataprep-multimodal-milvus-server >> ${LOG_PATH}/dataprep_upload_file.log + exit 1 + else + echo "[ $SERVICE_NAME ] HTTP status is 200. Checking content..." + fi + if [[ "$RESPONSE_BODY" != *"Data preparation succeeded"* ]]; then + echo "[ $SERVICE_NAME ] Content does not match the expected result: $RESPONSE_BODY" + docker logs dataprep-multimodal-milvus-server >> ${LOG_PATH}/dataprep_upload_file.log + exit 1 + else + echo "[ $SERVICE_NAME ] Content is as expected." + fi + + # test ingest with video and image + echo "Testing ingest API with both video+transcript and image+caption" + URL="http://${ip_address}:$DATAPREP_PORT/v1/dataprep/ingest" + + HTTP_RESPONSE=$(curl --silent --write-out "HTTPSTATUS:%{http_code}" -X POST -F "files=@$image_fn" -F "files=@$caption_fn" -F "files=@$video_fn" -F "files=@$transcript_fn" -H 'Content-Type: multipart/form-data' "$URL") + HTTP_STATUS=$(echo $HTTP_RESPONSE | tr -d '\n' | sed -e 's/.*HTTPSTATUS://') + RESPONSE_BODY=$(echo $HTTP_RESPONSE | sed -e 's/HTTPSTATUS\:.*//g') + SERVICE_NAME="dataprep - upload - file" + + if [ "$HTTP_STATUS" -ne "200" ]; then + echo "[ $SERVICE_NAME ] HTTP status is not 200. Received status was $HTTP_STATUS" + docker logs dataprep-multimodal-milvus-server >> ${LOG_PATH}/dataprep_upload_file.log + exit 1 + else + echo "[ $SERVICE_NAME ] HTTP status is 200. Checking content..." + fi + if [[ "$RESPONSE_BODY" != *"Data preparation succeeded"* ]]; then + echo "[ $SERVICE_NAME ] Content does not match the expected result: $RESPONSE_BODY" + docker logs dataprep-multimodal-milvus-server >> ${LOG_PATH}/dataprep_upload_file.log + exit 1 + else + echo "[ $SERVICE_NAME ] Content is as expected." + fi + + # test ingest with invalid input (.png image with .vtt transcript) + echo "Testing ingest API with invalid input (.png and .vtt)" + URL="http://${ip_address}:$DATAPREP_PORT/v1/dataprep/ingest" + + HTTP_RESPONSE=$(curl --silent --write-out "HTTPSTATUS:%{http_code}" -X POST -F "files=@$image_fn" -F "files=@$transcript_fn" -H 'Content-Type: multipart/form-data' "$URL") + HTTP_STATUS=$(echo $HTTP_RESPONSE | tr -d '\n' | sed -e 's/.*HTTPSTATUS://') + RESPONSE_BODY=$(echo $HTTP_RESPONSE | sed -e 's/HTTPSTATUS\:.*//g') + SERVICE_NAME="dataprep - upload - file" + + if [ "$HTTP_STATUS" -ne "400" ]; then + echo "[ $SERVICE_NAME ] HTTP status is not 400. Received status was $HTTP_STATUS" + docker logs dataprep-multimodal-milvus-server >> ${LOG_PATH}/dataprep_upload_file.log + exit 1 + else + echo "[ $SERVICE_NAME ] HTTP status is 400. Checking content..." + fi + if [[ "$RESPONSE_BODY" != *"No caption file found for $image_name"* ]]; then + echo "[ $SERVICE_NAME ] Content does not match the expected result: $RESPONSE_BODY" + docker logs dataprep-multimodal-milvus-server >> ${LOG_PATH}/dataprep_upload_file.log + exit 1 + else + echo "[ $SERVICE_NAME ] Content is as expected." + fi + + # test ingest with a PDF file + echo "Testing ingest API with a PDF file" + URL="http://${ip_address}:$DATAPREP_PORT/v1/dataprep/ingest" + + HTTP_RESPONSE=$(curl --silent --write-out "HTTPSTATUS:%{http_code}" -X POST -F "files=@$pdf_fn" -H 'Content-Type: multipart/form-data' "$URL") + HTTP_STATUS=$(echo $HTTP_RESPONSE | tr -d '\n' | sed -e 's/.*HTTPSTATUS://') + RESPONSE_BODY=$(echo $HTTP_RESPONSE | sed -e 's/HTTPSTATUS\:.*//g') + SERVICE_NAME="dataprep - upload - file" + + if [ "$HTTP_STATUS" -ne "200" ]; then + echo "[ $SERVICE_NAME ] HTTP status is not 200. Received status was $HTTP_STATUS" + docker logs dataprep-multimodal-milvus-server >> ${LOG_PATH}/dataprep_upload_file.log + exit 1 + else + echo "[ $SERVICE_NAME ] HTTP status is 200. Checking content..." + fi + if [[ "$RESPONSE_BODY" != *"Data preparation succeeded"* ]]; then + echo "[ $SERVICE_NAME ] Content does not match the expected result: $RESPONSE_BODY" + docker logs dataprep-multimodal-milvus-server >> ${LOG_PATH}/dataprep_upload_file.log + exit 1 + else + echo "[ $SERVICE_NAME ] Content is as expected." + fi + + # test generate_captions upload video file + echo "Testing generate_captions API with video" + URL="http://${ip_address}:$DATAPREP_PORT/v1/dataprep/generate_captions" + + HTTP_RESPONSE=$(curl --silent --write-out "HTTPSTATUS:%{http_code}" -X POST -F "files=@$video_fn" -H 'Content-Type: multipart/form-data' "$URL") + HTTP_STATUS=$(echo $HTTP_RESPONSE | tr -d '\n' | sed -e 's/.*HTTPSTATUS://') + RESPONSE_BODY=$(echo $HTTP_RESPONSE | sed -e 's/HTTPSTATUS\:.*//g') + SERVICE_NAME="dataprep - upload - file" + + if [ "$HTTP_STATUS" -ne "200" ]; then + echo "[ $SERVICE_NAME ] HTTP status is not 200. Received status was $HTTP_STATUS" + docker logs dataprep-multimodal-milvus-server >> ${LOG_PATH}/dataprep_upload_file.log + exit 1 + else + echo "[ $SERVICE_NAME ] HTTP status is 200. Checking content..." + fi + if [[ "$RESPONSE_BODY" != *"Data preparation succeeded"* ]]; then + echo "[ $SERVICE_NAME ] Content does not match the expected result: $RESPONSE_BODY" + docker logs dataprep-multimodal-milvus-server >> ${LOG_PATH}/dataprep_upload_file.log + exit 1 + else + echo "[ $SERVICE_NAME ] Content is as expected." + fi + + # test v1/generate_captions upload image file + echo "Testing generate_captions API with image" + URL="http://${ip_address}:$DATAPREP_PORT/v1/dataprep/generate_captions" + + HTTP_RESPONSE=$(curl --silent --write-out "HTTPSTATUS:%{http_code}" -X POST -F "files=@$image_fn" -H 'Content-Type: multipart/form-data' "$URL") + HTTP_STATUS=$(echo $HTTP_RESPONSE | tr -d '\n' | sed -e 's/.*HTTPSTATUS://') + RESPONSE_BODY=$(echo $HTTP_RESPONSE | sed -e 's/HTTPSTATUS\:.*//g') + SERVICE_NAME="dataprep - upload - file" + + if [ "$HTTP_STATUS" -ne "200" ]; then + echo "[ $SERVICE_NAME ] HTTP status is not 200. Received status was $HTTP_STATUS" + docker logs dataprep-multimodal-milvus-server >> ${LOG_PATH}/dataprep_upload_file.log + exit 1 + else + echo "[ $SERVICE_NAME ] HTTP status is 200. Checking content..." + fi + if [[ "$RESPONSE_BODY" != *"Data preparation succeeded"* ]]; then + echo "[ $SERVICE_NAME ] Content does not match the expected result: $RESPONSE_BODY" + docker logs dataprep-multimodal-milvus-server >> ${LOG_PATH}/dataprep_upload_file.log + exit 1 + else + echo "[ $SERVICE_NAME ] Content is as expected." + fi + + # test /v1/dataprep/get_files + echo "Testing get_files API" + URL="http://${ip_address}:$DATAPREP_PORT/v1/dataprep/get" + HTTP_RESPONSE=$(curl --silent --write-out "HTTPSTATUS:%{http_code}" -X POST "$URL") + HTTP_STATUS=$(echo $HTTP_RESPONSE | tr -d '\n' | sed -e 's/.*HTTPSTATUS://') + RESPONSE_BODY=$(echo $HTTP_RESPONSE | sed -e 's/HTTPSTATUS\:.*//g') + SERVICE_NAME="dataprep - get" + + if [ "$HTTP_STATUS" -ne "200" ]; then + echo "[ $SERVICE_NAME ] HTTP status is not 200. Received status was $HTTP_STATUS" + docker logs dataprep-multimodal-milvus-server >> ${LOG_PATH}/dataprep_file.log + exit 1 + else + echo "[ $SERVICE_NAME ] HTTP status is 200. Checking content..." + fi + if [[ "$RESPONSE_BODY" != *${image_name}* || "$RESPONSE_BODY" != *${video_name}* || "$RESPONSE_BODY" != *${audio_name}* || "$RESPONSE_BODY" != *${pdf_name}* ]]; then + echo "[ $SERVICE_NAME ] Content does not match the expected result: $RESPONSE_BODY" + docker logs dataprep-multimodal-milvus-server >> ${LOG_PATH}/dataprep_file.log + exit 1 + else + echo "[ $SERVICE_NAME ] Content is as expected." + fi + + # test /v1/dataprep/delete + echo "Testing delete API" + URL="http://${ip_address}:$DATAPREP_PORT/v1/dataprep/delete" + HTTP_RESPONSE=$(curl --silent --write-out "HTTPSTATUS:%{http_code}" -X POST -d '{"file_path": "dataprep_file.txt"}' -H 'Content-Type: application/json' "$URL") + HTTP_STATUS=$(echo $HTTP_RESPONSE | tr -d '\n' | sed -e 's/.*HTTPSTATUS://') + RESPONSE_BODY=$(echo $HTTP_RESPONSE | sed -e 's/HTTPSTATUS\:.*//g') + SERVICE_NAME="dataprep - del" + + # check response status + if [ "$HTTP_STATUS" -ne "200" ]; then + echo "[ $SERVICE_NAME ] HTTP status is not 200. Received status was $HTTP_STATUS" + docker logs dataprep-multimodal-milvus-server >> ${LOG_PATH}/dataprep_del.log + exit 1 + else + echo "[ $SERVICE_NAME ] HTTP status is 200. Checking content..." + fi + # check response body + if [[ "$RESPONSE_BODY" != *'{"status":true}'* ]]; then + echo "[ $SERVICE_NAME ] Content does not match the expected result: $RESPONSE_BODY" + docker logs dataprep-multimodal-milvus-server >> ${LOG_PATH}/dataprep_del.log + exit 1 + else + echo "[ $SERVICE_NAME ] Content is as expected." + fi +} + +function stop_docker() { + cid=$(docker ps -aq --filter "name=dataprep-multimodal-milvus-server*" --filter "name=milvus-*") + if [[ ! -z "$cid" ]]; then docker stop $cid && docker rm $cid && sleep 1s; fi + cid=$(docker ps -aq --filter "name=test-comps-lvm*") + if [[ ! -z "$cid" ]]; then docker stop $cid && docker rm $cid && sleep 1s; fi + +} + +function delete_data() { + cd ${LOG_PATH} + rm -rf ${tmp_dir} + sleep 1s +} + +function main() { + + stop_docker + build_docker_images + prepare_data + + start_lvm + start_service + + validate_microservice + delete_data + stop_docker + echo y | docker system prune + +} + +main