使用tornado python的简单异步示例

2024-06-10 18:07:06 发布

您现在位置:Python中文网/ 问答频道 /正文

我想找一个简单的异步服务器示例。 我有一些功能,有很多等待,数据库事务。。。等等:

def blocking_task(n):
    for i in xrange(n):
        print i
        sleep(1)
    return i

我需要在独立的进程中运行它而不阻塞。有可能吗?


Tags: in功能服务器数据库示例fortaskreturn
4条回答

Tornado设计为在一个线程中运行所有操作,但使用异步I/O尽可能避免阻塞。如果您使用的数据库具有非同步的Python绑定(理想情况下是专门针对Tornado的绑定,比如MongoDB的Motor或Postgres的momoko),那么您将能够在不阻塞服务器的情况下运行数据库查询;不需要单独的进程或线程。

为了解决您给出的示例,即调用time.sleep(1),可以使用此方法通过tornado协程异步执行此操作:

#!/usr/bin/python

import tornado.web
from tornado.ioloop import IOLoop
from tornado import gen 
import time

@gen.coroutine
def async_sleep(seconds):
    yield gen.Task(IOLoop.instance().add_timeout, time.time() + seconds)

class TestHandler(tornado.web.RequestHandler):
    @gen.coroutine
    def get(self):
        for i in xrange(100):
            print i
            yield async_sleep(1)
        self.write(str(i))
        self.finish()


application = tornado.web.Application([
    (r"/test", TestHandler),
    ])  

application.listen(9999)
IOLoop.instance().start()

有趣的部分是async_sleep。这个方法正在创建一个异步任务,它正在调用ioloop.add_timeout方法。add_timeout将在给定的秒数后运行指定的回调,而不会在等待超时到期时阻塞ioloop。它需要两个参数:

add_timeout(deadline, callback) # deadline is the number of seconds to wait, callback is the method to call after deadline.

正如您在上面的示例中所看到的,我们实际上只在代码中显式地为add_timeout提供了一个参数,这意味着我们最终会:

add_timeout(time.time() + seconds, ???)

我们没有提供预期的回调参数。实际上,当gen.Task执行add_timeout时,它会在显式提供的参数的末尾附加一个callback关键字参数。所以这个:

yield gen.Task(loop.add_timeout, time.time() + seconds)

导致在gen.Task()中执行此操作:

loop.add_timeout(time.time() + seconds, callback=gen.Callback(some_unique_key))

当超时后执行gen.Callback时,它表示gen.Task已完成,程序将继续执行到下一行。这个流程有点难以完全理解,至少一开始是这样的(我第一次读到它的时候确实是这样)。读几遍Tornado gen module documentation可能会有帮助。

Tornado设计为在一个线程中运行所有操作,但使用异步I/O尽可能避免阻塞。如果您使用的数据库具有非同步的Python绑定(理想情况下是专门针对Tornado的绑定,比如MongoDB的Motor或Postgres的momoko),那么您将能够在不阻塞服务器的情况下运行数据库查询;不需要单独的进程或线程。

为了解决您给出的示例,即调用time.sleep(1),可以使用此方法通过tornado协程异步执行此操作:

#!/usr/bin/python

import tornado.web
from tornado.ioloop import IOLoop
from tornado import gen 
import time

@gen.coroutine
def async_sleep(seconds):
    yield gen.Task(IOLoop.instance().add_timeout, time.time() + seconds)

class TestHandler(tornado.web.RequestHandler):
    @gen.coroutine
    def get(self):
        for i in xrange(100):
            print i
            yield async_sleep(1)
        self.write(str(i))
        self.finish()


application = tornado.web.Application([
    (r"/test", TestHandler),
    ])  

application.listen(9999)
IOLoop.instance().start()

有趣的部分是async_sleep。这个方法正在创建一个异步任务,它正在调用ioloop.add_timeout方法。add_timeout将在给定的秒数后运行指定的回调,而不会在等待超时到期时阻塞ioloop。它需要两个参数:

add_timeout(deadline, callback) # deadline is the number of seconds to wait, callback is the method to call after deadline.

正如您在上面的示例中所看到的,我们实际上只在代码中显式地为add_timeout提供了一个参数,这意味着我们最终会:

add_timeout(time.time() + seconds, ???)

我们没有提供预期的回调参数。实际上,当gen.Task执行add_timeout时,它会在显式提供的参数的末尾附加一个callback关键字参数。所以这个:

yield gen.Task(loop.add_timeout, time.time() + seconds)

导致在gen.Task()中执行此操作:

loop.add_timeout(time.time() + seconds, callback=gen.Callback(some_unique_key))

超时后执行gen.Callback时,它表示gen.Task已完成,程序将继续执行到下一行。这个流程有点难以完全理解,至少一开始是这样的(我第一次读到它的时候确实是这样)。读几遍Tornado gen module documentation可能会有帮助。

在这里我更新了有关龙卷风5.0的信息。Tornado 5.0添加了一个新方法^{}。在Coroutine patterns章的“调用阻塞函数”中:

The simplest way to call a blocking function from a coroutine is to use IOLoop.run_in_executor, which returns Futures that are compatible with coroutines:

@gen.coroutine def call_blocking(): yield IOLoop.current().run_in_executor(blocking_func, args)

另外,在^{}的文档中,is说:

This decorator should not be confused with the similarly-named IOLoop.run_in_executor. In general, using run_in_executor when calling a blocking method is recommended instead of using this decorator when defining a method. If compatibility with older versions of Tornado is required, consider defining an executor and using executor.submit() at the call site.

在5.0版本中,建议在调用阻塞函数的用例中使用IOLoop.run_In_executor。

import tornado.web
from tornado.ioloop import IOLoop
from tornado import gen

from tornado.concurrent import run_on_executor
from concurrent.futures import ThreadPoolExecutor   # `pip install futures` for python2

MAX_WORKERS = 16

class TestHandler(tornado.web.RequestHandler):
    executor = ThreadPoolExecutor(max_workers=MAX_WORKERS)

    """
    In below function goes your time consuming task
    """

    @run_on_executor
    def background_task(self):
        sm = 0
        for i in range(10 ** 8):
            sm = sm + 1

        return sm

    @tornado.gen.coroutine
    def get(self):
        """ Request that asynchronously calls background task. """
        res = yield self.background_task()
        self.write(str(res))

class TestHandler2(tornado.web.RequestHandler):
    @gen.coroutine
    def get(self):
        self.write('Response from server')
        self.finish()


application = tornado.web.Application([
    (r"/A", TestHandler),
    (r"/B", TestHandler2),
    ])

application.listen(5000)
IOLoop.instance().start()

当您运行上述代码时,可以在http://127.0.0.1:5000/A运行一个计算开销较大的操作,该操作不会阻止执行,请参阅访问http://127.0.0.1:5000/A后立即访问http://127.0.0.1:5000/B

相关问题 更多 >