Skip to content
This repository was archived by the owner on May 23, 2023. It is now read-only.

Commit 170f927

Browse files
condorcetyurishkuro
andcommitted
Add AsyncioScopeManager based on contextvars and supporting Tornado 6 (#118)
* Asyncio context manager with contextvars. Add different versions of tornado to travis.yml. * Make new context manager based on python 3.7 contextvars. * Inherit ContextVarsScopeManagerFix directly from ScopeManager, fix docstrings and README * Update testbed/test_multiple_callbacks/README.md Co-Authored-By: Yuri Shkuro <[email protected]> * Update testbed/test_nested_callbacks/README.md Co-Authored-By: Yuri Shkuro <[email protected]> * Update testbed/test_subtask_span_propagation/README.md Co-Authored-By: Yuri Shkuro <[email protected]> * Fix typo in testbed docs * Remove obsolete description from testbed docs * Update testbed/test_common_request_handler/README.md Co-Authored-By: Yuri Shkuro <[email protected]>
1 parent 7d2e62b commit 170f927

File tree

26 files changed

+1033
-29
lines changed

26 files changed

+1033
-29
lines changed

.travis.yml

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,12 +8,21 @@ python:
88
- "3.7"
99
- "3.8-dev"
1010

11+
env:
12+
- TORNADO=">=4,<5"
13+
- TORNADO=">=5,<6"
14+
- TORNADO=">=6"
15+
1116
matrix:
1217
allow_failures:
1318
- python: "3.8-dev"
19+
exclude:
20+
- python: "2.7"
21+
env: TORNADO=">=6"
1422

1523
install:
1624
- make bootstrap
25+
- pip install -q "tornado$TORNADO"
1726

1827
script:
1928
- make test testbed lint
Lines changed: 131 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,131 @@
1+
# Copyright (c) The OpenTracing Authors.
2+
#
3+
# Permission is hereby granted, free of charge, to any person obtaining a copy
4+
# of this software and associated documentation files (the "Software"), to deal
5+
# in the Software without restriction, including without limitation the rights
6+
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
7+
# copies of the Software, and to permit persons to whom the Software is
8+
# furnished to do so, subject to the following conditions:
9+
#
10+
# The above copyright notice and this permission notice shall be included in
11+
# all copies or substantial portions of the Software.
12+
#
13+
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
14+
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15+
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16+
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17+
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
18+
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
19+
# THE SOFTWARE.
20+
21+
from __future__ import absolute_import
22+
23+
from contextlib import contextmanager
24+
from contextvars import ContextVar
25+
26+
from opentracing import Scope, ScopeManager
27+
28+
29+
_SCOPE = ContextVar('scope')
30+
31+
32+
class ContextVarsScopeManager(ScopeManager):
33+
"""
34+
:class:`~opentracing.ScopeManager` implementation for **asyncio**
35+
that stores the :class:`~opentracing.Scope` using ContextVar.
36+
37+
The scope manager provides automatic :class:`~opentracing.Span` propagation
38+
from parent coroutines, tasks and scheduled in event loop callbacks to
39+
their children.
40+
41+
.. code-block:: python
42+
43+
async def child_coroutine():
44+
# No need manual activation of parent span in child coroutine.
45+
with tracer.start_active_span('child') as scope:
46+
...
47+
48+
async def parent_coroutine():
49+
with tracer.start_active_span('parent') as scope:
50+
...
51+
await child_coroutine()
52+
...
53+
54+
"""
55+
56+
def activate(self, span, finish_on_close):
57+
"""
58+
Make a :class:`~opentracing.Span` instance active.
59+
60+
:param span: the :class:`~opentracing.Span` that should become active.
61+
:param finish_on_close: whether *span* should automatically be
62+
finished when :meth:`Scope.close()` is called.
63+
64+
:return: a :class:`~opentracing.Scope` instance to control the end
65+
of the active period for the :class:`~opentracing.Span`.
66+
It is a programming error to neglect to call :meth:`Scope.close()`
67+
on the returned instance.
68+
"""
69+
70+
return self._set_scope(span, finish_on_close)
71+
72+
@property
73+
def active(self):
74+
"""
75+
Return the currently active :class:`~opentracing.Scope` which
76+
can be used to access the currently active :attr:`Scope.span`.
77+
78+
:return: the :class:`~opentracing.Scope` that is active,
79+
or ``None`` if not available.
80+
"""
81+
82+
return self._get_scope()
83+
84+
def _set_scope(self, span, finish_on_close):
85+
return _ContextVarsScope(self, span, finish_on_close)
86+
87+
def _get_scope(self):
88+
return _SCOPE.get(None)
89+
90+
91+
class _ContextVarsScope(Scope):
92+
def __init__(self, manager, span, finish_on_close):
93+
super(_ContextVarsScope, self).__init__(manager, span)
94+
self._finish_on_close = finish_on_close
95+
self._token = _SCOPE.set(self)
96+
97+
def close(self):
98+
if self.manager.active is not self:
99+
return
100+
101+
_SCOPE.reset(self._token)
102+
103+
if self._finish_on_close:
104+
self.span.finish()
105+
106+
107+
@contextmanager
108+
def no_parent_scope():
109+
"""
110+
Context manager that resets current Scope. Intended to break span
111+
propagation to children coroutines, tasks or scheduled callbacks.
112+
113+
.. code-block:: python
114+
115+
from opentracing.scope_managers.contextvars import no_parent_scope
116+
117+
def periodic()
118+
# `periodic` span will be children of root only at the first time.
119+
with self.tracer.start_active_span('periodic'):
120+
# Now we break span propagation.
121+
with no_parent_scope():
122+
self.loop.call_soon(periodic)
123+
124+
with self.tracer.start_active_span('root'):
125+
self.loop.call_soon(periodic)
126+
"""
127+
token = _SCOPE.set(None)
128+
try:
129+
yield
130+
finally:
131+
_SCOPE.reset(token)

setup.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,9 @@
2626
include_package_data=True,
2727
zip_safe=False,
2828
platforms='any',
29+
install_requires=[
30+
'futures;python_version=="2.7"',
31+
],
2932
extras_require={
3033
'tests': [
3134
'doubles',
@@ -40,8 +43,7 @@
4043

4144
'six>=1.10.0,<2.0',
4245
'gevent',
43-
'tornado<6',
46+
'tornado',
4447
],
45-
':python_version == "2.7"': ['futures'],
4648
},
4749
)

testbed/README.md

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ Alternatively, due to the organization of the suite, it's possible to run direct
1818

1919
## Tested frameworks
2020

21-
Currently the examples cover `threading`, `tornado`, `gevent` and `asyncio` (which requires Python 3). Each example uses their respective `ScopeManager` instance from `opentracing.scope_managers`, along with their related requirements and limitations.
21+
Currently the examples cover `threading`, `tornado`, `gevent`, `asyncio` (which requires Python 3) and `contextvars` (which requires Python 3.7 and higher). Each example uses their respective `ScopeManager` instance from `opentracing.scope_managers`, along with their related requirements and limitations.
2222

2323
### threading, asyncio and gevent
2424

@@ -30,6 +30,10 @@ No automatic `Span` propagation between parent and children tasks is provided, a
3030

3131
Currently, yielding over multiple children is not supported, as the context is effectively shared, and switching from coroutine to coroutine messes up the current active `Span`.
3232

33+
### contextvars
34+
35+
`ContextVarsScopeManager` uses [contextvars](https://docs.python.org/3/library/contextvars.html) module to both store **and** automatically propagate the context from parent coroutines / tasks / scheduled in event loop callbacks to their children.
36+
3337
## List of patterns
3438

3539
- [Active Span replacement](test_active_span_replacement) - Start an isolated task and query for its results in another task/thread.
@@ -54,7 +58,3 @@ testbed/
5458
```
5559

5660
Supporting all the platforms is optional, and a warning will be displayed when doing `make testbed` in such case.
57-
58-
## Flake8 support
59-
60-
Currently `flake8` does not support the Python 3 `await`/`async` syntax, and does not offer a way to ignore such syntax.

testbed/__init__.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +0,0 @@
1-

testbed/__main__.py

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,25 @@
11
from importlib import import_module
22
import logging
33
import os
4+
import sys
45
import six
56
import unittest
7+
from tornado import version_info as tornado_version
68

79

810
enabled_platforms = [
911
'threads',
10-
'tornado',
1112
'gevent',
1213
]
14+
if tornado_version < (6, 0, 0, 0):
15+
# Including testbed for Tornado coroutines and stack context.
16+
# We don't need run testbed in case Tornado>=6, because it became
17+
# asyncio-based framework and `stack_context` was deprecated.
18+
enabled_platforms.append('tornado')
1319
if six.PY3:
1420
enabled_platforms.append('asyncio')
21+
if sys.version_info >= (3, 7):
22+
enabled_platforms.append('contextvars')
1523

1624
logging.basicConfig(level=logging.INFO)
1725
logger = logging.getLogger(__package__)
@@ -47,4 +55,6 @@ def get_test_directories():
4755
suite = loader.loadTestsFromModule(test_module)
4856
main_suite.addTests(suite)
4957

50-
unittest.TextTestRunner(verbosity=3).run(main_suite)
58+
result = unittest.TextTestRunner(verbosity=3).run(main_suite)
59+
if result.failures or result.errors:
60+
sys.exit(1)
Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
from __future__ import print_function
2+
3+
import asyncio
4+
5+
from opentracing.mocktracer import MockTracer
6+
from ..testcase import OpenTracingTestCase
7+
from opentracing.scope_managers.contextvars import ContextVarsScopeManager
8+
from ..utils import stop_loop_when
9+
10+
11+
class TestAsyncioContextVars(OpenTracingTestCase):
12+
def setUp(self):
13+
self.tracer = MockTracer(ContextVarsScopeManager())
14+
self.loop = asyncio.get_event_loop()
15+
16+
def test_main(self):
17+
# Start an isolated task and query for its result -and finish it-
18+
# in another task/thread
19+
span = self.tracer.start_span('initial')
20+
self.submit_another_task(span)
21+
22+
stop_loop_when(self.loop,
23+
lambda: len(self.tracer.finished_spans()) >= 3)
24+
self.loop.run_forever()
25+
26+
initial, subtask, task = self.tracer.finished_spans()
27+
28+
self.assertEmptySpan(initial, 'initial')
29+
self.assertEmptySpan(subtask, 'subtask')
30+
self.assertEmptySpan(task, 'task')
31+
32+
# task/subtask are part of the same trace,
33+
# and subtask is a child of task
34+
self.assertSameTrace(subtask, task)
35+
self.assertIsChildOf(subtask, task)
36+
37+
# initial task is not related in any way to those two tasks
38+
self.assertNotSameTrace(initial, subtask)
39+
self.assertHasNoParent(initial)
40+
41+
async def task(self, span):
42+
# Create a new Span for this task
43+
with self.tracer.start_active_span('task'):
44+
45+
with self.tracer.scope_manager.activate(span, True):
46+
# Simulate work strictly related to the initial Span
47+
pass
48+
49+
# Use the task span as parent of a new subtask
50+
with self.tracer.start_active_span('subtask'):
51+
pass
52+
53+
def submit_another_task(self, span):
54+
self.loop.create_task(self.task(span))
Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
from __future__ import print_function
2+
3+
4+
import asyncio
5+
6+
import opentracing
7+
from opentracing.ext import tags
8+
from opentracing.mocktracer import MockTracer
9+
from opentracing.scope_managers.contextvars import ContextVarsScopeManager
10+
from ..testcase import OpenTracingTestCase
11+
from ..utils import get_logger, get_one_by_tag, stop_loop_when
12+
13+
14+
logger = get_logger(__name__)
15+
16+
17+
class Server(object):
18+
def __init__(self, *args, **kwargs):
19+
tracer = kwargs.pop('tracer')
20+
queue = kwargs.pop('queue')
21+
super(Server, self).__init__(*args, **kwargs)
22+
23+
self.tracer = tracer
24+
self.queue = queue
25+
26+
async def run(self):
27+
value = await self.queue.get()
28+
self.process(value)
29+
30+
def process(self, message):
31+
logger.info('Processing message in server')
32+
33+
ctx = self.tracer.extract(opentracing.Format.TEXT_MAP, message)
34+
with self.tracer.start_active_span('receive',
35+
child_of=ctx) as scope:
36+
scope.span.set_tag(tags.SPAN_KIND, tags.SPAN_KIND_RPC_SERVER)
37+
38+
39+
class Client(object):
40+
def __init__(self, tracer, queue):
41+
self.tracer = tracer
42+
self.queue = queue
43+
44+
async def send(self):
45+
with self.tracer.start_active_span('send') as scope:
46+
scope.span.set_tag(tags.SPAN_KIND, tags.SPAN_KIND_RPC_CLIENT)
47+
48+
message = {}
49+
self.tracer.inject(scope.span.context,
50+
opentracing.Format.TEXT_MAP,
51+
message)
52+
await self.queue.put(message)
53+
54+
logger.info('Sent message from client')
55+
56+
57+
class TestAsyncioContextVars(OpenTracingTestCase):
58+
def setUp(self):
59+
self.tracer = MockTracer(ContextVarsScopeManager())
60+
self.queue = asyncio.Queue()
61+
self.loop = asyncio.get_event_loop()
62+
self.server = Server(tracer=self.tracer, queue=self.queue)
63+
64+
def test(self):
65+
client = Client(self.tracer, self.queue)
66+
self.loop.create_task(self.server.run())
67+
self.loop.create_task(client.send())
68+
69+
stop_loop_when(self.loop,
70+
lambda: len(self.tracer.finished_spans()) >= 2)
71+
self.loop.run_forever()
72+
73+
spans = self.tracer.finished_spans()
74+
self.assertIsNotNone(get_one_by_tag(spans,
75+
tags.SPAN_KIND,
76+
tags.SPAN_KIND_RPC_SERVER))
77+
self.assertIsNotNone(get_one_by_tag(spans,
78+
tags.SPAN_KIND,
79+
tags.SPAN_KIND_RPC_CLIENT))

testbed/test_common_request_handler/README.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ This example shows a `Span` used with `RequestHandler`, which is used as a middl
55
Implementation details:
66
- For `threading`, no active `Span` is consumed as the tasks may be run concurrently on different threads, and an explicit `SpanContext` has to be saved to be used as parent.
77
- For `gevent` and `asyncio`, as no automatic `Span` propagation is done, an explicit `Span` has to be saved to be used as parent (observe an instrumentation library could help to do that implicitly - we stick to the simplest case, though).
8-
- For `tornado`, as the `StackContext` automatically propapates the context (even is the tasks are called through different coroutines), we **do** leverage the active `Span`.
8+
- For `tornado` and `contextvars`, as parent `Span` propagates automatically (even if the tasks are called through different coroutines), we **do** leverage the active `Span`.
99

1010

1111
RequestHandler implementation:
@@ -20,6 +20,6 @@ RequestHandler implementation:
2020
child_of=self.context,
2121
ignore_active_span=True)
2222
else:
23-
# Used by tornado.
23+
# Used by tornado and contextvars.
2424
span = self.tracer.start_span('send')
2525
```

testbed/test_common_request_handler/test_asyncio.py

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,5 @@
11
from __future__ import print_function
22

3-
import functools
4-
53
import asyncio
64

75
from opentracing.ext import tags

0 commit comments

Comments
 (0)