diff --git a/.gitignore b/.gitignore index 1d1e0a3899..875e50beef 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,6 @@ __pycache__ *.egg-info/ .DS_Store +.idea/ +.venv/ +build/ \ No newline at end of file diff --git a/comps/dataprep/elasticsearch/langchain/Dockerfile b/comps/dataprep/elasticsearch/langchain/Dockerfile new file mode 100644 index 0000000000..3880025d24 --- /dev/null +++ b/comps/dataprep/elasticsearch/langchain/Dockerfile @@ -0,0 +1,35 @@ +FROM python:3.11-slim + +ENV LANG=C.UTF-8 + +ARG ARCH="cpu" + +RUN apt-get update -y && apt-get install -y --no-install-recommends --fix-missing \ + build-essential \ + default-jre \ + libgl1-mesa-glx \ + libjemalloc-dev + +RUN useradd -m -s /bin/bash user && \ + mkdir -p /home/user && \ + chown -R user /home/user/ + +USER user + +COPY comps /home/user/comps + +RUN pip install --no-cache-dir --upgrade pip setuptools && \ + if [ ${ARCH} = "cpu" ]; then pip install --no-cache-dir torch torchvision --index-url https://download.pytorch.org/whl/cpu; fi && \ + pip install --no-cache-dir -r /home/user/comps/dataprep/elasticsearch/langchain/requirements.txt + +ENV PYTHONPATH=$PYTHONPATH:/home/user + +USER root + +RUN mkdir -p /home/user/comps/dataprep/elasticsearch/langchain/uploaded_files && chown -R user /home/user/comps/dataprep/elasticsearch/langchain/uploaded_files + +USER user + +WORKDIR /home/user/comps/dataprep/elasticsearch/langchain + +ENTRYPOINT ["python", "prepare_doc_elasticsearch.py"] diff --git a/comps/dataprep/elasticsearch/langchain/README.md b/comps/dataprep/elasticsearch/langchain/README.md new file mode 100644 index 0000000000..2b05a3959c --- /dev/null +++ b/comps/dataprep/elasticsearch/langchain/README.md @@ -0,0 +1,130 @@ +# Dataprep Microservice with Elasticsearch + +## ๐Ÿš€1. Start Microservice with Python๏ผˆOption 1๏ผ‰ + +### 1.1 Install Requirements + +```bash +pip install -r requirements.txt +``` + +### 1.2 Setup Environment Variables + +```bash +export ES_CONNECTION_STRING=http://localhost:9200 +export INDEX_NAME=${your_index_name} +``` + +### 1.3 Start Elasticsearch + +Please refer to this [readme](../../../vectorstores/Elasticsearch/README.md). + +### 1.4 Start Document Preparation Microservice for Elasticsearch with Python Script + +Start document preparation microservice for Elasticsearch with below command. + +```bash +python prepare_doc_elastic.py +``` + +## ๐Ÿš€2. Start Microservice with Docker (Option 2) + +### 2.1 Start Elasticsearch + +Please refer to this [readme](../../../vectorstores/elasticsearch/README.md). + +### 2.2 Setup Environment Variables + +```bash +export ES_CONNECTION_STRING=http://localhost:9200 +export INDEX_NAME=${your_index_name} +``` + +### 2.3 Build Docker Image + +```bash +cd GenAIComps +docker build -t opea/dataprep-elasticsearch:latest --build-arg https_proxy=$https_proxy --build-arg http_proxy=$http_proxy -f comps/dataprep/elasticsearch/langchain/Dockerfile . +``` + +### 2.4 Run Docker with CLI (Option A) + +```bash +docker run --name="dataprep-elasticsearch" -p 6011:6011 --ipc=host -e http_proxy=$http_proxy -e https_proxy=$https_proxy -e ES_CONNECTION_STRING=$ES_CONNECTION_STRING -e INDEX_NAME=$INDEX_NAME -e TEI_ENDPOINT=$TEI_ENDPOINT opea/dataprep-elastic:latest +``` + +### 2.5 Run with Docker Compose (Option B) + +```bash +cd comps/dataprep/elasticsearch/langchain +docker compose -f docker-compose-dataprep-elastic.yaml up -d +``` + +## ๐Ÿš€3. Consume Microservice + +### 3.1 Consume Upload API + +Once document preparation microservice for Elasticsearch is started, user can use below command to invoke the +microservice to convert the document to embedding and save to the database. + +```bash +curl -X POST \ + -H "Content-Type: application/json" \ + -d '{"path":"/path/to/document"}' \ + http://localhost:6011/v1/dataprep +``` + +### 3.2 Consume get_file API + +To get uploaded file structures, use the following command: + +```bash +curl -X POST \ + -H "Content-Type: application/json" \ + http://localhost:6011/v1/dataprep/get_file +``` + +Then you will get the response JSON like this: + +```json +[ + { + "name": "uploaded_file_1.txt", + "id": "uploaded_file_1.txt", + "type": "File", + "parent": "" + }, + { + "name": "uploaded_file_2.txt", + "id": "uploaded_file_2.txt", + "type": "File", + "parent": "" + } +] +``` + +### 4.3 Consume delete_file API + +To delete uploaded file/link, use the following command. + +The `file_path` here should be the `id` get from `/v1/dataprep/get_file` API. + +```bash +# delete link +curl -X POST \ + -H "Content-Type: application/json" \ + -d '{"file_path": "https://www.ces.tech/.txt"}' \ + http://localhost:6011/v1/dataprep/delete_file + +# delete file +curl -X POST \ + -H "Content-Type: application/json" \ + -d '{"file_path": "uploaded_file_1.txt"}' \ + http://localhost:6011/v1/dataprep/delete_file + +# delete all files and links +curl -X POST \ + -H "Content-Type: application/json" \ + -d '{"file_path": "all"}' \ + http://localhost:6011/v1/dataprep/delete_file +``` diff --git a/comps/dataprep/elasticsearch/langchain/__init__.py b/comps/dataprep/elasticsearch/langchain/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/comps/dataprep/elasticsearch/langchain/config.py b/comps/dataprep/elasticsearch/langchain/config.py new file mode 100644 index 0000000000..3cf111cea5 --- /dev/null +++ b/comps/dataprep/elasticsearch/langchain/config.py @@ -0,0 +1,20 @@ +import os + +ES_CONNECTION_STRING = os.getenv("ES_CONNECTION_STRING", "http://localhost:9200") +UPLOADED_FILES_PATH = os.getenv("UPLOADED_FILES_PATH", "./uploaded_files/") + +# Embedding model +EMBED_MODEL = os.getenv("EMBED_MODEL", "BAAI/bge-base-en-v1.5") + +# TEI Embedding endpoints +TEI_ENDPOINT = os.getenv("TEI_ENDPOINT", "") + +# Vector Index Configuration +INDEX_NAME = os.getenv("INDEX_NAME", "rag-elastic") + +# chunk parameters +CHUNK_SIZE = os.getenv("CHUNK_SIZE", 1500) +CHUNK_OVERLAP = os.getenv("CHUNK_OVERLAP", 100) + +# Logging enabled +LOG_FLAG = os.getenv("LOGFLAG", False) diff --git a/comps/dataprep/elasticsearch/langchain/docker-compose.yaml b/comps/dataprep/elasticsearch/langchain/docker-compose.yaml new file mode 100644 index 0000000000..6a7fa64e6b --- /dev/null +++ b/comps/dataprep/elasticsearch/langchain/docker-compose.yaml @@ -0,0 +1,38 @@ +version: "3" +services: + elasticsearch-vector-db: + hostname: db + container_name: elasticsearch-vector-db + image: docker.elastic.co/elasticsearch/elasticsearch:8.16.0 + ports: + - "9200:9200" + - "9300:9300" + restart: always + ipc: host + environment: + - ES_JAVA_OPTS=-Xms1g -Xmx1g + - discovery.type=single-node + - xpack.security.enabled=false + - bootstrap.memory_lock=false + - no_proxy= ${no_proxy} + - http_proxy= ${http_proxy} + - https_proxy= ${https_proxy} + + dataprep-elasticsearch: + image: opea/dataprep-elasticsearch:latest + container_name: dataprep-elasticsearch + ports: + - "6011:6011" + ipc: host + environment: + http_proxy: ${http_proxy} + https_proxy: ${https_proxy} + ES_CONNECTION_STRING: ${ES_CONNECTION_STRING} + INDEX_NAME: ${INDEX_NAME} + TEI_ENDPOINT: ${TEI_ENDPOINT} + HUGGINGFACEHUB_API_TOKEN: ${HUGGINGFACEHUB_API_TOKEN} + restart: unless-stopped + +networks: + default: + driver: bridge diff --git a/comps/dataprep/elasticsearch/langchain/prepare_doc_elasticsearch.py b/comps/dataprep/elasticsearch/langchain/prepare_doc_elasticsearch.py new file mode 100644 index 0000000000..e6a46447ae --- /dev/null +++ b/comps/dataprep/elasticsearch/langchain/prepare_doc_elasticsearch.py @@ -0,0 +1,370 @@ +import json +import os +from pathlib import Path +from typing import List, Optional, Union + +from comps import CustomLogger, DocPath, opea_microservices, register_microservice +from comps.dataprep.utils import ( + create_upload_folder, + document_loader, + encode_filename, + get_file_structure, + get_separators, + get_tables_result, + parse_html, + remove_folder_with_ignore, + save_content_to_local_disk, +) +from elasticsearch import Elasticsearch +from fastapi import Body, File, Form, HTTPException, UploadFile +from langchain.text_splitter import HTMLHeaderTextSplitter, RecursiveCharacterTextSplitter +from langchain_community.embeddings import HuggingFaceBgeEmbeddings +from langchain_core.documents import Document +from langchain_elasticsearch import ElasticsearchStore +from langchain_huggingface.embeddings import HuggingFaceEndpointEmbeddings + +from config import ( + CHUNK_OVERLAP, + CHUNK_SIZE, + EMBED_MODEL, + ES_CONNECTION_STRING, + INDEX_NAME, + LOG_FLAG, + TEI_ENDPOINT, + UPLOADED_FILES_PATH, +) + +logger = CustomLogger(__name__) + + +def create_index() -> None: + if not es_client.indices.exists(index=INDEX_NAME): + es_client.indices.create(index=INDEX_NAME) + + +def get_embedder() -> Union[HuggingFaceEndpointEmbeddings, HuggingFaceBgeEmbeddings]: + """Obtain required Embedder""" + + if TEI_ENDPOINT: + return HuggingFaceEndpointEmbeddings(model=TEI_ENDPOINT) + else: + return HuggingFaceBgeEmbeddings(model_name=EMBED_MODEL) + + +def get_elastic_store(embedder: Union[HuggingFaceEndpointEmbeddings, HuggingFaceBgeEmbeddings]) -> ElasticsearchStore: + """Get Elasticsearch vector store""" + + return ElasticsearchStore(index_name=INDEX_NAME, embedding=embedder, es_connection=es_client) + + +def delete_embeddings(doc_name: str) -> bool: + """Delete documents from Elasticsearch.""" + + try: + if doc_name == "all": + if LOG_FLAG: + logger.info(f"Deleting all documents from vectorstore") + + query = {"query": {"match_all": {}}} + else: + if LOG_FLAG: + logger.info(f"Deleting {doc_name} from vectorstore") + + query = {"query": {"match": {"metadata.doc_name": {"query": doc_name, "operator": "AND"}}}} + + es_client.delete_by_query(index=INDEX_NAME, body=query) + + return True + + except Exception as e: + if LOG_FLAG: + logger.info(f"An unexpected error occurred: {e}") + + return False + + +def search_by_filename(file_name: str) -> bool: + """Search Elasticsearch by file name.""" + + query = {"query": {"match": {"metadata.doc_name": {"query": file_name, "operator": "AND"}}}} + results = es_client.search(index=INDEX_NAME, body=query) + + if LOG_FLAG: + logger.info(f"[ search by file ] searched by {file_name}") + logger.info(f"[ search by file ] {len(results['hits'])} results: {results}") + + return results["hits"]["total"]["value"] > 0 + + +def ingest_doc_to_elastic(doc_path: DocPath) -> None: + """Ingest documents to Elasticsearch""" + + path = doc_path.path + file_name = path.split("/")[-1] + if LOG_FLAG: + logger.info(f"Parsing document {path}, file name: {file_name}.") + + if path.endswith(".html"): + headers_to_split_on = [ + ("h1", "Header 1"), + ("h2", "Header 2"), + ("h3", "Header 3"), + ] + text_splitter = HTMLHeaderTextSplitter(headers_to_split_on=headers_to_split_on) + else: + text_splitter = RecursiveCharacterTextSplitter( + chunk_size=doc_path.chunk_size, + chunk_overlap=doc_path.chunk_overlap, + add_start_index=True, + separators=get_separators(), + ) + + content = document_loader(path) + + structured_types = [".xlsx", ".csv", ".json", "jsonl"] + _, ext = os.path.splitext(path) + + if ext in structured_types: + chunks = content + else: + chunks = text_splitter.split_text(content) + + if doc_path.process_table and path.endswith(".pdf"): + table_chunks = get_tables_result(path, doc_path.table_strategy) + chunks = chunks + table_chunks + + if LOG_FLAG: + logger.info(f"Done preprocessing. Created {len(chunks)} chunks of the original file.") + + batch_size = 32 + num_chunks = len(chunks) + + metadata = dict({"doc_name": str(file_name)}) + + for i in range(0, num_chunks, batch_size): + batch_chunks = chunks[i : i + batch_size] + batch_texts = batch_chunks + + documents = [Document(page_content=text, metadata=metadata) for text in batch_texts] + _ = es_store.add_documents(documents=documents) + if LOG_FLAG: + logger.info(f"Processed batch {i // batch_size + 1}/{(num_chunks - 1) // batch_size + 1}") + + +async def ingest_link_to_elastic(link_list: List[str]) -> None: + """Ingest data scraped from website links into Elasticsearch""" + + text_splitter = RecursiveCharacterTextSplitter( + chunk_size=CHUNK_SIZE, + chunk_overlap=CHUNK_OVERLAP, + add_start_index=True, + separators=get_separators(), + ) + + batch_size = 32 + + for link in link_list: + content = parse_html([link])[0][0] + if LOG_FLAG: + logger.info(f"[ ingest link ] link: {link} content: {content}") + + encoded_link = encode_filename(link) + save_path = UPLOADED_FILES_PATH + encoded_link + ".txt" + doc_path = UPLOADED_FILES_PATH + link + ".txt" + if LOG_FLAG: + logger.info(f"[ ingest link ] save_path: {save_path}") + + await save_content_to_local_disk(save_path, content) + + chunks = text_splitter.split_text(content) + + num_chunks = len(chunks) + metadata = [dict({"doc_name": str(doc_path)})] + + for i in range(0, num_chunks, batch_size): + batch_chunks = chunks[i : i + batch_size] + batch_texts = batch_chunks + + documents = [Document(page_content=text, metadata=metadata) for text in batch_texts] + _ = es_store.add_documents(documents=documents) + + if LOG_FLAG: + logger.info(f"Processed batch {i // batch_size + 1}/{(num_chunks - 1) // batch_size + 1}") + + +@register_microservice(name="opea_service@prepare_doc_elastic", endpoint="/v1/dataprep", host="0.0.0.0", port=6011) +async def ingest_documents( + files: Optional[Union[UploadFile, List[UploadFile]]] = File(None), + link_list: Optional[str] = Form(None), + chunk_size: int = Form(1500), + chunk_overlap: int = Form(100), + process_table: bool = Form(False), + table_strategy: str = Form("fast"), +): + """Ingest documents for RAG""" + + if LOG_FLAG: + logger.info(f"files:{files}") + logger.info(f"link_list:{link_list}") + + if files and link_list: + raise HTTPException(status_code=400, detail="Provide either a file or a string list, not both.") + + if files: + if not isinstance(files, list): + files = [files] + + if not os.path.exists(UPLOADED_FILES_PATH): + Path(UPLOADED_FILES_PATH).mkdir(parents=True, exist_ok=True) + + for file in files: + encode_file = encode_filename(file.filename) + save_path = UPLOADED_FILES_PATH + encode_file + filename = save_path.split("/")[-1] + + try: + exists = search_by_filename(filename) + except Exception as e: + raise HTTPException( + status_code=500, + detail=f"Failed when searching in Elasticsearch for file {file.filename}.", + ) + + if exists: + if LOG_FLAG: + logger.info(f"[ upload ] File {file.filename} already exists.") + + raise HTTPException( + status_code=400, + detail=f"Uploaded file {file.filename} already exists. Please change file name.", + ) + + await save_content_to_local_disk(save_path, file) + + ingest_doc_to_elastic( + DocPath( + path=save_path, + chunk_size=chunk_size, + chunk_overlap=chunk_overlap, + process_table=process_table, + table_strategy=table_strategy, + ) + ) + if LOG_FLAG: + logger.info(f"Successfully saved file {save_path}") + + result = {"status": 200, "message": "Data preparation succeeded"} + + if LOG_FLAG: + logger.info(result) + return result + + if link_list: + try: + link_list = json.loads(link_list) # Parse JSON string to list + if not isinstance(link_list, list): + raise HTTPException(status_code=400, detail="link_list should be a list.") + + await ingest_link_to_elastic(link_list) + + if LOG_FLAG: + logger.info(f"Successfully saved link list {link_list}") + + result = {"status": 200, "message": "Data preparation succeeded"} + + if LOG_FLAG: + logger.info(result) + return result + + except json.JSONDecodeError: + raise HTTPException(status_code=400, detail="Invalid JSON format for link_list.") + + raise HTTPException(status_code=400, detail="Must provide either a file or a string list.") + + +@register_microservice( + name="opea_service@prepare_doc_elastic", + endpoint="/v1/dataprep/get_file", + host="0.0.0.0", + port=6011, +) +async def rag_get_file_structure(): + """Obtain uploaded file list""" + + if LOG_FLAG: + logger.info("[ dataprep - get file ] start to get file structure") + + if not Path(UPLOADED_FILES_PATH).exists(): + if LOG_FLAG: + logger.info("No file uploaded, return empty list.") + return [] + + file_content = get_file_structure(UPLOADED_FILES_PATH) + + if LOG_FLAG: + logger.info(file_content) + + return file_content + + +@register_microservice( + name="opea_service@prepare_doc_elastic", + endpoint="/v1/dataprep/delete_file", + host="0.0.0.0", + port=6011, +) +async def delete_single_file(file_path: str = Body(..., embed=True)): + """Delete file according to `file_path`. + + `file_path`: + - specific file path (e.g. /path/to/file.txt) + - folder path (e.g. /path/to/folder) + - "all": delete all files uploaded + """ + if file_path == "all": + if LOG_FLAG: + logger.info("[dataprep - del] delete all files") + remove_folder_with_ignore(UPLOADED_FILES_PATH) + assert delete_embeddings(file_path) + if LOG_FLAG: + logger.info("[dataprep - del] successfully delete all files.") + create_upload_folder(UPLOADED_FILES_PATH) + if LOG_FLAG: + logger.info({"status": True}) + return {"status": True} + + delete_path = Path(UPLOADED_FILES_PATH + "/" + encode_filename(file_path)) + + if LOG_FLAG: + logger.info(f"[dataprep - del] delete_path: {delete_path}") + + if delete_path.exists(): + # delete file + if delete_path.is_file(): + try: + assert delete_embeddings(file_path) + delete_path.unlink() + except Exception as e: + if LOG_FLAG: + logger.info(f"[dataprep - del] fail to delete file {delete_path}: {e}") + logger.info({"status": False}) + return {"status": False} + # delete folder + else: + if LOG_FLAG: + logger.info("[dataprep - del] delete folder is not supported for now.") + logger.info({"status": False}) + return {"status": False} + if LOG_FLAG: + logger.info({"status": True}) + return {"status": True} + else: + raise HTTPException(status_code=404, detail="File/folder not found. Please check del_path.") + + +if __name__ == "__main__": + es_client = Elasticsearch(hosts=ES_CONNECTION_STRING) + es_store = get_elastic_store(get_embedder()) + create_upload_folder(UPLOADED_FILES_PATH) + create_index() + opea_microservices["opea_service@prepare_doc_elastic"].start() diff --git a/comps/dataprep/elasticsearch/langchain/requirements.txt b/comps/dataprep/elasticsearch/langchain/requirements.txt new file mode 100644 index 0000000000..6afaec6145 --- /dev/null +++ b/comps/dataprep/elasticsearch/langchain/requirements.txt @@ -0,0 +1,30 @@ +beautifulsoup4 +cairosvg +docarray[full] +docx2txt +easyocr +elasticsearch +fastapi +huggingface_hub +langchain +langchain-community +langchain-elasticsearch +langchain-text-splitters +langchain-huggingface +markdown +numpy +opentelemetry-api +opentelemetry-exporter-otlp +opentelemetry-sdk +pandas +Pillow +prometheus-fastapi-instrumentator +pymupdf +pytesseract +python-bidi +python-docx +python-pptx +sentence_transformers +shortuuid +unstructured[all-docs] +uvicorn diff --git a/comps/retrievers/elasticsearch/langchain/Dockerfile b/comps/retrievers/elasticsearch/langchain/Dockerfile new file mode 100644 index 0000000000..26c4974a91 --- /dev/null +++ b/comps/retrievers/elasticsearch/langchain/Dockerfile @@ -0,0 +1,25 @@ +FROM python:3.11-slim + +ARG ARCH="cpu" + +RUN apt-get update -y && apt-get install -y --no-install-recommends --fix-missing \ + libgl1-mesa-glx \ + libjemalloc-dev + +RUN useradd -m -s /bin/bash user && \ + mkdir -p /home/user && \ + chown -R user /home/user/ + +COPY comps /home/user/comps + +USER user + +RUN pip install --no-cache-dir --upgrade pip setuptools && \ + if [ ${ARCH} = "cpu" ]; then pip install --no-cache-dir torch torchvision --index-url https://download.pytorch.org/whl/cpu; fi && \ + pip install --no-cache-dir -r /home/user/comps/retrievers/elasticsearch/langchain/requirements.txt + +ENV PYTHONPATH=$PYTHONPATH:/home/user + +WORKDIR /home/user/comps/retrievers/elasticsearch/langchain + +ENTRYPOINT ["python", "retriever_elasticsearch.py"] diff --git a/comps/retrievers/elasticsearch/langchain/README.md b/comps/retrievers/elasticsearch/langchain/README.md new file mode 100644 index 0000000000..455f8c7837 --- /dev/null +++ b/comps/retrievers/elasticsearch/langchain/README.md @@ -0,0 +1,122 @@ +# Retriever Microservice + +This retriever microservice is a highly efficient search service designed for handling and retrieving embedding vectors. +It operates by receiving an embedding vector as input and conducting a similarity search against vectors stored in a +VectorDB database. Users must specify the VectorDB's URL and the index name, and the service searches within that index +to find documents with the highest similarity to the input vector. + +The service primarily utilizes similarity measures in vector space to rapidly retrieve contentually similar documents. +The vector-based retrieval approach is particularly suited for handling large datasets, offering fast and accurate +search results that significantly enhance the efficiency and quality of information retrieval. + +Overall, this microservice provides robust backend support for applications requiring efficient similarity searches, +playing a vital role in scenarios such as recommendation systems, information retrieval, or any other context where +precise measurement of document similarity is crucial. + +## ๐Ÿš€1. Start Microservice with Python (Option 1) + +To start the retriever microservice, you must first install the required python packages. + +### 1.1 Install Requirements + +```bash +pip install -r requirements.txt +``` + +### 1.2 Start TEI Service + +```bash +model=BAAI/bge-base-en-v1.5 +volume=$PWD/data +docker run -d -p 6060:80 -v $volume:/data -e http_proxy=$http_proxy -e https_proxy=$https_proxy --pull always ghcr.io/huggingface/text-embeddings-inference:cpu-1.5 --model-id $model +``` + +### 1.3 Verify the TEI Service + +Health check the embedding service with: + +```bash +curl 127.0.0.1:6060/embed \ + -X POST \ + -d '{"inputs":"What is Deep Learning?"}' \ + -H 'Content-Type: application/json' +``` + +### 1.4 Setup VectorDB Service + +You need to setup your own VectorDB service (Elasticsearch in this example), and ingest your knowledge documents into +the vector database. + +As for Elasticsearch, you could start a docker container using the following commands. +Remember to ingest data into it manually. + +```bash +docker run -d --name vectorstore-elasticsearch -e ES_JAVA_OPTS="-Xms1g -Xmx1g" -e "discovery.type=single-node" -e "xpack.security.enabled=false" -p 9200:9200 -p 9300:9300 docker.elastic.co/elasticsearch/elasticsearch:8.16.0 +``` + +### 1.5 Start Retriever Service + +```bash +export TEI_EMBEDDING_ENDPOINT="http://${your_ip}:6060" +python retriever_elasticsearch.py +``` + +## ๐Ÿš€2. Start Microservice with Docker (Option 2) + +### 2.1 Setup Environment Variables + +```bash +export EMBED_MODEL="BAAI/bge-base-en-v1.5" +export ES_CONNECTION_STRING="http://localhost:9200" +export INDEX_NAME=${your_index_name} +export TEI_EMBEDDING_ENDPOINT="http://${your_ip}:6060" +``` + +### 2.2 Build Docker Image + +```bash +cd comps/retrievers/elasticsearch/langchain +docker build -t opea/retriever-elasticsearch:latest --build-arg https_proxy=$https_proxy --build-arg http_proxy=$http_proxy -f comps/retrievers/elasticsearch/langchain/Dockerfile . +``` + +To start a docker container, you have two options: + +- A. Run Docker with CLI +- B. Run Docker with Docker Compose + +You can choose one as needed. + +### 2.3 Run Docker with CLI (Option A) + +```bash +docker run -d --name="retriever-elasticsearch" -p 7000:7000 --ipc=host -e http_proxy=$http_proxy -e https_proxy=$https_proxy -e ES_CONNECTION_STRING=$ES_CONNECTION_STRING -e INDEX_NAME=$INDEX_NAME -e TEI_ENDPOINT=$TEI_ENDPOINT opea/retriever-elasticsearch:latest +``` + +### 2.4 Run Docker with Docker Compose (Option B) + +```bash +cd comps/retrievers/elasticsearch/langchain +docker compose -f docker_compose_retriever.yaml up -d +``` + +## ๐Ÿš€3. Consume Retriever Service + +### 3.1 Check Service Status + +```bash +curl http://localhost:7000/v1/health_check \ + -X GET \ + -H 'Content-Type: application/json' +``` + +### 3.2 Consume Embedding Service + +To consume the Retriever Microservice, you can generate a mock embedding vector of length 768 with Python. + +```bash +export your_embedding=$(python -c "import random; embedding = [random.uniform(-1, 1) for _ in range(768)]; print(embedding)") +curl http://${your_ip}:7000/v1/retrieval \ + -X POST \ + -d "{\"text\":\"What is the revenue of Nike in 2023?\",\"embedding\":${your_embedding}}" \ + -H 'Content-Type: application/json' +``` diff --git a/comps/retrievers/elasticsearch/langchain/config.py b/comps/retrievers/elasticsearch/langchain/config.py new file mode 100644 index 0000000000..e2d1aee125 --- /dev/null +++ b/comps/retrievers/elasticsearch/langchain/config.py @@ -0,0 +1,15 @@ +import os + +# Embedding model +EMBED_MODEL = os.getenv("EMBED_MODEL", "BAAI/bge-base-en-v1.5") + +ES_CONNECTION_STRING = os.getenv("ES_CONNECTION_STRING", "http://localhost:9200") + +# TEI Embedding endpoints +TEI_ENDPOINT = os.getenv("TEI_ENDPOINT", "") + +# Vector Index Configuration +INDEX_NAME = os.getenv("INDEX_NAME", "rag-elastic") + +# Logging enabled +LOG_FLAG = os.getenv("LOGFLAG", False) diff --git a/comps/retrievers/elasticsearch/langchain/docker_compose_retriever.yaml b/comps/retrievers/elasticsearch/langchain/docker_compose_retriever.yaml new file mode 100644 index 0000000000..33853dbead --- /dev/null +++ b/comps/retrievers/elasticsearch/langchain/docker_compose_retriever.yaml @@ -0,0 +1,30 @@ +version: "3.8" + +services: + tei_xeon_service: + image: ghcr.io/huggingface/text-embeddings-inference:cpu-1.2 + container_name: tei-xeon-server + ports: + - "6060:80" + volumes: + - "./data:/data" + shm_size: 1g + command: --model-id ${RETRIEVE_MODEL_ID} + retriever: + image: opea/retriever-elasticsearch:latest + container_name: retriever-elasticsearch + ports: + - "7000:7000" + ipc: host + environment: + http_proxy: ${http_proxy} + https_proxy: ${https_proxy} + INDEX_NAME: ${INDEX_NAME} + TEI_ENDPOINT: ${TEI_ENDPOINT} + ES_CONNECTION_STRING: ${ES_CONNECTION_STRING} + HUGGINGFACEHUB_API_TOKEN: ${HUGGINGFACEHUB_API_TOKEN} + restart: unless-stopped + +networks: + default: + driver: bridge diff --git a/comps/retrievers/elasticsearch/langchain/requirements.txt b/comps/retrievers/elasticsearch/langchain/requirements.txt new file mode 100644 index 0000000000..9ce5afcb6d --- /dev/null +++ b/comps/retrievers/elasticsearch/langchain/requirements.txt @@ -0,0 +1,14 @@ +docarray[full] +easyocr +fastapi +langchain-community +langchain-elasticsearch +langchain-huggingface +opentelemetry-api +opentelemetry-exporter-otlp +opentelemetry-sdk +prometheus-fastapi-instrumentator==7.0.0 +pymupdf +sentence_transformers +shortuuid +uvicorn diff --git a/comps/retrievers/elasticsearch/langchain/retriever_elasticsearch.py b/comps/retrievers/elasticsearch/langchain/retriever_elasticsearch.py new file mode 100644 index 0000000000..03c660dfc6 --- /dev/null +++ b/comps/retrievers/elasticsearch/langchain/retriever_elasticsearch.py @@ -0,0 +1,102 @@ +import time +from typing import Union + +from elasticsearch import Elasticsearch +from langchain_community.embeddings import HuggingFaceBgeEmbeddings +from langchain_elasticsearch import ElasticsearchStore +from langchain_huggingface.embeddings import HuggingFaceEndpointEmbeddings + +from comps import ( + CustomLogger, + EmbedDoc, + SearchedDoc, + ServiceType, + TextDoc, + opea_microservices, + register_microservice, + register_statistics, + statistics_dict, +) +from config import EMBED_MODEL, TEI_ENDPOINT, INDEX_NAME, ES_CONNECTION_STRING, LOG_FLAG + +logger = CustomLogger(__name__) + + +def create_index() -> None: + if not es_client.indices.exists(index=INDEX_NAME): + es_client.indices.create(index=INDEX_NAME) + + +def get_embedder() -> Union[HuggingFaceEndpointEmbeddings, HuggingFaceBgeEmbeddings]: + """Obtain required Embedder""" + + if TEI_ENDPOINT: + return HuggingFaceEndpointEmbeddings(model=TEI_ENDPOINT) + else: + return HuggingFaceBgeEmbeddings(model_name=EMBED_MODEL) + + +def get_elastic_store(embedder: Union[HuggingFaceEndpointEmbeddings, HuggingFaceBgeEmbeddings]) -> ElasticsearchStore: + """Get Elasticsearch vector store""" + + return ElasticsearchStore(index_name=INDEX_NAME, embedding=embedder, es_connection=es_client) + + +@register_microservice( + name="opea_service@retriever_elasticsearch", + service_type=ServiceType.RETRIEVER, + endpoint="/v1/retrieval", + host="0.0.0.0", + port=7000, +) +@register_statistics(names=["opea_service@retriever_elasticsearch"]) +async def retrieve(input: EmbedDoc) -> list: + """Retrieve documents based on similarity search type""" + if LOG_FLAG: + logger.info(input) + start = time.time() + + if input.search_type == "similarity": + docs_and_similarities = vector_db.similarity_search_by_vector_with_relevance_scores( + embedding=input.embedding, k=input.k + ) + search_res = [doc for doc, _ in docs_and_similarities] + + elif input.search_type == "similarity_distance_threshold": + if input.distance_threshold is None: + raise ValueError("distance_threshold must be provided for " + "similarity_distance_threshold retriever") + docs_and_similarities = vector_db.similarity_search_by_vector_with_relevance_scores( + embedding=input.embedding, k=input.k + ) + search_res = [doc for doc, similarity in docs_and_similarities if similarity > input.distance_threshold] + + elif input.search_type == "similarity_score_threshold": + docs_and_similarities = vector_db.similarity_search_by_vector_with_relevance_scores(query=input.text, k=input.k) + search_res = [doc for doc, similarity in docs_and_similarities if similarity > input.score_threshold] + + elif input.search_type == "mmr": + search_res = vector_db.max_marginal_relevance_search( + query=input.text, k=input.k, fetch_k=input.fetch_k, lambda_mult=input.lambda_mult + ) + + else: + raise ValueError(f"{input.search_type} not valid") + + searched_docs = [] + for r in search_res: + searched_docs.append(TextDoc(text=r.page_content)) + result = SearchedDoc(retrieved_docs=searched_docs, initial_query=input.text) + + statistics_dict["opea_service@retriever_elasticsearch"].append_latency(time.time() - start, None) + + if LOG_FLAG: + logger.info(result) + + return result + + +if __name__ == "__main__": + es_client = Elasticsearch(hosts=ES_CONNECTION_STRING) + vector_db = get_elastic_store(get_embedder()) + create_index() + opea_microservices["opea_service@retriever_elasticsearch"].start() diff --git a/comps/vectorstores/elasticsearch/README.md b/comps/vectorstores/elasticsearch/README.md new file mode 100644 index 0000000000..1104f9f4ac --- /dev/null +++ b/comps/vectorstores/elasticsearch/README.md @@ -0,0 +1,13 @@ +# Start Elasticsearch server + +## 1. Download Elasticsearch image + +```bash +docker pull docker.elastic.co/elasticsearch/elasticsearch:8.16.0 +``` + +## 2. Run Elasticsearch service + +```bash +docker run -p 9200:9200 -p 9300:9300 -e ES_JAVA_OPTS="-Xms1g -Xmx1g" -e "discovery.type=single-node" -e "xpack.security.enabled=false" \ docker.elastic.co/elasticsearch/elasticsearch:8.16.0 +``` diff --git a/comps/vectorstores/elasticsearch/__init__.py b/comps/vectorstores/elasticsearch/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/comps/vectorstores/elasticsearch/docker-compose.yml b/comps/vectorstores/elasticsearch/docker-compose.yml new file mode 100644 index 0000000000..cf249ba2bd --- /dev/null +++ b/comps/vectorstores/elasticsearch/docker-compose.yml @@ -0,0 +1,13 @@ +version: "3" +services: + elasticsearch-vector-db: + image: docker.elastic.co/elasticsearch/elasticsearch:8.16.0 + container_name: elasticsearch-vector-db + ports: + - "9200:9200" + - "9300:9300" + environment: + - ES_JAVA_OPTS=-Xms1g -Xmx1g + - discovery.type=single-node + - xpack.security.enabled=false + - bootstrap.memory_lock=false diff --git a/tests/dataprep/test_dataprep_elasticsearch_langchain.sh b/tests/dataprep/test_dataprep_elasticsearch_langchain.sh new file mode 100644 index 0000000000..a68bbac9f5 --- /dev/null +++ b/tests/dataprep/test_dataprep_elasticsearch_langchain.sh @@ -0,0 +1,137 @@ +#!/bin/bash +# Copyright (C) 2024 Intel Corporation +# SPDX-License-Identifier: Apache-2.0 + +set -x + +WORKPATH=$(dirname "$PWD") +LOG_PATH="$WORKPATH/tests" +ip_address=$(hostname -I | awk '{print $1}') +dataprep_service_port=6011 + +function build_docker_images() { + cd $WORKPATH + echo $WORKPATH + # piull elasticsearch image + docker pull docker.elastic.co/elasticsearch/elasticsearch:8.16.0 + + # build dataprep image for elasticsearch + docker build --no-cache -t opea/dataprep-elasticsearch:comps --build-arg https_proxy=$https_proxy --build-arg http_proxy=$http_proxy -f $WORKPATH/comps/dataprep/elasticsearch/langchain/Dockerfile . + if [ $? -ne 0 ]; then + echo "opea/dataprep-elasticsearch built fail" + exit 1 + else + echo "opea/dataprep-elasticsearch built successful" + fi +} + +function start_service() { + # elasticsearch + elasticsearch_port=9200 + docker run -d --name test-comps-vectorstore-elasticsearch -e ES_JAVA_OPTS="-Xms1g -Xmx1g" -e "discovery.type=single-node" -e "xpack.security.enabled=false" -p $elasticsearch_port:9200 -p 9300:9300 docker.elastic.co/elasticsearch/elasticsearch:8.16.0 + export ES_CONNECTION_STRING="http://${ip_address}:${elasticsearch_port}" + sleep 10s + + # data-prep + INDEX_NAME="test-elasticsearch" + docker run -d --name="test-comps-dataprep-elasticsearch" -p $dataprep_service_port:6011 --ipc=host -e http_proxy=$http_proxy -e https_proxy=$https_proxy -e ES_CONNECTION_STRING=$ES_CONNECTION_STRING -e INDEX_NAME=$INDEX_NAME opea/dataprep-elasticsearch:comps + sleep 15s + + bash ./tests/utils/wait-for-it.sh $ip_address:$dataprep_service_port -s -t 100 -- echo "Dataprep service up" + DATAPREP_UP=$? + if [ ${DATAPREP_UP} -ne 0 ]; then + echo "Could not start Dataprep service." + return 1 + fi + + sleep 5s + bash ./tests/utils/wait-for-it.sh ${ip_address}:$dataprep_service_port -s -t 1 -- echo "Dataprep service still up" + DATAPREP_UP=$? + if [ ${DATAPREP_UP} -ne 0 ]; then + echo "Dataprep service crashed." + return 1 + fi +} + +function validate_microservice() { + cd $LOG_PATH + + # test /v1/dataprep + URL="http://${ip_address}:$dataprep_service_port/v1/dataprep" + echo "Deep learning is a subset of machine learning that utilizes neural networks with multiple layers to analyze various levels of abstract data representations. It enables computers to identify patterns and make decisions with minimal human intervention by learning from large amounts of data." > $LOG_PATH/dataprep_file.txt + + HTTP_STATUS=$(curl -s -o /dev/null -w "%{http_code}" -X POST -F 'files=@./dataprep_file.txt' -H 'Content-Type: multipart/form-data' "$URL") + if [ "$HTTP_STATUS" -eq 200 ]; then + echo "[ dataprep ] HTTP status is 200. Checking content..." + cp ./dataprep_file.txt ./dataprep_file2.txt + local CONTENT=$(curl -s -X POST -F 'files=@./dataprep_file2.txt' -H 'Content-Type: multipart/form-data' "$URL" | tee ${LOG_PATH}/dataprep.log) + + if echo "$CONTENT" | grep -q "Data preparation succeeded"; then + echo "[ dataprep ] Content is as expected." + else + echo "[ dataprep ] Content does not match the expected result: $CONTENT" + docker logs test-comps-dataprep-elasticsearch >> ${LOG_PATH}/dataprep.log + exit 1 + fi + else + echo "[ dataprep ] HTTP status is not 200. Received status was $HTTP_STATUS" + docker logs test-comps-dataprep-elasticsearch >> ${LOG_PATH}/dataprep.log + exit 1 + fi + + # test /v1/dataprep/get_file + URL="http://${ip_address}:$dataprep_service_port/v1/dataprep/get_file" + HTTP_STATUS=$(curl -s -o /dev/null -w "%{http_code}" -X POST -H 'Content-Type: application/json' "$URL") + if [ "$HTTP_STATUS" -eq 200 ]; then + echo "[ dataprep - file ] HTTP status is 200. Checking content..." + local CONTENT=$(curl -s -X POST -H 'Content-Type: application/json' "$URL" | tee ${LOG_PATH}/dataprep_file.log) + + if echo "$CONTENT" | grep -q '{"name":'; then + echo "[ dataprep - file ] Content is as expected." + else + echo "[ dataprep - file ] Content does not match the expected result: $CONTENT" + docker logs test-comps-dataprep-elasticsearch >> ${LOG_PATH}/dataprep_file.log + exit 1 + fi + else + echo "[ dataprep - file ] HTTP status is not 200. Received status was $HTTP_STATUS" + docker logs test-comps-dataprep-elasticsearch >> ${LOG_PATH}/dataprep_file.log + exit 1 + fi + + # test /v1/dataprep/delete_file + URL="http://${ip_address}:$dataprep_service_port/v1/dataprep/delete_file" + HTTP_STATUS=$(curl -s -o /dev/null -w "%{http_code}" -X POST -d '{"file_path": "dataprep_file.txt"}' -H 'Content-Type: application/json' "$URL") + if [ "$HTTP_STATUS" -eq 200 ]; then + echo "[ dataprep - del ] HTTP status is 200." + docker logs test-comps-dataprep-elasticsearch >> ${LOG_PATH}/dataprep_del.log + else + echo "[ dataprep - del ] HTTP status is not 200. Received status was $HTTP_STATUS" + docker logs test-comps-dataprep-elasticsearch >> ${LOG_PATH}/dataprep_del.log + exit 1 + fi +} + +function stop_docker() { + cid=$(docker ps -aq --filter "name=test-comps-vectorstore-elasticsearch*") + if [[ ! -z "$cid" ]]; then docker stop $cid && docker rm $cid && sleep 1s; fi + + cid=$(docker ps -aq --filter "name=test-comps-dataprep-elasticsearch*") + if [[ ! -z "$cid" ]]; then docker stop $cid && docker rm $cid && sleep 1s; fi +} + +function main() { + + stop_docker + + build_docker_images + start_service + + validate_microservice + + stop_docker + echo y | docker system prune + +} + +main diff --git a/tests/retrievers/test_retrievers_elasticsearch_langchain.sh b/tests/retrievers/test_retrievers_elasticsearch_langchain.sh new file mode 100644 index 0000000000..53db592b59 --- /dev/null +++ b/tests/retrievers/test_retrievers_elasticsearch_langchain.sh @@ -0,0 +1,93 @@ +#!/bin/bash +# Copyright (C) 2024 Intel Corporation +# SPDX-License-Identifier: Apache-2.0 + +set -x + +WORKPATH=$(dirname "$PWD") +LOG_PATH="$WORKPATH/tests" +ip_address=$(hostname -I | awk '{print $1}') + +function build_docker_images() { + cd $WORKPATH + docker build --no-cache -t opea/retriever-elasticsearch:comps --build-arg https_proxy=$https_proxy --build-arg http_proxy=$http_proxy -f comps/retrievers/elasticsearch/langchain/Dockerfile . + if [ $? -ne 0 ]; then + echo "opea/retriever-elasticsearch built fail" + exit 1 + else + echo "opea/retriever-elasticsearch built successful" + fi +} + +function start_service() { + # elasticsearch + elasticsearch_port=9200 + docker run -d --name "test-comps-retriever-elasticsearch-vectorstore" -e ES_JAVA_OPTS="-Xms1g -Xmx1g" -e "discovery.type=single-node" -e "xpack.security.enabled=false" -p $elasticsearch_port:9200 -p 9300:9300 docker.elastic.co/elasticsearch/elasticsearch:8.16.0 + export ES_CONNECTION_STRING="http://${ip_address}:${elasticsearch_port}" + sleep 10s + + # elasticsearch retriever + INDEX_NAME="test-elasticsearch" + retriever_port=7000 + docker run -d --name="test-comps-retriever-elasticsearch-ms" -p $retriever_port:7000 --ipc=host -e http_proxy=$http_proxy -e https_proxy=$https_proxy -e ES_CONNECTION_STRING=$ES_CONNECTION_STRING -e INDEX_NAME=$INDEX_NAME opea/retriever-elasticsearch:comps + sleep 15s + + bash ./tests/utils/wait-for-it.sh ${ip_address}:$retriever_port -s -t 100 -- echo "Retriever up" + RETRIEVER_UP=$? + if [ ${RETRIEVER_UP} -ne 0 ]; then + echo "Could not start Retriever." + return 1 + fi + + sleep 5s + bash ./tests/utils/wait-for-it.sh ${ip_address}:$retriever_port -s -t 1 -- echo "Retriever still up" + RETRIEVER_UP=$? + if [ ${RETRIEVER_UP} -ne 0 ]; then + echo "Retriever crashed." + return 1 + fi +} + +function validate_microservice() { + retriever_port=7000 + test_embedding=$(python3 -c "import random; embedding = [random.uniform(-1, 1) for _ in range(768)]; print(embedding)") + + + result=$(http_proxy='' + curl http://${ip_address}:$retriever_port/v1/retrieval \ + -X POST \ + -d "{\"text\":\"test\",\"embedding\":${test_embedding}}" \ + -H 'Content-Type: application/json') + if [[ $result == *"retrieved_docs"* ]]; then + echo "Result correct." + else + echo "Result wrong. Received was $result" + docker logs test-comps-retriever-elasticsearch-vectorstore >> ${LOG_PATH}/vectorstore.log + docker logs test-comps-retriever-elasticsearch-tei-endpoint >> ${LOG_PATH}/tei-endpoint.log + docker logs test-comps-retriever-elasticsearch-ms >> ${LOG_PATH}/retriever-elasticsearch.log + exit 1 + fi +} + +function stop_docker() { + cid_retrievers=$(docker ps -aq --filter "name=test-comps-retriever-elasticsearch*") + if [[ ! -z "$cid_retrievers" ]]; then + docker stop $cid_retrievers && docker rm $cid_retrievers && sleep 1s + fi +} + +function main() { + + stop_docker + + build_docker_images + start_service + + validate_microservice + + stop_docker + echo y | docker system prune + +} + +main