Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 26 additions & 8 deletions docs/demos.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,19 +30,21 @@ python tools/query.py -d mp.db
```
(You just type questions and it prints answers.)

## How we did the GMail demo
## How we did the Gmail demo

The demo consisted of loading a large number (around 500) email messages
into a database, and querying the database about those messages.
The loading (ingestion) process was done ahead as it takes a long time.

We used the GMail API to download 550 messages from Guido's GMail
We used the Gmail API to download 550 messages from Guido's Gmail
(details below).

Given a folder with `*.eml` files in MIME format, we ran our email
ingestion tool, `tools/test_email.py`. (All these details will change
in the future, hopefully to be more similar to `ingest_vtt.py`.)

**TODO: Switch to describing ingest_email.py.**

The tool takes one positional argument, a directory, in which it will
create a SQLite database named `gmail.db`.
```sh
Expand All @@ -60,16 +62,32 @@ next file.
We can then query the `gmail.db` database using the same `query.py`
tool that we used for the Monty Python demo.

### How to use the GMail API to download messages
### How to use the Gmail API to download messages

In the `gmail/` folder you'll find a tool named `gmail_dump.py` which
will download any number of messages (default 50) using the GMail API.
In order to use the GMail API, however, you have to create a
will download any number of messages (default 50) using the Gmail API.
In order to use the Gmail API, however, you have to create a (free)
Google Cloud app and configure it appropriately.

In order to figure out how to set up the (free) Google Cloud app we
used the instructions at [GeeksForGeeks
](https://www.geeksforgeeks.org/devops/how-to-create-a-gcp-project/).
We created created an app in test mode at
[Google Cloud Console](https://console.cloud.google.com) and gave it
access to the Gmail API (I forget how exactly we did this part).

To create the needed client secret, we navigated to Client (side bar)
and clicked on "+ Create Client" (in the row of actions at the top),
selected "Desktop app", gave it a name, hit Create, scrolled down in the
resulting dialog box, and hit "Download JSON". This produced a JSON file
which should be copied into _client_secret.json_ in the gmail folder.
(The Cloud Console interface may look different for you.)

The first time you run the gmail_dump.py script, it will take you to
a browser where you have to log in and agree to various warnings about
using an app in test mode etc. The gmail_dump.py script then writes a
file _token.json_ and you're good for a week or so. When token.json
expires, unfortunately you get a crash and you have to manually delete
it to trigger the login flow again.
(Sometimes starting a browser may fail, e.g. under WSL. Take the URL
that's printed and manually go there.)

The rest of the email ingestion pipeline doesn't care where you got
your `*.eml` files from -- every email provider has its own quirks.
Expand Down
4 changes: 4 additions & 0 deletions docs/gmail.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,7 @@ Until we have time to write this up, your best bet is to
ask your favorite search engine or LLM-based chat bot for help.

More TBD.
In the meantime there are some hints in [Demos][def].


[def]: demos.md#how-to-use-the-gmail-api-to-download-messages
243 changes: 243 additions & 0 deletions tools/ingest_email.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,243 @@
#!/usr/bin/env python3
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT License.

"""
Email Ingestion Tool

This script ingests email (.eml) files into a SQLite database
that can be queried using tools/query.py.

Usage:
python tools/ingest_email.py -d email.db inbox_dump/
python tools/ingest_email.py -d email.db message1.eml message2.eml
python query.py --database email.db --query "What was discussed?"
"""

import argparse
import asyncio
import os
import sys
import time
from pathlib import Path

from typeagent.aitools import utils
from typeagent.emails.email_import import import_email_from_file
from typeagent.emails.email_memory import EmailMemory
from typeagent.emails.email_message import EmailMessage
from typeagent.knowpro.convsettings import ConversationSettings
from typeagent.storage.utils import create_storage_provider


def create_arg_parser() -> argparse.ArgumentParser:
"""Create argument parser for the email ingestion tool."""
parser = argparse.ArgumentParser(
description="Ingest email (.eml) files into a database for querying",
formatter_class=argparse.RawDescriptionHelpFormatter,
)

parser.add_argument(
"paths",
nargs="+",
help="Path to one or more .eml files or directories containing .eml files",
)

parser.add_argument(
"-d",
"--database",
required=True,
help="Path to the SQLite database file to create/use",
)

parser.add_argument(
"-v", "--verbose", action="store_true", help="Show verbose/debug output"
)

return parser


def collect_email_files(paths: list[str], verbose: bool) -> list[Path]:
"""Collect all .eml files from the given paths (files or directories)."""
email_files: list[Path] = []

for path_str in paths:
path = Path(path_str)
if not path.exists():
print(f"Error: Path '{path}' not found", file=sys.stderr)
sys.exit(1)

if path.is_file():
if path.suffix.lower() == ".eml":
email_files.append(path)
else:
print(f"Error: Skipping non-.eml file: {path}", file=sys.stderr)
sys.exit(1)
elif path.is_dir():
eml_files = sorted(path.glob("*.eml"))
if verbose:
print(f" Found {len(eml_files)} .eml files in {path}")
email_files.extend(eml_files)
else:
print(f"Error: Not a file or directory: {path}", file=sys.stderr)
sys.exit(1)

return email_files


async def ingest_emails(
paths: list[str],
database: str,
verbose: bool = False,
) -> None:
"""Ingest email files into a database."""

# Collect all .eml files
if verbose:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a cleaner way to do this verbose printing? Could there be a printVerbose() method that we call instead of print() and then it decides to print the message or not based on the verbose flag?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would feel a little "clever", so I'll skip it.

print("Collecting email files...")
email_files = collect_email_files(paths, verbose)

if not email_files:
print("Error: No .eml files found", file=sys.stderr)
sys.exit(1)

if verbose:
print(f"Found {len(email_files)} email files to ingest")

# Load environment for model API access
if verbose:
print("Loading environment...")
utils.load_dotenv()

# Create conversation settings and storage provider
if verbose:
print("Setting up conversation settings...")

settings = ConversationSettings()
settings.storage_provider = await create_storage_provider(
settings.message_text_index_settings,
settings.related_term_index_settings,
database,
EmailMessage,
)

# Create EmailMemory
email_memory = await EmailMemory.create(settings)

if verbose:
print(f"Target database: {database}")

batch_size = settings.semantic_ref_index_settings.batch_size
if verbose:
print(f"Batch size: {batch_size}")

# Parse and import emails
if verbose:
print("\nParsing and importing emails...")

successful_count = 0
failed_count = 0
skipped_count = 0
start_time = time.time()

semref_coll = await settings.storage_provider.get_semantic_ref_collection()
storage_provider = settings.storage_provider

for i, email_file in enumerate(email_files):
try:
if verbose:
print(f"\n[{i + 1}/{len(email_files)}] {email_file}")

email = import_email_from_file(str(email_file))
email_id = email.metadata.id

# Check if this email was already ingested
if email_id and storage_provider.is_source_ingested(email_id):
skipped_count += 1
if verbose:
print(f" [Already ingested, skipping]")
continue

if verbose:
print(f" From: {email.metadata.sender}")
if email.metadata.subject:
print(f" Subject: {email.metadata.subject}")
print(f" Date: {email.timestamp}")
print(f" Body chunks: {len(email.text_chunks)}")
for chunk in email.text_chunks:
# Show first 200 chars of each chunk
preview = chunk[:200].replace("\n", " ")
if len(chunk) > 200:
preview += "..."
print(f" {preview}")

# Pass source_id to mark as ingested atomically with the message
source_ids = [email_id] if email_id else None
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe emit a warning when the email id doesn't exist.

Do we try to create one based on the from/to/timestamp/subject hash if we don't have one otherwise?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could. I'll defer doing that until there's demand.

await email_memory.add_messages_with_indexing(
[email], source_ids=source_ids
)
successful_count += 1

# Print progress periodically
if not verbose and (i + 1) % batch_size == 0:
elapsed = time.time() - start_time
semref_count = await semref_coll.size()
print(
f" [{i + 1}/{len(email_files)}] {successful_count} imported | "
f"{semref_count} refs | {elapsed:.1f}s elapsed"
)

except Exception as e:
failed_count += 1
print(f"Error processing {email_file}: {e}", file=sys.stderr)
if verbose:
import traceback

traceback.print_exc()

# Final summary
elapsed = time.time() - start_time
semref_count = await semref_coll.size()

print()
if verbose:
print(f"Successfully imported {successful_count} email(s)")
if skipped_count:
print(f"Skipped {skipped_count} already-ingested email(s)")
if failed_count:
print(f"Failed to import {failed_count} email(s)")
print(f"Extracted {semref_count} semantic references")
print(f"Total time: {elapsed:.1f}s")
else:
print(
f"Imported {successful_count} emails to {database} "
f"({semref_count} refs, {elapsed:.1f}s)"
)
if skipped_count:
print(f"Skipped: {skipped_count} (already ingested)")
if failed_count:
print(f"Failed: {failed_count}")

# Show usage information
print()
print("To query the emails, use:")
print(
f" python tools/query.py --database '{database}' --query 'Your question here'"
)


def main() -> None:
"""Main entry point."""
parser = create_arg_parser()
args = parser.parse_args()

asyncio.run(
ingest_emails(
paths=args.paths,
database=args.database,
verbose=args.verbose,
)
)


if __name__ == "__main__":
main()
2 changes: 1 addition & 1 deletion typeagent/emails/email_import.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ def import_email_message(msg: Message, max_chunk_length: int) -> EmailMessage:
recipients=_import_address_headers(msg.get_all("To", [])),
cc=_import_address_headers(msg.get_all("Cc", [])),
bcc=_import_address_headers(msg.get_all("Bcc", [])),
subject=msg.get("Subject"),
subject=msg.get("Subject"), # TODO: Remove newlines
id=msg.get("Message-ID", None),
)
timestamp: str | None = None
Expand Down
13 changes: 12 additions & 1 deletion typeagent/knowpro/conversation_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,8 @@ async def add_metadata_to_index(self) -> None:
async def add_messages_with_indexing(
self,
messages: list[TMessage],
*,
source_ids: list[str] | None = None,
) -> AddMessagesResult:
"""
Add messages and build all indexes incrementally in a single transaction.
Expand All @@ -128,16 +130,25 @@ async def add_messages_with_indexing(
Args:
messages: Messages to add
source_ids: Optional list of source IDs to mark as ingested. These are
marked within the same transaction, so if the indexing fails, the
source IDs won't be marked as ingested (for SQLite storage).
Returns:
Result with counts of messages/semrefs added
Raises:
BaseException: Any error
Exception: Any error
"""
storage = await self.settings.get_storage_provider()

async with storage:
# Mark source IDs as ingested before adding messages
# This way, if indexing fails, the rollback will also undo the marks
if source_ids:
for source_id in source_ids:
storage.mark_source_ingested(source_id)

start_points = IndexingStartPoints(
message_count=await self.messages.size(),
semref_count=await self.semantic_refs.size(),
Expand Down
21 changes: 10 additions & 11 deletions typeagent/knowpro/interfaces.py
Original file line number Diff line number Diff line change
Expand Up @@ -863,12 +863,7 @@ async def get_conversation_threads(self) -> IConversationThreads: ...

# Metadata management
def get_conversation_metadata(self) -> ConversationMetadata:
"""Get conversation metadata.

Always returns a ConversationMetadata instance. Fields not found in
the database will be None. If no metadata exists at all, returns
an instance with all fields None.
"""
"""Get conversation metadata (missing fields set to None)."""
...

def set_conversation_metadata(self, **kwds: str | list[str] | None) -> None:
Expand All @@ -887,12 +882,16 @@ def update_conversation_timestamps(
created_at: Datetime | None = None,
updated_at: Datetime | None = None,
) -> None:
"""Update conversation timestamps.
"""Update conversation timestamps."""
...

Args:
created_at: Optional creation timestamp
updated_at: Optional last updated timestamp
"""
# Ingested source tracking
def is_source_ingested(self, source_id: str) -> bool:
"""Check if a source has already been ingested."""
...

def mark_source_ingested(self, source_id: str) -> None:
"""Mark a source as ingested (no commit; call within transaction context)."""
...

# Transaction management
Expand Down
Loading