Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,12 @@
"""

import os
from pathlib import Path
from typing import Optional

from crewai import Agent, Crew, Process, Task
from crewai.memory import LongTermMemory
from crewai.memory.storage.ltm_sqlite_storage import LTMSQLiteStorage
from opentelemetry.exporter.otlp.proto.http.trace_exporter import (
OTLPSpanExporter, # type: ignore[import-not-found]
)
Expand All @@ -28,6 +31,10 @@
# Make sure to set the OPENAI_API_KEY environment variable
os.environ["OPENAI_API_KEY"] = "YOUR_API_KEY"

# Store in project directory
project_root = Path(__file__).parent
storage_dir = project_root / "crewai_storage"


def create_research_writer_crew(crew_name: Optional[str] = None) -> Crew:
"""
Expand Down Expand Up @@ -107,6 +114,10 @@ def create_research_writer_crew(crew_name: Optional[str] = None) -> Crew:
tasks=[research_task, analysis_task, writing_task],
verbose=False,
process=Process.sequential,
memory=True,
long_term_memory=LongTermMemory(
storage=LTMSQLiteStorage(db_path=f"{storage_dir}/memory.db")
),
)

# Note: crew.key is auto-generated and read-only
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,10 @@
_CrewKickoffWrapper,
_ExecuteCoreWrapper,
_FlowKickoffAsyncWrapper,
_LongTermMemorySaveWrapper,
_LongTermMemorySearchWrapper,
_ShortTermMemorySaveWrapper,
_ShortTermMemorySearchWrapper,
_ToolUseWrapper,
)
from openinference.instrumentation.crewai.version import __version__
Expand All @@ -28,6 +32,10 @@ class CrewAIInstrumentor(BaseInstrumentor): # type: ignore
"_original_execute_core",
"_original_crew_kickoff",
"_original_flow_kickoff_async",
"_original_long_term_memory_save",
"_original_long_term_memory_search",
"_original_short_term_memory_save",
"_original_short_term_memory_search",
"_original_tool_use",
"_tracer",
)
Expand Down Expand Up @@ -73,6 +81,50 @@ def _instrument(self, **kwargs: Any) -> None:
wrapper=flow_kickoff_async_wrapper,
)

long_term_memory_save_wrapper = _LongTermMemorySaveWrapper(tracer=self._tracer)
self._original_long_term_memory_save = getattr(
import_module("crewai.memory.long_term.long_term_memory").LongTermMemory, "save", None
)
wrap_function_wrapper(
module="crewai.memory.long_term.long_term_memory",
name="LongTermMemory.save",
wrapper=long_term_memory_save_wrapper,
)

long_term_memory_search_wrapper = _LongTermMemorySearchWrapper(tracer=self._tracer)
self._original_long_term_memory_search = getattr(
import_module("crewai.memory.long_term.long_term_memory").LongTermMemory, "search", None
)
wrap_function_wrapper(
module="crewai.memory.long_term.long_term_memory",
name="LongTermMemory.search",
wrapper=long_term_memory_search_wrapper,
)

short_term_memory_save_wrapper = _ShortTermMemorySaveWrapper(tracer=self._tracer)
self._original_short_term_memory_save = getattr(
import_module("crewai.memory.short_term.short_term_memory").ShortTermMemory,
"save",
None,
)
wrap_function_wrapper(
module="crewai.memory.short_term.short_term_memory",
name="ShortTermMemory.save",
wrapper=short_term_memory_save_wrapper,
)

short_term_memory_search_wrapper = _ShortTermMemorySearchWrapper(tracer=self._tracer)
self._original_short_term_memory_search = getattr(
import_module("crewai.memory.short_term.short_term_memory").ShortTermMemory,
"search",
None,
)
wrap_function_wrapper(
module="crewai.memory.short_term.short_term_memory",
name="ShortTermMemory.search",
wrapper=short_term_memory_search_wrapper,
)

use_wrapper = _ToolUseWrapper(tracer=self._tracer)
self._original_tool_use = getattr(
import_module("crewai.tools.tool_usage").ToolUsage, "_use", None
Expand All @@ -99,6 +151,28 @@ def _uninstrument(self, **kwargs: Any) -> None:
crew_module.Flow.kickoff_async = self._original_flow_kickoff_async
self._original_flow_kickoff_async = None

if self._original_long_term_memory_save is not None:
long_term_memory_module = import_module("crewai.memory.long_term.long_term_memory")
long_term_memory_module.LongTermMemory.save = self._original_long_term_memory_save
self._original_long_term_memory_save = None

if self._original_long_term_memory_search is not None:
long_term_memory_module = import_module("crewai.memory.long_term.long_term_memory")
long_term_memory_module.LongTermMemory.search = self._original_long_term_memory_search
self._original_long_term_memory_search = None

if self._original_short_term_memory_save is not None:
short_term_memory_module = import_module("crewai.memory.short_term.short_term_memory")
short_term_memory_module.ShortTermMemory.save = self._original_short_term_memory_save
self._original_short_term_memory_save = None

if self._original_short_term_memory_search is not None:
short_term_memory_module = import_module("crewai.memory.short_term.short_term_memory")
short_term_memory_module.ShortTermMemory.search = (
self._original_short_term_memory_search
)
self._original_short_term_memory_search = None

if self._original_tool_use is not None:
tool_usage_module = import_module("crewai.tools.tool_usage")
tool_usage_module.ToolUsage._use = self._original_tool_use
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
import json
import time
from enum import Enum
from inspect import signature
from typing import Any, Callable, Iterator, List, Mapping, Optional, Tuple, cast
from typing import Any, Callable, Dict, Iterator, List, Mapping, Optional, Tuple, cast

from opentelemetry import context as context_api
from opentelemetry import trace as trace_api
Expand Down Expand Up @@ -43,6 +44,8 @@ def _flatten(mapping: Optional[Mapping[str, Any]]) -> Iterator[Tuple[str, Attrib
for index, sub_mapping in enumerate(value):
for sub_key, sub_value in _flatten(sub_mapping):
yield f"{key}.{index}.{sub_key}", sub_value
elif isinstance(value, List) and any(isinstance(item, str) for item in value):
value = ", ".join(map(str, value))
else:
if isinstance(value, Enum):
value = value.value
Expand Down Expand Up @@ -167,6 +170,21 @@ def _find_parent_agent(current_role: str, agents: List[Any]) -> Optional[str]:
return None


def _log_span_event(event_name: str, attributes: Dict[str, Any]) -> None:
"""Add a structured event with flattened attributes to the current active span."""
span = trace_api.get_current_span()
if not (span and span.is_recording()):
return

flattened_attributes = dict(_flatten(attributes))
span.add_event(event_name, flattened_attributes)

prefixed_attributes = {
f"{event_name}.{key}": value for key, value in flattened_attributes.items()
}
span.set_attributes(prefixed_attributes)


class _ExecuteCoreWrapper:
def __init__(self, tracer: trace_api.Tracer) -> None:
self._tracer = tracer
Expand Down Expand Up @@ -377,6 +395,204 @@ async def __call__(
return flow_output


class _LongTermMemorySaveWrapper:
def __init__(self, tracer: trace_api.Tracer) -> None:
self._tracer = tracer

def __call__(
self,
wrapped: Callable[..., Any],
instance: Any,
args: Tuple[Any, ...],
kwargs: Mapping[str, Any],
) -> Any:
if context_api.get_value(context_api._SUPPRESS_INSTRUMENTATION_KEY):
return wrapped(*args, **kwargs)

attributes: Dict[str, Any] = {}
try:
start_time = time.time()
response = wrapped(*args, **kwargs)
save_time_ms = (time.time() - start_time) * 1000

if args is not None:
item = args[0]
attributes.update(
{
"agent_role": getattr(item, "agent", None),
"value": getattr(item, "task", None),
"expected_output": getattr(item, "expected_output", None),
"datetime": getattr(item, "datetime", None),
"quality": getattr(item, "quality", None),
"metadata": getattr(item, "metadata", None),
"source_type": "long_term_memory",
"save_time_ms": save_time_ms,
}
)
except Exception as exception:
attributes.update(
{
"agent_role": getattr(item, "agent", None),
"value": getattr(item, "task", None),
"metadata": getattr(item, "metadata", None),
"error": str(exception),
"source_type": "long_term_memory",
}
)
raise

_log_span_event("long_term_memory.save", attributes)
return response


class _LongTermMemorySearchWrapper:
def __init__(self, tracer: trace_api.Tracer) -> None:
self._tracer = tracer

def __call__(
self,
wrapped: Callable[..., Any],
instance: Any,
args: Tuple[Any, ...],
kwargs: Mapping[str, Any],
) -> Any:
if context_api.get_value(context_api._SUPPRESS_INSTRUMENTATION_KEY):
return wrapped(*args, **kwargs)

attributes: Dict[str, Any] = {}
try:
start_time = time.time()
response = wrapped(*args, **kwargs)
query_time_ms = (time.time() - start_time) * 1000

if args is not None:
query = args[0]
limit = kwargs.get("latest_n", 3)
attributes.update(
{
"task": query,
"latest_n": limit,
"results": response,
"source_type": "long_term_memory",
"query_time_ms": query_time_ms,
}
)
except Exception as exception:
attributes.update(
{
"task": query,
"latest_n": limit,
"error": str(exception),
"source_type": "long_term_memory",
}
)
raise

_log_span_event("long_term_memory.search", attributes)
return response


class _ShortTermMemorySaveWrapper:
def __init__(self, tracer: trace_api.Tracer) -> None:
self._tracer = tracer

def __call__(
self,
wrapped: Callable[..., Any],
instance: Any,
args: Tuple[Any, ...],
kwargs: Mapping[str, Any],
) -> Any:
if context_api.get_value(context_api._SUPPRESS_INSTRUMENTATION_KEY):
return wrapped(*args, **kwargs)

attributes: Dict[str, Any] = {}
try:
start_time = time.time()
response = wrapped(*args, **kwargs)
save_time_ms = (time.time() - start_time) * 1000

if kwargs is not None:
attributes.update(
{
**kwargs,
"source_type": "short_term_memory",
"save_time_ms": save_time_ms,
}
)
except Exception as exception:
attributes.update(
{
**kwargs,
"error": str(exception),
"source_type": "short_term_memory",
}
)
raise

_log_span_event("short_term_memory.save", attributes)
return response


class _ShortTermMemorySearchWrapper:
def __init__(self, tracer: trace_api.Tracer) -> None:
self._tracer = tracer

def __call__(
self,
wrapped: Callable[..., Any],
instance: Any,
args: Tuple[Any, ...],
kwargs: Mapping[str, Any],
) -> Any:
if context_api.get_value(context_api._SUPPRESS_INSTRUMENTATION_KEY):
return wrapped(*args, **kwargs)

attributes: Dict[str, Any] = {}
try:
start_time = time.time()
response = wrapped(*args, **kwargs)
query_time_ms = (time.time() - start_time) * 1000

if args is not None:
try:
query = args[0]
except IndexError:
query = ""
try:
limit = args[1]
except IndexError:
limit = 5
try:
score_threshold = args[2]
except IndexError:
score_threshold = 0.6
attributes.update(
{
"query": query,
"limit": limit,
"score_threshold": score_threshold,
"results": response,
"source_type": "short_term_memory",
"query_time_ms": query_time_ms,
}
)
except Exception as exception:
attributes.update(
{
"query": query,
"limit": limit,
"score_threshold": score_threshold,
"error": str(exception),
"source_type": "short_term_memory",
}
)
raise

_log_span_event("short_term_memory.search", attributes)
return response


class _ToolUseWrapper:
def __init__(self, tracer: trace_api.Tracer) -> None:
self._tracer = tracer
Expand Down