Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -38,4 +38,4 @@ jobs:
workflows:
setup:
jobs:
- setup
- setup
5 changes: 5 additions & 0 deletions ddtrace/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,11 @@


telemetry.install_excepthook()
# In order to support 3.12, we start the writer upon initialization.
# See https://github.com/python/cpython/pull/104826.
# Telemetry events will only be sent after the `app-started` is queued.
# This will occur when the agent writer starts.
telemetry.telemetry_writer.enable()

from ._monkey import patch # noqa: E402
from ._monkey import patch_all # noqa: E402
Expand Down
6 changes: 4 additions & 2 deletions ddtrace/internal/telemetry/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,10 @@ def _excepthook(tp, value, root_traceback):
error_msg = "{}:{} {}".format(filename, lineno, str(value))
telemetry_writer.add_integration(integration_name, True, error_msg=error_msg)

telemetry_writer.app_shutdown()
telemetry_writer.disable()
if telemetry_writer.started is False:
telemetry_writer._app_started_event(False)
telemetry_writer._app_dependencies_loaded_event()
telemetry_writer.app_shutdown()

return _ORIGINAL_EXCEPTHOOK(tp, value, root_traceback)

Expand Down
6 changes: 3 additions & 3 deletions ddtrace/internal/telemetry/data.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,12 @@
from typing import List
from typing import Tuple

import ddtrace
from ddtrace.internal.compat import PY3
from ddtrace.internal.constants import DEFAULT_SERVICE_NAME
from ddtrace.internal.packages import get_distributions
from ddtrace.internal.runtime.container import get_container_info
from ddtrace.internal.utils.cache import cached
from ddtrace.version import get_version

from ...settings import _config as config
from ..hostname import get_hostname
Expand Down Expand Up @@ -63,7 +63,7 @@ def _get_application(key):
"env": env or "",
"language_name": "python",
"language_version": _format_version_info(sys.version_info),
"tracer_version": ddtrace.__version__,
"tracer_version": get_version(),
"runtime_name": platform.python_implementation(),
"runtime_version": _format_version_info(sys.implementation.version) if PY3 else "",
"products": _get_products(),
Expand All @@ -88,7 +88,7 @@ def get_application(service, version, env):
def _get_products():
# type: () -> Dict
return {
"appsec": {"version": ddtrace.__version__, "enabled": config._appsec_enabled},
"appsec": {"version": get_version(), "enabled": config._appsec_enabled},
}


Expand Down
18 changes: 6 additions & 12 deletions ddtrace/internal/telemetry/writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,6 @@ def enable(self):

if self._is_periodic:
self.start()
atexit.register(self.app_shutdown)
return True

self.status = ServiceStatus.RUNNING
Expand Down Expand Up @@ -291,14 +290,18 @@ def add_error(self, code, msg, filename, line_number):
msg = "%s:%s: %s" % (filename, line_number, msg)
self._error = (code, msg)

def _app_started_event(self):
# type: () -> None
def _app_started_event(self, register_app_shutdown=True):
# type: (bool) -> None
"""Sent when TelemetryWriter is enabled or forks"""
if self._forked:
# app-started events should only be sent by the main process
return
# List of configurations to be collected

self.started = True
if register_app_shutdown:
atexit.register(self.app_shutdown)

self.add_configurations(
[
(TELEMETRY_TRACING_ENABLED, config._tracing_enabled, "unknown"),
Expand Down Expand Up @@ -595,15 +598,6 @@ def periodic(self, force_flush=False):
for telemetry_event in telemetry_events:
self._client.send_event(telemetry_event)

def start(self, *args, **kwargs):
# type: (...) -> None
super(TelemetryWriter, self).start(*args, **kwargs)
# Queue app-started event after the telemetry worker thread is running
if self.started is False:
self._app_started_event()
self._app_dependencies_loaded_event()
self.started = True

def app_shutdown(self):
self._app_closing_event()
self.periodic(force_flush=True)
Expand Down
3 changes: 2 additions & 1 deletion ddtrace/internal/writer/writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -627,7 +627,8 @@ def _send_payload(self, payload, count, client):
def start(self):
super(AgentWriter, self).start()
try:
telemetry_writer.enable()
telemetry_writer._app_started_event()
telemetry_writer._app_dependencies_loaded_event()

# appsec remote config should be enabled/started after the global tracer and configs
# are initialized
Expand Down
82 changes: 45 additions & 37 deletions tests/telemetry/test_telemetry.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,12 @@
def test_enable(test_agent_session, run_python_code_in_subprocess):
code = """
from ddtrace.internal.telemetry import telemetry_writer
from ddtrace.internal.service import ServiceStatus

telemetry_writer.enable()

assert telemetry_writer.status == ServiceStatus.RUNNING
assert telemetry_writer._worker is not None
"""

stdout, stderr, status, _ = run_python_code_in_subprocess(code)
Expand All @@ -17,26 +22,10 @@ def test_enable(test_agent_session, run_python_code_in_subprocess):
assert stdout == b"", stderr
assert stderr == b""

events = test_agent_session.get_events()
assert len(events) == 3

# Same runtime id is used
assert events[0]["runtime_id"] == events[1]["runtime_id"]
assert events[0]["request_type"] == "app-closing"
assert events[1]["request_type"] == "app-dependencies-loaded"
assert events[2]["request_type"] == "app-started"
assert events[2]["payload"]["error"] == {"code": 0, "message": ""}


@pytest.mark.snapshot
def test_telemetry_enabled_on_first_tracer_flush(test_agent_session, ddtrace_run_python_code_in_subprocess):
"""assert telemetry events are generated after the first trace is flushed to the agent"""
# Using ddtrace-run and/or importing ddtrace alone should not enable telemetry
# Telemetry data should only be sent after the first trace to the agent
_, stderr, status, _ = ddtrace_run_python_code_in_subprocess("import ddtrace")
assert status == 0, stderr
# No trace and No Telemetry
assert len(test_agent_session.get_events()) == 0

# Submit a trace to the agent in a subprocess
code = 'from ddtrace import tracer; span = tracer.trace("test-telemetry"); span.finish()'
Expand All @@ -58,13 +47,19 @@ def test_telemetry_enabled_on_first_tracer_flush(test_agent_session, ddtrace_run
def test_enable_fork(test_agent_session, run_python_code_in_subprocess):
"""assert app-started/app-closing events are only sent in parent process"""
code = """
import warnings
# This test logs the following warning in py3.12:
# This process (pid=402) is multi-threaded, use of fork() may lead to deadlocks in the child
warnings.filterwarnings("ignore", category=DeprecationWarning)

import os

from ddtrace.internal.runtime import get_runtime_id
from ddtrace.internal.telemetry import telemetry_writer

# We have to start before forking since fork hooks are not enabled until after enabling
telemetry_writer.enable()
telemetry_writer._app_started_event()

if os.fork() == 0:
# Send multiple started events to confirm none get sent
Expand All @@ -78,27 +73,29 @@ def test_enable_fork(test_agent_session, run_python_code_in_subprocess):

stdout, stderr, status, _ = run_python_code_in_subprocess(code)
assert status == 0, stderr
assert stderr == b""
assert stderr == b"", stderr

runtime_id = stdout.strip().decode("utf-8")

requests = test_agent_session.get_requests()

# We expect 2 events from the parent process to get sent, but none from the child process
assert len(requests) == 3
assert len(requests) == 2
# Validate that the runtime id sent for every event is the parent processes runtime id
assert requests[0]["body"]["runtime_id"] == runtime_id
assert requests[0]["body"]["request_type"] == "app-closing"
assert requests[1]["body"]["runtime_id"] == runtime_id
assert requests[1]["body"]["request_type"] == "app-dependencies-loaded"
assert requests[1]["body"]["runtime_id"] == runtime_id
assert requests[2]["body"]["request_type"] == "app-started"
assert requests[2]["body"]["runtime_id"] == runtime_id
assert requests[1]["body"]["request_type"] == "app-started"


def test_enable_fork_heartbeat(test_agent_session, run_python_code_in_subprocess):
"""assert app-heartbeat events are only sent in parent process when no other events are queued"""
code = """
import warnings
# This test logs the following warning in py3.12:
# This process (pid=402) is multi-threaded, use of fork() may lead to deadlocks in the child
warnings.filterwarnings("ignore", category=DeprecationWarning)

import os

from ddtrace.internal.runtime import get_runtime_id
Expand All @@ -120,7 +117,7 @@ def test_enable_fork_heartbeat(test_agent_session, run_python_code_in_subprocess

stdout, stderr, status, _ = run_python_code_in_subprocess(code)
assert status == 0, stderr
assert stderr == b""
assert stderr == b"", stderr

runtime_id = stdout.strip().decode("utf-8")

Expand All @@ -138,6 +135,11 @@ def test_heartbeat_interval_configuration(run_python_code_in_subprocess):
env = os.environ.copy()
env["DD_TELEMETRY_HEARTBEAT_INTERVAL"] = "61"
code = """
import warnings
# This test logs the following warning in py3.12:
# This process (pid=402) is multi-threaded, use of fork() may lead to deadlocks in the child
warnings.filterwarnings("ignore", category=DeprecationWarning)

from ddtrace import config
assert config._telemetry_heartbeat_interval == 61

Expand All @@ -156,6 +158,11 @@ def test_logs_after_fork(run_python_code_in_subprocess):
# Regression test: telemetry writer should not log an error when a process forks
_, err, status, _ = run_python_code_in_subprocess(
"""
import warnings
# This test logs the following warning in py3.12:
# This process (pid=402) is multi-threaded, use of fork() may lead to deadlocks in the child
warnings.filterwarnings("ignore", category=DeprecationWarning)

import ddtrace
import logging
import os
Expand All @@ -167,7 +174,7 @@ def test_logs_after_fork(run_python_code_in_subprocess):
)

assert status == 0, err
assert err == b""
assert err == b"", err


def test_app_started_error_handled_exception(test_agent_session, run_python_code_in_subprocess):
Expand Down Expand Up @@ -250,6 +257,9 @@ def test_handled_integration_error(test_agent_session, run_python_code_in_subpro

from ddtrace import patch, tracer
patch(raise_errors=False, sqlite3=True)

# Create a span to start the telemetry writer
tracer.trace("hi").finish()
"""

_, stderr, status, _ = run_python_code_in_subprocess(code)
Expand All @@ -260,15 +270,11 @@ def test_handled_integration_error(test_agent_session, run_python_code_in_subpro

events = test_agent_session.get_events()

assert len(events) == 5
# Same runtime id is used
assert (
events[0]["runtime_id"]
== events[1]["runtime_id"]
== events[2]["runtime_id"]
== events[3]["runtime_id"]
== events[4]["runtime_id"]
)
assert len(events) > 1
for event in events:
# Same runtime id is used
assert event["runtime_id"] == events[0]["runtime_id"]

integrations_events = [event for event in events if event["request_type"] == "app-integrations-change"]

assert len(integrations_events) == 1
Expand All @@ -277,12 +283,14 @@ def test_handled_integration_error(test_agent_session, run_python_code_in_subpro
== "failed to import ddtrace module 'ddtrace.contrib.sqlite3' when patching on import"
)

metric_events = [event for event in events if event["request_type"] == "generate-metrics"]

metric_events = [
event
for event in events
if event["request_type"] == "generate-metrics"
and event["payload"]["series"][0]["metric"] == "integration_errors"
]
assert len(metric_events) == 1
assert metric_events[0]["payload"]["namespace"] == "tracers"
assert len(metric_events[0]["payload"]["series"]) == 1
assert metric_events[0]["payload"]["series"][0]["metric"] == "integration_errors"
assert metric_events[0]["payload"]["series"][0]["type"] == "count"
assert len(metric_events[0]["payload"]["series"][0]["points"]) == 1
assert metric_events[0]["payload"]["series"][0]["points"][0][1] == 1
Expand Down
11 changes: 2 additions & 9 deletions tests/telemetry/test_telemetry_metrics_e2e.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
import os
import subprocess
import sys
import time

import pytest

Expand All @@ -28,8 +27,6 @@ def _build_env():
def gunicorn_server(telemetry_metrics_enabled="true", token=None):
cmd = ["ddtrace-run", "gunicorn", "-w", "1", "-b", "0.0.0.0:8000", "tests.telemetry.app:app"]
env = _build_env()
env["DD_TELEMETRY_METRICS_ENABLED"] = telemetry_metrics_enabled
env["DD_TELEMETRY_HEARTBEAT_INTERVAL"] = "1.0"
env["_DD_TRACE_WRITER_ADDITIONAL_HEADERS"] = "X-Datadog-Test-Session-Token:{}".format(token)
env["DD_TRACE_AGENT_URL"] = os.environ.get("DD_TRACE_AGENT_URL", "")
env["DD_TRACE_DEBUG"] = "true"
Expand Down Expand Up @@ -90,19 +87,15 @@ def test_telemetry_metrics_enabled_on_gunicorn_child_process(test_agent_session)
gunicorn_client.get("/count_metric")
response = gunicorn_client.get("/count_metric")
assert response.status_code == 200
# DD_TELEMETRY_HEARTBEAT_INTERVAL is set to 1 second
time.sleep(1)
gunicorn_client.get("/count_metric")
response = gunicorn_client.get("/count_metric")
assert response.status_code == 200

events = test_agent_session.get_events()
metrics = list(filter(lambda event: event["request_type"] == "generate-metrics", events))
assert len(metrics) == 2
assert len(metrics) == 1
assert metrics[0]["payload"]["series"][0]["metric"] == "test_metric"
assert metrics[0]["payload"]["series"][0]["points"][0][1] == 2.0
assert metrics[1]["payload"]["series"][0]["metric"] == "test_metric"
assert metrics[1]["payload"]["series"][0]["points"][0][1] == 3.0
assert metrics[0]["payload"]["series"][0]["points"][0][1] == 5


def test_span_creation_and_finished_metrics_datadog(test_agent_session, ddtrace_run_python_code_in_subprocess):
Expand Down
11 changes: 5 additions & 6 deletions tests/telemetry/test_writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -375,8 +375,8 @@ def test_send_failing_request(mock_status, telemetry_writer):
with httpretty.enabled():
httpretty.register_uri(httpretty.POST, telemetry_writer._client.url, status=mock_status)
with mock.patch("ddtrace.internal.telemetry.writer.log") as log:
# sends failing app-closing event
telemetry_writer.app_shutdown()
# sends failing app-heartbeat event
telemetry_writer.periodic()
# asserts unsuccessful status code was logged
log.debug.assert_called_with(
"failed to send telemetry to the Datadog Agent at %s. response: %s",
Expand All @@ -395,13 +395,11 @@ def test_telemetry_graceful_shutdown(telemetry_writer, test_agent_session, mock_
telemetry_writer.app_shutdown()

events = test_agent_session.get_events()
assert len(events) == 3
assert len(events) == 1

# Reverse chronological order
assert events[0]["request_type"] == "app-closing"
assert events[0] == _get_request_body({}, "app-closing", 3)
assert events[1]["request_type"] == "app-dependencies-loaded"
assert events[2]["request_type"] == "app-started"
assert events[0] == _get_request_body({}, "app-closing", 1)


def test_app_heartbeat_event_periodic(mock_time, telemetry_writer, test_agent_session):
Expand All @@ -410,6 +408,7 @@ def test_app_heartbeat_event_periodic(mock_time, telemetry_writer, test_agent_se

# Ensure telemetry writer is initialized to send periodic events
telemetry_writer._is_periodic = True
telemetry_writer.started = True
# Assert default telemetry interval is 10 seconds and the expected periodic threshold and counts are set
assert telemetry_writer.interval == 10
assert telemetry_writer._periodic_threshold == 5
Expand Down