diff --git a/docs/demos.md b/docs/demos.md index 1f84f10..037f6a2 100644 --- a/docs/demos.md +++ b/docs/demos.md @@ -30,19 +30,21 @@ python tools/query.py -d mp.db ``` (You just type questions and it prints answers.) -## How we did the GMail demo +## How we did the Gmail demo The demo consisted of loading a large number (around 500) email messages into a database, and querying the database about those messages. The loading (ingestion) process was done ahead as it takes a long time. -We used the GMail API to download 550 messages from Guido's GMail +We used the Gmail API to download 550 messages from Guido's Gmail (details below). Given a folder with `*.eml` files in MIME format, we ran our email ingestion tool, `tools/test_email.py`. (All these details will change in the future, hopefully to be more similar to `ingest_vtt.py`.) +**TODO: Switch to describing ingest_email.py.** + The tool takes one positional argument, a directory, in which it will create a SQLite database named `gmail.db`. ```sh @@ -60,16 +62,32 @@ next file. We can then query the `gmail.db` database using the same `query.py` tool that we used for the Monty Python demo. -### How to use the GMail API to download messages +### How to use the Gmail API to download messages In the `gmail/` folder you'll find a tool named `gmail_dump.py` which -will download any number of messages (default 50) using the GMail API. -In order to use the GMail API, however, you have to create a +will download any number of messages (default 50) using the Gmail API. +In order to use the Gmail API, however, you have to create a (free) Google Cloud app and configure it appropriately. -In order to figure out how to set up the (free) Google Cloud app we -used the instructions at [GeeksForGeeks -](https://www.geeksforgeeks.org/devops/how-to-create-a-gcp-project/). +We created created an app in test mode at +[Google Cloud Console](https://console.cloud.google.com) and gave it +access to the Gmail API (I forget how exactly we did this part). + +To create the needed client secret, we navigated to Client (side bar) +and clicked on "+ Create Client" (in the row of actions at the top), +selected "Desktop app", gave it a name, hit Create, scrolled down in the +resulting dialog box, and hit "Download JSON". This produced a JSON file +which should be copied into _client_secret.json_ in the gmail folder. +(The Cloud Console interface may look different for you.) + +The first time you run the gmail_dump.py script, it will take you to +a browser where you have to log in and agree to various warnings about +using an app in test mode etc. The gmail_dump.py script then writes a +file _token.json_ and you're good for a week or so. When token.json +expires, unfortunately you get a crash and you have to manually delete +it to trigger the login flow again. +(Sometimes starting a browser may fail, e.g. under WSL. Take the URL +that's printed and manually go there.) The rest of the email ingestion pipeline doesn't care where you got your `*.eml` files from -- every email provider has its own quirks. diff --git a/docs/gmail.md b/docs/gmail.md index 4c0b514..d2ef5d8 100644 --- a/docs/gmail.md +++ b/docs/gmail.md @@ -6,3 +6,7 @@ Until we have time to write this up, your best bet is to ask your favorite search engine or LLM-based chat bot for help. More TBD. +In the meantime there are some hints in [Demos][def]. + + +[def]: demos.md#how-to-use-the-gmail-api-to-download-messages diff --git a/tools/ingest_email.py b/tools/ingest_email.py new file mode 100644 index 0000000..fc8225a --- /dev/null +++ b/tools/ingest_email.py @@ -0,0 +1,243 @@ +#!/usr/bin/env python3 +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. + +""" +Email Ingestion Tool + +This script ingests email (.eml) files into a SQLite database +that can be queried using tools/query.py. + +Usage: + python tools/ingest_email.py -d email.db inbox_dump/ + python tools/ingest_email.py -d email.db message1.eml message2.eml + python query.py --database email.db --query "What was discussed?" +""" + +import argparse +import asyncio +import os +import sys +import time +from pathlib import Path + +from typeagent.aitools import utils +from typeagent.emails.email_import import import_email_from_file +from typeagent.emails.email_memory import EmailMemory +from typeagent.emails.email_message import EmailMessage +from typeagent.knowpro.convsettings import ConversationSettings +from typeagent.storage.utils import create_storage_provider + + +def create_arg_parser() -> argparse.ArgumentParser: + """Create argument parser for the email ingestion tool.""" + parser = argparse.ArgumentParser( + description="Ingest email (.eml) files into a database for querying", + formatter_class=argparse.RawDescriptionHelpFormatter, + ) + + parser.add_argument( + "paths", + nargs="+", + help="Path to one or more .eml files or directories containing .eml files", + ) + + parser.add_argument( + "-d", + "--database", + required=True, + help="Path to the SQLite database file to create/use", + ) + + parser.add_argument( + "-v", "--verbose", action="store_true", help="Show verbose/debug output" + ) + + return parser + + +def collect_email_files(paths: list[str], verbose: bool) -> list[Path]: + """Collect all .eml files from the given paths (files or directories).""" + email_files: list[Path] = [] + + for path_str in paths: + path = Path(path_str) + if not path.exists(): + print(f"Error: Path '{path}' not found", file=sys.stderr) + sys.exit(1) + + if path.is_file(): + if path.suffix.lower() == ".eml": + email_files.append(path) + else: + print(f"Error: Skipping non-.eml file: {path}", file=sys.stderr) + sys.exit(1) + elif path.is_dir(): + eml_files = sorted(path.glob("*.eml")) + if verbose: + print(f" Found {len(eml_files)} .eml files in {path}") + email_files.extend(eml_files) + else: + print(f"Error: Not a file or directory: {path}", file=sys.stderr) + sys.exit(1) + + return email_files + + +async def ingest_emails( + paths: list[str], + database: str, + verbose: bool = False, +) -> None: + """Ingest email files into a database.""" + + # Collect all .eml files + if verbose: + print("Collecting email files...") + email_files = collect_email_files(paths, verbose) + + if not email_files: + print("Error: No .eml files found", file=sys.stderr) + sys.exit(1) + + if verbose: + print(f"Found {len(email_files)} email files to ingest") + + # Load environment for model API access + if verbose: + print("Loading environment...") + utils.load_dotenv() + + # Create conversation settings and storage provider + if verbose: + print("Setting up conversation settings...") + + settings = ConversationSettings() + settings.storage_provider = await create_storage_provider( + settings.message_text_index_settings, + settings.related_term_index_settings, + database, + EmailMessage, + ) + + # Create EmailMemory + email_memory = await EmailMemory.create(settings) + + if verbose: + print(f"Target database: {database}") + + batch_size = settings.semantic_ref_index_settings.batch_size + if verbose: + print(f"Batch size: {batch_size}") + + # Parse and import emails + if verbose: + print("\nParsing and importing emails...") + + successful_count = 0 + failed_count = 0 + skipped_count = 0 + start_time = time.time() + + semref_coll = await settings.storage_provider.get_semantic_ref_collection() + storage_provider = settings.storage_provider + + for i, email_file in enumerate(email_files): + try: + if verbose: + print(f"\n[{i + 1}/{len(email_files)}] {email_file}") + + email = import_email_from_file(str(email_file)) + email_id = email.metadata.id + + # Check if this email was already ingested + if email_id and storage_provider.is_source_ingested(email_id): + skipped_count += 1 + if verbose: + print(f" [Already ingested, skipping]") + continue + + if verbose: + print(f" From: {email.metadata.sender}") + if email.metadata.subject: + print(f" Subject: {email.metadata.subject}") + print(f" Date: {email.timestamp}") + print(f" Body chunks: {len(email.text_chunks)}") + for chunk in email.text_chunks: + # Show first 200 chars of each chunk + preview = chunk[:200].replace("\n", " ") + if len(chunk) > 200: + preview += "..." + print(f" {preview}") + + # Pass source_id to mark as ingested atomically with the message + source_ids = [email_id] if email_id else None + await email_memory.add_messages_with_indexing( + [email], source_ids=source_ids + ) + successful_count += 1 + + # Print progress periodically + if not verbose and (i + 1) % batch_size == 0: + elapsed = time.time() - start_time + semref_count = await semref_coll.size() + print( + f" [{i + 1}/{len(email_files)}] {successful_count} imported | " + f"{semref_count} refs | {elapsed:.1f}s elapsed" + ) + + except Exception as e: + failed_count += 1 + print(f"Error processing {email_file}: {e}", file=sys.stderr) + if verbose: + import traceback + + traceback.print_exc() + + # Final summary + elapsed = time.time() - start_time + semref_count = await semref_coll.size() + + print() + if verbose: + print(f"Successfully imported {successful_count} email(s)") + if skipped_count: + print(f"Skipped {skipped_count} already-ingested email(s)") + if failed_count: + print(f"Failed to import {failed_count} email(s)") + print(f"Extracted {semref_count} semantic references") + print(f"Total time: {elapsed:.1f}s") + else: + print( + f"Imported {successful_count} emails to {database} " + f"({semref_count} refs, {elapsed:.1f}s)" + ) + if skipped_count: + print(f"Skipped: {skipped_count} (already ingested)") + if failed_count: + print(f"Failed: {failed_count}") + + # Show usage information + print() + print("To query the emails, use:") + print( + f" python tools/query.py --database '{database}' --query 'Your question here'" + ) + + +def main() -> None: + """Main entry point.""" + parser = create_arg_parser() + args = parser.parse_args() + + asyncio.run( + ingest_emails( + paths=args.paths, + database=args.database, + verbose=args.verbose, + ) + ) + + +if __name__ == "__main__": + main() diff --git a/typeagent/emails/email_import.py b/typeagent/emails/email_import.py index 758fe76..5be5baa 100644 --- a/typeagent/emails/email_import.py +++ b/typeagent/emails/email_import.py @@ -62,7 +62,7 @@ def import_email_message(msg: Message, max_chunk_length: int) -> EmailMessage: recipients=_import_address_headers(msg.get_all("To", [])), cc=_import_address_headers(msg.get_all("Cc", [])), bcc=_import_address_headers(msg.get_all("Bcc", [])), - subject=msg.get("Subject"), + subject=msg.get("Subject"), # TODO: Remove newlines id=msg.get("Message-ID", None), ) timestamp: str | None = None diff --git a/typeagent/knowpro/conversation_base.py b/typeagent/knowpro/conversation_base.py index ab876a2..4b1d6ca 100644 --- a/typeagent/knowpro/conversation_base.py +++ b/typeagent/knowpro/conversation_base.py @@ -117,6 +117,8 @@ async def add_metadata_to_index(self) -> None: async def add_messages_with_indexing( self, messages: list[TMessage], + *, + source_ids: list[str] | None = None, ) -> AddMessagesResult: """ Add messages and build all indexes incrementally in a single transaction. @@ -128,16 +130,25 @@ async def add_messages_with_indexing( Args: messages: Messages to add + source_ids: Optional list of source IDs to mark as ingested. These are + marked within the same transaction, so if the indexing fails, the + source IDs won't be marked as ingested (for SQLite storage). Returns: Result with counts of messages/semrefs added Raises: - BaseException: Any error + Exception: Any error """ storage = await self.settings.get_storage_provider() async with storage: + # Mark source IDs as ingested before adding messages + # This way, if indexing fails, the rollback will also undo the marks + if source_ids: + for source_id in source_ids: + storage.mark_source_ingested(source_id) + start_points = IndexingStartPoints( message_count=await self.messages.size(), semref_count=await self.semantic_refs.size(), diff --git a/typeagent/knowpro/interfaces.py b/typeagent/knowpro/interfaces.py index ef0efae..396fbf8 100644 --- a/typeagent/knowpro/interfaces.py +++ b/typeagent/knowpro/interfaces.py @@ -863,12 +863,7 @@ async def get_conversation_threads(self) -> IConversationThreads: ... # Metadata management def get_conversation_metadata(self) -> ConversationMetadata: - """Get conversation metadata. - - Always returns a ConversationMetadata instance. Fields not found in - the database will be None. If no metadata exists at all, returns - an instance with all fields None. - """ + """Get conversation metadata (missing fields set to None).""" ... def set_conversation_metadata(self, **kwds: str | list[str] | None) -> None: @@ -887,12 +882,16 @@ def update_conversation_timestamps( created_at: Datetime | None = None, updated_at: Datetime | None = None, ) -> None: - """Update conversation timestamps. + """Update conversation timestamps.""" + ... - Args: - created_at: Optional creation timestamp - updated_at: Optional last updated timestamp - """ + # Ingested source tracking + def is_source_ingested(self, source_id: str) -> bool: + """Check if a source has already been ingested.""" + ... + + def mark_source_ingested(self, source_id: str) -> None: + """Mark a source as ingested (no commit; call within transaction context).""" ... # Transaction management diff --git a/typeagent/storage/memory/provider.py b/typeagent/storage/memory/provider.py index b97199b..18e5a12 100644 --- a/typeagent/storage/memory/provider.py +++ b/typeagent/storage/memory/provider.py @@ -41,6 +41,7 @@ class MemoryStorageProvider[TMessage: IMessage](IStorageProvider[TMessage]): _message_text_index: MessageTextIndex _related_terms_index: RelatedTermsIndex _conversation_threads: ConversationThreads + _ingested_sources: set[str] def __init__( self, @@ -60,6 +61,7 @@ def __init__( self._related_terms_index = RelatedTermsIndex(related_terms_settings) thread_settings = message_text_settings.embedding_index_settings self._conversation_threads = ConversationThreads(thread_settings) + self._ingested_sources = set() async def __aenter__(self) -> "MemoryStorageProvider[TMessage]": """Enter transaction context. No-op for in-memory storage.""" @@ -138,3 +140,22 @@ def update_conversation_timestamps( updated_at: Optional last updated timestamp (ignored) """ pass + + def is_source_ingested(self, source_id: str) -> bool: + """Check if a source has already been ingested. + + Args: + source_id: External source identifier (email ID, file path, etc.) + + Returns: + True if the source has been ingested, False otherwise. + """ + return source_id in self._ingested_sources + + def mark_source_ingested(self, source_id: str) -> None: + """Mark a source as ingested. + + Args: + source_id: External source identifier (email ID, file path, etc.) + """ + self._ingested_sources.add(source_id) diff --git a/typeagent/storage/sqlite/provider.py b/typeagent/storage/sqlite/provider.py index ea54772..7920702 100644 --- a/typeagent/storage/sqlite/provider.py +++ b/typeagent/storage/sqlite/provider.py @@ -619,3 +619,36 @@ def update_conversation_timestamps( def get_db_version(self) -> int: """Get the database schema version.""" return get_db_schema_version(self.db) + + def is_source_ingested(self, source_id: str) -> bool: + """Check if a source has already been ingested. + + This is a read-only operation that can be called outside of a transaction. + + Args: + source_id: External source identifier (email ID, file path, etc.) + + Returns: + True if the source has been ingested, False otherwise. + """ + cursor = self.db.cursor() + cursor.execute( + "SELECT 1 FROM IngestedSources WHERE source_id = ?", (source_id,) + ) + return cursor.fetchone() is not None + + def mark_source_ingested(self, source_id: str) -> None: + """Mark a source as ingested. + + This performs an INSERT but does NOT commit. It should be called within + a transaction context (e.g., inside `async with storage_provider:`). + The commit happens when the transaction context exits successfully. + + Args: + source_id: External source identifier (email ID, file path, etc.) + """ + cursor = self.db.cursor() + cursor.execute( + "INSERT OR IGNORE INTO IngestedSources (source_id) VALUES (?)", + (source_id,), + ) diff --git a/typeagent/storage/sqlite/schema.py b/typeagent/storage/sqlite/schema.py index 37f1db1..2cbd4ec 100644 --- a/typeagent/storage/sqlite/schema.py +++ b/typeagent/storage/sqlite/schema.py @@ -138,6 +138,14 @@ CREATE INDEX IF NOT EXISTS idx_related_fuzzy_term ON RelatedTermsFuzzy(term); """ +# Table for tracking ingested source IDs (e.g., email IDs, file paths) +# This prevents re-ingesting the same content on subsequent runs +INGESTED_SOURCES_SCHEMA = """ +CREATE TABLE IF NOT EXISTS IngestedSources ( + source_id TEXT PRIMARY KEY -- External source identifier (email ID, file path, etc.) +); +""" + # Type aliases for database row tuples type ShreddedMessage = tuple[ str | None, str | None, str | None, str | None, str | None, str | None @@ -260,6 +268,7 @@ def init_db_schema(db: sqlite3.Connection) -> None: cursor.execute(RELATED_TERMS_ALIASES_SCHEMA) cursor.execute(RELATED_TERMS_FUZZY_SCHEMA) cursor.execute(TIMESTAMP_INDEX_SCHEMA) + cursor.execute(INGESTED_SOURCES_SCHEMA) # Create additional indexes cursor.execute(SEMANTIC_REF_INDEX_TERM_INDEX)