From ef29cdd47d8a9737225a68fe73d1c02f9b3854b3 Mon Sep 17 00:00:00 2001 From: deep1401 Date: Fri, 13 Mar 2026 15:54:13 -0600 Subject: [PATCH 1/4] Move all trackio dbs to a common folder and then keep syncing through that so multiple runs appear in one trackio --- .../routers/compute_provider.py | 24 +++- api/transformerlab/routers/trackio.py | 11 ++ .../schemas/compute_providers.py | 4 + .../services/trackio_service.py | 40 ++++++- lab-sdk/src/lab/lab_facade.py | 110 +++++++++++++----- .../components/Experiment/Tasks/JobsList.tsx | 3 +- .../Experiment/Tasks/QueueTaskModal.tsx | 39 +++++++ .../components/Experiment/Tasks/Tasks.tsx | 5 + 8 files changed, 205 insertions(+), 31 deletions(-) diff --git a/api/transformerlab/routers/compute_provider.py b/api/transformerlab/routers/compute_provider.py index c8897e701..46f472620 100644 --- a/api/transformerlab/routers/compute_provider.py +++ b/api/transformerlab/routers/compute_provider.py @@ -1760,9 +1760,27 @@ async def launch_template_on_provider( # Enable Trackio auto-init for this job if requested. When set, the lab SDK # running inside the remote script can automatically initialize Trackio - # and capture metrics for visualization in the Tasks UI. + # and capture metrics for visualization in the Tasks UI. For shared projects, + # pass project name and run name so the SDK can build trackio_runs/{experiment_id}/{project_name}/. + trackio_project_name_for_job: Optional[str] = None + trackio_run_name_for_job: Optional[str] = None if request.enable_trackio: env_vars["TLAB_TRACKIO_AUTO_INIT"] = "true" + project_name = (request.trackio_project_name or "").strip() or str(request.experiment_id) + trackio_run_name = f"{request.task_name or 'task'}-job-{job_id}" + trackio_project_name_for_job = project_name + trackio_run_name_for_job = trackio_run_name + env_vars["TLAB_TRACKIO_PROJECT_NAME"] = project_name + env_vars["TLAB_TRACKIO_RUN_NAME"] = trackio_run_name + # Create shared project dir so the SDK can sync into it; path is derived by dashboard when needed. + workspace_dir = await get_workspace_dir() + shared_path = storage.join( + workspace_dir, + "trackio_runs", + secure_filename(str(request.experiment_id)), + secure_filename(project_name), + ) + await storage.makedirs(shared_path, exist_ok=True) # Get TFL_STORAGE_URI from storage context tfl_storage_uri = None @@ -1914,6 +1932,10 @@ async def launch_template_on_provider( job_data["workspace_dir"] = provider_config_dict["workspace_dir"] if request.file_mounts is True and request.task_id: job_data["task_id"] = request.task_id + if trackio_project_name_for_job is not None: + job_data["trackio_project_name"] = trackio_project_name_for_job + if trackio_run_name_for_job is not None: + job_data["trackio_run_name"] = trackio_run_name_for_job for key, value in job_data.items(): if value is not None: diff --git a/api/transformerlab/routers/trackio.py b/api/transformerlab/routers/trackio.py index b49b51ec0..626b091d3 100644 --- a/api/transformerlab/routers/trackio.py +++ b/api/transformerlab/routers/trackio.py @@ -2,6 +2,7 @@ from transformerlab.routers.auth import get_user_and_team from transformerlab.services.trackio_service import ( + list_trackio_projects, start_trackio_for_job, stop_trackio_for_job, ) @@ -23,6 +24,16 @@ async def trackio_start(job_id: str, user_and_team=Depends(get_user_and_team)) - return await start_trackio_for_job(job_id, org_id=org_id, experiment_id=experiment_id) +@router.get("/trackio/projects") +async def trackio_projects(experiment_id: str, user_and_team=Depends(get_user_and_team)) -> dict: + """ + List existing TrackIO project names for an experiment (for shared-project dropdown). + """ + _ = user_and_team + projects = await list_trackio_projects(experiment_id) + return {"projects": projects} + + @router.get("/trackio/stop") async def trackio_stop(job_id: str, user_and_team=Depends(get_user_and_team)) -> dict: """ diff --git a/api/transformerlab/schemas/compute_providers.py b/api/transformerlab/schemas/compute_providers.py index 4d28002d0..f769e8e27 100644 --- a/api/transformerlab/schemas/compute_providers.py +++ b/api/transformerlab/schemas/compute_providers.py @@ -170,6 +170,10 @@ class ProviderTemplateLaunchRequest(BaseModel): default=False, description="When True, set TLAB_TRACKIO_AUTO_INIT=true in the job environment so lab SDK can auto-integrate with Trackio.", ) + trackio_project_name: Optional[str] = Field( + default=None, + description="TrackIO project name for shared project; used when enable_trackio=True. Omit or empty to use 'default'.", + ) class ProviderTemplateFileUploadResponse(BaseModel): diff --git a/api/transformerlab/services/trackio_service.py b/api/transformerlab/services/trackio_service.py index 58f1cf102..137122ff9 100644 --- a/api/transformerlab/services/trackio_service.py +++ b/api/transformerlab/services/trackio_service.py @@ -3,11 +3,12 @@ import shutil import subprocess import sys -from typing import Any, Dict +from typing import Any, Dict, List from fastapi import HTTPException from lab import HOME_DIR, storage +from lab.dirs import get_workspace_dir from lab.job import Job from werkzeug.utils import secure_filename @@ -44,10 +45,19 @@ async def start_trackio_for_job(job_id: str, org_id: str | None, experiment_id: raise HTTPException(status_code=400, detail="Job data is not a dictionary") source_path = job_data.get("trackio_db_artifact_path") + if not source_path and job_data.get("trackio_project_name") and experiment_id: + # Shared project: derive path from workspace + experiment_id + project_name + workspace_dir = await get_workspace_dir() + source_path = storage.join( + workspace_dir, + "trackio_runs", + secure_filename(str(experiment_id)), + secure_filename(str(job_data["trackio_project_name"]).strip()), + ) if not source_path: raise HTTPException( status_code=404, - detail="Trackio metrics not found for this job (trackio_db_artifact_path missing)", + detail="Trackio metrics not found for this job (trackio_db_artifact_path or trackio_project_name missing)", ) # If there's already a Trackio process for this job, just return its URL @@ -55,7 +65,7 @@ async def start_trackio_for_job(job_id: str, org_id: str | None, experiment_id: if existing and isinstance(existing.get("url"), str): return {"url": existing["url"]} - project = job_data.get("trackio_project") + project = job_data.get("trackio_project") or job_data.get("trackio_project_name") # Always run Trackio from a local temporary copy of the metrics directory. # This works for both local and remote storage backends. Use HOME_DIR so the @@ -240,3 +250,27 @@ async def stop_trackio_for_job(job_id: str) -> None: except Exception: # Ignore cleanup errors pass + + +async def list_trackio_projects(experiment_id: str) -> List[str]: + """ + List TrackIO project directory names for an experiment (shared project names). + Returns basenames of direct subdirs under workspace_dir/trackio_runs/{experiment_id}/. + """ + if not experiment_id: + return [] + workspace_dir = await get_workspace_dir() + parent = storage.join(workspace_dir, "trackio_runs", secure_filename(str(experiment_id))) + if not await storage.exists(parent) or not await storage.isdir(parent): + return [] + try: + entries = await storage.ls(parent, detail=False) + except Exception: + return [] + projects: List[str] = [] + for path in entries: + if await storage.isdir(path): + name = path.rstrip("/").split("/")[-1].split("\\")[-1] + if name: + projects.append(name) + return sorted(projects) diff --git a/lab-sdk/src/lab/lab_facade.py b/lab-sdk/src/lab/lab_facade.py index ed1fbc594..39dfc458c 100644 --- a/lab-sdk/src/lab/lab_facade.py +++ b/lab-sdk/src/lab/lab_facade.py @@ -13,6 +13,7 @@ from . import dirs from .model import Model as ModelService from . import storage +from werkzeug.utils import secure_filename from .dataset import Dataset from .task_template import TaskTemplate from .generation import GenerationModel, load_generation_model as _load_generation_model @@ -140,6 +141,8 @@ def init(self, experiment_id: str | None = None, config: Optional[Dict[str, Any] self._trackio_available = False self._trackio_managed = False auto_init_trackio = os.environ.get("TLAB_TRACKIO_AUTO_INIT", "false").lower() == "true" + trackio_project_name_env = (os.environ.get("TLAB_TRACKIO_PROJECT_NAME") or "").strip() + trackio_run_name_env = (os.environ.get("TLAB_TRACKIO_RUN_NAME") or "").strip() if auto_init_trackio: try: import trackio # type: ignore[import] @@ -148,11 +151,31 @@ def init(self, experiment_id: str | None = None, config: Optional[Dict[str, Any] self._trackio_available = True existing_run = context_vars.current_run.get() if existing_run is None: - # Use experiment_id as a natural default project name if available - project_name = str(experiment_id or "TransformerLab") - trackio.init(project=project_name) - self._trackio_managed = True - logger.info(f"📊 Trackio auto-init enabled for project '{project_name}'") + if trackio_project_name_env: + # Shared project: use temp dir, seed from shared path, init with project + run name + job_id_env = os.environ.get("_TFL_JOB_ID", "unknown") + temp_dir = f"/tmp/trackio/{job_id_env}" + os.makedirs(temp_dir, exist_ok=True) + os.environ["TRACKIO_DIR"] = temp_dir + _run_async( + self._seed_trackio_shared_path_async( + experiment_id or "", trackio_project_name_env, temp_dir + ) + ) + trackio.init( + project=trackio_project_name_env, + name=trackio_run_name_env or f"job-{job_id_env}", + ) + self._trackio_managed = True + logger.info( + f"📊 Trackio auto-init enabled for shared project '{trackio_project_name_env}'" + ) + else: + # Legacy: per-job project name + project_name = str(experiment_id or "TransformerLab") + trackio.init(project=project_name) + self._trackio_managed = True + logger.info(f"📊 Trackio auto-init enabled for project '{project_name}'") except Exception: # Silently ignore any Trackio issues; lab core behavior must not be affected self._trackio_available = False @@ -1557,6 +1580,8 @@ def capture_trackio_metadata(self, db_path: str, project: Optional[str] = None) async def async_capture_trackio_metadata(self, db_path: str, project: Optional[str] = None) -> str: """ Async implementation of capture_trackio_metadata(). + When TLAB_TRACKIO_PROJECT_NAME is set (shared project), copies to trackio_runs/{experiment_id}/{project_name}/ + and does not write trackio_db_artifact_path (dashboard derives path). """ self._ensure_initialized() @@ -1568,32 +1593,65 @@ async def async_capture_trackio_metadata(self, db_path: str, project: Optional[s if not os.path.exists(src): raise FileNotFoundError(f"Trackio path does not exist: {src}") - # Resolve the job's artifacts directory and create a dedicated 'trackio' subfolder - artifacts_dir = await self._job.get_artifacts_dir() # type: ignore[union-attr] - trackio_dir = storage.join(artifacts_dir, "trackio") + trackio_project_name_env = (os.environ.get("TLAB_TRACKIO_PROJECT_NAME") or "").strip() + if trackio_project_name_env and self._experiment is not None: + # Shared project: copy to trackio_runs/{experiment_id}/{project_name}/ (merge, do not rm) + workspace_dir = await dirs.get_workspace_dir() + trackio_dir = storage.join( + workspace_dir, + "trackio_runs", + secure_filename(str(self._experiment.id)), + secure_filename(trackio_project_name_env), + ) + await storage.makedirs(trackio_dir, exist_ok=True) + if os.path.isdir(src): + await storage.copy_dir(src, trackio_dir) + else: + base_name = posixpath.basename(src) + dest_file = storage.join(trackio_dir, base_name) + await storage.copy_file(src, dest_file) + # Do not write trackio_db_artifact_path; dashboard derives from trackio_project_name + logger.info(f"📊 Saved Trackio metrics to shared project: {trackio_dir}") + return trackio_dir + else: + # Legacy: per-job artifacts/trackio + artifacts_dir = await self._job.get_artifacts_dir() # type: ignore[union-attr] + trackio_dir = storage.join(artifacts_dir, "trackio") - # Ensure the destination directory exists and is clean - if await storage.exists(trackio_dir): - # Remove any previous Trackio data for this job to avoid stale metrics - await storage.rm_tree(trackio_dir) + if await storage.exists(trackio_dir): + await storage.rm_tree(trackio_dir) + await storage.makedirs(trackio_dir, exist_ok=True) - await storage.makedirs(trackio_dir, exist_ok=True) + if os.path.isdir(src): + await storage.copy_dir(src, trackio_dir) + else: + base_name = posixpath.basename(src) + dest_file = storage.join(trackio_dir, base_name) + await storage.copy_file(src, dest_file) - # Copy directory contents or single file into the trackio subfolder - if os.path.isdir(src): - await storage.copy_dir(src, trackio_dir) - else: - base_name = posixpath.basename(src) - dest_file = storage.join(trackio_dir, base_name) - await storage.copy_file(src, dest_file) + await self._job.update_job_data_field("trackio_db_artifact_path", trackio_dir) # type: ignore[union-attr] + if project is not None and isinstance(project, str) and project.strip() != "": + await self._job.update_job_data_field("trackio_project", project.strip()) # type: ignore[union-attr] - # Record the artifact location in job_data so the backend/UI can locate it - await self._job.update_job_data_field("trackio_db_artifact_path", trackio_dir) # type: ignore[union-attr] - if project is not None and isinstance(project, str) and project.strip() != "": - await self._job.update_job_data_field("trackio_project", project.strip()) # type: ignore[union-attr] + logger.info(f"📊 Saved Trackio metrics for job to: {trackio_dir}") + return trackio_dir - logger.info(f"📊 Saved Trackio metrics for job to: {trackio_dir}") - return trackio_dir + async def _seed_trackio_shared_path_async( + self, experiment_id: str, project_name: str, dest_dir: str + ) -> None: + """If shared project path exists, copy its contents into dest_dir (seed for new run).""" + try: + workspace_dir = await dirs.get_workspace_dir() + shared_path = storage.join( + workspace_dir, + "trackio_runs", + secure_filename(str(experiment_id)), + secure_filename(project_name), + ) + if await storage.exists(shared_path) and await storage.isdir(shared_path): + await storage.copy_dir(shared_path, dest_dir) + except Exception as e: + logger.debug("Trackio shared path seed failed: %s", e) def _capture_existing_trackio_run(self) -> None: """ diff --git a/src/renderer/components/Experiment/Tasks/JobsList.tsx b/src/renderer/components/Experiment/Tasks/JobsList.tsx index 2c9fc9f26..5d1b097fc 100644 --- a/src/renderer/components/Experiment/Tasks/JobsList.tsx +++ b/src/renderer/components/Experiment/Tasks/JobsList.tsx @@ -275,7 +275,8 @@ const JobsList: React.FC = ({ )} - {job?.job_data?.trackio_db_artifact_path && ( + {(job?.job_data?.trackio_db_artifact_path || + job?.job_data?.trackio_project_name) && (