Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions .github/workflows/docker/compose/finetuning-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,7 @@ services:
build:
dockerfile: comps/finetuning/src/Dockerfile.intel_hpu
image: ${REGISTRY:-opea}/finetuning-gaudi:${TAG:-latest}
finetuning-xtune:
build:
dockerfile: comps/finetuning/src/Dockerfile.xtune
image: ${REGISTRY:-opea}/finetuning-xtune:${TAG:-latest}
20 changes: 20 additions & 0 deletions comps/finetuning/deployment/docker_compose/compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,26 @@ services:
- HF_TOKEN=${HF_TOKEN}
ipc: host
restart: always
finetuning-xtune:
image: ${REGISTRY:-opea}/finetuning-xtune:${TAG:-latest}
container_name: finetuning-xtune
ports:
- "${PORT1:-8015}:8015"
- "${PORT2:-8265}:8265"
- "${PORT3:-7860}:7860"
environment:
- no_proxy=${no_proxy}
- https_proxy=${https_proxy}
- http_proxy=${http_proxy}
- HF_TOKEN=${HF_TOKEN}
devices:
- "/dev/dri:/dev/dri"
volumes:
- ${DATA:-/data}:${DATA:-/data}
group_add:
- ${RENDER_GROUP_ID:-110}
ipc: host
restart: always
finetuning-gaudi:
extends: finetuning
image: ${REGISTRY:-opea}/finetuning-gaudi:${TAG:-latest}
Expand Down
56 changes: 56 additions & 0 deletions comps/finetuning/src/Dockerfile.xtune
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
# Copyright (C) 2024 Intel Corporation
# SPDX-License-Identifier: Apache-2.0

# Use the same python version with ray
FROM python:3.10.14

ARG HF_TOKEN
ARG DATA

ENV HF_TOKEN=$HF_TOKEN
ENV DATA=$DATA

RUN useradd -m -s /bin/bash user && \
mkdir -p /home/user && \
chown -R user /home/user/

RUN wget -qO - https://repositories.intel.com/gpu/intel-graphics.key | gpg --yes --dearmor --output /usr/share/keyrings/intel-graphics.gpg && \
echo "deb [arch=amd64,i386 signed-by=/usr/share/keyrings/intel-graphics.gpg] https://repositories.intel.com/gpu/ubuntu jammy unified" | \
tee /etc/apt/sources.list.d/intel-gpu-jammy.list &&\
apt update -y &&\
apt install -y \
libze-intel-gpu1 libze1 intel-opencl-icd clinfo \
libze-dev intel-ocloc \
intel-level-zero-gpu-raytracing \
vim \
rsync

COPY comps /home/user/comps


RUN chown -R user /home/user/comps/finetuning

ENV PATH=$PATH:/home/user/.local/bin
RUN cd /home/user/comps/finetuning/src/integrations/xtune && git config --global user.name "test" && git config --global user.email "test" && bash prepare_xtune.sh

RUN python -m pip install --upgrade pip setuptools peft && \
python -m pip install -r /home/user/comps/finetuning/src/requirements.txt && \
python -m pip install --no-deps transformers==4.45.0 datasets==2.21.0 accelerate==0.34.2 peft==0.12.0

ENV PYTHONPATH=$PYTHONPATH:/home/user



WORKDIR /home/user/comps/finetuning/src

RUN echo PKGPATH=$(python3 -c "import pkg_resources; print(pkg_resources.get_distribution('oneccl-bind-pt').location)") >> run.sh && \
echo 'export LD_LIBRARY_PATH=$PKGPATH/oneccl_bindings_for_pytorch/opt/mpi/lib/:$LD_LIBRARY_PATH' >> run.sh && \
echo 'source $PKGPATH/oneccl_bindings_for_pytorch/env/setvars.sh' >> run.sh && \
echo 'export FINETUNING_COMPONENT_NAME="XTUNE_FINETUNING"' >> run.sh && \
echo ray start --head --dashboard-host=0.0.0.0 >> run.sh && \
echo export RAY_ADDRESS=http://localhost:8265 >> run.sh && \
echo python opea_finetuning_microservice.py >> run.sh && \
echo 'export DATA=$DATA' >> run.sh && \
echo 'ZE_AFFINITY_MASK=0 llamafactory-cli webui &' >> run.sh

CMD bash run.sh
13 changes: 12 additions & 1 deletion comps/finetuning/src/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,9 @@ ray start --address='${head_node_ip}:6379'

```bash
export HF_TOKEN=${your_huggingface_token}
python finetuning_service.py
# export FINETUNING_COMPONENT_NAME="which component you want to run"
# export FINETUNING_COMPONENT_NAME="OPEA_FINETUNING" or export FINETUNING_COMPONENT_NAME="XTUNE_FINETUNING"
python opea_finetuning_microservice.py
```

## 🚀2. Start Microservice with Docker (Option 2)
Expand Down Expand Up @@ -99,6 +101,10 @@ cd ../deployment/docker_compose
docker compose -f compose.yaml up finetuning-gaudi -d
```

### 2.3 Setup Xtune on Arc A770

Please follow [doc](./integrations/xtune/README.md) to install Xtune on Arc A770

## 🚀3. Consume Finetuning Service

### 3.1 Upload a training file
Expand Down Expand Up @@ -261,6 +267,11 @@ curl http://${your_ip}:8015/v1/finetune/list_checkpoints -X POST -H "Content-Typ

After fine-tuning job is done, fine-tuned model can be chosen from listed checkpoints, then the fine-tuned model can be used in other microservices. For example, fine-tuned reranking model can be used in [reranks](../../rerankings/src/README.md) microservice by assign its path to the environment variable `RERANK_MODEL_ID`, fine-tuned embedding model can be used in [embeddings](../../embeddings/src/README.md) microservice by assign its path to the environment variable `model`, LLMs after instruction tuning can be used in [llms](../../llms/src/text-generation/README.md) microservice by assign its path to the environment variable `your_hf_llm_model`.

### 3.5 Xtune

Once you follow `3.2 Setup Xtune on Arc A770`, you can access Xtune in web through http://localhost:7860/
Please see [Xtune doc](./integrations/xtune/README.md) for details.

## 🚀4. Descriptions for Finetuning parameters

We utilize [OpenAI finetuning parameters](https://platform.openai.com/docs/api-reference/fine-tuning) and extend it with more customizable parameters, see the definitions at [finetune_config](https://github.com/opea-project/GenAIComps/blob/main/comps/finetuning/src/integrations/finetune_config.py).
16 changes: 16 additions & 0 deletions comps/finetuning/src/integrations/finetune_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,21 @@ class LoraConfig(BaseModel):
target_modules: Optional[List[str]] = None


class XtuneConfig(BaseModel):
tool: str = ""
trainer: str = ""
model: str = ""
config_file: str = ""
dataset: str = ""
dataset_root: str = ""
device: str = ""

@validator("tool")
def check_task(cls, v: str):
assert v in ["", "clip", "adaclip"]
return v


class GeneralConfig(BaseModel):
base_model: str = None
tokenizer_name: Optional[str] = None
Expand All @@ -48,6 +63,7 @@ class GeneralConfig(BaseModel):
save_strategy: str = "no"
config: LoadConfig = LoadConfig()
lora_config: Optional[LoraConfig] = LoraConfig()
xtune_config: Optional[XtuneConfig] = XtuneConfig()
enable_gradient_checkpointing: bool = False
task: str = "instruction_tuning"

Expand Down
241 changes: 241 additions & 0 deletions comps/finetuning/src/integrations/xtune.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,241 @@
# Copyright (C) 2024 Intel Corporation
# SPDX-License-Identifier: Apache-2.0

import os
import random
import re
import time
import urllib.parse
import uuid
from pathlib import Path
from typing import Dict

from fastapi import BackgroundTasks, File, Form, HTTPException, UploadFile
from pydantic_yaml import to_yaml_file
from ray.job_submission import JobSubmissionClient

from comps import CustomLogger, OpeaComponent, OpeaComponentRegistry
from comps.cores.proto.api_protocol import (
FileObject,
FineTuningJob,
FineTuningJobCheckpoint,
FineTuningJobIDRequest,
FineTuningJobList,
UploadFileRequest,
)
from comps.finetuning.src.integrations.finetune_config import FinetuneConfig, FineTuningParams

logger = CustomLogger("opea")

DATASET_BASE_PATH = "datasets"
JOBS_PATH = "jobs"
OUTPUT_DIR = "output"

if not os.path.exists(DATASET_BASE_PATH):
os.mkdir(DATASET_BASE_PATH)
if not os.path.exists(JOBS_PATH):
os.mkdir(JOBS_PATH)
if not os.path.exists(OUTPUT_DIR):
os.mkdir(OUTPUT_DIR)

FineTuningJobID = str
CheckpointID = str
CheckpointPath = str

CHECK_JOB_STATUS_INTERVAL = 5 # Check every 5 secs

global ray_client
ray_client: JobSubmissionClient = None

running_finetuning_jobs: Dict[FineTuningJobID, FineTuningJob] = {}
finetuning_job_to_ray_job: Dict[FineTuningJobID, str] = {}
checkpoint_id_to_checkpoint_path: Dict[CheckpointID, CheckpointPath] = {}


# Add a background task to periodicly update job status
def update_job_status(job_id: FineTuningJobID):
while True:
job_status = ray_client.get_job_status(finetuning_job_to_ray_job[job_id])
status = str(job_status).lower()
# Ray status "stopped" is OpenAI status "cancelled"
status = "cancelled" if status == "stopped" else status
logger.info(f"Status of job {job_id} is '{status}'")
running_finetuning_jobs[job_id].status = status
if status == "succeeded" or status == "cancelled" or status == "failed":
break
time.sleep(CHECK_JOB_STATUS_INTERVAL)


async def save_content_to_local_disk(save_path: str, content):
save_path = Path(save_path)
try:
if isinstance(content, str):
with open(save_path, "w", encoding="utf-8") as file:
file.write(content)
else:
with save_path.open("wb") as fout:
content = await content.read()
fout.write(content)
except Exception as e:
logger.info(f"Write file failed. Exception: {e}")
raise Exception(status_code=500, detail=f"Write file {save_path} failed. Exception: {e}")


async def upload_file(purpose: str = Form(...), file: UploadFile = File(...)):
return UploadFileRequest(purpose=purpose, file=file)


@OpeaComponentRegistry.register("XTUNE_FINETUNING")
class XtuneFinetuning(OpeaComponent):
"""A specialized finetuning component derived from OpeaComponent for finetuning services."""

def __init__(self, name: str, description: str, config: dict = None):
super().__init__(name, "finetuning", description, config)

def create_finetuning_jobs(self, request: FineTuningParams, background_tasks: BackgroundTasks):
model = request.model
train_file = request.training_file
finetune_config = FinetuneConfig(General=request.General)
if finetune_config.General.xtune_config.device == "XPU":
flag = 1
else:
flag = 0
if os.getenv("HF_TOKEN", None):
finetune_config.General.config.token = os.getenv("HF_TOKEN", None)

job = FineTuningJob(
id=f"ft-job-{uuid.uuid4()}",
model=model,
created_at=int(time.time()),
training_file=train_file,
hyperparameters={},
status="running",
seed=random.randint(0, 1000) if request.seed is None else request.seed,
)

finetune_config_file = f"{JOBS_PATH}/{job.id}.yaml"
to_yaml_file(finetune_config_file, finetune_config)

global ray_client
ray_client = JobSubmissionClient() if ray_client is None else ray_client
if finetune_config.General.xtune_config.tool == "clip":
ray_job_id = ray_client.submit_job(
# Entrypoint shell command to execute
entrypoint=f"cd integrations/xtune/src/llamafactory/clip_finetune && export DATA={finetune_config.General.xtune_config.dataset_root} && bash scripts/clip_finetune/{finetune_config.General.xtune_config.trainer}.sh {finetune_config.General.xtune_config.dataset} {finetune_config.General.xtune_config.model} 0 {finetune_config.General.xtune_config.device} > /tmp/test.log 2>&1 || true",
)

else:
if flag == 1:
ray_job_id = ray_client.submit_job(
# Entrypoint shell command to execute
entrypoint=f"cd integrations/xtune/src/llamafactory/adaclip_finetune && python train.py --config {finetune_config.General.xtune_config.config_file} --frames_dir {finetune_config.General.xtune_config.dataset_root}{finetune_config.General.xtune_config.dataset}/frames --top_k 16 --freeze_cnn --frame_agg mlp --resume {finetune_config.General.xtune_config.model} --xpu --batch_size 8 > /tmp/test.log 2>&1 || true",
)
else:
ray_job_id = ray_client.submit_job(
# Entrypoint shell command to execute
entrypoint=f"cd integrations/xtune/src/llamafactory/adaclip_finetune && python train.py --config {finetune_config.General.config_file} --frames_dir {finetune_config.General.dataset_root}{finetune_config.General.dataset}/frames --top_k 16 --freeze_cnn --frame_agg mlp --resume {finetune_config.General.model}--batch_size 8 > /tmp/test.log 2>&1 || true",
)

logger.info(f"Submitted Ray job: {ray_job_id} ...")

running_finetuning_jobs[job.id] = job
finetuning_job_to_ray_job[job.id] = ray_job_id

background_tasks.add_task(update_job_status, job.id)

return job

def list_finetuning_jobs(self):
finetuning_jobs_list = FineTuningJobList(data=list(running_finetuning_jobs.values()), has_more=False)

return finetuning_jobs_list

def retrieve_finetuning_job(self, request: FineTuningJobIDRequest):
fine_tuning_job_id = request.fine_tuning_job_id

job = running_finetuning_jobs.get(fine_tuning_job_id)
if job is None:
raise HTTPException(status_code=404, detail=f"Fine-tuning job '{fine_tuning_job_id}' not found!")
return job

def cancel_finetuning_job(self, request: FineTuningJobIDRequest):
fine_tuning_job_id = request.fine_tuning_job_id

ray_job_id = finetuning_job_to_ray_job.get(fine_tuning_job_id)
if ray_job_id is None:
raise HTTPException(status_code=404, detail=f"Fine-tuning job '{fine_tuning_job_id}' not found!")

global ray_client
ray_client = JobSubmissionClient() if ray_client is None else ray_client
ray_client.stop_job(ray_job_id)

job = running_finetuning_jobs.get(fine_tuning_job_id)
job.status = "cancelled"
return job

def list_finetuning_checkpoints(self, request: FineTuningJobIDRequest):
fine_tuning_job_id = request.fine_tuning_job_id

job = running_finetuning_jobs.get(fine_tuning_job_id)
if job is None:
raise HTTPException(status_code=404, detail=f"Fine-tuning job '{fine_tuning_job_id}' not found!")
output_dir = os.path.join(OUTPUT_DIR, job.id)
checkpoints = []
if os.path.exists(output_dir):
# Iterate over the contents of the directory and add an entry for each
files = os.listdir(output_dir)
for file in files: # Loop over directory contents
file_path = os.path.join(output_dir, file)
if os.path.isdir(file_path) and file.startswith("checkpoint"):
steps = re.findall("\d+", file)[0]
checkpointsResponse = FineTuningJobCheckpoint(
id=f"ftckpt-{uuid.uuid4()}", # Generate a unique ID
created_at=int(time.time()), # Use the current timestamp
fine_tuned_model_checkpoint=file_path, # Directory path itself
fine_tuning_job_id=fine_tuning_job_id,
object="fine_tuning.job.checkpoint",
step_number=steps,
)
checkpoints.append(checkpointsResponse)
if job.status == "succeeded":
checkpointsResponse = FineTuningJobCheckpoint(
id=f"ftckpt-{uuid.uuid4()}", # Generate a unique ID
created_at=int(time.time()), # Use the current timestamp
fine_tuned_model_checkpoint=output_dir, # Directory path itself
fine_tuning_job_id=fine_tuning_job_id,
object="fine_tuning.job.checkpoint",
)
checkpoints.append(checkpointsResponse)

return checkpoints

async def upload_training_files(self, request: UploadFileRequest):
file = request.file
if file is None:
raise HTTPException(status_code=404, detail="upload file failed!")
filename = urllib.parse.quote(file.filename, safe="")
save_path = os.path.join(DATASET_BASE_PATH, filename)
await save_content_to_local_disk(save_path, file)

fileBytes = os.path.getsize(save_path)
fileInfo = FileObject(
id=f"file-{uuid.uuid4()}",
object="file",
bytes=fileBytes,
created_at=int(time.time()),
filename=filename,
purpose="fine-tune",
)

return fileInfo

def invoke(self, *args, **kwargs):
pass

def check_health(self) -> bool:
"""Checks the health of the component.

Returns:
bool: True if the component is healthy, False otherwise.
"""
return True
Loading
Loading