diff --git a/affirm/CHRONON_BOOTCAMP.md b/affirm/CHRONON_BOOTCAMP.md new file mode 100644 index 0000000000..dde126a079 --- /dev/null +++ b/affirm/CHRONON_BOOTCAMP.md @@ -0,0 +1,355 @@ +# Chronon GroupBy Bootcamp + +## Overview + +This bootcamp is designed for **end users** who want to learn Chronon GroupBy features through hands-on practice. It provides step-by-step commands to get you up and running quickly with minimal complexity. + +## What You'll Learn + +- Create your first GroupBy feature +- Run backfill computations +- Upload features to online store +- Test online serving +- Validate your results + +## Prerequisites + +- Docker installed and running +- Basic command-line knowledge +- No prior Chronon experience required + +## Quick Start + +### 1. Setup Environment + +```bash +# Clone Chronon repository +git clone git@github.com:airbnb/chronon.git +cd chronon/affirm + +# Start the bootcamp environment (minimal setup) +./setup-chronon-bootcamp.sh +``` + +Wait for the setup to complete. You should see: +``` +✅ Sample data has been copied to MinIO and is ready for GroupBy development! +📊 Parquet files available at: s3a://chronon/warehouse/data/ +📁 purchases.parquet (~155 records), users.parquet (100 records) +``` + +### 2. Verify Services + +```bash +# Check all services are running +docker-compose -f docker-compose-bootcamp.yml ps +``` + +All containers should show "Up" status. + +## Hands-On Tutorial + +### Step 1: Setup Team Information + +Before creating your GroupBy, you need to setup team information: + +```bash +# Create teams.json file in the root directory +cat > teams.json << 'EOF' +{ + "bootcamp": { + "description": "Bootcamp team for learning Chronon", + "namespace": "default", + "user": "bootcamp_user", + "production": { + "backfill": { + "EXECUTOR_CORES": "2", + "DRIVER_MEMORY": "1G", + "EXECUTOR_MEMORY": "2G", + "PARALLELISM": "4" + }, + "upload": { + "EXECUTOR_CORES": "2", + "EXECUTOR_MEMORY": "2G", + "PARALLELISM": "4" + } + } + } +} +EOF +``` + +### Step 2: Create Your First GroupBy + +Create a new GroupBy configuration: + +```bash +# Create the configuration directory +mkdir -p group_bys/bootcamp + +# Create your GroupBy configuration +cat > group_bys/bootcamp/user_purchase_features.py << 'EOF' +from ai.chronon.api.ttypes import Source, EventSource +from ai.chronon.query import Query, select +from ai.chronon.group_by import GroupBy, Aggregation, Operation, Window, TimeUnit + +# Define the source using sample data +source = Source( + events=EventSource( + table="purchases", # Sample purchase data + query=Query( + selects=select("user_id", "purchase_price", "item_category"), + time_column="ts" + ) + ) +) + +# Define time windows +window_sizes = [ + Window(length=1, timeUnit=TimeUnit.DAYS), # 1 day + Window(length=7, timeUnit=TimeUnit.DAYS), # 7 days +] + +# Create the GroupBy configuration +v1 = GroupBy( + sources=[source], + keys=["user_id"], + aggregations=[ + Aggregation( + input_column="purchase_price", + operation=Operation.SUM, + windows=window_sizes + ), + Aggregation( + input_column="purchase_price", + operation=Operation.COUNT, + windows=window_sizes + ), + Aggregation( + input_column="purchase_price", + operation=Operation.AVERAGE, + windows=window_sizes + ), + ], + online=True, +) +EOF +``` + +### Step 3: Compile Your Configuration + +```bash +# Compile the GroupBy configuration +python3 api/py/ai/chronon/repo/compile.py \ + --conf group_bys/bootcamp/user_purchase_features.py \ + --force-overwrite +``` + +You should see compilation success message. + +### Step 4: Run Backfill + +```bash +# Run backfill for December 1st, 2023 +python3 api/py/ai/chronon/repo/run.py \ + --conf production/group_bys/bootcamp/user_purchase_features.v1 \ + --mode backfill \ + --ds 2023-12-01 \ + --repo /srv/chronon +``` + +**Monitor Progress**: Open http://localhost:8080 to watch your job in Spark UI. + +### Step 5: Check Your Results + +```bash +# Access Spark SQL to check results +docker-compose -f docker-compose-bootcamp.yml exec chronon-main spark-sql +``` + +Inside Spark SQL: +```sql +-- List your tables +SHOW TABLES LIKE '*bootcamp*'; + +-- Check your computed features +SELECT * FROM default.bootcamp_user_purchase_features_v1 +WHERE ds = '2023-12-01' +LIMIT 10; + +-- Exit Spark SQL +EXIT; +``` + +### Step 6: Upload to Online Store + +```bash +# Upload features to MongoDB for online serving +python3 api/py/ai/chronon/repo/run.py \ + --conf production/group_bys/bootcamp/user_purchase_features.v1 \ + --mode upload \ + --ds 2023-12-01 \ + --repo /srv/chronon +``` + +### Step 7: Test Online Serving + +```bash +# Test fetching features for a specific user +python3 api/py/ai/chronon/repo/run.py \ + --mode fetch \ + --conf-type group_bys \ + --name bootcamp/user_purchase_features.v1 \ + -k '{"user_id":"user_1"}' \ + --repo /srv/chronon +``` + +You should see JSON output with feature values like: +```json +{ + "user_id": "user_1", + "purchase_price_sum_1d": 45.67, + "purchase_price_count_1d": 3, + "purchase_price_avg_1d": 15.22, + "purchase_price_sum_7d": 123.45, + "purchase_price_count_7d": 8, + "purchase_price_avg_7d": 15.43 +} +``` + +## Validation Exercise + +### Step 8: Manual Verification + +Let's verify your results are correct: + +```bash +# Check the source data for user_1 on 2023-12-01 +docker-compose -f docker-compose-bootcamp.yml exec chronon-main spark-sql +``` + +```sql +-- Load sample data +CREATE OR REPLACE TEMPORARY VIEW purchases AS +SELECT * FROM parquet.`s3a://chronon/warehouse/data/purchases/purchases.parquet`; + +-- Check user_1's purchases on 2023-12-01 +SELECT + user_id, + purchase_price, + ts +FROM purchases +WHERE user_id = 'user_1' +AND DATE(ts) = '2023-12-01' +ORDER BY ts; + +-- Manual calculation +SELECT + user_id, + SUM(purchase_price) as manual_sum_1d, + COUNT(*) as manual_count_1d, + AVG(purchase_price) as manual_avg_1d +FROM purchases +WHERE user_id = 'user_1' +AND DATE(ts) = '2023-12-01' +GROUP BY user_id; + +EXIT; +``` + +Compare the manual calculation with your GroupBy results - they should match! + +## Next Steps + +### Try Different Aggregations + +Modify your GroupBy configuration to try different operations: + +```python +# Add to your aggregations list +Aggregation( + input_column="purchase_price", + operation=Operation.MAX, # Maximum purchase + windows=window_sizes +), +Aggregation( + input_column="purchase_price", + operation=Operation.MIN, # Minimum purchase + windows=window_sizes +), +``` + +### Try Different Windows + +```python +# Add longer windows +window_sizes = [ + Window(length=1, timeUnit=TimeUnit.DAYS), # 1 day + Window(length=7, timeUnit=TimeUnit.DAYS), # 7 days + Window(length=30, timeUnit=TimeUnit.DAYS), # 30 days + Window(length=90, timeUnit=TimeUnit.DAYS), # 90 days +] +``` + +### Run Date Ranges + +```bash +# Run backfill for multiple days +python3 api/py/ai/chronon/repo/run.py \ + --conf production/group_bys/bootcamp/user_purchase_features.v1 \ + --mode backfill \ + --start-ds 2023-12-01 \ + --end-ds 2023-12-07 \ + --repo /srv/chronon +``` + +## Troubleshooting + +### Common Issues + +**Compilation Error**: +```bash +# Make sure you're in the chronon directory +cd /path/to/chronon +python3 api/py/ai/chronon/repo/compile.py --conf group_bys/bootcamp/user_purchase_features.py --force-overwrite +``` + +**Backfill Error**: +```bash +# Check if Spark is running +docker-compose -f docker-compose-bootcamp.yml ps spark-master + +# Check Spark UI +open http://localhost:8080 +``` + +**No Results**: +```bash +# Check if your table was created +docker-compose -f docker-compose-bootcamp.yml exec chronon-main spark-sql -e "SHOW TABLES LIKE '*bootcamp*';" +``` + +### Getting Help + +- **Spark UI**: http://localhost:8080 - Monitor job execution +- **MinIO Console**: http://localhost:9001 - Check data storage +- **Jupyter**: http://localhost:8888 - Interactive data exploration + +## Congratulations! 🎉 + +You've successfully completed the Chronon GroupBy bootcamp! You now know how to: + +- ✅ Create GroupBy configurations +- ✅ Compile and run backfill jobs +- ✅ Upload features to online store +- ✅ Test online serving +- ✅ Validate your results + +## What's Next? + +- **Explore More**: Try different aggregations and windows +- **Production**: Learn about Airflow orchestration in the Developer Guide +- **Advanced**: Study the full Developer Guide for production workflows +- **Join Community**: Connect with other Chronon users + +Happy feature engineering! 🚀 diff --git a/affirm/CHRONON_DEVELOPER_GUIDE.md b/affirm/CHRONON_DEVELOPER_GUIDE.md new file mode 100644 index 0000000000..d3f332b9ca --- /dev/null +++ b/affirm/CHRONON_DEVELOPER_GUIDE.md @@ -0,0 +1,675 @@ +# Chronon Developer Guide + +## Overview + +This guide is designed for **Chronon operators and developers** who need to understand the complete Chronon system architecture, production workflows, and how to simulate production environments locally using Docker. It covers both the technical implementation details and production-like orchestration. + +## Table of Contents + +1. [System Architecture](#system-architecture) +2. [Production Workflows](#production-workflows) +3. [Local Development Setup](#local-development-setup) +4. [GroupBy Development Workflow](#groupby-development-workflow) +5. [Airflow Integration](#airflow-integration) +6. [Production Simulation](#production-simulation) +7. [Monitoring and Troubleshooting](#monitoring-and-troubleshooting) +8. [Best Practices](#best-practices) + +## System Architecture + +### Key Components + +- **Spark**: Batch processing engine for GroupBy computations +- **Iceberg**: Table format for ACID transactions and time travel +- **MongoDB**: Online KV store for local development (Chronon's built-in support) +- **DynamoDB**: Online KV store for production environments +- **Airflow**: Workflow orchestration and job scheduling +- **S3/MinIO**: Object storage for Iceberg tables and raw data + +## Production Workflows + +### How Production Works + +1. **Data Ingestion**: Raw data flows into data warehouse (S3/HDFS) +2. **Feature Definition**: MLEs define GroupBy configurations in Python +3. **Compilation**: Python configs are compiled to Thrift JSON +4. **Orchestration**: Airflow DAGs automatically discover and schedule jobs +5. **Execution**: Spark processes GroupBy computations +6. **Storage**: Results stored in Iceberg tables (S3) and DynamoDB (online serving) +7. **Serving**: Features served via online API + +### Key Components + +- **Spark**: Batch processing engine for GroupBy computations +- **Iceberg**: Table format for ACID transactions and time travel +- **MongoDB**: Online KV store for local development (Chronon's built-in support) +- **DynamoDB**: Online KV store for production environments +- **Airflow**: Workflow orchestration and job scheduling +- **S3/MinIO**: Object storage for Iceberg tables and raw data + +## Local Development Setup + +### Prerequisites + +```bash +# Check Docker is installed +docker --version +docker-compose --version + +# Start Docker if it's not running +open -a Docker # macOS +# or start Docker Desktop from Applications + +# Check ports are available +lsof -i :8080 -i :8085 -i :8888 -i :9000 -i :9001 -i :27017 -i :8000 +``` + +### Start Development Environment + +```bash +# Clone Chronon repository +git clone git@github.com:airbnb/chronon.git +cd chronon/affirm + +# Start the developer environment (full setup) +./setup-chronon-developer.sh +``` + +### Verify Services + +```bash +# Check all containers are running +docker-compose -f docker-compose-developer.yml ps +``` + +**Expected Output**: +``` +NAME IMAGE STATUS +chronon-airflow-db-1 postgres:13 Up (healthy) +chronon-chronon-main-1 ezvz/chronon Up +chronon-dynamodb-local-1 amazon/dynamodb-local:latest Up +chronon-jupyter-1 jupyter/pyspark-notebook:latest Up (healthy) +chronon-minio-1 minio/minio:latest Up (healthy) +chronon-mongodb-1 mongo:latest Up (healthy) +chronon-spark-master-1 bitnami/spark:3.5.0 Up +chronon-spark-worker-1 bitnami/spark:3.5.0 Up +chronon-spark-worker-2 bitnami/spark:3.5.0 Up +``` + +### Access Points + +- **Spark Master**: http://localhost:8080 - GroupBy computation +- **Jupyter Notebooks**: http://localhost:8888 (token: chronon-dev) - Data exploration +- **MinIO Console**: http://localhost:9001 (minioadmin/minioadmin) - S3 storage +- **MongoDB**: localhost:27017 - Online feature serving +- **Airflow Dashboard**: http://localhost:8085 (admin/admin) - Production-like orchestration +- **DynamoDB Local**: http://localhost:8000 - Optional testing + +## GroupBy Development Workflow + +### 1. Explore Sample Data + +The setup script has already copied sample Parquet files to MinIO. You can explore them: + +```python +# In Jupyter Notebook (http://localhost:8888) +from pyspark.sql import SparkSession + +spark = SparkSession.builder.appName("DataExploration").getOrCreate() + +# Load sample data from MinIO +purchases_df = spark.read.parquet("s3a://chronon/warehouse/data/purchases/purchases.parquet") +users_df = spark.read.parquet("s3a://chronon/warehouse/data/users/users.parquet") + +# Explore the data +purchases_df.show(10) +purchases_df.describe().show() +``` + +### 2. Setup Team Information + +Before compiling, you need to create a `teams.json` file to define your team configuration: + +```bash +# Create teams.json file in the root directory +cat > teams.json << 'EOF' +{ + "my_team": { + "description": "My team for GroupBy development", + "namespace": "default", + "user": "developer", + "production": { + "backfill": { + "EXECUTOR_CORES": "2", + "DRIVER_MEMORY": "1G", + "EXECUTOR_MEMORY": "2G", + "PARALLELISM": "4" + }, + "upload": { + "EXECUTOR_CORES": "2", + "EXECUTOR_MEMORY": "2G", + "PARALLELISM": "4" + } + } + } +} +EOF +``` + +**Key Points**: +- **Team name** (`my_team`) must match your directory structure +- **Namespace** defines the database namespace for your tables +- **Production settings** configure Spark resources for different job types +- **User** field is used for job ownership and permissions + +### 3. Define GroupBy Configuration + +Create your GroupBy configuration in Python: + +```python +# group_bys/my_team/user_purchase_features.py +from ai.chronon.api.ttypes import Source, EventSource +from ai.chronon.query import Query, select +from ai.chronon.group_by import GroupBy, Aggregation, Operation, Window, TimeUnit + +# Define the source using sample data from MinIO +source = Source( + events=EventSource( + table="purchases", # Temp view created from MinIO Parquet file + query=Query( + selects=select("user_id", "purchase_price", "item_category"), + time_column="ts" # Event timestamp column + ) + ) +) + +# Define time windows for aggregations +window_sizes = [ + Window(length=1, timeUnit=TimeUnit.DAYS), # 1 day + Window(length=7, timeUnit=TimeUnit.DAYS), # 7 days + Window(length=30, timeUnit=TimeUnit.DAYS), # 30 days +] + +# Create the GroupBy configuration +v1 = GroupBy( + sources=[source], + keys=["user_id"], # Primary key for grouping + aggregations=[ + # Sum of purchase prices in different windows + Aggregation( + input_column="purchase_price", + operation=Operation.SUM, + windows=window_sizes + ), + # Count of purchases in different windows + Aggregation( + input_column="purchase_price", + operation=Operation.COUNT, + windows=window_sizes + ), + # Average purchase price in different windows + Aggregation( + input_column="purchase_price", + operation=Operation.AVERAGE, + windows=window_sizes + ), + # Last 10 purchase prices (no window = all time) + Aggregation( + input_column="purchase_price", + operation=Operation.LAST_K(10), + ), + ], + online=True, # Enable online serving +) +``` + +### 4. Compile Configuration + +Convert your Python configuration to Thrift JSON: + +```bash +# Compile the GroupBy configuration +python3 api/py/ai/chronon/repo/compile.py \ + --conf group_bys/my_team/user_purchase_features.py \ + --force-overwrite + +# This creates: production/group_bys/my_team/user_purchase_features.v1 +``` + +### 5. Run Local Backfill (Direct) + +Execute the GroupBy computation for a specific date: + +```bash +# Run backfill for a specific date +python3 api/py/ai/chronon/repo/run.py \ + --conf production/group_bys/my_team/user_purchase_features.v1 \ + --mode backfill \ + --ds 2023-12-01 \ + --repo /srv/chronon + +# For date ranges +python3 api/py/ai/chronon/repo/run.py \ + --conf production/group_bys/my_team/user_purchase_features.v1 \ + --mode backfill \ + --start-ds 2023-12-01 \ + --end-ds 2023-12-07 \ + --repo /srv/chronon +``` + +### 6. Check Output in Iceberg Tables + +Verify your computed features in Iceberg tables: + +```bash +# Access Spark SQL +docker-compose -f docker-compose-developer.yml exec chronon-main spark-sql + +# Inside Spark SQL +USE default; +SHOW TABLES LIKE '*user_purchase*'; + +# Check the computed features +SELECT * FROM default.my_team_user_purchase_features_v1 +WHERE ds = '2023-12-01' +LIMIT 10; + +# Verify it's an Iceberg table +DESCRIBE EXTENDED default.my_team_user_purchase_features_v1; +# Should show: Table Type: ICEBERG +``` + +### 7. Upload to Online Store + +Upload computed features to MongoDB for online serving: + +```bash +# Upload features to MongoDB +python3 api/py/ai/chronon/repo/run.py \ + --conf production/group_bys/my_team/user_purchase_features.v1 \ + --mode upload \ + --ds 2023-12-01 \ + --repo /srv/chronon +``` + +### 8. Test Online Serving + +Test fetching features from the online store: + +```bash +# Test fetch for a specific user +python3 api/py/ai/chronon/repo/run.py \ + --mode fetch \ + --conf-type group_bys \ + --name my_team/user_purchase_features.v1 \ + -k '{"user_id":"user_1"}' \ + --repo /srv/chronon +``` + +## Airflow Integration + +### Production-Like Spark Job Submission + +Airflow is the **standard way** to submit Spark jobs in production environments. Using Airflow locally gives you a production-like development experience. + +### Why Use Airflow for GroupBy Development + +- **Production-like workflow** - Same job submission process as production +- **Spark job orchestration** - Proper job scheduling and dependency management +- **Monitoring and alerting** - Web UI for job status and failure notifications +- **Error handling** - Automatic retries and failure notifications +- **Resource management** - Proper Spark cluster resource allocation +- **Job history** - Track job execution history and performance + +### Submit via Airflow (Production-Like) + +Instead of using `run.py` directly, you can submit your compiled JSON configs to Airflow using the OSS Chronon DAGs. This provides a production-like workflow with proper orchestration, monitoring, and error handling. + +#### Step 1: Copy Configs to Airflow DAGs Folder + +Copy your compiled JSON configs to the Airflow DAGs folder where Chronon DAGs can discover them: + +```bash +# Create the team directory in Airflow DAGs +docker-compose -f docker-compose-developer.yml exec airflow-webserver \ + mkdir -p /opt/airflow/dags/chronon/group_bys/my_team + +# Copy your compiled GroupBy config +docker cp production/group_bys/my_team/user_purchase_features.v1 \ + chronon-airflow-webserver-1:/opt/airflow/dags/chronon/group_bys/my_team/ + +# Verify the file was copied +docker-compose -f docker-compose-developer.yml exec airflow-webserver \ + ls -la /opt/airflow/dags/chronon/group_bys/my_team/ +``` + +#### Step 2: Configure Chronon Constants + +Edit the Airflow constants file to point to your configs and configure the DAG behavior: + +```bash +# Access the Airflow container +docker-compose -f docker-compose-developer.yml exec airflow-webserver bash + +# Edit the constants file +vi /opt/airflow/dags/chronon/constants.py +``` + +Update the constants: +```python +# In /opt/airflow/dags/chronon/constants.py +import os + +# Path to your compiled JSON configs +CHRONON_PATH = "/opt/airflow/dags/chronon" + +# Team name (should match your directory structure) +TEST_TEAM_NAME = "my_team" + +# Concurrency settings (adjust based on your resources) +GROUP_BY_BATCH_CONCURRENCY = 5 +JOIN_CONCURRENCY = 3 + +# Time partition columns +time_parts = ["ds", "ts", "hr"] + +# Spark configuration +SPARK_MASTER = "spark://spark-master:7077" +SPARK_DRIVER_MEMORY = "1g" +SPARK_EXECUTOR_MEMORY = "2g" +SPARK_EXECUTOR_CORES = "2" + +# Online store configuration (MongoDB for local dev) +ONLINE_STORE_CONFIG = { + "host": "mongodb", + "port": 27017, + "database": "admin", + "username": "admin", + "password": "admin" +} +``` + +#### Step 3: Restart Airflow Services + +Restart Airflow to pick up the new configuration: + +```bash +# Restart Airflow services +docker-compose -f docker-compose-developer.yml restart airflow-webserver airflow-scheduler + +# Wait for services to be ready +sleep 30 +``` + +#### Step 4: Monitor DAGs in Airflow UI + +1. **Access Airflow UI**: http://localhost:8085 (admin/admin) +2. **Find your DAG**: Look for `chronon_batch_dag_my_team` +3. **Check DAG structure**: Click on the DAG to see the task graph +4. **Trigger manually**: Click "Trigger DAG" to run your GroupBy +5. **Monitor progress**: Watch task execution in real-time + +#### Step 5: Verify Airflow Execution + +```bash +# Check DAG status +docker-compose -f docker-compose-developer.yml exec airflow-webserver \ + airflow dags list | grep chronon + +# Check specific DAG runs +docker-compose -f docker-compose-developer.yml exec airflow-webserver \ + airflow dags state chronon_batch_dag_my_team 2023-12-01 + +# View DAG logs +docker-compose -f docker-compose-developer.yml exec airflow-webserver \ + airflow tasks logs chronon_batch_dag_my_team backfill_task 2023-12-01 + +# Check task details +docker-compose -f docker-compose-developer.yml exec airflow-webserver \ + airflow tasks list chronon_batch_dag_my_team +``` + +#### Step 6: Monitor Spark Jobs + +While Airflow is running your DAG, monitor the Spark jobs: + +1. **Spark UI**: http://localhost:8080 - See job progress and resource usage +2. **Airflow UI**: http://localhost:8085 - See task status and logs +3. **Command Line**: Use the verification commands above + +#### Step 7: Verify Results + +After the DAG completes successfully: + +```bash +# Check if your GroupBy table was created +docker-compose -f docker-compose-developer.yml exec chronon-main spark-sql -e " +SHOW TABLES LIKE '*my_team*'; +SELECT * FROM default.my_team_user_purchase_features_v1 WHERE ds = '2023-12-01' LIMIT 5; +" + +# Check online store (MongoDB) +docker-compose -f docker-compose-developer.yml exec mongodb mongosh -u admin -p admin --authenticationDatabase admin -e " +use admin +db.my_team_user_purchase_features_v1.find().limit(3) +" +``` + +### Understanding the OSS Chronon DAGs + +The OSS Chronon DAGs automatically discover your JSON configs and create DAGs with the following structure: + +``` +chronon_batch_dag_my_team +├── backfill_task # Runs GroupBy backfill +├── upload_task # Uploads features to online store +└── metadata_upload_task # Uploads metadata +``` + +**DAG Naming Convention**: +- `chronon_batch_dag_{team_name}` - For GroupBy configs +- `chronon_join_{join_name}` - For Join configs +- `chronon_staging_query_{team_name}` - For Staging Query configs + +### Benefits of Airflow Approach + +- ✅ **Production-like workflow** - Same as production environment +- ✅ **Automatic discovery** - DAGs auto-generate from JSON configs +- ✅ **Scheduling** - Daily/weekly automated execution +- ✅ **Monitoring** - Web UI for job status and logs +- ✅ **Error handling** - Automatic retries and failure notifications +- ✅ **Dependency management** - Chain multiple GroupBys together +- ✅ **Resource management** - Proper Spark cluster allocation +- ✅ **Job history** - Track execution history and performance + +## Production Simulation + +### Understanding Auto-Generated DAGs + +Chronon automatically creates DAGs based on your configurations: + +- **GroupBy DAGs**: `chronon_batch_dag_{team_name}` - Runs backfill and upload jobs +- **Join DAGs**: `chronon_join_{join_name}` - Runs join computations +- **Staging Query DAGs**: `chronon_staging_query_{team_name}` - Runs staging queries + +### Workflow Comparison + +| Approach | Use Case | Pros | Cons | +|----------|----------|------|------| +| **Direct run.py** | Development, testing, debugging | Fast iteration, immediate feedback, direct control | Manual execution, no scheduling | +| **Airflow + OSS DAGs** | Production simulation, scheduling | Production-like, automated, monitoring, error handling | Setup overhead, less direct control | + +### Hybrid Approach: Development + Production Simulation + +**Recommended Workflow**: + +1. **Initial Development** (Direct run.py): + ```bash + # Quick iteration and testing + python3 api/py/ai/chronon/repo/run.py --conf ... --mode backfill --ds 2023-12-01 + ``` + +2. **Production Simulation** (Airflow with OSS DAGs): + ```bash + # Copy compiled JSON configs to Airflow DAGs folder + # Use OSS Chronon DAGs for production-like execution + # Monitor via Airflow UI at http://localhost:8085 + ``` + +3. **Production Deployment**: + ```bash + # Same Airflow DAGs and JSON configs work in production + # No changes needed for deployment + ``` + +## Monitoring and Troubleshooting + +### Real-time Monitoring During GroupBy Execution + +#### 1. Spark UI Monitoring (Best Option) + +**Access**: http://localhost:8080 + +**What to Monitor**: +- **Jobs Tab**: See all Spark jobs (active, completed, failed) +- **Stages Tab**: Monitor individual stage progress +- **Executors Tab**: Check resource usage (CPU, memory, disk) +- **SQL Tab**: See SQL query execution plans and metrics +- **Environment Tab**: Verify Spark configuration + +#### 2. Command Line Monitoring + +```bash +# Monitor Spark logs in real-time +docker-compose -f docker-compose-developer.yml logs -f chronon-main + +# Check specific Spark application +docker-compose -f docker-compose-developer.yml exec chronon-main \ + spark-submit --status + +# Monitor system resources +docker stats chronon-chronon-main-1 +``` + +#### 3. Airflow Monitoring + +- Access Airflow UI at http://localhost:8085 +- Monitor DAG runs, task status, and logs +- Set up alerts for failed jobs +- Review execution times and resource usage + +### Common Errors and Solutions + +#### Compilation Errors +```bash +# Error: Module not found +# Solution: Check PYTHONPATH and imports +export PYTHONPATH=/srv/chronon:$PYTHONPATH + +# Error: Invalid source configuration +# Solution: Verify table exists and columns are correct +spark-sql -e "DESCRIBE data.your_table;" +``` + +#### Backfill Errors +```bash +# Error: Out of memory +# Solution: Reduce parallelism or increase memory +python3 api/py/ai/chronon/repo/run.py --conf ... --mode backfill --ds 2023-12-01 --parallelism 1 + +# Error: Table not found +# Solution: Check source table exists +spark-sql -e "SHOW TABLES IN data;" +``` + +#### Upload Errors +```bash +# Error: MongoDB connection failed +# Solution: Check MongoDB is running +docker-compose -f docker-compose-developer.yml exec mongodb mongosh --eval "db.adminCommand('ping')" + +# Error: Invalid key format +# Solution: Check key format in fetch command +python3 api/py/ai/chronon/repo/run.py --mode fetch --conf-type group_bys --name team/feature.v1 -k '{"user_id":"123"}' +``` + +## Best Practices + +### Feature Naming Conventions + +```python +# Good naming examples +Aggregation( + input_column="purchase_amount", + operation=Operation.SUM, + windows=[Window(length=7, timeUnit=TimeUnit.DAYS)] +) +# Results in: purchase_amount_sum_7d + +# Use descriptive names +Aggregation( + input_column="user_engagement_score", + operation=Operation.AVERAGE, + windows=[Window(length=30, timeUnit=TimeUnit.DAYS)] +) +# Results in: user_engagement_score_avg_30d +``` + +### Testing Strategies + +1. **Start Small**: Test with single day, small user set +2. **Validate Incrementally**: Check each aggregation separately +3. **Use Test Data**: Create controlled test scenarios +4. **Compare Manually**: Verify key calculations by hand +5. **Monitor Resources**: Watch Spark UI for performance issues + +### Documentation Requirements + +```python +# Document your GroupBy configuration +""" +User Purchase Features + +This GroupBy computes user purchase aggregations for the last 1, 7, and 30 days. + +Features: +- purchase_amount_sum_{window}: Total amount spent +- purchase_amount_count_{window}: Number of purchases +- purchase_amount_avg_{window}: Average purchase amount + +Source: data.purchases table +Key: user_id +Windows: 1d, 7d, 30d +""" + +v1 = GroupBy( + sources=[source], + keys=["user_id"], + aggregations=[...], + online=True, +) +``` + +### Version Control Workflow + +```bash +# 1. Create feature branch +git checkout -b feature/user-purchase-features + +# 2. Develop and test locally +# ... development work ... + +# 3. Commit changes +git add group_bys/my_team/user_purchase_features.py +git commit -m "Add user purchase aggregations for 1d, 7d, 30d windows" + +# 4. Test in staging environment +# ... deploy to staging ... + +# 5. Create pull request +git push origin feature/user-purchase-features +``` + +--- + +This guide provides everything you need to understand Chronon's architecture, develop GroupBy features, and simulate production workflows locally. Use it as a reference for both development and production deployment. diff --git a/affirm/docker-compose-bootcamp.yml b/affirm/docker-compose-bootcamp.yml new file mode 100644 index 0000000000..05c6e0900d --- /dev/null +++ b/affirm/docker-compose-bootcamp.yml @@ -0,0 +1,135 @@ +services: + minio: + image: minio/minio:latest + ports: + - "9000:9000" + - "9001:9001" + environment: + MINIO_ROOT_USER: minioadmin + MINIO_ROOT_PASSWORD: minioadmin + command: server /data --console-address ":9001" + volumes: + - minio_data:/data + healthcheck: + test: ["CMD", "curl", "-f", "http://localhost:9000/minio/health/live"] + interval: 30s + timeout: 20s + retries: 3 + + mongodb: + image: mongo:latest + ports: + - "27017:27017" + environment: + MONGO_INITDB_ROOT_USERNAME: admin + MONGO_INITDB_ROOT_PASSWORD: admin + volumes: + - mongodb_data:/opt/mongo/data/db + healthcheck: + test: ["CMD", "mongosh", "--eval", "db.adminCommand('ping')"] + interval: 30s + timeout: 10s + retries: 3 + + spark-master: + image: apache/spark:3.5.2 + ports: + - "8080:8080" # Master UI + - "7077:7077" # Master RPC + environment: + SPARK_MASTER_HOST: spark-master + SPARK_MASTER_PORT: "7077" + SPARK_MASTER_WEBUI_PORT: "8080" + SPARK_DAEMON_JAVA_OPTS: -Dspark.deploy.recoveryMode=FILESYSTEM -Dspark.deploy.recoveryDirectory=/tmp/spark-recovery + command: + [ + "/opt/spark/bin/spark-class","org.apache.spark.deploy.master.Master", + "--host","spark-master","--port","7077","--webui-port","8080" + ] + volumes: + - spark_events:/opt/spark/spark-events + healthcheck: + test: ["CMD-SHELL", "ps -ef | grep -q '[o]rg.apache.spark.deploy.master.Master' || exit 1; command -v wget >/dev/null 2>&1 && wget -qO- http://127.0.0.1:8080 >/dev/null || true"] + interval: 5s + timeout: 5s + retries: 24 + start_period: 5s + restart: unless-stopped + + spark-worker: + image: apache/spark:3.5.2 + depends_on: + spark-master: + condition: service_healthy + minio: + condition: service_started + command: + [ + "/opt/spark/bin/spark-class","org.apache.spark.deploy.worker.Worker", + "spark://spark-master:7077", + "--cores","2","--memory","2G", + "--webui-port","8081" + ] + environment: + SPARK_WORKER_DIR: /opt/spark/work-dir + volumes: + - spark_events:/opt/spark/spark-events + restart: unless-stopped + + chronon-main: + image: ezvz/chronon + command: bash -c "spark-shell -i scripts/data-loader.scala && tail -f /dev/null" + ports: + - "4040:4040" + - "4041:4041" + environment: + USER: root + SPARK_SUBMIT_PATH: spark-submit + PYTHONPATH: /srv/chronon + SPARK_VERSION: "3.5.2" + JOB_MODE: spark://spark-master:7077 + PARALLELISM: "4" + EXECUTOR_MEMORY: 2G + EXECUTOR_CORES: "2" + DRIVER_MEMORY: 1G + CHRONON_LOG_TABLE: default.chronon_log_table + CHRONON_ONLINE_CLASS: ai.chronon.quickstart.online.ChrononMongoOnlineImpl + CHRONON_ONLINE_ARGS: -Zuser=admin -Zpassword=admin -Zhost=mongodb -Zport=27017 -Zdatabase=admin + SPARK_SQL_EXTENSIONS: org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions + SPARK_SQL_CATALOG_SPARK_CATALOG: org.apache.iceberg.spark.SparkSessionCatalog + SPARK_SQL_CATALOG_SPARK_CATALOG_TYPE: hive + SPARK_SQL_CATALOG_SPARK_CATALOG_WAREHOUSE: s3a://chronon/warehouse + SPARK_CHRONON_TABLE_WRITE_FORMAT: iceberg + SPARK_CHRONON_TABLE_READ_FORMAT: iceberg + AWS_ACCESS_KEY_ID: minioadmin + AWS_SECRET_ACCESS_KEY: minioadmin + S3_ENDPOINT: http://minio:9000 + S3_PATH_STYLE_ACCESS: "true" + volumes: + - ../api/py/test/sample:/srv/chronon + - ./scripts:/srv/scripts + - spark_events:/opt/spark/spark-events + depends_on: + - spark-master + - minio + - mongodb + + jupyter: + image: jupyter/pyspark-notebook:latest + ports: + - "8888:8888" + environment: + JUPYTER_ENABLE_LAB: "yes" + SPARK_MASTER: spark://spark-master:7077 + JUPYTER_TOKEN: chronon-dev + volumes: + - ../api/py:/home/jovyan/work/chronon-api + - ../api/py/test/sample:/home/jovyan/work/sample-data + depends_on: + - spark-master + - chronon-main + +volumes: + minio_data: + mongodb_data: + spark_events: diff --git a/affirm/docker-compose-developer.yml b/affirm/docker-compose-developer.yml new file mode 100644 index 0000000000..ffdb0f09ad --- /dev/null +++ b/affirm/docker-compose-developer.yml @@ -0,0 +1,218 @@ +# Enhanced Chronon Docker Compose with Spark + Iceberg + MongoDB + S3 + Airflow + +services: + + # MinIO for S3-compatible storage + minio: + image: minio/minio:latest + ports: + - "9000:9000" # API + - "9001:9001" # Console + environment: + MINIO_ROOT_USER: minioadmin + MINIO_ROOT_PASSWORD: minioadmin + command: server /data --console-address ":9001" + volumes: + - minio_data:/data + healthcheck: + test: ["CMD", "curl", "-f", "http://localhost:9000/minio/health/live"] + interval: 30s + timeout: 20s + retries: 3 + + # MongoDB - Chronon's built-in KV store support + mongodb: + image: mongo:latest + ports: + - "27017:27017" + environment: + MONGO_INITDB_ROOT_USERNAME: admin + MONGO_INITDB_ROOT_PASSWORD: admin + volumes: + - mongodb_data:/opt/mongo/data/db + healthcheck: + test: ["CMD", "mongosh", "--eval", "db.adminCommand('ping')"] + interval: 30s + timeout: 10s + retries: 3 + + # Note: Polaris removed for simplicity - using Spark's built-in Hive catalog instead + # This is sufficient for local GroupBy development and testing + + # Airflow Webserver (OPTIONAL - comment out if you don't need orchestration) + airflow-webserver: + image: apache/airflow:2.5.3 + ports: + - "8085:8080" + environment: + - AIRFLOW__CORE__EXECUTOR=LocalExecutor + - AIRFLOW__DATABASE__SQL_ALCHEMY_CONN=postgresql+psycopg2://airflow:airflow@airflow-db:5432/airflow + - AIRFLOW__CORE__SQL_ALCHEMY_CONN=postgresql+psycopg2://airflow:airflow@airflow-db:5432/airflow + - AIRFLOW__CORE__FERNET_KEY=46BKJoQYlPPOexq0OhDZnIlNepKFf87WFwLbfzqDDho= + - AIRFLOW__CORE__DAGS_ARE_PAUSED_AT_CREATION=true + - AIRFLOW__CORE__LOAD_EXAMPLES=false + - AIRFLOW__API__AUTH_BACKENDS=airflow.api.auth.backend.basic_auth,airflow.api.auth.backend.session + - AIRFLOW__SCHEDULER__ENABLE_HEALTH_CHECK=true + - _PIP_ADDITIONAL_REQUIREMENTS=apache-airflow-providers-postgres + volumes: + - ./airflow/dags:/opt/airflow/dags + - ./airflow/logs:/opt/airflow/logs + - ./airflow/plugins:/opt/airflow/plugins + command: webserver + depends_on: + - airflow-db + - airflow-scheduler + # Simplified health check - just check if container is running + # Airflow webserver can take time to fully initialize + + # Airflow Scheduler (OPTIONAL - comment out if you don't need orchestration) + airflow-scheduler: + image: apache/airflow:2.5.3 + environment: + - AIRFLOW__CORE__EXECUTOR=LocalExecutor + - AIRFLOW__DATABASE__SQL_ALCHEMY_CONN=postgresql+psycopg2://airflow:airflow@airflow-db:5432/airflow + - AIRFLOW__CORE__SQL_ALCHEMY_CONN=postgresql+psycopg2://airflow:airflow@airflow-db:5432/airflow + - AIRFLOW__CORE__FERNET_KEY=46BKJoQYlPPOexq0OhDZnIlNepKFf87WFwLbfzqDDho= + - AIRFLOW__CORE__DAGS_ARE_PAUSED_AT_CREATION=true + - AIRFLOW__CORE__LOAD_EXAMPLES=false + - AIRFLOW__API__AUTH_BACKENDS=airflow.api.auth.backend.basic_auth,airflow.api.auth.backend.session + - AIRFLOW__SCHEDULER__ENABLE_HEALTH_CHECK=true + - _PIP_ADDITIONAL_REQUIREMENTS=apache-airflow-providers-postgres + volumes: + - ./airflow/dags:/opt/airflow/dags + - ./airflow/logs:/opt/airflow/logs + - ./airflow/plugins:/opt/airflow/plugins + command: scheduler + depends_on: + - airflow-db + # Simplified health check - just check if container is running + # Airflow scheduler can take time to fully initialize + + # Airflow Database (PostgreSQL) - Using PostgreSQL 13 with legacy authentication + airflow-db: + image: postgres:13 + environment: + POSTGRES_DB: airflow + POSTGRES_USER: airflow + POSTGRES_PASSWORD: airflow + POSTGRES_INITDB_ARGS: "--auth-host=md5 --auth-local=md5" + volumes: + - airflow_db_data:/var/lib/postgresql/data + healthcheck: + test: ["CMD-SHELL", "pg_isready -U airflow"] + interval: 30s + timeout: 10s + retries: 3 + + # DynamoDB Local + dynamodb-local: + image: amazon/dynamodb-local:latest + ports: + - "8000:8000" + command: "-jar DynamoDBLocal.jar -sharedDb -dbPath /data" + volumes: + - dynamodb_data:/data + # Note: DynamoDB Local doesn't have curl, so we skip health check + # The service will be considered ready when the container starts + + # Spark Master + spark-master: + image: apache/spark:3.5.2 + ports: + - "8080:8080" # Spark UI + - "7077:7077" # Spark Master + environment: + - SPARK_MODE=master + - SPARK_RPC_AUTHENTICATION_ENABLED=no + - SPARK_RPC_ENCRYPTION_ENABLED=no + - SPARK_LOCAL_STORAGE_ENCRYPTION_ENABLED=no + - SPARK_SSL_ENABLED=no + volumes: + - spark_events:/opt/apache/spark/spark-events + + # Spark Worker + spark-worker: + image: apache/spark:3.5.2 + depends_on: + - spark-master + - minio + environment: + - SPARK_MODE=worker + - SPARK_MASTER_URL=spark://spark-master:7077 + - SPARK_WORKER_MEMORY=2G + - SPARK_WORKER_CORES=2 + - SPARK_RPC_AUTHENTICATION_ENABLED=no + - SPARK_RPC_ENCRYPTION_ENABLED=no + - SPARK_LOCAL_STORAGE_ENCRYPTION_ENABLED=no + - SPARK_SSL_ENABLED=no + volumes: [] + scale: 2 + + # Chronon Main Container with all components + chronon-main: + image: ezvz/chronon + command: bash -c "spark-shell -i scripts/data-loader.scala && tail -f /dev/null" + ports: + - "4040:4040" # Spark UI + - "4041:4041" # Spark UI (backup) + environment: + - USER=root + - SPARK_SUBMIT_PATH=spark-submit + - PYTHONPATH=/srv/chronon + - SPARK_VERSION=3.5.2 + - JOB_MODE=spark://spark-master:7077 + - PARALLELISM=4 + - EXECUTOR_MEMORY=2G + - EXECUTOR_CORES=2 + - DRIVER_MEMORY=1G + - CHRONON_LOG_TABLE=default.chronon_log_table + - CHRONON_ONLINE_CLASS=ai.chronon.quickstart.online.ChrononMongoOnlineImpl + - CHRONON_ONLINE_ARGS=-Zuser=admin -Zpassword=admin -Zhost=mongodb -Zport=27017 -Zdatabase=admin + # Iceberg Configuration with Spark's built-in Hive catalog + - SPARK_SQL_EXTENSIONS=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions + - SPARK_SQL_CATALOG_SPARK_CATALOG=org.apache.iceberg.spark.SparkSessionCatalog + - SPARK_SQL_CATALOG_SPARK_CATALOG_TYPE=hive + - SPARK_SQL_CATALOG_SPARK_CATALOG_WAREHOUSE=s3a://chronon/warehouse + - SPARK_CHRONON_TABLE_WRITE_FORMAT=iceberg + - SPARK_CHRONON_TABLE_READ_FORMAT=iceberg + # S3 Configuration + - AWS_ACCESS_KEY_ID=minioadmin + - AWS_SECRET_ACCESS_KEY=minioadmin + - S3_ENDPOINT=http://minio:9000 + - S3_PATH_STYLE_ACCESS=true + # DynamoDB Configuration + - AWS_DEFAULT_REGION=us-east-1 + - DYNAMODB_ENDPOINT=http://dynamodb:8000 + volumes: + - ../api/py/test/sample:/srv/chronon + - ./scripts:/srv/scripts + - spark_events:/opt/spark/spark-events + depends_on: + - spark-master + - minio + - mongodb + - airflow-webserver + - dynamodb-local + + # Jupyter Notebook for development (ESSENTIAL for GroupBy verification) + jupyter: + image: jupyter/pyspark-notebook:latest + ports: + - "8888:8888" + environment: + - JUPYTER_ENABLE_LAB=yes + - SPARK_MASTER=spark://spark-master:7077 + - JUPYTER_TOKEN=chronon-dev + volumes: + - ../api/py:/home/jovyan/work/chronon-api + - ../api/py/test/sample:/home/jovyan/work/sample-data + depends_on: + - spark-master + - chronon-main + +volumes: + minio_data: + mongodb_data: + airflow_db_data: + dynamodb_data: + spark_events: diff --git a/affirm/group_bys/bootcamp/user_purchase_features.py b/affirm/group_bys/bootcamp/user_purchase_features.py new file mode 100644 index 0000000000..999c565da0 --- /dev/null +++ b/affirm/group_bys/bootcamp/user_purchase_features.py @@ -0,0 +1,45 @@ +from ai.chronon.api.ttypes import Source, EventSource +from ai.chronon.query import Query, select +from ai.chronon.group_by import GroupBy, Aggregation, Operation, Window, TimeUnit + +# Define the source using sample data +source = Source( + events=EventSource( + table="purchases", # Sample purchase data + query=Query( + selects=select("user_id", "purchase_price", "item_category"), + time_column="ts" + ) + ) +) + +# Define time windows +window_sizes = [ + Window(length=1, timeUnit=TimeUnit.DAYS), # 1 day + Window(length=7, timeUnit=TimeUnit.DAYS), # 7 days +] + +# Create the GroupBy configuration +v1 = GroupBy( + sources=[source], + keys=["user_id"], + aggregations=[ + Aggregation( + input_column="purchase_price", + operation=Operation.SUM, + windows=window_sizes + ), + Aggregation( + input_column="purchase_price", + operation=Operation.COUNT, + windows=window_sizes + ), + Aggregation( + input_column="purchase_price", + operation=Operation.AVERAGE, + windows=window_sizes + ), + ], + online=True, + backfill_start_date="2023-12-01", # Start date for backfill +) \ No newline at end of file diff --git a/affirm/sample_data/README.md b/affirm/sample_data/README.md new file mode 100644 index 0000000000..5e1b488df2 --- /dev/null +++ b/affirm/sample_data/README.md @@ -0,0 +1,43 @@ +# Sample Data for Chronon Bootcamp + +This directory contains pre-generated Parquet files for the Chronon GroupBy bootcamp. + +## Files + +- **`purchases.parquet`** - Purchase event data (~155 records) +- **`users.parquet`** - User demographic data (100 records) + +## Data Overview + +### Purchases Table +- **Records**: ~155 purchase events +- **Date Range**: December 1-7, 2023 +- **Columns**: + - `user_id`: User identifier (user_1 to user_100) + - `purchase_price`: Purchase amount ($2.58 - $89.69) + - `item_category`: Product category (electronics, books, clothing, food, home) + - `ts`: Purchase timestamp + +### Users Table +- **Records**: 100 users +- **Columns**: + - `user_id`: User identifier (user_1 to user_100) + - `age`: User age (18-65) + - `city`: User city (New York, San Francisco, Chicago, Miami) + - `signup_date`: User signup date + +## Usage + +The setup script automatically loads these files into Spark tables: +- `data.purchases` - Purchase events +- `data.users` - User demographics + +## Regenerating Data + +To regenerate the sample data with different parameters: + +```bash +python3 scripts/generate_sample_parquet.py +``` + +This will create new Parquet files with the same structure but different random data. diff --git a/affirm/sample_data/purchases.parquet b/affirm/sample_data/purchases.parquet new file mode 100644 index 0000000000..f891870ba2 Binary files /dev/null and b/affirm/sample_data/purchases.parquet differ diff --git a/affirm/sample_data/users.parquet b/affirm/sample_data/users.parquet new file mode 100644 index 0000000000..061d7ea1f9 Binary files /dev/null and b/affirm/sample_data/users.parquet differ diff --git a/affirm/setup-chronon-bootcamp.sh b/affirm/setup-chronon-bootcamp.sh new file mode 100755 index 0000000000..d6489bb12f --- /dev/null +++ b/affirm/setup-chronon-bootcamp.sh @@ -0,0 +1,394 @@ +#!/bin/bash + +# Chronon Bootcamp Setup Script +# Minimal setup for learning Chronon GroupBy features +# Includes: Spark + Iceberg + MongoDB + MinIO + Jupyter + +set -e # Exit on any error + +# Colors for output +RED='\033[0;31m' +GREEN='\033[0;32m' +YELLOW='\033[1;33m' +BLUE='\033[0;34m' +NC='\033[0m' # No Color + +# Configuration +SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" +COMPOSE_FILE="docker-compose-bootcamp.yml" +MINIO_ALIAS="local" +MINIO_ENDPOINT="http://localhost:9000" +MINIO_ACCESS_KEY="minioadmin" +MINIO_SECRET_KEY="minioadmin" + +echo "🎓 Starting Chronon Bootcamp (minimal setup)" + +# Function to print colored output +print_status() { + echo -e "${BLUE}[INFO]${NC} $1" +} + +print_success() { + echo -e "${GREEN}[SUCCESS]${NC} $1" +} + +print_warning() { + echo -e "${YELLOW}[WARNING]${NC} $1" +} + +print_error() { + echo -e "${RED}[ERROR]${NC} $1" +} + +# Function to check if command exists +command_exists() { + command -v "$1" >/dev/null 2>&1 +} + +# Function to check prerequisites +check_prerequisites() { + print_status "Checking prerequisites..." + + # Check Docker + if ! command_exists docker; then + print_error "Docker is not installed. Please install Docker first." + exit 1 + fi + + # Check Docker Compose + if ! command_exists docker-compose; then + print_error "Docker Compose is not installed. Please install Docker Compose first." + exit 1 + fi + + # Check if Docker daemon is running + if ! docker info >/dev/null 2>&1; then + print_error "Docker daemon is not running. Please start Docker first." + exit 1 + fi + + # Check for port conflicts + local ports=(9000 9001 27017 8080 7077 8888) + for port in "${ports[@]}"; do + if lsof -i :$port >/dev/null 2>&1; then + print_warning "Port $port is already in use. This may cause conflicts." + fi + done + + print_success "Prerequisites check completed" +} + +# Function to wait for service to be ready +wait_for_service() { + local service_name="$1" + local test_command="$2" + local max_attempts=30 + local attempt=1 + + print_status "Waiting for $service_name to be ready..." + + while [ $attempt -le $max_attempts ]; do + if eval "$test_command" >/dev/null 2>&1; then + print_success "$service_name is ready!" + return 0 + fi + + print_status "Attempt $attempt/$max_attempts - $service_name not ready yet, waiting..." + sleep 5 + ((attempt++)) + done + + print_error "$service_name failed to start within expected time" + return 1 +} + +# Function to cleanup existing setup +cleanup_existing_setup() { + print_status "Cleaning up existing setup..." + + # Stop and remove containers + if [ -f "$COMPOSE_FILE" ]; then + docker-compose -f "$COMPOSE_FILE" down --volumes --remove-orphans 2>/dev/null || true + fi + + # Remove any orphaned containers + docker container prune -f >/dev/null 2>&1 || true + + # Clean up networks + docker network prune -f >/dev/null 2>&1 || true + + print_success "Cleanup completed" +} + +# Function to start core services +start_core_services() { + print_status "Starting core services..." + + # Bootcamp: Start only essential services + docker-compose -f "$COMPOSE_FILE" up -d minio mongodb + + # Wait for services to be ready + wait_for_service "MinIO" "curl -f http://localhost:9000/minio/health/live" + wait_for_service "MongoDB" "docker-compose -f $COMPOSE_FILE exec mongodb mongosh --eval 'db.adminCommand(\"ping\")'" + + print_success "Core services started" +} + +# Function to configure MinIO +configure_minio() { + print_status "Configuring MinIO..." + + # Install MinIO client if not present + if ! command_exists mc; then + print_status "Installing MinIO client..." + if [[ "$OSTYPE" == "darwin"* ]]; then + # macOS + if command_exists brew; then + brew install minio/stable/mc + else + print_warning "Homebrew not found. Please install MinIO client manually: https://docs.min.io/docs/minio-client-quickstart-guide.html" + return 1 + fi + else + # Linux + curl -O https://dl.min.io/client/mc/release/linux-amd64/mc + chmod +x mc + sudo mv mc /usr/local/bin/ + fi + fi + + # Configure MinIO client + if ! mc alias list | grep -q "$MINIO_ALIAS"; then + mc alias set "$MINIO_ALIAS" "$MINIO_ENDPOINT" "$MINIO_ACCESS_KEY" "$MINIO_SECRET_KEY" + print_success "MinIO client configured" + else + print_status "MinIO client already configured" + fi + + # Create buckets + mc mb "$MINIO_ALIAS/chronon" --ignore-existing + mc mb "$MINIO_ALIAS/chronon/warehouse" --ignore-existing + mc mb "$MINIO_ALIAS/chronon/warehouse/data" --ignore-existing + mc mb "$MINIO_ALIAS/chronon/warehouse/data/purchases" --ignore-existing + mc mb "$MINIO_ALIAS/chronon/warehouse/data/users" --ignore-existing + + print_success "MinIO buckets created" +} + +# Function to copy sample data to MinIO +copy_sample_data_to_minio() { + print_status "Copying sample data to MinIO..." + + # Check if sample data files exist + if [ ! -f "sample_data/purchases.parquet" ] || [ ! -f "sample_data/users.parquet" ]; then + print_warning "Sample data files not found. Generating them now..." + if command_exists python3; then + python3 scripts/generate_sample_parquet.py + else + print_error "Python3 not found. Please generate sample data manually." + return 1 + fi + fi + + # Copy Parquet files directly to MinIO + print_status "Copying Parquet files to MinIO..." + + # Copy purchases data + if mc cp sample_data/purchases.parquet "$MINIO_ALIAS/chronon/warehouse/data/purchases/" >/dev/null 2>&1; then + print_success "Copied purchases.parquet to MinIO" + else + print_warning "Failed to copy purchases.parquet to MinIO" + fi + + # Copy users data + if mc cp sample_data/users.parquet "$MINIO_ALIAS/chronon/warehouse/data/users/" >/dev/null 2>&1; then + print_success "Copied users.parquet to MinIO" + else + print_warning "Failed to copy users.parquet to MinIO" + fi + + # Verify files are in MinIO + print_status "Verifying files in MinIO..." + if mc ls "$MINIO_ALIAS/chronon/warehouse/data/purchases/" | grep -q "purchases.parquet" && \ + mc ls "$MINIO_ALIAS/chronon/warehouse/data/users/" | grep -q "users.parquet"; then + print_success "Sample data copied to MinIO successfully!" + print_status "📊 Files available at: s3a://chronon/warehouse/data/" + print_status "📁 purchases.parquet (~155 records)" + print_status "📁 users.parquet (100 records)" + print_status "📅 Data spans 7 days (Dec 1-7, 2023)" + print_status "🏷️ Categories: electronics, books, clothing, food, home" + else + print_warning "Some files may not have been copied successfully" + print_status "You can access the data directly from sample_data/ directory" + fi +} + +# Function to create Spark tables from parquet files +create_spark_tables() { + print_status "Creating Spark tables from parquet data..." + + # Download S3A JARs if not present + if [ ! -f "/tmp/hadoop-aws-3.2.4.jar" ]; then + print_status "Downloading Hadoop AWS JARs..." + curl -s -o /tmp/hadoop-aws-3.2.4.jar https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-aws/3.2.4/hadoop-aws-3.2.4.jar + curl -s -o /tmp/aws-java-sdk-bundle-1.11.1026.jar https://repo1.maven.org/maven2/com/amazonaws/aws-java-sdk-bundle/1.11.1026/aws-java-sdk-bundle-1.11.1026.jar + fi + + # Copy JARs to container + CHRONON_CONTAINER=$(docker-compose -f "$COMPOSE_FILE" ps -q chronon-main) + docker cp /tmp/hadoop-aws-3.2.4.jar ${CHRONON_CONTAINER}:/tmp/ >/dev/null 2>&1 + docker cp /tmp/aws-java-sdk-bundle-1.11.1026.jar ${CHRONON_CONTAINER}:/tmp/ >/dev/null 2>&1 + + # Create partitioned purchases table + print_status "Creating partitioned 'purchases' table..." + docker-compose -f "$COMPOSE_FILE" exec -T chronon-main spark-sql \ + --jars /tmp/hadoop-aws-3.2.4.jar,/tmp/aws-java-sdk-bundle-1.11.1026.jar \ + --conf spark.hadoop.fs.s3a.endpoint=http://minio:9000 \ + --conf spark.hadoop.fs.s3a.access.key=minioadmin \ + --conf spark.hadoop.fs.s3a.secret.key=minioadmin \ + --conf spark.hadoop.fs.s3a.path.style.access=true \ + --conf spark.hadoop.fs.s3a.impl=org.apache.hadoop.fs.s3a.S3AFileSystem \ + --conf spark.hadoop.fs.s3a.aws.credentials.provider=org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider \ + -e "CREATE TABLE IF NOT EXISTS purchases (user_id STRING, purchase_price DOUBLE, item_category STRING, ts BIGINT) USING PARQUET PARTITIONED BY (ds STRING); INSERT INTO purchases SELECT user_id, purchase_price, item_category, CAST(UNIX_TIMESTAMP(ts) * 1000 AS BIGINT) as ts, DATE_FORMAT(ts, 'yyyy-MM-dd') as ds FROM parquet.\`s3a://chronon/warehouse/data/purchases/purchases.parquet\`" >/dev/null 2>&1 + + if [ $? -eq 0 ]; then + print_success "Created partitioned 'purchases' table with 7 partitions (2023-12-01 to 2023-12-07)" + else + print_warning "Failed to create purchases table - you may need to create it manually" + fi +} + +# Function to start processing services +start_processing_services() { + print_status "Starting processing services..." + + # Start Spark (required for GroupBy processing) + docker-compose -f "$COMPOSE_FILE" up -d spark-master spark-worker + + # Wait for Spark to be ready + wait_for_service "Spark Master" "curl -f http://localhost:8080" + + print_success "Processing services started" +} + +# Function to start development tools +start_development_tools() { + print_status "Starting development tools..." + + # Start Chronon main container and Jupyter + docker-compose -f "$COMPOSE_FILE" up -d chronon-main jupyter + + # Give services time to start + sleep 10 + + print_success "Development tools started" +} + +# Function to verify services +verify_services() { + print_status "Verifying all services..." + + local services=( + "MinIO:http://localhost:9001" + "Spark:http://localhost:8080" + "Jupyter:http://localhost:8888" + ) + + for service in "${services[@]}"; do + local name="${service%%:*}" + local url="${service##*:}" + + if curl -f "$url" >/dev/null 2>&1; then + print_success "$name is accessible at $url" + else + print_warning "$name may not be fully ready at $url" + fi + done + + # Test MongoDB + if docker-compose -f "$COMPOSE_FILE" exec mongodb mongosh --eval "db.adminCommand('ping')" >/dev/null 2>&1; then + print_success "MongoDB is accessible" + else + print_warning "MongoDB may not be fully ready" + fi + + # Test Iceberg functionality with Hive catalog + print_status "Testing Iceberg functionality..." + # Give Spark more time to initialize + sleep 15 + + if docker-compose -f "$COMPOSE_FILE" exec chronon-main spark-sql -e " + CREATE TABLE IF NOT EXISTS test_iceberg (id INT, name STRING) USING ICEBERG; + INSERT INTO test_iceberg VALUES (1, 'test'); + SELECT * FROM test_iceberg; + DROP TABLE test_iceberg; + " >/dev/null 2>&1; then + print_success "Iceberg functionality verified" + else + print_warning "Iceberg test failed - Spark may still be initializing" + print_status "You can test Iceberg manually later with: docker-compose -f $COMPOSE_FILE exec chronon-main spark-sql" + fi + + print_success "Service verification completed" +} + +# Function to display access information +display_access_info() { + print_success "Chronon Bootcamp Setup Completed!" + echo + echo "=== Access Information ===" + echo "MinIO Console: http://localhost:9001 (minioadmin/minioadmin)" + echo "Spark Master: http://localhost:8080" + echo "Jupyter Notebooks: http://localhost:8888 (token: chronon-dev) - ESSENTIAL for GroupBy verification" + echo "MongoDB: localhost:27017" + echo + echo "=== Next Steps ===" + echo "🎓 Follow the bootcamp: CHRONON_BOOTCAMP.md" + echo "1. Access Jupyter at http://localhost:8888 for data exploration" + echo "2. Use Spark at http://localhost:8080 for batch processing" + echo "3. Monitor MinIO at http://localhost:9001 for storage" + echo + echo "=== Sample Data Ready ===" + echo "✅ Sample data has been copied to MinIO and is ready for GroupBy development!" + echo "📊 Parquet files available at: s3a://chronon/warehouse/data/" + echo "📁 purchases.parquet (~155 records), users.parquet (100 records)" + echo "📅 Data spans 7 days (Dec 1-7, 2023) with realistic purchase patterns" + echo "🏷️ Categories: electronics, books, clothing, food, home" + echo + echo "=== Start Learning ===" + echo "🎓 Follow the bootcamp: CHRONON_BOOTCAMP.md" + echo "🔍 Optional: Explore data in Jupyter at http://localhost:8888" + echo + echo "=== Useful Commands ===" + echo "Stop all services: docker-compose -f $COMPOSE_FILE down" + echo "View logs: docker-compose -f $COMPOSE_FILE logs [service-name]" + echo "Restart service: docker-compose -f $COMPOSE_FILE restart [service-name]" + echo + echo "=== Troubleshooting ===" + echo "If services don't start properly:" + echo "1. Check Docker is running: docker info" + echo "2. Check port conflicts: lsof -i :9000 -i :8080 -i :8888 -i :27017" + echo "3. View service logs: docker-compose -f $COMPOSE_FILE logs [service-name]" + echo "4. Restart specific service: docker-compose -f $COMPOSE_FILE restart [service-name]" +} + +# Main execution +main() { + print_status "Starting Chronon Bootcamp setup..." + + check_prerequisites + cleanup_existing_setup + start_core_services + configure_minio + copy_sample_data_to_minio + create_spark_tables + start_processing_services + start_development_tools + verify_services + display_access_info + + print_success "Chronon Bootcamp setup completed successfully!" +} + +# Run main function +main "$@" diff --git a/affirm/setup-chronon-developer.sh b/affirm/setup-chronon-developer.sh new file mode 100755 index 0000000000..2f3e883298 --- /dev/null +++ b/affirm/setup-chronon-developer.sh @@ -0,0 +1,442 @@ +#!/bin/bash + +# Chronon Developer Setup Script +# Full production-like setup for advanced development +# Includes: Spark + Iceberg + MongoDB + MinIO + Airflow + DynamoDB Local + +set -e # Exit on any error + +# Colors for output +RED='\033[0;31m' +GREEN='\033[0;32m' +YELLOW='\033[1;33m' +BLUE='\033[0;34m' +NC='\033[0m' # No Color + +# Configuration +SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" +COMPOSE_FILE="docker-compose-developer.yml" +MINIO_ALIAS="local" +MINIO_ENDPOINT="http://localhost:9000" +MINIO_ACCESS_KEY="minioadmin" +MINIO_SECRET_KEY="minioadmin" + +echo "👨‍💻 Starting Chronon Developer (full setup)" + +# Function to print colored output +print_status() { + echo -e "${BLUE}[INFO]${NC} $1" +} + +print_success() { + echo -e "${GREEN}[SUCCESS]${NC} $1" +} + +print_warning() { + echo -e "${YELLOW}[WARNING]${NC} $1" +} + +print_error() { + echo -e "${RED}[ERROR]${NC} $1" +} + +# Function to check if command exists +command_exists() { + command -v "$1" >/dev/null 2>&1 +} + +# Function to check prerequisites +check_prerequisites() { + print_status "Checking prerequisites..." + + # Check Docker + if ! command_exists docker; then + print_error "Docker is not installed. Please install Docker first." + exit 1 + fi + + # Check Docker Compose + if ! command_exists docker-compose; then + print_error "Docker Compose is not installed. Please install Docker Compose first." + exit 1 + fi + + # Check if Docker daemon is running + if ! docker info >/dev/null 2>&1; then + print_error "Docker daemon is not running. Please start Docker first." + exit 1 + fi + + # Check for port conflicts + local ports=(9000 9001 27017 8080 7077 8888 8085 8000) + for port in "${ports[@]}"; do + if lsof -i :$port >/dev/null 2>&1; then + print_warning "Port $port is already in use. This may cause conflicts." + fi + done + + print_success "Prerequisites check completed" +} + +# Function to wait for service to be ready +wait_for_service() { + local service_name="$1" + local test_command="$2" + local max_attempts=30 + local attempt=1 + + print_status "Waiting for $service_name to be ready..." + + while [ $attempt -le $max_attempts ]; do + if eval "$test_command" >/dev/null 2>&1; then + print_success "$service_name is ready!" + return 0 + fi + + print_status "Attempt $attempt/$max_attempts - $service_name not ready yet, waiting..." + sleep 5 + ((attempt++)) + done + + print_error "$service_name failed to start within expected time" + return 1 +} + +# Function to cleanup existing setup +cleanup_existing_setup() { + print_status "Cleaning up existing setup..." + + # Stop and remove containers + if [ -f "$COMPOSE_FILE" ]; then + docker-compose -f "$COMPOSE_FILE" down --volumes --remove-orphans 2>/dev/null || true + fi + + # Remove any orphaned containers + docker container prune -f >/dev/null 2>&1 || true + + # Clean up networks + docker network prune -f >/dev/null 2>&1 || true + + print_success "Cleanup completed" +} + +# Function to start core services +start_core_services() { + print_status "Starting core services..." + + # Developer: Start all services including Airflow and DynamoDB + docker-compose -f "$COMPOSE_FILE" up -d minio mongodb airflow-db dynamodb-local + + # Wait for services to be ready + wait_for_service "MinIO" "curl -f http://localhost:9000/minio/health/live" + wait_for_service "MongoDB" "docker-compose -f $COMPOSE_FILE exec mongodb mongosh --eval 'db.adminCommand(\"ping\")'" + wait_for_service "Airflow DB" "docker-compose -f $COMPOSE_FILE exec airflow-db pg_isready -U airflow" + + # DynamoDB Local doesn't have health check, just wait a bit for it to start + print_status "Waiting for DynamoDB Local to start..." + sleep 10 + + print_success "Core services started" +} + +# Function to configure MinIO +configure_minio() { + print_status "Configuring MinIO..." + + # Install MinIO client if not present + if ! command_exists mc; then + print_status "Installing MinIO client..." + if [[ "$OSTYPE" == "darwin"* ]]; then + # macOS + if command_exists brew; then + brew install minio/stable/mc + else + print_warning "Homebrew not found. Please install MinIO client manually: https://docs.min.io/docs/minio-client-quickstart-guide.html" + return 1 + fi + else + # Linux + curl -O https://dl.min.io/client/mc/release/linux-amd64/mc + chmod +x mc + sudo mv mc /usr/local/bin/ + fi + fi + + # Configure MinIO client + if ! mc alias list | grep -q "$MINIO_ALIAS"; then + mc alias set "$MINIO_ALIAS" "$MINIO_ENDPOINT" "$MINIO_ACCESS_KEY" "$MINIO_SECRET_KEY" + print_success "MinIO client configured" + else + print_status "MinIO client already configured" + fi + + # Create buckets + mc mb "$MINIO_ALIAS/chronon" --ignore-existing + mc mb "$MINIO_ALIAS/chronon/warehouse" --ignore-existing + mc mb "$MINIO_ALIAS/chronon/warehouse/data" --ignore-existing + mc mb "$MINIO_ALIAS/chronon/warehouse/data/purchases" --ignore-existing + mc mb "$MINIO_ALIAS/chronon/warehouse/data/users" --ignore-existing + + print_success "MinIO buckets created" +} + +# Function to copy sample data to MinIO +copy_sample_data_to_minio() { + print_status "Copying sample data to MinIO..." + + # Check if sample data files exist + if [ ! -f "sample_data/purchases.parquet" ] || [ ! -f "sample_data/users.parquet" ]; then + print_warning "Sample data files not found. Generating them now..." + if command_exists python3; then + python3 scripts/generate_sample_parquet.py + else + print_error "Python3 not found. Please generate sample data manually." + return 1 + fi + fi + + # Copy Parquet files directly to MinIO + print_status "Copying Parquet files to MinIO..." + + # Copy purchases data + if mc cp sample_data/purchases.parquet "$MINIO_ALIAS/chronon/warehouse/data/purchases/" >/dev/null 2>&1; then + print_success "Copied purchases.parquet to MinIO" + else + print_warning "Failed to copy purchases.parquet to MinIO" + fi + + # Copy users data + if mc cp sample_data/users.parquet "$MINIO_ALIAS/chronon/warehouse/data/users/" >/dev/null 2>&1; then + print_success "Copied users.parquet to MinIO" + else + print_warning "Failed to copy users.parquet to MinIO" + fi + + # Verify files are in MinIO + print_status "Verifying files in MinIO..." + if mc ls "$MINIO_ALIAS/chronon/warehouse/data/purchases/" | grep -q "purchases.parquet" && \ + mc ls "$MINIO_ALIAS/chronon/warehouse/data/users/" | grep -q "users.parquet"; then + print_success "Sample data copied to MinIO successfully!" + print_status "📊 Files available at: s3a://chronon/warehouse/data/" + print_status "📁 purchases.parquet (~155 records)" + print_status "📁 users.parquet (100 records)" + print_status "📅 Data spans 7 days (Dec 1-7, 2023)" + print_status "🏷️ Categories: electronics, books, clothing, food, home" + else + print_warning "Some files may not have been copied successfully" + print_status "You can access the data directly from sample_data/ directory" + fi +} + +# Function to start processing services +start_processing_services() { + print_status "Starting processing services..." + + # Start Spark (required for GroupBy processing) + docker-compose -f "$COMPOSE_FILE" up -d spark-master spark-worker + + # Wait for Spark to be ready + wait_for_service "Spark Master" "curl -f http://localhost:8080" + + # Developer: Start Airflow (for production-like orchestration) + print_status "Starting Airflow services (production-like orchestration)..." + docker-compose -f "$COMPOSE_FILE" up -d airflow-scheduler airflow-webserver + + # Give Airflow time to initialize (don't wait for it to be ready) + print_status "Airflow services starting in background (may take time to initialize)..." + + print_success "Processing services started" +} + +# Function to start development tools +start_development_tools() { + print_status "Starting development tools..." + + # Start Chronon main container and Jupyter + docker-compose -f "$COMPOSE_FILE" up -d chronon-main jupyter + + # Give services time to start + sleep 10 + + print_success "Development tools started" +} + +# Function to setup Airflow (production-like orchestration) +setup_airflow() { + print_status "Setting up Airflow for production-like Spark job submission..." + + # Create Airflow directories + mkdir -p airflow/dags airflow/logs airflow/plugins + + # Wait for Airflow services to be ready + print_status "Waiting for Airflow services to initialize..." + sleep 30 + + # Initialize Airflow database + print_status "Initializing Airflow database..." + if docker-compose -f "$COMPOSE_FILE" exec airflow-webserver airflow db init 2>/dev/null; then + print_success "Airflow database initialized successfully" + + # Create admin user + print_status "Creating Airflow admin user..." + if docker-compose -f "$COMPOSE_FILE" exec airflow-webserver airflow users create \ + --username admin \ + --firstname Admin \ + --lastname User \ + --role Admin \ + --email admin@example.com \ + --password admin 2>/dev/null; then + print_success "Airflow admin user created (admin/admin)" + else + print_warning "Airflow admin user creation failed - you can create it manually later" + fi + + # Test Airflow connectivity + if docker-compose -f "$COMPOSE_FILE" exec airflow-webserver airflow dags list >/dev/null 2>&1; then + print_success "Airflow is ready for Spark job submission!" + else + print_warning "Airflow may still be initializing - check http://localhost:8085" + fi + + else + print_warning "Airflow database initialization failed" + print_status "This may be due to PostgreSQL compatibility issues" + print_status "You can still use Chronon GroupBy features with direct run.py commands" + print_status "To troubleshoot Airflow: docker-compose -f $COMPOSE_FILE logs airflow-webserver" + fi + + print_success "Airflow setup completed" +} + +# Function to verify services +verify_services() { + print_status "Verifying all services..." + + local services=( + "MinIO:http://localhost:9001" + "Spark:http://localhost:8080" + "Jupyter:http://localhost:8888" + ) + + for service in "${services[@]}"; do + local name="${service%%:*}" + local url="${service##*:}" + + if curl -f "$url" >/dev/null 2>&1; then + print_success "$name is accessible at $url" + else + print_warning "$name may not be fully ready at $url" + fi + done + + # Test MongoDB + if docker-compose -f "$COMPOSE_FILE" exec mongodb mongosh --eval "db.adminCommand('ping')" >/dev/null 2>&1; then + print_success "MongoDB is accessible" + else + print_warning "MongoDB may not be fully ready" + fi + + # Test Airflow (separate check since it can take time to initialize) + if docker-compose -f "$COMPOSE_FILE" ps airflow-webserver | grep -q "Up"; then + print_success "Airflow webserver container is running" + # Try to access the web interface + if curl -f http://localhost:8085 >/dev/null 2>&1; then + print_success "Airflow web interface is accessible at http://localhost:8085" + else + print_warning "Airflow web interface may still be initializing" + fi + else + print_warning "Airflow webserver may not be fully ready" + fi + + # Test DynamoDB Local (separate check since it doesn't have curl) + if docker-compose -f "$COMPOSE_FILE" ps dynamodb-local | grep -q "Up"; then + print_success "DynamoDB Local container is running" + else + print_warning "DynamoDB Local may not be fully ready" + fi + + # Test Iceberg functionality with Hive catalog + print_status "Testing Iceberg functionality..." + # Give Spark more time to initialize + sleep 15 + + if docker-compose -f "$COMPOSE_FILE" exec chronon-main spark-sql -e " + CREATE TABLE IF NOT EXISTS test_iceberg (id INT, name STRING) USING ICEBERG; + INSERT INTO test_iceberg VALUES (1, 'test'); + SELECT * FROM test_iceberg; + DROP TABLE test_iceberg; + " >/dev/null 2>&1; then + print_success "Iceberg functionality verified" + else + print_warning "Iceberg test failed - Spark may still be initializing" + print_status "You can test Iceberg manually later with: docker-compose -f $COMPOSE_FILE exec chronon-main spark-sql" + fi + + print_success "Service verification completed" +} + +# Function to display access information +display_access_info() { + print_success "Chronon Developer Setup Completed!" + echo + echo "=== Access Information ===" + echo "MinIO Console: http://localhost:9001 (minioadmin/minioadmin)" + echo "Spark Master: http://localhost:8080" + echo "Jupyter Notebooks: http://localhost:8888 (token: chronon-dev) - ESSENTIAL for GroupBy verification" + echo "MongoDB: localhost:27017" + echo "DynamoDB Local: http://localhost:8000" + echo "Airflow Dashboard: http://localhost:8085 (admin/admin) - PRODUCTION-LIKE Spark job submission" + echo + echo "=== Next Steps ===" + echo "👨‍💻 Follow the developer guide: CHRONON_DEVELOPER_GUIDE.md" + echo "1. Access Jupyter at http://localhost:8888 for GroupBy development and verification" + echo "2. Use Spark at http://localhost:8080 for batch processing" + echo "3. Monitor MinIO at http://localhost:9001 for storage" + echo "4. Check Airflow dashboard at http://localhost:8085 for production-like Spark job submission" + echo + echo "=== Sample Data Ready ===" + echo "✅ Sample data has been copied to MinIO and is ready for GroupBy development!" + echo "📊 Parquet files available at: s3a://chronon/warehouse/data/" + echo "📁 purchases.parquet (~155 records), users.parquet (100 records)" + echo "📅 Data spans 7 days (Dec 1-7, 2023) with realistic purchase patterns" + echo "🏷️ Categories: electronics, books, clothing, food, home" + echo + echo "=== Start Development ===" + echo "👨‍💻 Follow the developer guide: CHRONON_DEVELOPER_GUIDE.md" + echo "🔍 Optional: Explore data in Jupyter at http://localhost:8888" + echo "🚀 Production-like: Use Airflow DAGs with compiled JSON configs" + echo + echo "=== Useful Commands ===" + echo "Stop all services: docker-compose -f $COMPOSE_FILE down" + echo "View logs: docker-compose -f $COMPOSE_FILE logs [service-name]" + echo "Restart service: docker-compose -f $COMPOSE_FILE restart [service-name]" + echo + echo "=== Troubleshooting ===" + echo "If services don't start properly:" + echo "1. Check Docker is running: docker info" + echo "2. Check port conflicts: lsof -i :9000 -i :8080 -i :8888 -i :27017 -i :8085 -i :8000" + echo "3. View service logs: docker-compose -f $COMPOSE_FILE logs [service-name]" + echo "4. Restart specific service: docker-compose -f $COMPOSE_FILE restart [service-name]" +} + +# Main execution +main() { + print_status "Starting Chronon Developer setup..." + + check_prerequisites + cleanup_existing_setup + start_core_services + configure_minio + copy_sample_data_to_minio + start_processing_services + start_development_tools + setup_airflow + verify_services + display_access_info + + print_success "Chronon Developer setup completed successfully!" +} + +# Run main function +main "$@" diff --git a/api/py/test/sample/production/group_bys/bootcamp/user_purchase_features.v1 b/api/py/test/sample/production/group_bys/bootcamp/user_purchase_features.v1 new file mode 100644 index 0000000000..b6be210c2b --- /dev/null +++ b/api/py/test/sample/production/group_bys/bootcamp/user_purchase_features.v1 @@ -0,0 +1,81 @@ +{ + "metaData": { + "name": "bootcamp.user_purchase_features.v1", + "online": 1, + "customJson": "{\"lag\": 0, \"groupby_tags\": null, \"column_tags\": {}}", + "dependencies": [ + "{\"name\": \"wait_for_purchases_ds\", \"spec\": \"purchases/ds={{ ds }}\", \"start\": null, \"end\": null}" + ], + "tableProperties": {}, + "outputNamespace": "default", + "team": "bootcamp", + "offlineSchedule": "@daily" + }, + "sources": [ + { + "events": { + "table": "purchases", + "query": { + "selects": { + "user_id": "user_id", + "purchase_price": "purchase_price", + "item_category": "item_category" + }, + "timeColumn": "ts", + "setups": [] + } + } + } + ], + "keyColumns": [ + "user_id" + ], + "aggregations": [ + { + "inputColumn": "purchase_price", + "operation": 7, + "argMap": {}, + "windows": [ + { + "length": 1, + "timeUnit": 1 + }, + { + "length": 7, + "timeUnit": 1 + } + ] + }, + { + "inputColumn": "purchase_price", + "operation": 6, + "argMap": {}, + "windows": [ + { + "length": 1, + "timeUnit": 1 + }, + { + "length": 7, + "timeUnit": 1 + } + ] + }, + { + "inputColumn": "purchase_price", + "operation": 8, + "argMap": {}, + "windows": [ + { + "length": 1, + "timeUnit": 1 + }, + { + "length": 7, + "timeUnit": 1 + } + ] + } + ], + "backfillStartDate": "2023-12-01" +} \ No newline at end of file diff --git a/api/py/test/sample/teams.json b/api/py/test/sample/teams.json index 39f7a25559..2de73421a7 100644 --- a/api/py/test/sample/teams.json +++ b/api/py/test/sample/teams.json @@ -1,69 +1,39 @@ { "default": { - "table_properties": { - "source": "chronon" - }, - "common_env": { - "VERSION": "latest", - "SPARK_SUBMIT_PATH": "[TODO]/path/to/spark-submit", - "JOB_MODE": "local[*]", - "HADOOP_DIR": "[STREAMING-TODO]/path/to/folder/containing", - "CHRONON_ONLINE_CLASS": "[ONLINE-TODO]your.online.class", - "CHRONON_ONLINE_ARGS": "[ONLINE-TODO]args prefixed with -Z become constructor map for your implementation of ai.chronon.online.Api, -Zkv-host= -Zkv-port=", - "PARTITION_COLUMN": "ds", - "PARTITION_FORMAT": "yyyy-MM-dd" - }, - "production": { - "backfill" : { - "EXECUTOR_CORES": "1", - "DRIVER_MEMORY": "15G", - "EXECUTOR_MEMORY": "8G", - "PARALLELISM": "4000", - "MAX_EXECUTORS": "1000" - }, - "upload" : { - "EXECUTOR_CORES": "1", - "EXECUTOR_MEMORY": "8G", - "PARALLELISM": "1000", - "MAX_EXECUTORS": "1000" + "description": "Default team configuration", + "namespace": "default", + "user": "default_user", + "table_properties": {}, + "production": { + "backfill": { + "EXECUTOR_CORES": "2", + "DRIVER_MEMORY": "1G", + "EXECUTOR_MEMORY": "2G", + "PARALLELISM": "4" }, - "streaming" : { + "upload": { "EXECUTOR_CORES": "2", - "EXECUTOR_MEMORY": "4G", - "PARALLELISM": "16" + "EXECUTOR_MEMORY": "2G", + "PARALLELISM": "4" } } }, - "sample_team": { - "description": "Team description", - "namespace": "chronon_db", - "user": "# TODO: ldap user name to run the jobs as, from airflow or your own scheduler", + "bootcamp": { + "description": "Bootcamp team for learning Chronon", + "namespace": "default", + "user": "bootcamp_user", "production": { - "backfill" : { - "EXECUTOR_CORES": "4" - } - }, - "dev": { - "backfill" : { + "backfill": { + "EXECUTOR_CORES": "2", + "DRIVER_MEMORY": "1G", + "EXECUTOR_MEMORY": "2G", + "PARALLELISM": "4" + }, + "upload": { "EXECUTOR_CORES": "2", - "DRIVER_MEMORY": "30G" + "EXECUTOR_MEMORY": "2G", + "PARALLELISM": "4" } } - }, - "kaggle": { - "description": "Workspace for kaggle competitions", - "namespace": "default" - }, - "quickstart": { - "description": "Used for the quickstart example", - "namespace": "default" - }, - "unit_test": { - "description": "Used for the unit test cases", - "namespace": "default" - }, - "cs_ds": { - "description": "Used for unit testing purposes", - "namespace": "default" } -} +} \ No newline at end of file diff --git a/group_bys/bootcamp/user_purchase_features.py b/group_bys/bootcamp/user_purchase_features.py new file mode 100644 index 0000000000..999c565da0 --- /dev/null +++ b/group_bys/bootcamp/user_purchase_features.py @@ -0,0 +1,45 @@ +from ai.chronon.api.ttypes import Source, EventSource +from ai.chronon.query import Query, select +from ai.chronon.group_by import GroupBy, Aggregation, Operation, Window, TimeUnit + +# Define the source using sample data +source = Source( + events=EventSource( + table="purchases", # Sample purchase data + query=Query( + selects=select("user_id", "purchase_price", "item_category"), + time_column="ts" + ) + ) +) + +# Define time windows +window_sizes = [ + Window(length=1, timeUnit=TimeUnit.DAYS), # 1 day + Window(length=7, timeUnit=TimeUnit.DAYS), # 7 days +] + +# Create the GroupBy configuration +v1 = GroupBy( + sources=[source], + keys=["user_id"], + aggregations=[ + Aggregation( + input_column="purchase_price", + operation=Operation.SUM, + windows=window_sizes + ), + Aggregation( + input_column="purchase_price", + operation=Operation.COUNT, + windows=window_sizes + ), + Aggregation( + input_column="purchase_price", + operation=Operation.AVERAGE, + windows=window_sizes + ), + ], + online=True, + backfill_start_date="2023-12-01", # Start date for backfill +) \ No newline at end of file diff --git a/production/group_bys/bootcamp/user_purchase_features.v1 b/production/group_bys/bootcamp/user_purchase_features.v1 new file mode 100644 index 0000000000..b6be210c2b --- /dev/null +++ b/production/group_bys/bootcamp/user_purchase_features.v1 @@ -0,0 +1,81 @@ +{ + "metaData": { + "name": "bootcamp.user_purchase_features.v1", + "online": 1, + "customJson": "{\"lag\": 0, \"groupby_tags\": null, \"column_tags\": {}}", + "dependencies": [ + "{\"name\": \"wait_for_purchases_ds\", \"spec\": \"purchases/ds={{ ds }}\", \"start\": null, \"end\": null}" + ], + "tableProperties": {}, + "outputNamespace": "default", + "team": "bootcamp", + "offlineSchedule": "@daily" + }, + "sources": [ + { + "events": { + "table": "purchases", + "query": { + "selects": { + "user_id": "user_id", + "purchase_price": "purchase_price", + "item_category": "item_category" + }, + "timeColumn": "ts", + "setups": [] + } + } + } + ], + "keyColumns": [ + "user_id" + ], + "aggregations": [ + { + "inputColumn": "purchase_price", + "operation": 7, + "argMap": {}, + "windows": [ + { + "length": 1, + "timeUnit": 1 + }, + { + "length": 7, + "timeUnit": 1 + } + ] + }, + { + "inputColumn": "purchase_price", + "operation": 6, + "argMap": {}, + "windows": [ + { + "length": 1, + "timeUnit": 1 + }, + { + "length": 7, + "timeUnit": 1 + } + ] + }, + { + "inputColumn": "purchase_price", + "operation": 8, + "argMap": {}, + "windows": [ + { + "length": 1, + "timeUnit": 1 + }, + { + "length": 7, + "timeUnit": 1 + } + ] + } + ], + "backfillStartDate": "2023-12-01" +} \ No newline at end of file diff --git a/scripts/spark_submit.sh b/scripts/spark_submit.sh new file mode 100755 index 0000000000..bf26d923bf --- /dev/null +++ b/scripts/spark_submit.sh @@ -0,0 +1,85 @@ +#!/usr/bin/env bash + +# +# Copyright (C) 2023 The Chronon Authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +### ******************* NOTE *************************** +### This is just a template, you will most likely need to modify this file to get things to work + +### Consider adding the following arguments to spark submit in your prod env. We do not include them by default, because it can cause issues on local runs on M1 Macbooks. +###--conf spark.io.compression.codec=zstd \ +###--conf spark.io.compression.zstd.level=2 \ +###--conf spark.io.compression.zstd.bufferSize=1M \ + +### ******************* END **************************** + +set -euxo pipefail +CHRONON_WORKING_DIR=${CHRONON_TMPDIR:-/tmp}/${USER} +mkdir -p ${CHRONON_WORKING_DIR} +export TEST_NAME="${APP_NAME}_${USER}_test" +unset PYSPARK_DRIVER_PYTHON +unset PYSPARK_PYTHON +unset SPARK_HOME +unset SPARK_CONF_DIR +export LOG4J_FILE="${CHRONON_WORKING_DIR}/log4j_file" +cat > ${LOG4J_FILE} << EOF +log4j.rootLogger=INFO, stdout +log4j.appender.stdout=org.apache.log4j.ConsoleAppender +log4j.appender.stdout.Target=System.out +log4j.appender.stdout.layout=org.apache.log4j.PatternLayout +log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %-5p %c{1}:%L - %m%n +log4j.appender.stdout.layout.ConversionPattern=[%d{yyyy-MM-dd HH:mm:ss}] {%c{1}} %L - %m%n +log4j.logger.ai.chronon=INFO +EOF +$SPARK_SUBMIT_PATH \ +--driver-java-options " -Dlog4j.configuration=file:${LOG4J_FILE}" \ +--conf "spark.executor.extraJavaOptions= -XX:ParallelGCThreads=4 -XX:+UseParallelGC -XX:+UseCompressedOops" \ +--conf spark.sql.shuffle.partitions=${PARALLELISM:-4000} \ +--conf spark.dynamicAllocation.maxExecutors=${MAX_EXECUTORS:-1000} \ +--conf spark.default.parallelism=${PARALLELISM:-4000} \ +--conf spark.local.dir=${CHRONON_WORKING_DIR} \ +--conf spark.jars.ivy=${CHRONON_WORKING_DIR} \ +--conf spark.executor.cores=${EXECUTOR_CORES:-1} \ +--conf spark.chronon.partition.column="${PARTITION_COLUMN:-ds}" \ +--conf spark.chronon.partition.format="${PARTITION_FORMAT:-yyyy-MM-dd}" \ +--conf spark.chronon.backfill.validation.enabled="${ENABLE_VALIDATION:-false}" \ +--deploy-mode client \ +--master "${JOB_MODE:-yarn}" \ +--executor-memory "${EXECUTOR_MEMORY:-2G}" \ +--driver-memory "${DRIVER_MEMORY:-1G}" \ +--conf spark.app.name=${APP_NAME} \ +--conf spark.chronon.outputParallelismOverride=${OUTPUT_PARALLELISM:--1} \ +--conf spark.chronon.rowCountPerPartition=${ROW_COUNT_PER_PARTITION:--1} \ +--conf spark.hadoop.fs.s3a.endpoint=http://localhost:9000 \ +--conf spark.hadoop.fs.s3a.access.key=minioadmin \ +--conf spark.hadoop.fs.s3a.secret.key=minioadmin \ +--conf spark.hadoop.fs.s3a.path.style.access=true \ +--conf spark.hadoop.fs.s3a.impl=org.apache.hadoop.fs.s3a.S3AFileSystem \ +--conf spark.hadoop.fs.s3a.aws.credentials.provider=org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider \ +--jars "/tmp/hadoop-aws-3.2.4.jar,/tmp/aws-java-sdk-bundle-1.11.1026.jar,${CHRONON_ONLINE_JAR:-}" \ +"$@" 2>&1 | +grep --line-buffered -v "YarnScheduler:70" | +grep --line-buffered -v "TransportResponseHandler:144" | +grep --line-buffered -v "TransportClient:331" | +grep --line-buffered -v "io.netty.channel.AbstractChannel" | +grep --line-buffered -v "ClosedChannelException" | +grep --line-buffered -v "TransportResponseHandler:154" | +grep --line-buffered -v "TransportRequestHandler:293" | +grep --line-buffered -v "TransportResponseHandler:144" | +tee ${CHRONON_WORKING_DIR}/${APP_NAME}_spark.log + + + diff --git a/teams.json b/teams.json new file mode 100644 index 0000000000..2de73421a7 --- /dev/null +++ b/teams.json @@ -0,0 +1,39 @@ +{ + "default": { + "description": "Default team configuration", + "namespace": "default", + "user": "default_user", + "table_properties": {}, + "production": { + "backfill": { + "EXECUTOR_CORES": "2", + "DRIVER_MEMORY": "1G", + "EXECUTOR_MEMORY": "2G", + "PARALLELISM": "4" + }, + "upload": { + "EXECUTOR_CORES": "2", + "EXECUTOR_MEMORY": "2G", + "PARALLELISM": "4" + } + } + }, + "bootcamp": { + "description": "Bootcamp team for learning Chronon", + "namespace": "default", + "user": "bootcamp_user", + "production": { + "backfill": { + "EXECUTOR_CORES": "2", + "DRIVER_MEMORY": "1G", + "EXECUTOR_MEMORY": "2G", + "PARALLELISM": "4" + }, + "upload": { + "EXECUTOR_CORES": "2", + "EXECUTOR_MEMORY": "2G", + "PARALLELISM": "4" + } + } + } +} \ No newline at end of file