Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
241 changes: 241 additions & 0 deletions tools/ingest_email.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,241 @@
#!/usr/bin/env python3
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT License.

"""
Email Ingestion Tool
This script ingests email (.eml) files into a SQLite database
that can be queried using tools/query.py.
Usage:
python tools/ingest_email.py -d email.db inbox_dump/
python tools/ingest_email.py -d email.db message1.eml message2.eml
python query.py --database email.db --query "What was discussed?"
"""

import argparse
import asyncio
import os
import sys
import time
from pathlib import Path

from typeagent.aitools import utils
from typeagent.emails.email_import import import_email_from_file
from typeagent.emails.email_memory import EmailMemory
from typeagent.emails.email_message import EmailMessage
from typeagent.knowpro.convsettings import ConversationSettings
from typeagent.storage.utils import create_storage_provider


def create_arg_parser() -> argparse.ArgumentParser:
"""Create argument parser for the email ingestion tool."""
parser = argparse.ArgumentParser(
description="Ingest email (.eml) files into a database for querying",
formatter_class=argparse.RawDescriptionHelpFormatter,
)

parser.add_argument(
"paths",
nargs="+",
help="Path to one or more .eml files or directories containing .eml files",
)

parser.add_argument(
"-d",
"--database",
required=True,
help="Path to the SQLite database file to create/use",
)

parser.add_argument(
"-v", "--verbose", action="store_true", help="Show verbose/debug output"
)

return parser


def collect_email_files(paths: list[str], verbose: bool) -> list[Path]:
"""Collect all .eml files from the given paths (files or directories)."""
email_files: list[Path] = []

for path_str in paths:
path = Path(path_str)
if not path.exists():
print(f"Error: Path '{path}' not found", file=sys.stderr)
sys.exit(1)

if path.is_file():
if path.suffix.lower() == ".eml":
email_files.append(path)
else:
print(f"Warning: Skipping non-.eml file: {path}", file=sys.stderr)
elif path.is_dir():
eml_files = list(path.glob("*.eml"))
if verbose:
print(f" Found {len(eml_files)} .eml files in {path}")
email_files.extend(eml_files)
else:
print(f"Warning: Skipping special file: {path}", file=sys.stderr)

return email_files


async def ingest_emails(
paths: list[str],
database: str,
verbose: bool = False,
) -> None:
"""Ingest email files into a database."""

# Collect all .eml files
if verbose:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a cleaner way to do this verbose printing? Could there be a printVerbose() method that we call instead of print() and then it decides to print the message or not based on the verbose flag?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would feel a little "clever", so I'll skip it.

print("Collecting email files...")
email_files = collect_email_files(paths, verbose)

if not email_files:
print("Error: No .eml files found", file=sys.stderr)
sys.exit(1)

if verbose:
print(f"Found {len(email_files)} email files to ingest")

# Load environment for model API access
if verbose:
print("Loading environment...")
utils.load_dotenv()

# Create conversation settings and storage provider
if verbose:
print("Setting up conversation settings...")

settings = ConversationSettings()
settings.storage_provider = await create_storage_provider(
settings.message_text_index_settings,
settings.related_term_index_settings,
database,
EmailMessage,
)

# Create EmailMemory
email_memory = await EmailMemory.create(settings)

if verbose:
print(f"Target database: {database}")

batch_size = settings.semantic_ref_index_settings.batch_size
if verbose:
print(f"Batch size: {batch_size}")

# Parse and import emails
if verbose:
print("\nParsing and importing emails...")

successful_count = 0
failed_count = 0
skipped_count = 0
start_time = time.time()

semref_coll = await settings.storage_provider.get_semantic_ref_collection()
storage_provider = settings.storage_provider

for i, email_file in enumerate(email_files):
try:
if verbose:
print(f"\n[{i + 1}/{len(email_files)}] {email_file}")

email = import_email_from_file(str(email_file))
email_id = email.metadata.id

# Check if this email was already ingested
if email_id and storage_provider.is_source_ingested(email_id):
skipped_count += 1
if verbose:
print(f" [Already ingested, skipping]")
continue

if verbose:
print(f" From: {email.metadata.sender}")
if email.metadata.subject:
print(f" Subject: {email.metadata.subject}")
print(f" Date: {email.timestamp}")
print(f" Body chunks: {len(email.text_chunks)}")
for chunk in email.text_chunks:
# Show first 200 chars of each chunk
preview = chunk[:200].replace("\n", " ")
if len(chunk) > 200:
preview += "..."
print(f" {preview}")

# Pass source_id to mark as ingested atomically with the message
source_ids = [email_id] if email_id else None
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe emit a warning when the email id doesn't exist.

Do we try to create one based on the from/to/timestamp/subject hash if we don't have one otherwise?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could. I'll defer doing that until there's demand.

await email_memory.add_messages_with_indexing(
[email], source_ids=source_ids
)
successful_count += 1

# Print progress periodically
if not verbose and (i + 1) % batch_size == 0:
elapsed = time.time() - start_time
semref_count = await semref_coll.size()
print(
f" [{i + 1}/{len(email_files)}] {successful_count} imported | "
f"{semref_count} refs | {elapsed:.1f}s elapsed"
)

except Exception as e:
failed_count += 1
print(f"Error processing {email_file}: {e}", file=sys.stderr)
if verbose:
import traceback

traceback.print_exc()

# Final summary
elapsed = time.time() - start_time
semref_count = await semref_coll.size()

print()
if verbose:
print(f"Successfully imported {successful_count} email(s)")
if skipped_count:
print(f"Skipped {skipped_count} already-ingested email(s)")
if failed_count:
print(f"Failed to import {failed_count} email(s)")
print(f"Extracted {semref_count} semantic references")
print(f"Total time: {elapsed:.1f}s")
else:
print(
f"Imported {successful_count} emails to {database} "
f"({semref_count} refs, {elapsed:.1f}s)"
)
if skipped_count:
print(f"Skipped: {skipped_count} (already ingested)")
if failed_count:
print(f"Failed: {failed_count}")

# Show usage information
print()
print("To query the emails, use:")
print(
f" python tools/query.py --database '{database}' --query 'Your question here'"
)


def main() -> None:
"""Main entry point."""
parser = create_arg_parser()
args = parser.parse_args()

asyncio.run(
ingest_emails(
paths=args.paths,
database=args.database,
verbose=args.verbose,
)
)


if __name__ == "__main__":
main()
2 changes: 1 addition & 1 deletion typeagent/emails/email_import.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ def import_email_message(msg: Message, max_chunk_length: int) -> EmailMessage:
recipients=_import_address_headers(msg.get_all("To", [])),
cc=_import_address_headers(msg.get_all("Cc", [])),
bcc=_import_address_headers(msg.get_all("Bcc", [])),
subject=msg.get("Subject"),
subject=msg.get("Subject"), # TODO: Remove newlines
id=msg.get("Message-ID", None),
)
timestamp: str | None = None
Expand Down
10 changes: 10 additions & 0 deletions typeagent/knowpro/conversation_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ async def add_metadata_to_index(self) -> None:
async def add_messages_with_indexing(
self,
messages: list[TMessage],
source_ids: list[str] | None = None,
) -> AddMessagesResult:
"""
Add messages and build all indexes incrementally in a single transaction.
Expand All @@ -128,6 +129,9 @@ async def add_messages_with_indexing(

Args:
messages: Messages to add
source_ids: Optional list of source IDs to mark as ingested. These are
marked within the same transaction, so if the indexing fails, the
source IDs won't be marked as ingested (for SQLite storage).

Returns:
Result with counts of messages/semrefs added
Expand All @@ -138,6 +142,12 @@ async def add_messages_with_indexing(
storage = await self.settings.get_storage_provider()

async with storage:
# Mark source IDs as ingested before adding messages
# This way, if indexing fails, the rollback will also undo the marks
if source_ids:
for source_id in source_ids:
storage.mark_source_ingested(source_id)

start_points = IndexingStartPoints(
message_count=await self.messages.size(),
semref_count=await self.semantic_refs.size(),
Expand Down
21 changes: 10 additions & 11 deletions typeagent/knowpro/interfaces.py
Original file line number Diff line number Diff line change
Expand Up @@ -863,12 +863,7 @@ async def get_conversation_threads(self) -> IConversationThreads: ...

# Metadata management
def get_conversation_metadata(self) -> ConversationMetadata:
"""Get conversation metadata.

Always returns a ConversationMetadata instance. Fields not found in
the database will be None. If no metadata exists at all, returns
an instance with all fields None.
"""
"""Get conversation metadata (missing fields set to None)."""
...

def set_conversation_metadata(self, **kwds: str | list[str] | None) -> None:
Expand All @@ -887,12 +882,16 @@ def update_conversation_timestamps(
created_at: Datetime | None = None,
updated_at: Datetime | None = None,
) -> None:
"""Update conversation timestamps.
"""Update conversation timestamps."""
...

Args:
created_at: Optional creation timestamp
updated_at: Optional last updated timestamp
"""
# Ingested source tracking
def is_source_ingested(self, source_id: str) -> bool:
"""Check if a source has already been ingested."""
...

def mark_source_ingested(self, source_id: str) -> None:
"""Mark a source as ingested (no commit; call within transaction context)."""
...

# Transaction management
Expand Down
21 changes: 21 additions & 0 deletions typeagent/storage/memory/provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ class MemoryStorageProvider[TMessage: IMessage](IStorageProvider[TMessage]):
_message_text_index: MessageTextIndex
_related_terms_index: RelatedTermsIndex
_conversation_threads: ConversationThreads
_ingested_sources: set[str]

def __init__(
self,
Expand All @@ -60,6 +61,7 @@ def __init__(
self._related_terms_index = RelatedTermsIndex(related_terms_settings)
thread_settings = message_text_settings.embedding_index_settings
self._conversation_threads = ConversationThreads(thread_settings)
self._ingested_sources = set()

async def __aenter__(self) -> "MemoryStorageProvider[TMessage]":
"""Enter transaction context. No-op for in-memory storage."""
Expand Down Expand Up @@ -138,3 +140,22 @@ def update_conversation_timestamps(
updated_at: Optional last updated timestamp (ignored)
"""
pass

def is_source_ingested(self, source_id: str) -> bool:
"""Check if a source has already been ingested.

Args:
source_id: External source identifier (email ID, file path, etc.)

Returns:
True if the source has been ingested, False otherwise.
"""
return source_id in self._ingested_sources

def mark_source_ingested(self, source_id: str) -> None:
"""Mark a source as ingested.

Args:
source_id: External source identifier (email ID, file path, etc.)
"""
self._ingested_sources.add(source_id)
33 changes: 33 additions & 0 deletions typeagent/storage/sqlite/provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -619,3 +619,36 @@ def update_conversation_timestamps(
def get_db_version(self) -> int:
"""Get the database schema version."""
return get_db_schema_version(self.db)

def is_source_ingested(self, source_id: str) -> bool:
"""Check if a source has already been ingested.

This is a read-only operation that can be called outside of a transaction.

Args:
source_id: External source identifier (email ID, file path, etc.)

Returns:
True if the source has been ingested, False otherwise.
"""
cursor = self.db.cursor()
cursor.execute(
"SELECT 1 FROM IngestedSources WHERE source_id = ?", (source_id,)
)
return cursor.fetchone() is not None

def mark_source_ingested(self, source_id: str) -> None:
"""Mark a source as ingested.

This performs an INSERT but does NOT commit. It should be called within
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I haven't tried this, but the docs say you can do cursor.in_transaction to see if there's an active transaction. That could keep someone from calling this if there's no active transaction.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Meh, this function only exists so add_messages_with_indexing can call it. I'm not too worried about users calling it wrongly.

a transaction context (e.g., inside `async with storage_provider:`).
The commit happens when the transaction context exits successfully.

Args:
source_id: External source identifier (email ID, file path, etc.)
"""
cursor = self.db.cursor()
cursor.execute(
"INSERT OR IGNORE INTO IngestedSources (source_id) VALUES (?)",
(source_id,),
)
Loading