Skip to content
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 57 additions & 0 deletions docs/guides/code_examples/open_telemetry/instrument_crawler.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
import asyncio

from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
from opentelemetry.sdk.resources import Resource
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import SimpleSpanProcessor
from opentelemetry.trace import set_tracer_provider

from crawlee.crawlers import BasicCrawlingContext, ParselCrawler, ParselCrawlingContext
from crawlee.otel import CrawlerInstrumentor
from crawlee.storages import Dataset, KeyValueStore, RequestQueue


def instrument_crawler() -> None:
"""Add instrumentation to the crawler."""
resource = Resource.create(
{
'service.name': 'ExampleCrawler',
'service.version': '1.0.0',
'environment': 'development',
}
)

# Set up the OpenTelemetry tracer provider and exporter
provider = TracerProvider(resource=resource)
otlp_exporter = OTLPSpanExporter(endpoint='localhost:4317', insecure=True)
provider.add_span_processor(SimpleSpanProcessor(otlp_exporter))
set_tracer_provider(provider)
# Instrument the crawler with OpenTelemetry
CrawlerInstrumentor(
instrument_classes=[RequestQueue, KeyValueStore, Dataset]
).instrument()


async def main() -> None:
"""Run the crawler."""
instrument_crawler()

crawler = ParselCrawler(max_requests_per_crawl=100)
kvs = await KeyValueStore.open()

@crawler.pre_navigation_hook
async def pre_nav_hook(_: BasicCrawlingContext) -> None:
# Simulate some pre-navigation processing
await asyncio.sleep(0.01)

@crawler.router.default_handler
async def handler(context: ParselCrawlingContext) -> None:
await context.push_data({'url': context.request.url})
await kvs.set_value(key='url', value=context.request.url)
await context.enqueue_links()

await crawler.run(['https://crawlee.dev/'])


if __name__ == '__main__':
asyncio.run(main())
54 changes: 54 additions & 0 deletions docs/guides/open_telemetry.mdx
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
---
id: otel
title: Trace and optimize crawlers
description: How to instrument crawlers with OpenTelemetry
---

import ApiLink from '@site/src/components/ApiLink';
import CodeBlock from '@theme/CodeBlock';

import InstrumentCrawler from '!!raw-loader!./code_examples/open_telemetry/instrument_crawler.py';

[OpenTelemtery](https://opentelemetry.io/) is a collection of APIs, SDKs, and tools to instrument, generate, collect, and export telemetry data (metrics, logs, and traces) to help you analyze your software’s performance and behavior. In the context of crawler development, it can be used to better understand how the crawler internally works, identify bottlenecks, debug, log metrics, and more. The topic described in this guide requires at least a basic understanding of OpenTelemetry. A good place to start is [What is open telemetry](https://opentelemetry.io/docs/what-is-opentelemetry/).

In this guide, it will be shown how to set up OpenTelemetry and instrument a specific crawler to see traces of individual requests that are being processed by the crawler. OpenTelemetry on its own does not provide out of the box tool for convenient visualisation of the exported data (apart from printing to the console), but there are several good available tools to do that. In this guide, we will use [Jaeger](https://www.jaegertracing.io/) to visualise the telemetry data. To better understand concepts such as exporter, collector, and visualisation backend, please refer to the [OpenTelemetry documentation](https://opentelemetry.io/docs/collector/).


## Set up the Jaeger

This guide will show how to set up the environment locally to run the example code and visualize the telemetry data in Jaeger that will be running locally in a [docker](https://www.docker.com/) container.

To start the preconfigured Docker container, you can use the following command:

```bash
docker run -d --name jaeger -e COLLECTOR_OTLP_ENABLED=true -p 16686:16686 -p 4317:4317 -p 4318:4318 jaegertracing/all-in-one:latest
```
For more details about the Jaeger setup, see the [getting started](https://www.jaegertracing.io/docs/2.7/getting-started/) section in their documentation.
You can see the Jaeger UI in your browser by navigating to http://localhost:16686

## Instrument the Crawler

Now you can proceed with instrumenting the crawler to send the telemetry data to Jaeger and running it. To have the Python environment ready, you should install either **crawlee[all]** or **crawlee[otel]**, This will ensure that OpenTelemetry dependencies are installed, and you can run the example code snippet.
In the following example, you can see the function `instrument_crawler` that contains the instrumentation setup and is called before the crawler is started. If you have already set up the Jaeger, then you can just run the following code snippet.

<CodeBlock className="language-python">
{InstrumentCrawler}
</CodeBlock>

## Analyze the results

In the Jaeger UI, you can search for different traces, apply filtering, compare traces, view their detailed attributes, view timing details, and more. For the detailed description of the tool's capabilities, please refer to the [Jaeger documentation](https://www.jaegertracing.io/docs/1.47/deployment/frontend-ui/#trace-page).

![Jaeger search view](/img/guides/jaeger_otel_search_view_example.png 'Example visualisation of search view in Jaeger')
![Jaeger trace view](/img/guides/jaeger_otel_trace_example.png 'Example visualisation of crawler request trace in Jaeger')

You can use different tools to consume the OpenTelemetry data that might better suit your needs. Please see the list of known Vendors in [OpenTelemetry documentation](https://opentelemetry.io/ecosystem/vendors/).

## Customize the instrumentation

You can customize the <ApiLink to="class/CrawlerInstrumentor">`CrawlerInstrumentor`</ApiLink>. Depending on the arguments used during its initialization, the instrumentation will be applied to different parts ot the Crawlee code. By default, it instruments some functions that can give quite a good picture of each individual request handling. To turn this default instrumentation off, you can pass `request_handling_instrumentation=False` during initialization. You can also extend instrumentation by passing `instrument_classes=[...]` initialization argument that contains classes you want to be auto-instrumented. All their public methods will be automatically instrumented. Bear in mind that instrumentation has some runtime costs as well. The more instrumentation is used, the more overhead it will add to the crawler execution.


You can also create your instrumentation by selecting only the methods you want to instrument. For more details, see the <ApiLink to="class/CrawlerInstrumentor">`CrawlerInstrumentor`</ApiLink> source code and the [Python documentation for OpenTelemetry](https://opentelemetry.io/docs/languages/python/).

If you have questions or need assistance, feel free to reach out on our [GitHub](https://github.com/apify/crawlee-python) or join our [Discord community](https://discord.com/invite/jyEM2PRvMU).
17 changes: 17 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -61,10 +61,17 @@ all = [
"html5lib>=1.0",
"inquirer>=3.3.0",
"jaro-winkler>=2.0.3",
"opentelemetry-api>=1.34.1",
"opentelemetry-distro[otlp]>=0.54",
"opentelemetry-instrumentation>=0.54",
"opentelemetry-instrumentation-httpx>=0.54",
"opentelemetry-sdk>=1.34.1",
"opentelemetry-semantic-conventions>=0.54",
"parsel>=1.10.0",
"playwright>=1.27.0",
"scikit-learn>=1.6.0",
"typer>=0.12.0",
"wrapt>=1.17.0",
]
adaptive-crawler = [
"jaro-winkler>=2.0.3",
Expand All @@ -76,6 +83,15 @@ cli = ["cookiecutter>=2.6.0", "inquirer>=3.3.0", "rich>=13.9.0", "typer>=0.12.0"
curl-impersonate = ["curl-cffi>=0.9.0"]
parsel = ["parsel>=1.10.0"]
playwright = ["playwright>=1.27.0"]
otel = [
"opentelemetry-api>=1.34.1",
"opentelemetry-distro[otlp]>=0.54",
"opentelemetry-instrumentation>=0.54",
"opentelemetry-instrumentation-httpx>=0.54",
"opentelemetry-sdk>=1.34.1",
"opentelemetry-semantic-conventions>=0.54",
"wrapt>=1.17.0",
]

[project.scripts]
crawlee = "crawlee._cli:cli"
Expand Down Expand Up @@ -254,6 +270,7 @@ module = [
"cookiecutter.*", # Untyped and stubs not available
"inquirer.*", # Untyped and stubs not available
"warcio.*", # Example code shows WARC files creation.
"wrapt" # Untyped and stubs not available
]
ignore_missing_imports = true

Expand Down
49 changes: 36 additions & 13 deletions src/crawlee/crawlers/_basic/_context_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,38 @@
TMiddlewareCrawlingContext = TypeVar('TMiddlewareCrawlingContext', bound=BasicCrawlingContext)


class _Middleware(Generic[TMiddlewareCrawlingContext, TCrawlingContext]):
"""Helper wrapper class to make the middleware easily observable by open telemetry instrumentation."""

def __init__(
self,
middleware: Callable[
[TCrawlingContext],
AsyncGenerator[TMiddlewareCrawlingContext, Exception | None],
],
input_context: TCrawlingContext,
) -> None:
self.generator = middleware(input_context)
self.input_context = input_context
self.output_context: TMiddlewareCrawlingContext | None = None

async def action(self) -> TMiddlewareCrawlingContext:
self.output_context = await self.generator.__anext__()
return self.output_context

async def cleanup(self, final_consumer_exception: Exception | None) -> None:
try:
await self.generator.asend(final_consumer_exception)
except StopAsyncIteration:
pass
except ContextPipelineInterruptedError as e:
raise RuntimeError('Invalid state - pipeline interrupted in the finalization step') from e
except Exception as e:
raise ContextPipelineFinalizationError(e, self.output_context or self.input_context) from e
else:
raise RuntimeError('The middleware yielded more than once')


@docs_group('Classes')
class ContextPipeline(Generic[TCrawlingContext]):
"""Encapsulates the logic of gradually enhancing the crawling context with additional information and utilities.
Expand Down Expand Up @@ -57,15 +89,15 @@ async def __call__(
Exceptions from the consumer function are wrapped together with the final crawling context.
"""
chain = list(self._middleware_chain())
cleanup_stack: list[AsyncGenerator[Any, Exception | None]] = []
cleanup_stack: list[_Middleware[Any]] = []
final_consumer_exception: Exception | None = None

try:
for member in reversed(chain):
if member._middleware: # noqa: SLF001
middleware_instance = member._middleware(crawling_context) # noqa: SLF001
middleware_instance = _Middleware(middleware=member._middleware, input_context=crawling_context) # noqa: SLF001
try:
result = await middleware_instance.__anext__()
result = await middleware_instance.action()
except SessionError: # Session errors get special treatment
raise
except StopAsyncIteration as e:
Expand All @@ -88,16 +120,7 @@ async def __call__(
raise RequestHandlerError(e, crawling_context) from e
finally:
for middleware_instance in reversed(cleanup_stack):
try:
result = await middleware_instance.asend(final_consumer_exception)
except StopAsyncIteration: # noqa: PERF203
pass
except ContextPipelineInterruptedError as e:
raise RuntimeError('Invalid state - pipeline interrupted in the finalization step') from e
except Exception as e:
raise ContextPipelineFinalizationError(e, crawling_context) from e
else:
raise RuntimeError('The middleware yielded more than once')
await middleware_instance.cleanup(final_consumer_exception)

def compose(
self,
Expand Down
5 changes: 5 additions & 0 deletions src/crawlee/otel/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
from crawlee.otel.crawler_instrumentor import CrawlerInstrumentor

__all__ = [
'CrawlerInstrumentor',
]
152 changes: 152 additions & 0 deletions src/crawlee/otel/crawler_instrumentor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
from __future__ import annotations

import inspect
from typing import TYPE_CHECKING, Any

from opentelemetry.instrumentation.instrumentor import ( # type:ignore[attr-defined] # Mypy has troubles with OTEL
BaseInstrumentor,
)
from opentelemetry.instrumentation.utils import unwrap
from opentelemetry.semconv.attributes.code_attributes import CODE_FUNCTION_NAME
from opentelemetry.semconv.attributes.http_attributes import HTTP_REQUEST_METHOD
from opentelemetry.semconv.attributes.url_attributes import URL_FULL
from opentelemetry.trace import get_tracer
from wrapt import wrap_function_wrapper

from crawlee._utils.docs import docs_group
from crawlee.crawlers import BasicCrawler, ContextPipeline
from crawlee.crawlers._basic._context_pipeline import _Middleware

if TYPE_CHECKING:
from collections.abc import Callable

from crawlee.crawlers import BasicCrawlingContext


@docs_group('Classes')
class CrawlerInstrumentor(BaseInstrumentor):
"""Helper class for instrumenting crawlers with OpenTelemetry."""

def __init__(
self, *, instrument_classes: list[type] | None = None, request_handling_instrumentation: bool = True
) -> None:
"""Initialize the instrumentor.

Args:
instrument_classes: List of classes to be instrumented - all their public methods and coroutines will be
wrapped by generic instrumentation wrapper that will create spans for them.
request_handling_instrumentation: Handpicked most interesting methods to instrument in the request handling
pipeline.
"""
self._tracer = get_tracer(__name__)

async def _simple_async_wrapper(wrapped: Any, _: Any, args: Any, kwargs: Any) -> Any:
with self._tracer.start_as_current_span(
name=wrapped.__name__, attributes={CODE_FUNCTION_NAME: wrapped.__qualname__}
):
return await wrapped(*args, **kwargs)

def _simple_wrapper(wrapped: Any, _: Any, args: Any, kwargs: Any) -> Any:
with self._tracer.start_as_current_span(
name=wrapped.__name__, attributes={CODE_FUNCTION_NAME: wrapped.__qualname__}
):
return wrapped(*args, **kwargs)

def _init_wrapper(wrapped: Any, _: Any, args: Any, kwargs: Any) -> None:
with self._tracer.start_as_current_span(
name=wrapped.__name__, attributes={CODE_FUNCTION_NAME: wrapped.__qualname__}
):
wrapped(*args, **kwargs)

self._instrumented: list[tuple[Any, str, Callable]] = []
self._simple_wrapper = _simple_wrapper
self._simple_async_wrapper = _simple_async_wrapper
self._init_wrapper = _init_wrapper

if instrument_classes:
for _class in instrument_classes:
self._instrument_all_public_methods(on_class=_class)

if request_handling_instrumentation:

async def middlware_wrapper(wrapped: Any, instance: _Middleware, args: Any, kwargs: Any) -> Any:
with self._tracer.start_as_current_span(
name=f'{instance.generator.__name__}, {wrapped.__name__}', # type:ignore[attr-defined] # valid in our context
attributes={
URL_FULL: instance.input_context.request.url,
CODE_FUNCTION_NAME: instance.generator.__qualname__, # type:ignore[attr-defined] # valid in our context
},
):
return await wrapped(*args, **kwargs)

async def context_pipeline_wrapper(
wrapped: Any, _: ContextPipeline[BasicCrawlingContext], args: Any, kwargs: Any
) -> Any:
context = args[0]
final_context_consumer = args[1]

async def wrapped_final_consumer(*args: Any, **kwargs: Any) -> Any:
with self._tracer.start_as_current_span(
name='request_handler',
attributes={URL_FULL: context.request.url, HTTP_REQUEST_METHOD: context.request.method},
):
return await final_context_consumer(*args, **kwargs)

with self._tracer.start_as_current_span(
name='ContextPipeline',
attributes={URL_FULL: context.request.url, HTTP_REQUEST_METHOD: context.request.method},
):
return await wrapped(context, wrapped_final_consumer, **kwargs)

async def _commit_request_handler_result_wrapper(
wrapped: Callable[[Any], Any], _: BasicCrawler, args: Any, kwargs: Any
) -> Any:
context = args[0]
with self._tracer.start_as_current_span(
name='Commit results',
attributes={URL_FULL: context.request.url, HTTP_REQUEST_METHOD: context.request.method},
):
return await wrapped(*args, **kwargs)

# Handpicked interesting methods to instrument
self._instrumented.extend(
[
(_Middleware, 'action', middlware_wrapper),
(_Middleware, 'cleanup', middlware_wrapper),
(ContextPipeline, '__call__', context_pipeline_wrapper),
(BasicCrawler, '_BasicCrawler__run_task_function', self._simple_async_wrapper),
(BasicCrawler, '_commit_request_handler_result', _commit_request_handler_result_wrapper),
]
)

def instrumentation_dependencies(self) -> list[str]:
"""Return a list of python packages with versions that will be instrumented."""
return ['crawlee']

def _instrument_all_public_methods(self, on_class: type) -> None:
public_coroutines = {
name
for name, member in inspect.getmembers(on_class, predicate=inspect.iscoroutinefunction)
if not name.startswith('_')
}
public_methods = {
name
for name, member in inspect.getmembers(on_class, predicate=inspect.isfunction)
if not name.startswith('_')
} - public_coroutines

for coroutine in public_coroutines:
self._instrumented.append((on_class, coroutine, self._simple_async_wrapper))

for method in public_methods:
self._instrumented.append((on_class, method, self._simple_wrapper))

self._instrumented.append((on_class, '__init__', self._init_wrapper))

def _instrument(self, **_: Any) -> None:
for _class, method, wrapper in self._instrumented:
wrap_function_wrapper(_class, method, wrapper)

def _uninstrument(self, **_: Any) -> None:
for _class, method, wrapper in self._instrumented: # noqa: B007
unwrap(_class, method)
Loading