diff --git a/comps/embeddings/deployment/docker_compose/compose.yaml b/comps/embeddings/deployment/docker_compose/compose.yaml index 44780348af..6a96230e00 100644 --- a/comps/embeddings/deployment/docker_compose/compose.yaml +++ b/comps/embeddings/deployment/docker_compose/compose.yaml @@ -4,6 +4,7 @@ include: - ../../../third_parties/tei/deployment/docker_compose/compose.yaml - ../../../third_parties/bridgetower/deployment/docker_compose/compose.yaml + - ../../../third_parties/clip/deployment/docker_compose/compose_intel_cpu.yaml x-multimodal-bridgetower-embedding-config: &multimodal-bridgetower-embedding-config image: ${REGISTRY:-opea}/embedding:${TAG:-latest} @@ -64,7 +65,11 @@ services: no_proxy: ${no_proxy} http_proxy: ${http_proxy} https_proxy: ${https_proxy} + CLIP_EMBEDDING_ENDPOINT: ${CLIP_EMBEDDING_ENDPOINT} EMBEDDING_COMPONENT_NAME: "OPEA_CLIP_EMBEDDING" + depends_on: + multimodal-clip-embedding: + condition: service_healthy restart: unless-stopped multimodal-bridgetower-embedding-server: diff --git a/comps/embeddings/src/integrations/clip.py b/comps/embeddings/src/integrations/clip.py index e216c257df..4c7d037eef 100644 --- a/comps/embeddings/src/integrations/clip.py +++ b/comps/embeddings/src/integrations/clip.py @@ -1,13 +1,10 @@ # Copyright (C) 2024 Intel Corporation # SPDX-License-Identifier: Apache-2.0 +import asyncio import os -from typing import List, Union -import torch -import torch.nn as nn -from einops import rearrange -from transformers import AutoProcessor, AutoTokenizer, CLIPModel +import requests from comps import CustomLogger, OpeaComponent, OpeaComponentRegistry, ServiceType from comps.cores.proto.api_protocol import EmbeddingRequest, EmbeddingResponse, EmbeddingResponseData @@ -16,51 +13,6 @@ logflag = os.getenv("LOGFLAG", False) -model_name = "openai/clip-vit-base-patch32" - -clip = CLIPModel.from_pretrained(model_name) -processor = AutoProcessor.from_pretrained(model_name) -tokenizer = AutoTokenizer.from_pretrained(model_name) - - -class vCLIP(nn.Module): - def __init__(self, cfg): - super().__init__() - - self.num_frm = cfg["num_frm"] - self.model_name = cfg["model_name"] - - def embed_query(self, texts): - """Input is list of texts.""" - text_inputs = tokenizer(texts, padding=True, return_tensors="pt") - text_features = clip.get_text_features(**text_inputs) - return text_features - - def get_embedding_length(self): - text_features = self.embed_query("sample_text") - return text_features.shape[1] - - def get_image_embeddings(self, images): - """Input is list of images.""" - image_inputs = processor(images=images, return_tensors="pt") - image_features = clip.get_image_features(**image_inputs) - return image_features - - def get_video_embeddings(self, frames_batch): - """Input is list of list of frames in video.""" - self.batch_size = len(frames_batch) - vid_embs = [] - for frames in frames_batch: - frame_embeddings = self.get_image_embeddings(frames) - frame_embeddings = rearrange(frame_embeddings, "(b n) d -> b n d", b=len(frames_batch)) - # Normalize, mean aggregate and return normalized video_embeddings - frame_embeddings = frame_embeddings / frame_embeddings.norm(dim=-1, keepdim=True) - video_embeddings = frame_embeddings.mean(dim=1) - video_embeddings = video_embeddings / video_embeddings.norm(dim=-1, keepdim=True) - vid_embs.append(video_embeddings) - return torch.cat(vid_embs, dim=0) - - @OpeaComponentRegistry.register("OPEA_CLIP_EMBEDDING") class OpeaClipEmbedding(OpeaComponent): """A specialized embedding component derived from OpeaComponent for CLIP embedding services. @@ -74,7 +26,7 @@ class OpeaClipEmbedding(OpeaComponent): def __init__(self, name: str, description: str, config: dict = None): super().__init__(name, ServiceType.EMBEDDING.name.lower(), description, config) - self.embeddings = vCLIP({"model_name": "openai/clip-vit-base-patch32", "num_frm": 4}) + self.base_url = os.getenv("CLIP_EMBEDDING_ENDPOINT", "http://localhost:6990") health_status = self.check_health() if not health_status: @@ -89,25 +41,24 @@ async def invoke(self, input: EmbeddingRequest) -> EmbeddingResponse: Returns: EmbeddingResponse: The response in OpenAI embedding format, including embeddings, model, and usage information. """ - # Parse input according to the EmbeddingRequest format - if isinstance(input.input, str): - texts = [input.input.replace("\n", " ")] - elif isinstance(input.input, list): - if all(isinstance(item, str) for item in input.input): - texts = [text.replace("\n", " ") for text in input.input] - else: - raise ValueError("Invalid input format: Only string or list of strings are supported.") - else: - raise TypeError("Unsupported input type: input must be a string or list of strings.") - embed_vector = self.get_embeddings(texts) - if input.dimensions is not None: - embed_vector = [embed_vector[i][: input.dimensions] for i in range(len(embed_vector))] - - # for standard openai embedding format - res = EmbeddingResponse( - data=[EmbeddingResponseData(index=i, embedding=embed_vector[i]) for i in range(len(embed_vector))] - ) - return res + json_payload = input.model_dump() + try: + response = await asyncio.to_thread( + requests.post, + f"{self.base_url}/v1/embeddings", + headers={"Content-Type": "application/json"}, + json=json_payload, + ) + response.raise_for_status() + response_json = response.json() + + return EmbeddingResponse( + data=[EmbeddingResponseData(**item) for item in response_json.get("data", [])], + model=response_json.get("model", input.model), + usage=response_json.get("usage", {}), + ) + except requests.RequestException as e: + raise RuntimeError(f"Failed to invoke embedding service: {str(e)}") def check_health(self) -> bool: """Checks if the embedding model is healthy. @@ -115,20 +66,13 @@ def check_health(self) -> bool: Returns: bool: True if the embedding model is initialized, False otherwise. """ - if self.embeddings: + try: + _ = requests.post( + f"{self.base_url}/v1/embeddings", + headers={"Content-Type": "application/json"}, + json={"input": "health check"}, + ) + return True - else: + except requests.RequestException as e: return False - - def get_embeddings(self, text: Union[str, List[str]]) -> List[List[float]]: - """Generates embeddings for input text. - - Args: - text (Union[str, List[str]]): Input text or list of texts. - - Returns: - List[List[float]]: List of embedding vectors. - """ - texts = [text] if isinstance(text, str) else text - embed_vector = self.embeddings.embed_query(texts).tolist() - return embed_vector diff --git a/comps/embeddings/src/requirements.txt b/comps/embeddings/src/requirements.txt index 277fd59578..80cb078280 100644 --- a/comps/embeddings/src/requirements.txt +++ b/comps/embeddings/src/requirements.txt @@ -1,9 +1,7 @@ aiohttp docarray -einops fastapi huggingface_hub -open-clip-torch openai opentelemetry-api opentelemetry-exporter-otlp @@ -13,5 +11,4 @@ predictionguard==2.2.1 prometheus-fastapi-instrumentator PyYAML shortuuid -transformers uvicorn diff --git a/comps/third_parties/clip/deployment/docker_compose/compose_intel_cpu.yaml b/comps/third_parties/clip/deployment/docker_compose/compose_intel_cpu.yaml index 112804aa42..58c9dd3fa9 100644 --- a/comps/third_parties/clip/deployment/docker_compose/compose_intel_cpu.yaml +++ b/comps/third_parties/clip/deployment/docker_compose/compose_intel_cpu.yaml @@ -1,19 +1,22 @@ # Copyright (C) 2024 Intel Corporation # SPDX-License-Identifier: Apache-2.0 -version: "3.8" - services: - embedding: + multimodal-clip-embedding: image: opea/embedding-multimodal-clip:latest - container_name: embedding-multimodal-server + container_name: multimodal-clip-embedding-server ports: - - "6000:6000" + - "${MULTIMODAL_CLIP_EMBEDDER_PORT:-6990}:6990" ipc: host environment: no_proxy: ${no_proxy} http_proxy: ${http_proxy} https_proxy: ${https_proxy} + healthcheck: + test: ["CMD-SHELL", "sleep 30 && exit 0"] + interval: 1s + timeout: 35s + retries: 1 restart: unless-stopped networks: diff --git a/tests/embeddings/test_embeddings_clip.sh b/tests/embeddings/test_embeddings_clip.sh index a90d99b4f6..5a2668da5d 100644 --- a/tests/embeddings/test_embeddings_clip.sh +++ b/tests/embeddings/test_embeddings_clip.sh @@ -17,12 +17,25 @@ function build_docker_images() { else echo "opea/embedding built successful" fi + + cd $WORKPATH + echo $(pwd) + docker build --no-cache -t opea/embedding-multimodal-clip:comps --build-arg https_proxy=$https_proxy --build-arg http_proxy=$http_proxy -f comps/third_parties/clip/src/Dockerfile . + + if [ $? -ne 0 ]; then + echo "opea/embedding-multimodal-clip built fail" + exit 1 + else + echo "opea/embedding-multimodal-clip built successful" + fi } function start_service() { export TAG=comps export host_ip=${ip_address} export EMBEDDER_PORT=10203 + export MULTIMODAL_CLIP_EMBEDDER_PORT=10204 + export CLIP_EMBEDDING_ENDPOINT=http://${host_ip}:${MULTIMODAL_CLIP_EMBEDDER_PORT} service_name="clip-embedding-server" cd $WORKPATH cd comps/embeddings/deployment/docker_compose/ @@ -42,6 +55,7 @@ function validate_service() { else echo "Result wrong. Received was $result" docker logs clip-embedding-server + docker logs multimodal-clip-embedding-server exit 1 fi } @@ -57,7 +71,7 @@ function validate_microservice() { } function stop_docker() { - cid=$(docker ps -aq --filter "name=clip-embedding-server*") + cid=$(docker ps -aq --filter "name=clip-embedding-server*" --filter "name=multimodal-clip-embedding-server*") if [[ ! -z "$cid" ]]; then docker stop $cid && docker rm $cid && sleep 1s; fi }