Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions comps/dataprep/multimodal/redis/langchain/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ This `dataprep` microservice accepts the following from the user and ingests the
- Videos (mp4 files) and their transcripts (optional)
- Images (gif, jpg, jpeg, and png files) and their captions (optional)
- Audio (wav files)
- PDFs (with text and images)

## 🚀1. Start Microservice with Python(Option 1)

Expand Down Expand Up @@ -111,18 +112,19 @@ docker container logs -f dataprep-multimodal-redis

## 🚀4. Consume Microservice

Once this dataprep microservice is started, user can use the below commands to invoke the microservice to convert images and videos and their transcripts (optional) to embeddings and save to the Redis vector store.
Once this dataprep microservice is started, user can use the below commands to invoke the microservice to convert images, videos, text, and PDF files to embeddings and save to the Redis vector store.

This microservice provides 3 different ways for users to ingest files into Redis vector store corresponding to the 3 use cases.

### 4.1 Consume _ingest_with_text_ API

**Use case:** This API is used when videos are accompanied by transcript files (`.vtt` format) or images are accompanied by text caption files (`.txt` format).
**Use case:** This API is used for videos accompanied by transcript files (`.vtt` format), images accompanied by text caption files (`.txt` format), and PDF files containing a mix of text and images.

**Important notes:**

- Make sure the file paths after `files=@` are correct.
- Every transcript or caption file's name must be identical to its corresponding video or image file's name (except their extension - .vtt goes with .mp4 and .txt goes with .jpg, .jpeg, .png, or .gif). For example, `video1.mp4` and `video1.vtt`. Otherwise, if `video1.vtt` is not included correctly in the API call, the microservice will return an error `No captions file video1.vtt found for video1.mp4`.
- It is assumed that PDFs will contain at least one image. Each image in the file will be embedded along with the text that appears on the same page as the image.

#### Single video-transcript pair upload

Expand Down Expand Up @@ -157,6 +159,7 @@ curl -X POST \
-F "files=@./image1.txt" \
-F "files=@./image2.jpg" \
-F "files=@./image2.txt" \
-F "files=@./example.pdf" \
http://localhost:6007/v1/ingest_with_text
```

Expand Down
149 changes: 124 additions & 25 deletions comps/dataprep/multimodal/redis/langchain/prepare_videodoc_redis.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
# Copyright (C) 2024 Intel Corporation
# SPDX-License-Identifier: Apache-2.0

import base64
import json
import os
import shutil
import time
Expand Down Expand Up @@ -30,6 +32,7 @@
write_vtt,
)
from PIL import Image
import pymupdf

from comps import opea_microservices, register_microservice
from comps.embeddings.multimodal.bridgetower.bridgetower_embedding import BridgeTowerEmbedding
Expand Down Expand Up @@ -301,7 +304,53 @@ def prepare_data_and_metadata_from_annotation(
return text_list, image_list, metadatas


def ingest_multimodal(videoname, data_folder, embeddings):
def prepare_pdf_data_from_annotation(annotation, path_to_frames, title):
"""PDF data processing has some key differences from videos and images.

1. Neighboring frames' transcripts are not currently considered relevant.
We are only taking the text located on the same page as the image.
2. The images/frames are indexed differently, by page and image-within-page
indices, as opposed to a single frame index.
3. Instead of time of frame in ms, we return the PDF page index through
the pre-existing time_of_frame_ms metadata key to maintain compatibility.
"""
text_list = []
image_list = []
metadatas = []
for frame in annotation:
page_index = frame["frame_no"]
image_index = frame["sub_video_id"]
path_to_frame = os.path.join(path_to_frames, f"page{page_index}_image{image_index}.png")
caption_for_ingesting = frame["caption"]
caption_for_inference = frame["caption"]

video_id = frame["video_id"]
b64_img_str = frame["b64_img_str"]
embedding_type = "pair" if b64_img_str else "text"
source_video = frame["video_name"]

text_list.append(caption_for_ingesting)

if b64_img_str:
image_list.append(path_to_frame)

metadatas.append(
{
"content": caption_for_ingesting,
"b64_img_str": b64_img_str,
"video_id": video_id,
"source_video": source_video,
"time_of_frame_ms": page_index, # For PDFs save the page number
"embedding_type": embedding_type,
"title": title,
"transcript_for_inference": caption_for_inference,
}
)

return text_list, image_list, metadatas


def ingest_multimodal(filename, data_folder, embeddings, is_pdf=False):
"""Ingest text image pairs to Redis from the data/ directory that consists of frames and annotations."""
data_folder = os.path.abspath(data_folder)
annotation_file_path = os.path.join(data_folder, "annotations.json")
Expand All @@ -310,10 +359,13 @@ def ingest_multimodal(videoname, data_folder, embeddings):
annotation = load_json_file(annotation_file_path)

# prepare data to ingest
text_list, image_list, metadatas = prepare_data_and_metadata_from_annotation(annotation, path_to_frames, videoname)
if is_pdf:
text_list, image_list, metadatas = prepare_pdf_data_from_annotation(annotation, path_to_frames, filename)
else:
text_list, image_list, metadatas = prepare_data_and_metadata_from_annotation(annotation, path_to_frames, filename)

MultimodalRedis.from_text_image_pairs_return_keys(
texts=[f"From {videoname}. " + text for text in text_list],
texts=[f"From {filename}. " + text for text in text_list],
images=image_list,
embedding=embeddings,
metadatas=metadatas,
Expand Down Expand Up @@ -510,7 +562,7 @@ async def ingest_generate_caption(files: List[UploadFile] = File(None)):
)
async def ingest_with_text(files: List[UploadFile] = File(None)):
if files:
accepted_media_formats = [".mp4", ".png", ".jpg", ".jpeg", ".gif"]
accepted_media_formats = [".mp4", ".png", ".jpg", ".jpeg", ".gif", ".pdf"]
# Create a lookup dictionary containing all media files
matched_files = {f.filename: [f] for f in files if os.path.splitext(f.filename)[1] in accepted_media_formats}
uploaded_files_map = {}
Expand All @@ -537,25 +589,25 @@ async def ingest_with_text(files: List[UploadFile] = File(None)):
elif file_extension not in accepted_media_formats:
print(f"Skipping file {file.filename} because of unsupported format.")

# Check if every media file has a caption file
for media_file_name, file_pair in matched_files.items():
if len(file_pair) != 2:
# Check that every media file that is not a pdf has a caption file
for media_file_name, file_list in matched_files.items():
if len(file_list) != 2 and os.path.splitext(media_file_name)[1] != ".pdf":
raise HTTPException(status_code=400, detail=f"No caption file found for {media_file_name}")

if len(matched_files.keys()) == 0:
return HTTPException(
status_code=400,
detail="The uploaded files have unsupported formats. Please upload at least one video file (.mp4) with captions (.vtt) or one image (.png, .jpg, .jpeg, or .gif) with caption (.txt)",
detail="The uploaded files have unsupported formats. Please upload at least one video file (.mp4) with captions (.vtt) or one image (.png, .jpg, .jpeg, or .gif) with caption (.txt) or one .pdf file",
)

for media_file in matched_files:
print(f"Processing file {media_file}")
file_name, file_extension = os.path.splitext(media_file)

# Assign unique identifier to file
file_id = generate_id()

# Create file name by appending identifier
file_name, file_extension = os.path.splitext(media_file)
media_file_name = f"{file_name}_{file_id}{file_extension}"
media_dir_name = os.path.splitext(media_file_name)[0]

Expand All @@ -564,25 +616,72 @@ async def ingest_with_text(files: List[UploadFile] = File(None)):
shutil.copyfileobj(matched_files[media_file][0].file, f)
uploaded_files_map[file_name] = media_file_name

# Save caption file in upload directory
caption_file_extension = os.path.splitext(matched_files[media_file][1].filename)[1]
caption_file = f"{media_dir_name}{caption_file_extension}"
with open(os.path.join(upload_folder, caption_file), "wb") as f:
shutil.copyfileobj(matched_files[media_file][1].file, f)
if file_extension == ".pdf":
# Set up location to store pdf images and text, reusing "frames" and "annotations" from video
output_dir = os.path.join(upload_folder, media_dir_name)
os.makedirs(output_dir, exist_ok=True)
os.makedirs(os.path.join(output_dir, "frames"), exist_ok=True)
doc = pymupdf.open(os.path.join(upload_folder, media_file_name))
annotations = []
for page_idx, page in enumerate(doc, start=1):
text = page.get_text()
images = page.get_images()
for image_idx, image in enumerate(images, start=1):
# Write image and caption file for each image found in pdf
img_fname = f"page{page_idx}_image{image_idx}"
img_fpath = os.path.join(output_dir, "frames", img_fname + ".png")
pix = pymupdf.Pixmap(doc, image[0]) # create pixmap

if pix.n - pix.alpha > 3: # if CMYK, convert to RGB first
pix = pymupdf.Pixmap(pymupdf.csRGB, pix)

pix.save(img_fpath) # pixmap to png
pix = None

# Convert image to base64 encoded string
with open(img_fpath, "rb") as image2str:
encoded_string = base64.b64encode(image2str.read()) # png to bytes

decoded_string = encoded_string.decode() # bytes to string

# Create annotations file, reusing metadata keys from video
annotations.append(
{
"video_id": file_id,
"video_name": os.path.basename(os.path.join(upload_folder, media_file_name)),
"b64_img_str": decoded_string,
"caption": text,
"time": 0.0,
"frame_no": page_idx,
"sub_video_id": image_idx,
}
)

with open(os.path.join(output_dir, "annotations.json"), "w") as f:
json.dump(annotations, f)

# Ingest multimodal data into redis
ingest_multimodal(file_name, os.path.join(upload_folder, media_dir_name), embeddings, is_pdf=True)
else:
# Save caption file in upload directory
caption_file_extension = os.path.splitext(matched_files[media_file][1].filename)[1]
caption_file = f"{media_dir_name}{caption_file_extension}"
with open(os.path.join(upload_folder, caption_file), "wb") as f:
shutil.copyfileobj(matched_files[media_file][1].file, f)

# Store frames and caption annotations in a new directory
extract_frames_and_annotations_from_transcripts(
file_id,
os.path.join(upload_folder, media_file_name),
os.path.join(upload_folder, caption_file),
os.path.join(upload_folder, media_dir_name),
)
# Store frames and caption annotations in a new directory
extract_frames_and_annotations_from_transcripts(
file_id,
os.path.join(upload_folder, media_file_name),
os.path.join(upload_folder, caption_file),
os.path.join(upload_folder, media_dir_name),
)

# Delete temporary caption file
os.remove(os.path.join(upload_folder, caption_file))
# Delete temporary caption file
os.remove(os.path.join(upload_folder, caption_file))

# Ingest multimodal data into redis
ingest_multimodal(file_name, os.path.join(upload_folder, media_dir_name), embeddings)
# Ingest multimodal data into redis
ingest_multimodal(file_name, os.path.join(upload_folder, media_dir_name), embeddings)

# Delete temporary media directory containing frames and annotations
shutil.rmtree(os.path.join(upload_folder, media_dir_name))
Expand Down
1 change: 1 addition & 0 deletions comps/dataprep/multimodal/redis/langchain/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ opentelemetry-sdk
Pillow
prometheus-fastapi-instrumentator
pydantic
pymupdf
python-multipart
redis
shortuuid
Expand Down
31 changes: 30 additions & 1 deletion tests/dataprep/test_dataprep_multimodal_redis_langchain.sh
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ audio_fn="${tmp_dir}/${audio_name}.wav"
image_name="apple"
image_fn="${tmp_dir}/${image_name}.png"
caption_fn="${tmp_dir}/${image_name}.txt"
pdf_name="nke-10k-2023"
pdf_fn="${tmp_dir}/${pdf_name}.pdf"

function build_docker_images() {
cd $WORKPATH
Expand Down Expand Up @@ -132,6 +134,9 @@ tire.""" > ${transcript_fn}
echo "Downloading Audio"
wget https://github.com/intel/intel-extension-for-transformers/raw/main/intel_extension_for_transformers/neural_chat/assets/audio/sample.wav -O ${audio_fn}

echo "Downloading PDF"
wget https://raw.githubusercontent.com/opea-project/GenAIComps/main/comps/retrievers/redis/data/nke-10k-2023.pdf -O ${pdf_fn}

}

function validate_microservice() {
Expand Down Expand Up @@ -256,6 +261,30 @@ function validate_microservice() {
echo "[ $SERVICE_NAME ] Content is as expected."
fi

# test v1/ingest_with_text with a PDF file
echo "Testing ingest_with_text API with a PDF file"
URL="http://${ip_address}:$dataprep_service_port/v1/ingest_with_text"

HTTP_RESPONSE=$(curl --silent --write-out "HTTPSTATUS:%{http_code}" -X POST -F "files=@$pdf_fn" -H 'Content-Type: multipart/form-data' "$URL")
HTTP_STATUS=$(echo $HTTP_RESPONSE | tr -d '\n' | sed -e 's/.*HTTPSTATUS://')
RESPONSE_BODY=$(echo $HTTP_RESPONSE | sed -e 's/HTTPSTATUS\:.*//g')
SERVICE_NAME="dataprep - upload - file"

if [ "$HTTP_STATUS" -ne "200" ]; then
echo "[ $SERVICE_NAME ] HTTP status is not 200. Received status was $HTTP_STATUS"
docker logs test-comps-dataprep-multimodal-redis >> ${LOG_PATH}/dataprep_upload_file.log
exit 1
else
echo "[ $SERVICE_NAME ] HTTP status is 200. Checking content..."
fi
if [[ "$RESPONSE_BODY" != *"Data preparation succeeded"* ]]; then
echo "[ $SERVICE_NAME ] Content does not match the expected result: $RESPONSE_BODY"
docker logs test-comps-dataprep-multimodal-redis >> ${LOG_PATH}/dataprep_upload_file.log
exit 1
else
echo "[ $SERVICE_NAME ] Content is as expected."
fi

# test v1/generate_captions upload video file
echo "Testing generate_captions API with video"
URL="http://${ip_address}:$dataprep_service_port/v1/generate_captions"
Expand Down Expand Up @@ -319,7 +348,7 @@ function validate_microservice() {
else
echo "[ $SERVICE_NAME ] HTTP status is 200. Checking content..."
fi
if [[ "$RESPONSE_BODY" != *${image_name}* || "$RESPONSE_BODY" != *${video_name}* || "$RESPONSE_BODY" != *${audio_name}* ]]; then
if [[ "$RESPONSE_BODY" != *${image_name}* || "$RESPONSE_BODY" != *${video_name}* || "$RESPONSE_BODY" != *${audio_name}* || "$RESPONSE_BODY" != *${pdf_name}* ]]; then
echo "[ $SERVICE_NAME ] Content does not match the expected result: $RESPONSE_BODY"
docker logs test-comps-dataprep-multimodal-redis >> ${LOG_PATH}/dataprep_file.log
exit 1
Expand Down