Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
319 changes: 163 additions & 156 deletions examples/fetcher_demo.py
Original file line number Diff line number Diff line change
@@ -1,192 +1,194 @@
"""
Example usage of the new fetcher architecture.
Enhanced fetcher demo with data analysis capabilities.

This file demonstrates how to use the BaseFetcher and its derived classes
for fetching different types of data.
This file demonstrates the new unified fetcher architecture with
data processing and analysis features.
"""

from typing import Any, Dict, List

from live_trade_bench.fetchers import (
BaseFetcher,
NewsFetcher,
OptionFetcher,
PolymarketFetcher,
RedditFetcher,
StockFetcher,
)
from live_trade_bench.fetchers.news_fetcher import fetch_news_data
from live_trade_bench.fetchers.polymarket_fetcher import (
fetch_market_price_with_history,
fetch_trending_markets,
)
from live_trade_bench.fetchers.stock_fetcher import (
fetch_stock_price_with_history,
fetch_trending_stocks,
)


def example_stock_fetcher() -> None:
"""Example of using StockFetcher."""
print("=== Stock Fetcher Example ===")
"""Example of enhanced StockFetcher with market analysis."""
print("=== Enhanced Stock Fetcher Example ===")

# Create a stock fetcher with custom delays
# Create a stock fetcher
stock_fetcher = StockFetcher(min_delay=0.5, max_delay=1.5)

# Fetch price data
try:
price_data = stock_fetcher.fetch_stock_data(
ticker="AAPL",
start_date="2024-01-01",
end_date="2024-01-31",
interval="1d",
)
if hasattr(
price_data, "__len__"
): # Check if it has a length (DataFrame or dict)
print(f"Fetched AAPL data (type: {type(price_data).__name__})")
# Try to convert to dict if it's a DataFrame
try:
import pandas as pd

if isinstance(price_data, pd.DataFrame):
price_dict = price_data.to_dict(orient="index")
print(f"Converted to dict with {len(price_dict)} entries")
print(f"Sample data: {list(price_dict.items())[:2]}")
else:
print(f"Data length: {len(price_data)}")
except ImportError:
print("Pandas not available, raw data received")
else:
print(f"Unexpected data type: {type(price_data)}")
except Exception as e:
print(f"Error fetching stock data: {e}")
# Get trending stocks for today
trending_stocks = fetch_trending_stocks(limit=3, for_date=None)
print(f"Trending stocks: {trending_stocks}")

# Test historical trending (for backtest)
historical_date = "2024-01-15"
historical_stocks = fetch_trending_stocks(limit=3, for_date=historical_date)
print(f"Historical stocks for {historical_date}: {historical_stocks}")

# Fetch stock data with history for multiple stocks
market_data = {}
for ticker in trending_stocks[:2]: # Test first 2 stocks
print(f"\nFetching data for {ticker}...")
stock_data = fetch_stock_price_with_history(ticker)
market_data[ticker] = stock_data

if stock_data.get("current_price"):
print(f"{ticker} current price: ${stock_data['current_price']:.2f}")
history = stock_data.get("price_history", [])
print(f"{ticker} history: {len(history)} days")
else:
print(f"Failed to fetch {ticker} data")

# Also demonstrate the fetch() method with valid modes
try:
# Get trending stocks
trending = stock_fetcher.fetch("trending_stocks", limit=5)
print(f"Trending stocks: {trending}")

# Get current price for a specific stock
current_price = stock_fetcher.fetch("stock_price", ticker="AAPL")
print(f"Current AAPL price: ${current_price}")
except Exception as e:
print(f"Error using fetch method: {e}")
print(f"Error in stock fetcher demo: {e}")


def example_news_fetcher() -> None:
"""Example of using NewsFetcher."""
print("\n=== News Fetcher Example ===")
"""Example of enhanced NewsFetcher with news analysis."""
print("\n=== Enhanced News Fetcher Example ===")

# Create a news fetcher with longer delays for web scraping
news_fetcher = NewsFetcher(min_delay=3.0, max_delay=7.0)

# Fetch news data
try:
news_data = news_fetcher.fetch(
query="Apple stock",
start_date="2024-01-01",
end_date="2024-01-31",
max_pages=2,
from live_trade_bench.fetchers.news_fetcher import fetch_news_data
from datetime import datetime, timedelta

# Calculate date range (last 7 days)
end_date = datetime.now()
start_date = end_date - timedelta(days=7)

# Fetch news for Apple
print("Fetching Apple stock news...")
news_data = fetch_news_data(
query="Apple stock earnings",
start_date=start_date.strftime("%Y-%m-%d"),
end_date=end_date.strftime("%Y-%m-%d"),
max_pages=1,
ticker="AAPL"
)

print(f"Fetched {len(news_data)} news articles")
if news_data:
print(f"Sample article: {news_data[0]['title']}")
print(f"Latest article: {news_data[0].get('title', 'No title')}")

# Show news summaries
print(f"\n--- Sample News ---")
for i, item in enumerate(news_data[:3], 1):
print(f"{i}. {item.get('title', 'No title')}")
if item.get('snippet'):
print(f" {item['snippet'][:100]}...")

except Exception as e:
print(f"Error fetching news data: {e}")


def example_option_fetcher() -> None:
"""Example of using OptionFetcher."""
print("\n=== Option Fetcher Example ===")

# Create an option fetcher
option_fetcher = OptionFetcher()

try:
# Get available expirations
expirations = option_fetcher.fetch_option_expirations("AAPL")
print(f"AAPL has {len(expirations)} option expirations")

if expirations:
# Get option chain for nearest expiration
option_chain = option_fetcher.fetch_option_chain("AAPL", expirations[0])
print(
f"Option chain has {len(option_chain['calls'])} calls and {len(option_chain['puts'])} puts"
)

# Calculate Greeks for a sample option
greeks = option_fetcher.calculate_option_greeks(
underlying_price=150.0,
strike=150.0,
time_to_expiry=0.25,
risk_free_rate=0.05,
volatility=0.3,
option_type="call",
)
print(f"Sample Greeks: {greeks}")
except Exception as e:
print(f"Error fetching option data: {e}")
print(f"Error in news fetcher demo: {e}")


def example_polymarket_fetcher() -> None:
"""Example of using PolymarketFetcher."""
print("\n=== Polymarket Fetcher Example ===")

# Create a Polymarket fetcher
poly_fetcher = PolymarketFetcher()
"""Example of enhanced PolymarketFetcher with market analysis."""
print("\n=== Enhanced Polymarket Fetcher Example ===")

try:
# Fetch trending markets using the unified fetch method
trending = poly_fetcher.fetch("markets", category=None, limit=5)
if isinstance(trending, list):
print(f"Found {len(trending)} trending markets")

if trending:
# Get details for first market
first_market = trending[0]
if isinstance(first_market, dict) and "id" in first_market:
market_id = first_market["id"]
details = poly_fetcher.fetch("market_details", market_id=market_id)
if isinstance(details, dict):
print(f"Market: {details.get('title', 'Unknown')}")
print(f"Outcomes: {len(details.get('outcomes', []))}")

# Get orderbook
orderbook = poly_fetcher.fetch("orderbook", market_id=market_id)
if isinstance(orderbook, dict):
print(
f"Orderbook entries: {len(orderbook.get('asks', []))}"
)
print(
f"Orderbook has {len(orderbook.get('bids', []))} bids and {len(orderbook.get('asks', []))} asks"
)
else:
print(f"Unexpected trending data type: {type(trending)}")
except Exception as e:
print(f"Error fetching Polymarket data: {e}")
# Get trending markets for today
print("Fetching trending Polymarket markets...")
trending_markets = fetch_trending_markets(limit=2, for_date=None)
print(f"Found {len(trending_markets)} trending markets")

# Test historical markets (for backtest)
historical_date = "2024-01-15"
historical_markets = fetch_trending_markets(limit=2, for_date=historical_date)
print(f"Historical markets for {historical_date}: {len(historical_markets)} markets")

if trending_markets:
# Fetch market data with history
market_data = {}
for market in trending_markets:
market_id = market.get("id")
if market_id:
print(f"\nFetching data for market: {market.get('question', market_id)}")
market_price_data = fetch_market_price_with_history(market_id)
market_data[market_id] = {
**market_price_data,
"question": market.get("question", market_id),
"url": market.get("url", "")
}

current_price = market_price_data.get("current_price")
if current_price is not None:
print(f"Current price: {current_price:.3f}")
history = market_price_data.get("price_history", [])
print(f"History: {len(history)} days")


def example_reddit_fetcher() -> None:
"""Example of using RedditFetcher."""
print("\n=== Reddit Fetcher Example ===")
except Exception as e:
print(f"Error in Polymarket fetcher demo: {e}")

# Create a Reddit fetcher
reddit_fetcher = RedditFetcher()

def example_integrated_workflow() -> None:
"""Example of integrated workflow using multiple fetchers."""
print("\n=== Integrated Fetcher Workflow ===")

try:
# Get available categories
categories = reddit_fetcher.get_available_categories()
print(f"Available categories: {categories}")

if categories:
# Get available dates for first category
dates = reddit_fetcher.get_available_dates(categories[0])
print(f"Available dates for {categories[0]}: {len(dates)} dates")

if dates:
# Fetch posts for first date
posts = reddit_fetcher.fetch_top_from_category(
category=categories[0], date=dates[0], max_limit=10
)
print(f"Fetched {len(posts)} posts")

if posts:
print(f"Top post: {posts[0]['title']}")
# Step 1: Get market universe
print("1. Getting stock universe...")
stocks = fetch_trending_stocks(limit=2, for_date=None)
print(f"Selected stocks: {stocks}")

# Step 2: Fetch market data
print("\n2. Fetching market data...")
stock_market_data = {}
for ticker in stocks:
stock_data = fetch_stock_price_with_history(ticker)
stock_market_data[ticker] = stock_data
print(f" {ticker}: ${stock_data.get('current_price', 'N/A')}")

# Step 3: Fetch news data
print("\n3. Fetching related news...")
from live_trade_bench.fetchers.news_fetcher import fetch_news_data
from datetime import datetime, timedelta

end_date = datetime.now()
start_date = end_date - timedelta(days=3)

all_news = []
for ticker in stocks[:1]: # Just test one to avoid too many requests
news = fetch_news_data(
query=f"{ticker} stock",
start_date=start_date.strftime("%Y-%m-%d"),
end_date=end_date.strftime("%Y-%m-%d"),
max_pages=1,
ticker=ticker
)
all_news.extend(news)
print(f" {ticker}: {len(news)} articles")

# Step 4: Test backtest data
print("\n4. Testing backtest data...")
backtest_date = (datetime.now() - timedelta(days=7)).strftime("%Y-%m-%d")
historical_stocks = fetch_trending_stocks(limit=2, for_date=backtest_date)
print(f" Historical stocks for {backtest_date}: {historical_stocks}")

if all_news:
print(f"\n5. Sample news processing:")
for item in all_news[:2]:
print(f" - {item.get('title', 'No title')}")
print(f" Date: {item.get('date', 'Unknown')}")

except Exception as e:
print(f"Error fetching Reddit data: {e}")
print(f"Error in integrated workflow: {e}")


def example_context_manager() -> None:
Expand All @@ -196,8 +198,8 @@ def example_context_manager() -> None:
# Use fetcher as context manager for automatic cleanup
with StockFetcher() as fetcher:
try:
data = fetcher.fetch("AAPL", "2024-01-01", "2024-01-05", "D")
print(f"Fetched {len(data)} days of data using context manager")
trending = fetcher.get_trending_stocks(limit=3)
print(f"Fetched {len(trending)} trending stocks using context manager: {trending}")
except Exception as e:
print(f"Error: {e}")

Expand Down Expand Up @@ -229,20 +231,25 @@ def fetch(self, query: str, limit: int = 10) -> List[Dict[str, Any]]:


def main() -> None:
"""Run all examples."""
print("Fetcher Architecture Examples")
print("=" * 50)
"""Run all enhanced fetcher examples."""
print("Enhanced Fetcher Architecture Examples")
print("=" * 60)

example_stock_fetcher()
example_news_fetcher()
example_option_fetcher()
example_polymarket_fetcher()
example_reddit_fetcher()
example_integrated_workflow()
example_context_manager()
example_custom_fetcher()

print("\n" + "=" * 50)
print("All examples completed!")
print("\n" + "=" * 60)
print("All enhanced fetcher examples completed!")
print("\nUnified fetcher interfaces demonstrated:")
print("- fetch_trending_* with for_date parameter (None=today, date=historical)")
print("- fetch_*_price_with_history for both asset types")
print("- Single interface for both live and backtest data")
print("- Completely symmetric design across asset types")
print("- Minimal, consolidated API surface")


if __name__ == "__main__":
Expand Down
Loading
Loading