在课堂上使用Flask

2024-09-25 08:36:12 发布

您现在位置:Python中文网/ 问答频道 /正文

我有很多线程的应用程序。其中一个是烧瓶,用于实现(腋窝)API。它使用低负载,从不暴露在互联网上,所以内置flask web服务器是完美的。

我当前的代码如下:

class API:
    # ... all other stuff here, skipped
    def run():
        app = flask.Flask('API')

        @app.route('/cmd1')
        def cmd1():
            self.cmd1()

        @app.route('/cmd2')
        def cmd2()
            self.cmd2()

        app.run()

我觉得我做错了,因为所有的文档都说“在模块级创建烧瓶应用程序”。但我不想这样做-这会扰乱我的测试,而且API是大型应用程序的一小部分,它有自己的结构和协议(每个“应用程序”是在一个或多个线程中运行的独立类)。

我怎样才能在课堂上使用烧瓶?


Tags: runselfapiwebapp应用程序flask烧瓶
2条回答

虽然这样做,但感觉不符合烧瓶样式指南。如果需要在项目中包装Flask应用程序,请根据需要创建一个单独的类,并添加应执行的函数

from flask import Flask, Response


class EndpointAction(object):

    def __init__(self, action):
        self.action = action
        self.response = Response(status=200, headers={})

    def __call__(self, *args):
        self.action()
        return self.response


class FlaskAppWrapper(object):
    app = None

    def __init__(self, name):
        self.app = Flask(name)

    def run(self):
        self.app.run()

    def add_endpoint(self, endpoint=None, endpoint_name=None, handler=None):
        self.app.add_url_rule(endpoint, endpoint_name, EndpointAction(handler))


def action():
    # Execute anything

a = FlaskAppWrapper('wrap')
a.add_endpoint(endpoint='/ad', endpoint_name='ad', handler=action)
a.run()

这里需要注意的是:

  • EndpointAction应该是一个包装器,它将执行您的函数并生成一个空的200响应。如果需要,可以编辑功能
  • 端点处理程序可以是任何定义了__call__方法的对象
  • 端点名称应唯一,因为它表示视图名称
  • 无法在应用程序之后添加终结点,因为应用程序启动后线程将阻塞。您可以通过在单独的线程上运行应用程序来启用它,但不建议动态更改URL映射,也不建议线程安全

为了完成Kostas pellelis的回答,因为我很难找到为什么响应没有直接使用操作返回值。

下面是另一个没有decorators的FLASK类:

class EndpointAction(object):

def __init__(self, action):
    self.action = action

def __call__(self, *args):
    # Perform the action
    answer = self.action()
    # Create the answer (bundle it in a correctly formatted HTTP answer)
    self.response = flask.Response(answer, status=200, headers={})
    # Send it
    return self.response

class FlaskAppWrapper(object):

def add_all_endpoints(self):
    # Add root endpoint
    self.add_endpoint(endpoint="/", endpoint_name="/", handler=self.action)

    # Add action endpoints
    self.add_endpoint(endpoint="/add_X", endpoint_name="/add_X", handler=self.add_X)
    # you can add more ... 

def add_endpoint(self, endpoint=None, endpoint_name=None, handler=None):
    self.app.add_url_rule(endpoint, endpoint_name, EndpointAction(handler)) 
    # You can also add options here : "... , methods=['POST'], ... "

# ==================== ------ API Calls ------- ====================
def action(self):
    # Dummy action
    return "action" # String that will be returned and display on the webpage
    # Test it with curl 127.0.0.1:5000

def add_X(self):
    # Dummy action
    return "add_X"
    # Test it with curl 127.0.0.1:5000/add_X

相关问题 更多 >