diff --git a/src/hitachione/__init__.py b/src/hitachione/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/hitachione/agents/__init__.py b/src/hitachione/agents/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/hitachione/agents/knowledge_retrieval.py b/src/hitachione/agents/knowledge_retrieval.py new file mode 100644 index 0000000..d6b41fa --- /dev/null +++ b/src/hitachione/agents/knowledge_retrieval.py @@ -0,0 +1,87 @@ +"""Knowledge Retrieval Agent – consults Weaviate for context enrichment. + +Responsibilities: + • Resolve entity aliases (e.g. "Google" → GOOGL) + • Find prior top-lists or summaries relevant to the query + • Provide entity hints the Orchestrator can use for planning + +Non-blocking: failures return partial / empty results rather than crashing. +""" + +from __future__ import annotations + +import logging +from typing import Any + +from ..config.settings import ( + WEAVIATE_COLLECTION, WEAVIATE_API_KEY, WEAVIATE_HTTP_HOST, +) +from ..models.schemas import TaskContext + +logger = logging.getLogger(__name__) + + +def _weaviate_client(): + """Build a *sync* Weaviate client.""" + import weaviate + from weaviate.auth import AuthApiKey + + if WEAVIATE_HTTP_HOST.endswith(".weaviate.cloud"): + return weaviate.connect_to_weaviate_cloud( + cluster_url=f"https://{WEAVIATE_HTTP_HOST}", + auth_credentials=AuthApiKey(WEAVIATE_API_KEY), + ) + return weaviate.connect_to_custom( + http_host=WEAVIATE_HTTP_HOST, + http_port=443, http_secure=True, + grpc_host=WEAVIATE_HTTP_HOST, grpc_port=443, grpc_secure=True, + auth_credentials=AuthApiKey(WEAVIATE_API_KEY), + ) + + +class KnowledgeRetrievalAgent: + """Look up aliases, entity hints, and prior summaries from the KB.""" + + def run(self, ctx: TaskContext) -> dict[str, Any]: + """Return ``{"aliases": {...}, "entity_hints": [...], "summaries": [...]}``.""" + result: dict[str, Any] = {"aliases": {}, "entity_hints": [], "summaries": []} + try: + client = _weaviate_client() + try: + col = client.collections.get(WEAVIATE_COLLECTION) + + # 1) BM25 search on the user query for entity hints / summaries + resp = col.query.bm25( + query=ctx.user_query, + limit=8, + return_properties=[ + "text", "title", "ticker", "company", + "dataset_source", "date", + ], + ) + for obj in resp.objects: + p = {k: v for k, v in obj.properties.items() if v is not None} + ticker = p.get("ticker") + company = p.get("company") + if ticker and company: + result["aliases"][company] = ticker + if ticker and ticker not in result["entity_hints"]: + result["entity_hints"].append(ticker) + result["summaries"].append( + f"[{p.get('dataset_source','')} | {p.get('date','')}] " + f"{p.get('title','')}" + ) + + # 2) If the context already has entities, resolve them + for entity in ctx.entities: + if entity.upper() not in result["aliases"].values(): + result["aliases"][entity] = entity.upper() + + finally: + client.close() + + except Exception as exc: + logger.warning("KnowledgeRetrievalAgent error: %s", exc) + ctx.uncertainties.append(f"KB lookup failed: {exc}") + + return result diff --git a/src/hitachione/agents/orchestrator.py b/src/hitachione/agents/orchestrator.py new file mode 100644 index 0000000..f4c9451 --- /dev/null +++ b/src/hitachione/agents/orchestrator.py @@ -0,0 +1,300 @@ +"""Orchestrator – the agentic ReAct loop (Plan → Act → Observe → Reflect). + +This is the top-level entry point. Given a free-text user prompt it: + +1. **Parses intent** (rank, compare, snapshot, event_reaction, …) +2. **Plans** subgoals and identifies information gaps +3. **Acts** by calling sub-agents (KB retrieval, company retrieval, + researcher, synthesizer, reviewer) +4. **Observes** outputs and assesses sufficiency +5. **Reflects** — if the Reviewer flags issues and we have iterations left, + revises the plan and loops +6. **Stops** when the Reviewer says OK, the budget is exhausted, or + information gain is negligible +7. **Returns** a clean ``SynthesizedAnswer`` with rationale + caveats + +Usage:: + + from hitachione.agents.orchestrator import Orchestrator + answer = Orchestrator().run("Top 3 tech stocks of 2024") + print(answer.markdown) +""" + +from __future__ import annotations + +import json +import logging +import re +import sys +from pathlib import Path +from typing import Any + +from openai import OpenAI + +# Ensure tools directory is importable +sys.path.insert(0, str(Path(__file__).resolve().parents[2])) + +from ..config.settings import ( + MAX_ITERATIONS, OPENAI_API_KEY, OPENAI_BASE_URL, PLANNER_MODEL, +) +from ..models.schemas import ( + Intent, SynthesizedAnswer, TaskContext, +) +from ..services.tracing import Tracer +from .knowledge_retrieval import KnowledgeRetrievalAgent +from .researcher import ResearcherAgent +from .reviewer import ReviewerAgent +from .synthesizer import SynthesizerAgent + +logger = logging.getLogger(__name__) + +# ── Intent parsing prompt ──────────────────────────────────────────────── + +_INTENT_PROMPT = """\ +Classify the user's financial query into exactly one intent and extract +structured fields. Return ONLY valid JSON (no markdown fences): + +{ + "intent": "", + "entities": ["", ...], + "timeframe": "", + "sector": "" +} + +Rules: +- "entities" must be uppercase ticker symbols when identifiable. +- If the user mentions company names instead of tickers, map them to tickers + (e.g. "Tesla" → "TSLA", "Apple" → "AAPL", "Google" → "GOOGL"). +- If unsure about tickers, leave entities empty. +- Keep it concise. +""" + +# ── Company retrieval helper ───────────────────────────────────────────── + +def _find_symbols(query: str) -> list[str]: + """Call the company filtering tool to discover relevant tickers.""" + try: + from ..tools.company_filtering_tool.tool import find_relevant_symbols + return find_relevant_symbols(query, use_llm_filter=True) + except Exception as exc: + logger.warning("Company filtering tool error: %s", exc) + return [] + + +# ── Orchestrator ───────────────────────────────────────────────────────── + +class Orchestrator: + """Agentic ReAct orchestrator for financial intelligence queries.""" + + def __init__(self, max_iterations: int = MAX_ITERATIONS): + self.max_iter = max_iterations + self.kb_agent = KnowledgeRetrievalAgent() + self.researcher = ResearcherAgent() + self.synthesizer = SynthesizerAgent() + self.reviewer = ReviewerAgent() + + # ── public API ────────────────────────────────────────────────────── + + def run( + self, + user_query: str, + *, + default_timeframe: str = "", + metadata: dict[str, Any] | None = None, + ) -> SynthesizedAnswer: + """Execute the full plan-act-observe-reflect loop.""" + + tracer = Tracer.start( + "orchestrator_run", + metadata={"query": user_query, **(metadata or {})}, + ) + + ctx = TaskContext(user_query=user_query, timeframe=default_timeframe) + + # ── STEP 1: Parse intent ──────────────────────────────────────── + with tracer.span("intent_parse") as sp: + self._parse_intent(ctx) + sp.update(output={ + "intent": ctx.intent.value, + "entities": ctx.entities, + "timeframe": ctx.timeframe, + "sector": ctx.sector, + }) + ctx.observations.append( + f"Intent={ctx.intent.value}, entities={ctx.entities}, " + f"timeframe={ctx.timeframe}, sector={ctx.sector}" + ) + + answer = SynthesizedAnswer() + + for iteration in range(1, self.max_iter + 1): + ctx.iteration = iteration + logger.info("── Iteration %d/%d ──", iteration, self.max_iter) + + # ── STEP 2: Plan ──────────────────────────────────────────── + with tracer.span("planning", metadata={"iteration": iteration}) as sp: + plan = self._plan(ctx) + ctx.plan = plan + sp.update(output=plan) + ctx.observations.append(f"Plan (iter {iteration}): {plan}") + + # ── STEP 3: Act – KB retrieval ────────────────────────────── + with tracer.span("knowledge_retrieval") as sp: + kb_data = self.kb_agent.run(ctx) + sp.update(output=kb_data) + # Merge entity hints into context + for hint in kb_data.get("entity_hints", []): + if hint not in ctx.entities: + ctx.entities.append(hint) + + # ── STEP 3b: Act – Company retrieval (if entities empty) ─── + if not ctx.entities: + with tracer.span("company_retrieval") as sp: + ctx.observations.append("No entities yet – calling company filter") + symbols = _find_symbols(ctx.user_query) + ctx.entities = symbols + sp.update(output=symbols) + + if not ctx.entities: + ctx.uncertainties.append("Could not identify any tickers") + answer.caveats.append( + "No tickers could be identified for this query." + ) + answer.markdown = ( + f"I wasn't able to identify specific tickers for: " + f"*{user_query}*. Please try including ticker symbols " + f"(e.g. AAPL, TSLA)." + ) + answer.confidence = 0.0 + tracer.end(output={"markdown": answer.markdown}) + return answer + + # ── STEP 4: Act – Research ────────────────────────────────── + with tracer.span("research_fanout") as sp: + research = self.researcher.run(ctx, ctx.entities) + sp.update(output={ + "count": len(research), + "tickers": [r.ticker for r in research], + }) + + # ── STEP 5: Act – Synthesize ──────────────────────────────── + with tracer.span("synthesizer") as sp: + answer = self.synthesizer.run(ctx, research) + sp.update(output={ + "confidence": answer.confidence, + "caveats": answer.caveats, + "md_length": len(answer.markdown), + }) + + # ── STEP 6: Observe – Review ──────────────────────────────── + with tracer.span("reviewer") as sp: + feedback = self.reviewer.run(ctx, answer) + sp.update(output={ + "ok": feedback.ok, + "missing": feedback.missing, + "notes": feedback.notes, + }) + ctx.observations.append( + f"Reviewer (iter {iteration}): ok={feedback.ok}, " + f"notes={feedback.notes}" + ) + + # ── STEP 7: Reflect & decide to stop ─────────────────────── + if feedback.ok: + logger.info("Reviewer OK – stopping") + break + + if iteration < self.max_iter: + # Reflect: try to address missing items + with tracer.span("reflection") as sp: + adjustments = self._reflect(ctx, feedback) + sp.update(output=adjustments) + ctx.observations.append(f"Reflection: {adjustments}") + else: + logger.info("Max iterations reached – returning best effort") + answer.caveats.append( + "Maximum analysis iterations reached; some data may be incomplete." + ) + + tracer.end(output={"confidence": answer.confidence, "caveats": answer.caveats}) + return answer + + # ── Private helpers ───────────────────────────────────────────────── + + def _parse_intent(self, ctx: TaskContext) -> None: + """Use LLM to classify intent and extract entities / timeframe.""" + try: + client = OpenAI(base_url=OPENAI_BASE_URL, api_key=OPENAI_API_KEY) + resp = client.chat.completions.create( + model=PLANNER_MODEL, + messages=[ + {"role": "system", "content": _INTENT_PROMPT}, + {"role": "user", "content": ctx.user_query}, + ], + temperature=0, + ) + raw = (resp.choices[0].message.content or "").strip() + # Strip markdown fences if present + if "```" in raw: + raw = raw.split("```")[1] + if raw.startswith("json"): + raw = raw[4:] + raw = raw.strip() + data = json.loads(raw) + except Exception as exc: + logger.warning("Intent parse error: %s", exc) + data = {} + + intent_str = data.get("intent", "mixed") + try: + ctx.intent = Intent(intent_str) + except ValueError: + ctx.intent = Intent.MIXED + + ctx.entities = [e.upper() for e in data.get("entities", [])] + ctx.timeframe = data.get("timeframe", ctx.timeframe) or "" + ctx.sector = data.get("sector", "") or "" + + def _plan(self, ctx: TaskContext) -> list[str]: + """Generate a simple step plan from the current context.""" + steps = [] + if not ctx.entities: + steps.append("Discover relevant entities via KB + company filter") + else: + steps.append(f"Research entities: {', '.join(ctx.entities)}") + + if ctx.intent in (Intent.RANK, Intent.COMPARE): + steps.append("Fetch sentiment + performance for each entity") + steps.append("Score & rank entities") + elif ctx.intent == Intent.SNAPSHOT: + steps.append("Fetch latest data for the entity") + elif ctx.intent == Intent.EVENT_REACTION: + steps.append("Fetch recent news + price reaction") + else: + steps.append("Fetch sentiment + performance data") + + steps.append("Synthesize answer with rationale + caveats") + steps.append("Review for quality and completeness") + + return steps + + def _reflect(self, ctx: TaskContext, feedback) -> dict[str, Any]: + """Adjust the context based on reviewer feedback.""" + adjustments: dict[str, Any] = {"action": "none"} + + missing = feedback.missing + # If entities are missing research, they may need to be re-fetched + missing_entities = [ + m for m in missing + if "not researched" in m.lower() + ] + if missing_entities: + adjustments["action"] = "retry_missing_entities" + adjustments["details"] = missing_entities + + # If confidence is low, try broader KB search + if any("confidence" in m.lower() for m in missing): + adjustments["action"] = "broaden_search" + ctx.observations.append("Broadening search due to low confidence") + + return adjustments diff --git a/src/hitachione/agents/researcher.py b/src/hitachione/agents/researcher.py new file mode 100644 index 0000000..3b9d21b --- /dev/null +++ b/src/hitachione/agents/researcher.py @@ -0,0 +1,73 @@ +"""Researcher Agent – per-entity data fetch with async fan-out. + +For each ticker the Researcher: + 1. Calls the sentiment analysis tool → rating 1-10 + rationale + 2. Calls the performance analysis tool → score 1-10 + outlook + 3. Captures errors per entity (never crashes the whole run) +""" + +from __future__ import annotations + +import asyncio +import logging +import sys +from pathlib import Path +from typing import Any + +# Ensure the tools can import their helpers +sys.path.insert(0, str(Path(__file__).resolve().parents[2])) + +from ..models.schemas import CompanyResearch, TaskContext, ToolError + +logger = logging.getLogger(__name__) + + +# ── Lazy imports for the existing tools (avoid circular / heavy init) ── + +def _sentiment(ticker: str) -> dict[str, Any]: + from ..tools.sentiment_analysis_tool.tool import analyze_ticker_sentiment_sync + return analyze_ticker_sentiment_sync(ticker) + + +def _performance(ticker: str) -> dict[str, Any]: + from ..tools.performance_analysis_tool.tool import analyse_stock_performance + return analyse_stock_performance(ticker) + + +def _research_one(ticker: str) -> CompanyResearch: + """Fetch sentiment + performance for one ticker (sync).""" + cr = CompanyResearch(ticker=ticker) + + # Sentiment + try: + cr.sentiment = _sentiment(ticker) + refs = cr.sentiment.get("references", []) + cr.news_snippets = [str(r) for r in refs][:5] + except Exception as exc: + logger.warning("Sentiment error for %s: %s", ticker, exc) + cr.errors.append(ToolError(entity=ticker, tool="sentiment", error=str(exc))) + + # Performance + try: + cr.performance = _performance(ticker) + except Exception as exc: + logger.warning("Performance error for %s: %s", ticker, exc) + cr.errors.append(ToolError(entity=ticker, tool="performance", error=str(exc))) + + return cr + + +class ResearcherAgent: + """Fan-out research across a list of entities.""" + + def run(self, ctx: TaskContext, entities: list[str]) -> list[CompanyResearch]: + """Research every entity; accumulate errors without crashing.""" + results: list[CompanyResearch] = [] + for ticker in entities: + ctx.observations.append(f"Researching {ticker}...") + cr = _research_one(ticker) + results.append(cr) + if cr.errors: + for e in cr.errors: + ctx.uncertainties.append(f"{e.tool} failed for {e.entity}: {e.error}") + return results diff --git a/src/hitachione/agents/reviewer.py b/src/hitachione/agents/reviewer.py new file mode 100644 index 0000000..bae492b --- /dev/null +++ b/src/hitachione/agents/reviewer.py @@ -0,0 +1,63 @@ +"""Reviewer Agent – deterministic quality gate for synthesized answers. + +Checks: + 1. Entity coverage: were all requested entities actually researched? + 2. Score completeness: does each entity have sentiment + performance? + 3. Answer quality: is the markdown non-empty and reasonably long? + 4. Confidence threshold: is overall confidence above the minimum? + +Returns ``ReviewFeedback(ok=True/False, missing=[...], notes=...)``. +""" + +from __future__ import annotations + +import logging + +from ..config.settings import QUALITY_THRESHOLD +from ..models.schemas import ( + ReviewFeedback, SynthesizedAnswer, TaskContext, +) + +logger = logging.getLogger(__name__) + + +class ReviewerAgent: + """Deterministic checks – no LLM calls, fast and predictable.""" + + def run(self, ctx: TaskContext, answer: SynthesizedAnswer) -> ReviewFeedback: + fb = ReviewFeedback() + missing: list[str] = [] + + # 1. Entity coverage + researched = {cr.ticker for cr in answer.raw_research} + for entity in ctx.entities: + if entity.upper() not in researched: + missing.append(f"Entity {entity} not researched") + + # 2. Score completeness + for cr in answer.raw_research: + if not cr.sentiment or cr.sentiment.get("rating") is None: + missing.append(f"{cr.ticker}: missing sentiment rating") + if not cr.performance or cr.performance.get("performance_score") is None: + missing.append(f"{cr.ticker}: missing performance score") + + # 3. Answer quality + if len(answer.markdown) < 50: + missing.append("Answer text too short") + + # 4. Confidence + if answer.confidence < QUALITY_THRESHOLD: + missing.append( + f"Confidence {answer.confidence:.2f} below threshold " + f"{QUALITY_THRESHOLD:.2f}" + ) + + fb.missing = missing + fb.ok = len(missing) == 0 + fb.notes = ( + "All checks passed." if fb.ok + else f"{len(missing)} issue(s) found: {'; '.join(missing[:5])}" + ) + + logger.info("Reviewer: ok=%s, issues=%d", fb.ok, len(missing)) + return fb diff --git a/src/hitachione/agents/synthesizer.py b/src/hitachione/agents/synthesizer.py new file mode 100644 index 0000000..65c083b --- /dev/null +++ b/src/hitachione/agents/synthesizer.py @@ -0,0 +1,146 @@ +"""Synthesizer Agent – composes a ranked / comparative answer from research. + +Takes a list of ``CompanyResearch`` objects and produces a user-facing +``SynthesizedAnswer`` with: + • Markdown answer (ranked list or comparison table) + • Rationale explaining the scoring / ranking + • Caveats for partial data + • Citations from news snippets + • Confidence estimate +""" + +from __future__ import annotations + +import json +import logging +from typing import Any + +from openai import OpenAI + +from ..config.settings import OPENAI_BASE_URL, OPENAI_API_KEY, WORKER_MODEL +from ..models.schemas import ( + CompanyResearch, Intent, SynthesizedAnswer, TaskContext, +) + +logger = logging.getLogger(__name__) + +# ── Prompt templates ───────────────────────────────────────────────────── + +_SYSTEM = ( + "You are a financial intelligence synthesizer. " + "Given structured research data for companies, compose a clear, " + "well-structured Markdown answer for the user. " + "Include a rationale section explaining your reasoning. " + "If data is partial, add explicit caveats. " + "If news references are available, include them as citations. " + "Do NOT give investment advice – present analytics and insights only. " + "Return raw Markdown text (not wrapped in JSON)." +) + + +def _build_research_block(research: list[CompanyResearch]) -> str: + """Serialize research data into a text block for the LLM.""" + parts: list[str] = [] + for cr in research: + lines = [f"## {cr.ticker}"] + if cr.sentiment: + lines.append( + f"- Sentiment: rating={cr.sentiment.get('rating')}, " + f"label={cr.sentiment.get('label')}, " + f"rationale={cr.sentiment.get('rationale','')[:200]}" + ) + if cr.performance: + lines.append( + f"- Performance: score={cr.performance.get('performance_score')}, " + f"outlook={cr.performance.get('outlook')}, " + f"justification={cr.performance.get('justification','')[:200]}" + ) + if cr.news_snippets: + lines.append("- News references: " + "; ".join(cr.news_snippets[:3])) + if cr.errors: + lines.append( + "- ⚠ Data gaps: " + + ", ".join(f"{e.tool} ({e.error[:60]})" for e in cr.errors) + ) + parts.append("\n".join(lines)) + return "\n\n".join(parts) + + +def _estimate_confidence(research: list[CompanyResearch]) -> float: + """Heuristic confidence score 0-1 based on data completeness.""" + if not research: + return 0.0 + scores: list[float] = [] + for cr in research: + s = 0.0 + if cr.sentiment and cr.sentiment.get("rating") is not None: + s += 0.5 + if cr.performance and cr.performance.get("performance_score") is not None: + s += 0.5 + scores.append(s) + return sum(scores) / len(scores) + + +class SynthesizerAgent: + """Compose a user-facing answer from per-entity research.""" + + def run( + self, + ctx: TaskContext, + research: list[CompanyResearch], + ) -> SynthesizedAnswer: + answer = SynthesizedAnswer(raw_research=research) + answer.confidence = _estimate_confidence(research) + + # Gather caveats + for cr in research: + for e in cr.errors: + answer.caveats.append(f"{e.entity}: {e.tool} unavailable ({e.error[:80]})") + if answer.confidence < 0.5: + answer.caveats.append("Low overall data coverage – results are best-effort.") + + # Gather citations + for cr in research: + answer.citations.extend(cr.news_snippets[:3]) + + # Build the research context for the LLM + data_block = _build_research_block(research) + user_msg = ( + f"User query: {ctx.user_query}\n" + f"Intent: {ctx.intent.value}\n" + f"Timeframe: {ctx.timeframe or 'not specified'}\n\n" + f"Research data:\n{data_block}" + ) + + try: + client = OpenAI(base_url=OPENAI_BASE_URL, api_key=OPENAI_API_KEY) + resp = client.chat.completions.create( + model=WORKER_MODEL, + messages=[ + {"role": "system", "content": _SYSTEM}, + {"role": "user", "content": user_msg}, + ], + temperature=0.2, + ) + md = (resp.choices[0].message.content or "").strip() + answer.markdown = md + # Extract first paragraph as rationale summary + answer.rationale = md.split("\n\n")[0] if md else "" + except Exception as exc: + logger.error("Synthesizer LLM error: %s", exc) + # Fallback: build a simple text answer from raw data + answer.markdown = self._fallback_markdown(ctx, research) + answer.rationale = "Generated from raw data (LLM unavailable)." + + return answer + + @staticmethod + def _fallback_markdown( + ctx: TaskContext, research: list[CompanyResearch] + ) -> str: + lines = [f"## Results for: {ctx.user_query}\n"] + for cr in research: + sent = cr.sentiment.get("rating", "?") if cr.sentiment else "?" + perf = cr.performance.get("performance_score", "?") if cr.performance else "?" + lines.append(f"- **{cr.ticker}**: sentiment={sent}/10, performance={perf}/10") + return "\n".join(lines) diff --git a/src/hitachione/config/__init__.py b/src/hitachione/config/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/hitachione/config/settings.py b/src/hitachione/config/settings.py new file mode 100644 index 0000000..fe6483d --- /dev/null +++ b/src/hitachione/config/settings.py @@ -0,0 +1,38 @@ +"""Centralised settings for the multi-agent financial intelligence system. + +All secrets come from env-vars / .env – never hard-coded. +""" + +from __future__ import annotations + +import os +from pathlib import Path +from dotenv import load_dotenv + +# Load .env from project root +_PROJECT_ROOT = Path(__file__).resolve().parents[3] +load_dotenv(_PROJECT_ROOT / ".env") + +# ── LLM ────────────────────────────────────────────────────────────────── +OPENAI_BASE_URL = os.getenv( + "OPENAI_BASE_URL", + "https://generativelanguage.googleapis.com/v1beta/openai/", +) +OPENAI_API_KEY = os.getenv("OPENAI_API_KEY", "") +PLANNER_MODEL = os.getenv("DEFAULT_PLANNER_MODEL", "gemini-2.5-flash") +WORKER_MODEL = os.getenv("DEFAULT_WORKER_MODEL", "gemini-2.5-flash") + +# ── Weaviate ───────────────────────────────────────────────────────────── +WEAVIATE_COLLECTION = os.getenv("WEAVIATE_COLLECTION_NAME", "Hitachi_finance_news") +WEAVIATE_API_KEY = os.getenv("WEAVIATE_API_KEY", "") +WEAVIATE_HTTP_HOST = os.getenv("WEAVIATE_HTTP_HOST", "localhost") +WEAVIATE_GRPC_HOST = os.getenv("WEAVIATE_GRPC_HOST", "localhost") + +# ── Langfuse (optional) ───────────────────────────────────────────────── +LANGFUSE_PUBLIC_KEY = os.getenv("LANGFUSE_PUBLIC_KEY") +LANGFUSE_SECRET_KEY = os.getenv("LANGFUSE_SECRET_KEY") +LANGFUSE_HOST = os.getenv("LANGFUSE_HOST", "https://us.cloud.langfuse.com") + +# ── Orchestrator defaults ──────────────────────────────────────────────── +MAX_ITERATIONS = int(os.getenv("ORCHESTRATOR_MAX_ITERATIONS", "2")) +QUALITY_THRESHOLD = float(os.getenv("ORCHESTRATOR_QUALITY_THRESHOLD", "0.6")) diff --git a/src/hitachione/main.py b/src/hitachione/main.py new file mode 100644 index 0000000..2322208 --- /dev/null +++ b/src/hitachione/main.py @@ -0,0 +1,60 @@ +"""Entry point for the multi-agent financial intelligence system. + +Usage:: + + python -m src.hitachione.main # launch Gradio UI + python -m src.hitachione.main --cli # one-shot CLI mode +""" + +from __future__ import annotations + +import argparse +import logging +import sys +from pathlib import Path + +# Ensure project root is on sys.path so `utils` etc. resolve +sys.path.insert(0, str(Path(__file__).resolve().parents[2])) +sys.path.insert(0, str(Path(__file__).resolve().parents[1])) + +logging.basicConfig( + level=logging.INFO, + format="%(asctime)s [%(name)s] %(levelname)s: %(message)s", + datefmt="%H:%M:%S", +) + + +def main() -> None: + parser = argparse.ArgumentParser(description="Financial Intelligence Agent") + parser.add_argument( + "--cli", action="store_true", help="Run a single query from stdin" + ) + parser.add_argument( + "--query", type=str, default="", help="Query string (CLI mode)" + ) + parser.add_argument( + "--port", type=int, default=7860, help="Gradio server port" + ) + args = parser.parse_args() + + if args.cli or args.query: + from hitachione.agents.orchestrator import Orchestrator + + query = args.query or input("Enter query: ") + orch = Orchestrator() + answer = orch.run(query) + print(answer.markdown) + if answer.caveats: + print("\nCaveats:") + for c in answer.caveats: + print(f" - {c}") + print(f"\nConfidence: {answer.confidence:.0%}") + else: + from hitachione.ui.app import build_app + + demo = build_app() + demo.launch(share=True) + + +if __name__ == "__main__": + main() diff --git a/src/hitachione/models/__init__.py b/src/hitachione/models/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/hitachione/models/schemas.py b/src/hitachione/models/schemas.py new file mode 100644 index 0000000..f770049 --- /dev/null +++ b/src/hitachione/models/schemas.py @@ -0,0 +1,91 @@ +"""Shared data models for the multi-agent financial intelligence system. + +All models are plain dataclasses – no heavy framework dependency. +""" + +from __future__ import annotations + +import uuid +from dataclasses import dataclass, field +from datetime import datetime +from enum import Enum +from typing import Any + + +# ── Intent taxonomy ────────────────────────────────────────────────────── + +class Intent(str, Enum): + RANK = "rank" + COMPARE = "compare" + SNAPSHOT = "snapshot" + EVENT_REACTION = "event_reaction" + FUNDAMENTALS = "fundamentals" + MACRO = "macro" + MIXED = "mixed" + + +# ── Per-run context (scratchpad / blackboard) ──────────────────────────── + +@dataclass +class TaskContext: + """Short-lived context for a single orchestrator run.""" + + run_id: str = field(default_factory=lambda: uuid.uuid4().hex[:12]) + user_query: str = "" + intent: Intent = Intent.MIXED + timeframe: str = "" # e.g. "last 3 years", "2024 Q3" + sector: str = "" # e.g. "automotive" + entities: list[str] = field(default_factory=list) # ticker symbols + constraints: dict[str, Any] = field(default_factory=dict) + + # Blackboard – accumulates across iterations + plan: list[str] = field(default_factory=list) + observations: list[str] = field(default_factory=list) + uncertainties: list[str] = field(default_factory=list) + iteration: int = 0 + timestamp: str = field( + default_factory=lambda: datetime.utcnow().isoformat(timespec="seconds") + ) + + +# ── Tool / agent outputs ──────────────────────────────────────────────── + +@dataclass +class ToolError: + """Captures a non-fatal error from a tool call.""" + + entity: str + tool: str + error: str + + +@dataclass +class CompanyResearch: + """Research bundle for one company/ticker.""" + + ticker: str + sentiment: dict[str, Any] = field(default_factory=dict) + performance: dict[str, Any] = field(default_factory=dict) + news_snippets: list[str] = field(default_factory=list) + errors: list[ToolError] = field(default_factory=list) + + +@dataclass +class SynthesizedAnswer: + """Final user-facing answer produced by the Synthesizer.""" + + markdown: str = "" + rationale: str = "" + caveats: list[str] = field(default_factory=list) + citations: list[str] = field(default_factory=list) + confidence: float = 0.0 # 0-1 + raw_research: list[CompanyResearch] = field(default_factory=list) + + +@dataclass +class ReviewFeedback: + """Output of the Reviewer agent.""" + + ok: bool = False + missing: list[str] = field(default_factory=list) + notes: str = "" diff --git a/src/hitachione/services/__init__.py b/src/hitachione/services/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/hitachione/services/tracing.py b/src/hitachione/services/tracing.py new file mode 100644 index 0000000..7eeca74 --- /dev/null +++ b/src/hitachione/services/tracing.py @@ -0,0 +1,156 @@ +"""Lightweight Langfuse tracing helpers. + +If Langfuse keys are not set the helpers become no-ops so the system +runs cleanly without observability configured. + +Uses the imperative Langfuse Python SDK span API: + - ``Langfuse.start_span(name)`` creates a root span + auto-trace + - ``LangfuseSpan.start_span(name)`` creates a nested child span + - ``LangfuseSpan.update(output=…)`` attaches data + - ``LangfuseSpan.end()`` closes the span +""" + +from __future__ import annotations + +import logging +from contextlib import contextmanager +from typing import Any, Generator + +logger = logging.getLogger(__name__) + +# ── Try to import Langfuse; degrade gracefully ────────────────────────── +_langfuse = None +try: + from langfuse import Langfuse + from ..config.settings import LANGFUSE_PUBLIC_KEY, LANGFUSE_SECRET_KEY, LANGFUSE_HOST + + if LANGFUSE_PUBLIC_KEY and LANGFUSE_SECRET_KEY: + _langfuse = Langfuse( + public_key=LANGFUSE_PUBLIC_KEY, + secret_key=LANGFUSE_SECRET_KEY, + host=LANGFUSE_HOST, + ) + if _langfuse.auth_check(): + logger.info("Langfuse tracing enabled (auth OK)") + else: + logger.warning("Langfuse auth check failed – tracing disabled") + _langfuse = None + else: + logger.info("Langfuse keys not set – tracing disabled") +except Exception as exc: + logger.debug("Langfuse unavailable: %s", exc) + + +# ── Public helpers ────────────────────────────────────────────────────── + +class Tracer: + """Thin wrapper around a Langfuse trace / span tree. + + Usage:: + + tracer = Tracer.start("orchestrator_run", metadata={...}) + with tracer.span("intent_parse") as sp: + sp.update(output={"intent": "rank"}) + tracer.end(output=final_answer) + + The root span acts as the top-level container. All child spans + created via ``tracer.span(...)`` are nested under it so the full + Plan → Act → Observe → Reflect flow is visible in Langfuse. + """ + + def __init__(self, root_span: Any | None = None): + self._root = root_span # LangfuseSpan or None + self._trace_id: str | None = None + if root_span is not None: + self._trace_id = root_span.trace_id + + # --- factory --- + @classmethod + def start( + cls, + name: str, + *, + user_id: str = "", + metadata: dict | None = None, + ) -> "Tracer": + if _langfuse is None: + return cls(None) + try: + root = _langfuse.start_span(name=name, metadata=metadata or {}) + # Attach trace-level info (name, user_id, tags) + root.update_trace( + name=name, + user_id=user_id or None, + metadata=metadata or {}, + ) + logger.info("Langfuse trace started: %s", root.trace_id) + return cls(root) + except Exception as exc: + logger.warning("Langfuse trace start error: %s", exc) + return cls(None) + + # --- span context-manager --- + @contextmanager + def span(self, name: str, **kwargs) -> Generator["_Span", None, None]: + """Create a child span nested under the root.""" + sp = _Span.create(name, parent=self._root, **kwargs) + try: + yield sp + finally: + sp.finish() + + # --- finalise --- + def end(self, *, output: Any = None): + if self._root is not None: + try: + if output is not None: + self._root.update(output=output) + self._root.end() + except Exception as exc: + logger.debug("Langfuse root span end error: %s", exc) + if _langfuse is not None: + try: + _langfuse.flush() + except Exception as exc: + logger.debug("Langfuse flush error: %s", exc) + + @property + def trace_id(self) -> str | None: + return self._trace_id + + +class _Span: + """One span inside a trace.""" + + def __init__(self, name: str, lang_span: Any | None = None): + self.name = name + self._span = lang_span + + @classmethod + def create(cls, name: str, parent: Any | None = None, **kwargs) -> "_Span": + """Create a span as a child of *parent* (a ``LangfuseSpan``).""" + if parent is None: + return cls(name, None) + try: + child = parent.start_span( + name=name, + metadata=kwargs.get("metadata"), + ) + return cls(name, child) + except Exception as exc: + logger.debug("Langfuse child span error (%s): %s", name, exc) + return cls(name, None) + + def update(self, **kwargs): + if self._span is not None: + try: + self._span.update(**kwargs) + except Exception: + pass + + def finish(self): + if self._span is not None: + try: + self._span.end() + except Exception: + pass diff --git a/src/hitachione/tools/__init__.py b/src/hitachione/tools/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/hitachione/ui/__init__.py b/src/hitachione/ui/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/hitachione/ui/app.py b/src/hitachione/ui/app.py new file mode 100644 index 0000000..6c7b1c6 --- /dev/null +++ b/src/hitachione/ui/app.py @@ -0,0 +1,72 @@ +"""Gradio UI for the multi-agent financial intelligence system. + +Uses gr.ChatInterface (same pattern as the other bootcamp apps) +so the user types a financial prompt and gets a Markdown answer. +""" + +from __future__ import annotations + +import logging + +import gradio as gr + +from ..agents.orchestrator import Orchestrator + +logger = logging.getLogger(__name__) + +_orchestrator = Orchestrator() + + +def _respond( + message: str, + history: list[dict], +) -> str: + """Process one user query and return the assistant's answer. + + The orchestrator can take 30-60 s, during which Gradio shows a spinner. + """ + if not message.strip(): + return "*Please enter a financial query.*" + + try: + answer = _orchestrator.run(message.strip()) + except Exception as exc: + logger.exception("Orchestrator error") + return f"**Error:** {exc}" + + # Build final output + parts = [answer.markdown] + + if answer.caveats: + parts.append("\n---\n**Caveats:**") + for c in answer.caveats: + parts.append(f"- {c}") + + if answer.citations: + parts.append("\n**Citations:**") + for i, cit in enumerate(answer.citations[:5], 1): + parts.append(f"{i}. {cit}") + + parts.append(f"\n*Confidence: {answer.confidence:.0%}*") + + return "\n".join(parts) + + +def build_app() -> gr.ChatInterface: + """Build and return the Gradio ChatInterface app.""" + demo = gr.ChatInterface( + fn=_respond, + title="🏦 Financial Intelligence Agent", + description=( + "Ask any financial question – ranking, comparison, snapshot, " + "event reaction, and more." + ), + examples=[ + "Top 3 tech stocks of 2024", + "Compare TSLA vs AAPL vs NVDA", + "What moved NVDA after last earnings?", + ], + chatbot=gr.Chatbot(height=600), + textbox=gr.Textbox(lines=1, placeholder="Enter your financial query…"), + ) + return demo