Skip to content

Commit c564488

Browse files
authored
Merge pull request #217 from NCATSTranslator/kgx_storage
Add S3 upload functionality with EBS cleanup and source-based logging #142
2 parents b514567 + 78715be commit c564488

10 files changed

Lines changed: 1214 additions & 16 deletions

File tree

.gitignore

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,4 +20,7 @@ releases
2020
# Documentation build artifacts (only used locally and in CI)
2121
/docs/
2222
site/
23+
24+
# Logs
25+
logs/
2326
/.claude/

Makefile

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,12 @@ define HELP
5151
│ │
5252
│ test Run all tests │
5353
│ │
54+
│ upload Upload data and releases to S3 │
55+
│ upload-all Upload all sources to S3 │
56+
│ cleanup-ebs Clean up old EBS versions │
57+
│ cleanup-s3 Delete all objects from S3 bucket (DANGEROUS) │
58+
│ cleanup-s3-source Delete specific source from S3 (DANGEROUS) │
59+
│ │
5460
│ lint Lint all code │
5561
│ lint-fix Fix linting errors automatically │
5662
│ format Format all code │
@@ -175,6 +181,45 @@ release-%:
175181
@echo "Creating release for $*..."
176182
@$(RUN) python src/translator_ingest/release.py $*
177183

184+
### S3 Upload and Storage Management ###
185+
186+
.PHONY: upload
187+
upload:
188+
@echo "Uploading sources to S3: $(SOURCES)"
189+
@$(RUN) python src/translator_ingest/upload_s3.py $(SOURCES)
190+
191+
.PHONY: upload-%
192+
upload-%:
193+
@echo "Uploading $* to S3..."
194+
@$(RUN) python src/translator_ingest/upload_s3.py $*
195+
196+
.PHONY: upload-all
197+
upload-all:
198+
@echo "Uploading all sources to S3..."
199+
@$(RUN) python src/translator_ingest/upload_s3.py
200+
201+
.PHONY: cleanup-ebs
202+
cleanup-ebs:
203+
@echo "Cleaning up old versions from EBS for sources: $(SOURCES)"
204+
@for source in $(SOURCES); do \
205+
echo "Cleaning up $$source..."; \
206+
$(RUN) python -c "from translator_ingest.util.storage.s3 import cleanup_old_source_versions, cleanup_old_releases; \
207+
cleanup_old_source_versions('$$source'); cleanup_old_releases('$$source')"; \
208+
done
209+
210+
.PHONY: cleanup-s3
211+
cleanup-s3:
212+
@echo "WARNING: This will delete ALL objects from the S3 bucket!"
213+
@$(RUN) python -c "from translator_ingest.util.storage.s3 import cleanup_s3_bucket; cleanup_s3_bucket()"
214+
215+
.PHONY: cleanup-s3-source
216+
cleanup-s3-source:
217+
@echo "WARNING: This will delete source data from S3!"
218+
@for source in $(SOURCES); do \
219+
echo "Deleting $$source from S3..."; \
220+
$(RUN) python -c "from translator_ingest.util.storage.s3 import cleanup_s3_source; cleanup_s3_source('$$source')"; \
221+
done
222+
178223
### Linting, Formatting, and Cleaning ###
179224

180225
.PHONY: clean

src/translator_ingest/__init__.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,11 @@
1212

1313
INGESTS_RELEASES_PATH = TRANSLATOR_INGEST_PATH / ".." / ".." / "releases"
1414

15+
INGESTS_LOGS_PATH = TRANSLATOR_INGEST_PATH / ".." / ".." / "logs"
16+
1517
INGESTS_PARSER_PATH = TRANSLATOR_INGEST_PATH / "ingests"
1618
INGEST_PARSER_DIR = INGESTS_PARSER_PATH.absolute()
1719

18-
INGESTS_STORAGE_URL = os.environ.get("INGESTS_STORAGE_URL", "https://stars.renci.org/var/translator/data")
19-
INGESTS_RELEASES_URL = os.environ.get("INGESTS_RELEASES_URL", "https://stars.renci.org/var/translator/releases")
20+
# Default public HTTPS endpoints for KGX storage (browser view format)
21+
INGESTS_STORAGE_URL = os.environ.get("INGESTS_STORAGE_URL", "https://kgx-storage.rtx.ai/?path=data")
22+
INGESTS_RELEASES_URL = os.environ.get("INGESTS_RELEASES_URL", "https://kgx-storage.rtx.ai/?path=releases")

src/translator_ingest/ingests/ctd/ctd.py

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
from bs4 import BeautifulSoup
2525
from koza.model.graphs import KnowledgeGraph
2626

27+
2728
BIOLINK_AFFECTS = "biolink:affects"
2829
BIOLINK_CAUSES = "biolink:causes"
2930
BIOLINK_ASSOCIATED_WITH = "biolink:associated_with"
@@ -44,6 +45,14 @@
4445
"negative correlation": BIOLINK_NEGATIVELY_CORRELATED
4546
}
4647

48+
49+
# !!! !!! README !!! !!!
50+
# CTD implemented a CAPTCHA which unfortunately breaks dependable programmatic access for determining the version
51+
# and downloading data. If possible, opening a browser and passing the CAPTCHA at ctdbase.org should allow everything
52+
# to run. If not, when determining the latest version, the translator-ingests pipeline will fall back to a previously
53+
# successful build, so if running in an environment where passing the CAPTCHA is not possible, copy a previous CTD
54+
# directory from /data/ including the latest-build.json file and the pipeline will utilize the last successful version.
55+
4756
def get_latest_version():
4857
# CTD doesn't provide a great programmatic way to determine the latest version, but it does have a Data Status page
4958
# with a version on it. Fetch the html and parse it to determine the current version.
@@ -56,7 +65,6 @@ def get_latest_version():
5665
else:
5766
raise RuntimeError('Could not determine latest version for CTD, "pgheading" header was missing...')
5867

59-
6068
@koza.transform_record(tag="chemicals_diseases")
6169
def transform_chemical_to_disease(koza: koza.KozaTransform, record: dict[str, Any]) -> KnowledgeGraph | None:
6270
chemical = ChemicalEntity(id=f"MESH:{record["ChemicalID"]}", name=record["ChemicalName"])

src/translator_ingest/upload_s3.py

Lines changed: 224 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,224 @@
1+
"""
2+
CLI entry point for uploading translator-ingests data and releases to S3.
3+
4+
This module provides the command-line interface for the `make upload` target.
5+
Data sources and release sources are handled as separate, independent lists.
6+
7+
Requirements:
8+
- Must run on EC2 instance with IAM role permissions
9+
- Run after `make run` and `make release` have been executed
10+
11+
Usage:
12+
# auto-discover data and release sources separately (no combining)
13+
uv run python src/translator_ingest/upload_s3.py
14+
15+
# explicit control over different source lists
16+
uv run python src/translator_ingest/upload_s3.py \
17+
--data-sources "ctd go_cam ncbigene" \
18+
--release-sources "translator_kg ctd go_cam"
19+
20+
# upload data only for specific sources
21+
uv run python src/translator_ingest/upload_s3.py --data-sources "ncbigene"
22+
23+
# upload releases only for specific sources
24+
uv run python src/translator_ingest/upload_s3.py --release-sources "translator_kg"
25+
26+
# skip cleanup after upload
27+
uv run python src/translator_ingest/upload_s3.py --no-cleanup
28+
29+
Via Makefile:
30+
make upload SOURCES="go_cam ctd"
31+
make upload-all
32+
make upload-go_cam
33+
"""
34+
35+
from pathlib import Path
36+
37+
import click
38+
39+
from translator_ingest import INGESTS_DATA_PATH, INGESTS_RELEASES_PATH
40+
from translator_ingest.util.logging_utils import get_logger, setup_logging
41+
from translator_ingest.util.storage.s3 import upload_and_cleanup
42+
43+
logger = get_logger(__name__)
44+
45+
46+
def discover_data_sources() -> list[str]:
47+
"""Discover sources from /data directory only.
48+
49+
Returns:
50+
Sorted list of source names found in /data directory
51+
"""
52+
sources = []
53+
data_path = Path(INGESTS_DATA_PATH)
54+
55+
if data_path.exists():
56+
for item in data_path.iterdir():
57+
if item.is_dir():
58+
sources.append(item.name)
59+
60+
return sorted(sources)
61+
62+
63+
def discover_release_sources() -> list[str]:
64+
"""Discover sources from /releases directory only.
65+
66+
Returns:
67+
Sorted list of source names found in /releases directory
68+
"""
69+
sources = []
70+
releases_path = Path(INGESTS_RELEASES_PATH)
71+
72+
if releases_path.exists():
73+
for item in releases_path.iterdir():
74+
if item.is_dir():
75+
sources.append(item.name)
76+
77+
return sorted(sources)
78+
79+
80+
def print_upload_summary(results: dict):
81+
"""Print formatted upload summary to console.
82+
83+
Args:
84+
results: Results dictionary from upload_and_cleanup()
85+
"""
86+
print("\n" + "=" * 80)
87+
print("S3 UPLOAD SUMMARY")
88+
print("=" * 80)
89+
print(f"Sources processed: {results['sources_processed']}")
90+
print(f"Files uploaded: {results['total_uploaded']}")
91+
print(f"Files failed: {results['total_failed']}")
92+
print(f"Data transferred: {results['total_bytes_transferred'] / (1024 * 1024 * 1024):.2f} GB")
93+
print(f"EBS space freed: {results['total_bytes_freed'] / (1024 * 1024 * 1024):.2f} GB")
94+
print("=" * 80)
95+
96+
# Per-source details
97+
print("\nPer-Source Details:")
98+
print("-" * 80)
99+
for source, stats in results['per_source_stats'].items():
100+
print(f"\n{source}:")
101+
102+
# Data upload
103+
data_upload = stats.get('data_upload', {})
104+
if 'error' in data_upload:
105+
print(f" Data upload: ERROR - {data_upload['error']}")
106+
else:
107+
print(f" Data upload: {data_upload.get('uploaded', 0)} files, "
108+
f"{data_upload.get('bytes_transferred', 0) / (1024 * 1024):.2f} MB")
109+
110+
# Releases upload
111+
releases_upload = stats.get('releases_upload', {})
112+
if 'error' in releases_upload:
113+
print(f" Releases upload: ERROR - {releases_upload['error']}")
114+
else:
115+
print(f" Releases upload: {releases_upload.get('uploaded', 0)} files, "
116+
f"{releases_upload.get('bytes_transferred', 0) / (1024 * 1024):.2f} MB")
117+
118+
# Cleanup stats
119+
data_cleanup = stats.get('data_cleanup', {})
120+
if data_cleanup:
121+
print(f" Data cleanup: {data_cleanup.get('deleted', 0)} versions deleted, "
122+
f"{data_cleanup.get('bytes_freed', 0) / (1024 * 1024 * 1024):.2f} GB freed")
123+
124+
releases_cleanup = stats.get('releases_cleanup', {})
125+
if releases_cleanup:
126+
print(f" Releases cleanup: {releases_cleanup.get('deleted', 0)} releases deleted, "
127+
f"{releases_cleanup.get('bytes_freed', 0) / (1024 * 1024 * 1024):.2f} GB freed")
128+
129+
print("\n" + "=" * 80 + "\n")
130+
131+
132+
@click.command()
133+
@click.option(
134+
"--data-sources",
135+
help="Space-separated list of sources to upload from /data (e.g., 'ctd go_cam ncbigene')"
136+
)
137+
@click.option(
138+
"--release-sources",
139+
help="Space-separated list of sources to upload from /releases (e.g., 'translator_kg ctd go_cam')"
140+
)
141+
@click.option("--no-cleanup", is_flag=True, help="Skip EBS cleanup after upload")
142+
def main(data_sources, release_sources, no_cleanup):
143+
"""Upload translator-ingests data and releases to S3.
144+
145+
If no sources are specified, automatically discovers sources from /data
146+
and /releases directories separately (no combining).
147+
148+
Examples:
149+
\b
150+
# auto-discover both data and release sources separately
151+
uv run python src/translator_ingest/upload_s3.py
152+
153+
\b
154+
# explicit control over different source lists
155+
uv run python src/translator_ingest/upload_s3.py \\
156+
--data-sources "ctd go_cam ncbigene" \\
157+
--release-sources "translator_kg ctd go_cam"
158+
159+
\b
160+
# upload data only for specific sources
161+
uv run python src/translator_ingest/upload_s3.py --data-sources "ncbigene"
162+
163+
\b
164+
# upload releases only for specific sources
165+
uv run python src/translator_ingest/upload_s3.py --release-sources "translator_kg"
166+
167+
\b
168+
# skip cleanup after upload
169+
uv run python src/translator_ingest/upload_s3.py --no-cleanup
170+
"""
171+
setup_logging(source="upload")
172+
173+
# Parse source lists from space-separated strings
174+
data_source_list = None
175+
release_source_list = None
176+
177+
if data_sources:
178+
data_source_list = data_sources.split()
179+
logger.info(f"Data sources specified: {', '.join(data_source_list)}")
180+
181+
if release_sources:
182+
release_source_list = release_sources.split()
183+
logger.info(f"Release sources specified: {', '.join(release_source_list)}")
184+
185+
# Auto-discover if neither specified
186+
if data_sources is None and release_sources is None:
187+
logger.info("No sources specified, auto-discovering sources separately...")
188+
189+
data_source_list = discover_data_sources()
190+
release_source_list = discover_release_sources()
191+
192+
if not data_source_list and not release_source_list:
193+
logger.warning("No sources found in /data or /releases directories")
194+
print("No sources found to upload.")
195+
return
196+
197+
logger.info(f"Discovered {len(data_source_list)} data sources: {', '.join(data_source_list) if data_source_list else 'none'}")
198+
logger.info(f"Discovered {len(release_source_list)} release sources: {', '.join(release_source_list) if release_source_list else 'none'}")
199+
200+
logger.info("Starting S3 upload...")
201+
logger.info(f"Data sources: {data_source_list if data_source_list else 'none'}")
202+
logger.info(f"Release sources: {release_source_list if release_source_list else 'none'}")
203+
logger.info(f"Cleanup: {not no_cleanup}")
204+
205+
# Execute upload and cleanup
206+
results = upload_and_cleanup(
207+
data_sources=data_source_list,
208+
release_sources=release_source_list,
209+
cleanup=not no_cleanup
210+
)
211+
212+
# Print summary
213+
print_upload_summary(results)
214+
215+
# Exit with error if there were failures
216+
if results['total_failed'] > 0:
217+
logger.error(f"Upload completed with {results['total_failed']} failures")
218+
exit(1)
219+
else:
220+
logger.info("Upload completed successfully")
221+
222+
223+
if __name__ == "__main__":
224+
main()

0 commit comments

Comments
 (0)