python线程池中丢失的Datadog跨度

2024-05-20 03:43:21 发布

您现在位置:Python中文网/ 问答频道 /正文

我有一个在线程池中运行的函数,但是当我在线程池之外运行它时,它只显示在Datadog跟踪UI中。在下面的屏幕截图中,您可以看到它显示在sync_work中,但不在async_work

enter image description here

这是我的代码,包含在一个名为ddtrace_threadpool_example.py的脚本中:

from concurrent.futures import ThreadPoolExecutor
from ddtrace import tracer
from random import random
import time


def perform_work(input_value):
    with tracer.trace('do_something') as _:
        seconds = random()
        time.sleep(seconds)
        return input_value**2


def sync_work(input_values):
    with tracer.trace('sync_work') as _:
        results = []
        for input_value in input_values:
            result = perform_work(input_value=input_value)
            results.append(result)
        return results


def async_work(input_values):
    with tracer.trace('async_work') as _:
        thread_pool = ThreadPoolExecutor(max_workers=10)
        futures = thread_pool.map(
            lambda input_value:
            perform_work(input_value=input_value),
            input_values
        )
        results = list(futures)
        return results


@tracer.wrap(service='ddtrace-example')
def start_work():
    input_values = list(range(15))
    sync_results = sync_work(input_values=input_values)
    print(sync_results)
    async_results = async_work(input_values=input_values)
    print(async_results)


if __name__ == '__main__':
    start_work()

我像这样运行脚本:python ddtrace_threadpool_example.py。我正在使用Python 3.7,并且pip freeze显示了ddtrace==0.29.0


Tags: fromimportinputasyncvalueexampledefrandom
1条回答
网友
1楼 · 发布于 2024-05-20 03:43:21

在与Datadog支持人员交谈后,这似乎是一个已知的问题

Thanks for your patience while we looked into this issue. We we're currently investigating this along with PoolExecutors and will reach out with updates. Right now it looks like those child spans within the async call lose context, so they appear disconnected.

目前的解决方法是在父级上下文中传递。在调用线程池执行器之前添加此行

current_context = tracer.get_call_context()

然后将该上下文传递给在线程池中运行的函数:

perform_work(
    input_value=input_value,
    parent_context=current_context
)

并使用它在函数内部创建一个跨度,如下所示:

span = tracer.start_span('do_something', child_of=parent_context)
seconds = random()
time.sleep(seconds)
span.finish()

完整的示例如下所示:

from concurrent.futures import ThreadPoolExecutor
from ddtrace import tracer
from random import random
import time


def perform_work(input_value, parent_context=None):
    span = tracer.start_span('do_something', child_of=parent_context)
    seconds = random()
    time.sleep(seconds)
    span.finish()
    return input_value ** 2


def sync_work(input_values):
    with tracer.trace('sync_work') as _:
        results = []
        for input_value in input_values:
            result = perform_work(input_value=input_value)
            results.append(result)
        return results


def async_work(input_values):
    with tracer.trace('async_work') as _:
        current_context = tracer.get_call_context()
        thread_pool = ThreadPoolExecutor(max_workers=10)
        futures = thread_pool.map(
            lambda input_value:
            perform_work(
                input_value=input_value,
                parent_context=current_context
            ),
            input_values
        )
        results = list(futures)
        return results


@tracer.wrap(service='ddtrace-example')
def start_work():
    input_values = list(range(15))
    sync_results = sync_work(input_values=input_values)
    print(sync_results)
    async_results = async_work(input_values=input_values)
    print(async_results)


if __name__ == '__main__':
    start_work()

这将产生如下结果:

enter image description here

相关问题 更多 >