使用sqlalchemy筛选器查询运行aws lambda(python)的Localstack导致异常

2024-10-03 09:13:13 发布

您现在位置:Python中文网/ 问答频道 /正文

我创建了一个AWS lambda函数,它使用sqlalchemy连接到mysql数据库

代码在我的开发工作站(从main调用lambda函数)和AWS本身作为lambda成功运行

我正在使用localstack docker构建一个测试。我在localstack上创建了lambda,然后调用它。对数据库的sqlalchemy查询失败,出现异常。如果我将sqlalchemy查询修改为一个简单的查询,它可以正常工作

注意:查看我的答案中可能的解决方法

失败的函数是get_rows_by_status_and_age,这是lambda代码:


db_connection_str=f'mysql+pymysql://{db_user}:{db_pass}@{db_host}/{db_schema}'

def db_init():
    engine = create_engine(db_connection_str, echo=False, isolation_level="READ COMMITTED")
    # logger.info(db_connection_str)
    # logger.info(engine.dialect)
    sm = sessionmaker(bind=engine, expire_on_commit=False)
    session = sm()
    return engine, session

def get_rows_by_status_and_age(session, status_list, min_age_minutes):
    time_current = datetime.datetime.now(datetime.timezone.utc)
    time_old = time_current - datetime.timedelta(minutes=min_age_minutes)
    try:
        q = session.query(func.count(Execution.id)).filter(
            Execution.status.in_(status_list),
            Execution.updated_at < time_old)
        return q.scalar()
    except Exception as e:
        logger.error("failed with error: {}".format(e))
        raise

def lambda_handler(event, context):
    engine, session = db_init()
    try:
        count = get_rows_by_status_and_age(session, [1,6,7], 20)
        logger.info(count)
    except Exception as e:
        logger.info('Error in the lambda:')
        logger.exception(e)


if __name__ == '__main__':  # pragma: no cover
    logging.basicConfig()
    lambda_handler("", "")

上述查询的例外情况是KeyError

localstack_container            | 2021-08-24T08:00:48:ERROR:root: failed with error: <function comma_op at 0x7fd9677e0440>
localstack_container            | 2021-08-24T08:00:48:INFO:root: Error in the lambda:
localstack_container            | 2021-08-24T08:00:48:ERROR:root: <function comma_op at 0x7fd9677e0440>
localstack_container            | Traceback (most recent call last):
localstack_container            |   File "/tmp/localstack/lambda_script_l_656d31ba.py", line 43, in lambda_handler
localstack_container            |     count = get_rows_by_status_and_age(session, [1,6,7], 20)
localstack_container            |   File "/tmp/localstack/lambda_script_l_656d31ba.py", line 29, in get_rows_by_status_and_age
localstack_container            |     return q.scalar()
localstack_container            |   File "/tmp/localstack/zipfile.b3651fa4/sqlalchemy/orm/query.py", line 3312, in scalar
localstack_container            |     ret = self.one()
localstack_container            |   File "/tmp/localstack/zipfile.b3651fa4/sqlalchemy/orm/query.py", line 3282, in one
localstack_container            |     ret = self.one_or_none()
localstack_container            |   File "/tmp/localstack/zipfile.b3651fa4/sqlalchemy/orm/query.py", line 3251, in one_or_none
localstack_container            |     ret = list(self)
localstack_container            |   File "/tmp/localstack/zipfile.b3651fa4/sqlalchemy/orm/query.py", line 3324, in __iter__
localstack_container            |     return self._execute_and_instances(context)
localstack_container            |   File "/tmp/localstack/zipfile.b3651fa4/sqlalchemy/orm/query.py", line 3349, in _execute_and_instances
localstack_container            |     result = conn.execute(querycontext.statement, self._params)
localstack_container            |   File "/tmp/localstack/zipfile.b3651fa4/sqlalchemy/engine/base.py", line 988, in execute
localstack_container            |     return meth(self, multiparams, params)
localstack_container            |   File "/tmp/localstack/zipfile.b3651fa4/sqlalchemy/sql/elements.py", line 287, in _execute_on_connection
localstack_container            |     return connection._execute_clauseelement(self, multiparams, params)
localstack_container            |   File "/tmp/localstack/zipfile.b3651fa4/sqlalchemy/engine/base.py", line 1098, in _execute_clauseelement
localstack_container            |     else None,
localstack_container            |   File "<string>", line 1, in <lambda>
localstack_container            |   File "/tmp/localstack/zipfile.b3651fa4/sqlalchemy/sql/elements.py", line 462, in compile
localstack_container            |     return self._compiler(dialect, bind=bind, **kw)
localstack_container            |   File "/tmp/localstack/zipfile.b3651fa4/sqlalchemy/sql/elements.py", line 468, in _compiler
localstack_container            |     return dialect.statement_compiler(dialect, self, **kw)
localstack_container            |   File "/tmp/localstack/zipfile.b3651fa4/sqlalchemy/sql/compiler.py", line 562, in __init__
localstack_container            |     Compiled.__init__(self, dialect, statement, **kwargs)
localstack_container            |   File "/tmp/localstack/zipfile.b3651fa4/sqlalchemy/sql/compiler.py", line 319, in __init__
localstack_container            |     self.string = self.process(self.statement, **compile_kwargs)
localstack_container            |   File "/tmp/localstack/zipfile.b3651fa4/sqlalchemy/sql/compiler.py", line 350, in process
localstack_container            |     return obj._compiler_dispatch(self, **kwargs)
localstack_container            |   File "/tmp/localstack/zipfile.b3651fa4/sqlalchemy/sql/visitors.py", line 91, in _compiler_dispatch
localstack_container            |     return meth(self, **kw)
localstack_container            |   File "/tmp/localstack/zipfile.b3651fa4/sqlalchemy/sql/compiler.py", line 2092, in visit_select
localstack_container            |     for name, column in select._columns_plus_names
localstack_container            |   File "/tmp/localstack/zipfile.b3651fa4/sqlalchemy/sql/compiler.py", line 2092, in <listcomp>
localstack_container            |     for name, column in select._columns_plus_names
localstack_container            |   File "/tmp/localstack/zipfile.b3651fa4/sqlalchemy/sql/compiler.py", line 1845, in _label_select_column
localstack_container            |     return result_expr._compiler_dispatch(self, **column_clause_args)
localstack_container            |   File "/tmp/localstack/zipfile.b3651fa4/sqlalchemy/sql/visitors.py", line 91, in _compiler_dispatch
localstack_container            |     return meth(self, **kw)
localstack_container            |   File "/tmp/localstack/zipfile.b3651fa4/sqlalchemy/sql/compiler.py", line 820, in visit_label
localstack_container            |     + self.preparer.format_label(label, labelname)
localstack_container            |   File "/tmp/localstack/zipfile.b3651fa4/sqlalchemy/sql/visitors.py", line 91, in _compiler_dispatch
localstack_container            |     return meth(self, **kw)
localstack_container            |   File "/tmp/localstack/zipfile.b3651fa4/sqlalchemy/sql/compiler.py", line 826, in visit_label
localstack_container            |     self, within_columns_clause=False, **kw
localstack_container            |   File "/tmp/localstack/zipfile.b3651fa4/sqlalchemy/sql/visitors.py", line 91, in _compiler_dispatch
localstack_container            |     return meth(self, **kw)
localstack_container            |   File "/tmp/localstack/zipfile.b3651fa4/sqlalchemy/sql/compiler.py", line 1105, in visit_function
localstack_container            |     ) % {"expr": self.function_argspec(func, **kwargs)}
localstack_container            |   File "/tmp/localstack/zipfile.b3651fa4/sqlalchemy/sql/compiler.py", line 1117, in function_argspec
localstack_container            |     return func.clause_expr._compiler_dispatch(self, **kwargs)
localstack_container            |   File "/tmp/localstack/zipfile.b3651fa4/sqlalchemy/sql/visitors.py", line 91, in _compiler_dispatch
localstack_container            |     return meth(self, **kw)
localstack_container            |   File "/tmp/localstack/zipfile.b3651fa4/sqlalchemy/sql/compiler.py", line 718, in visit_grouping
localstack_container            |     return "(" + grouping.element._compiler_dispatch(self, **kwargs) + ")"
localstack_container            |   File "/tmp/localstack/zipfile.b3651fa4/sqlalchemy/sql/visitors.py", line 91, in _compiler_dispatch
localstack_container            |     return meth(self, **kw)
localstack_container            |   File "/tmp/localstack/zipfile.b3651fa4/sqlalchemy/sql/compiler.py", line 968, in visit_clauselist
localstack_container            |     sep = OPERATORS[clauselist.operator]
localstack_container            | KeyError: <function comma_op at 0x7fd9677e0440>

如果我将查询修改为以下查询,它也会失败,但会出现另一个异常(我删除了func.count()):

        q = session.query(Execution.id).filter(
            Execution.status.in_(status_list),
            Execution.updated_at < time_old)
        return q.all()

在最后一种情况下,例外情况是UnsupportedCompilationError,如下所示:

localstack_container            | 2021-08-24T09:44:20:ERROR:root: failed with error: Compiler <sqlalchemy.dialects.mysql.mysqldb.MySQLCompiler_mysqldb object at 0x7fb84801d9d0> can't render element of type <function in_op at 0x7fb8464b0b90> (Background on this error at: http://sqlalche.me/e/l7de)
localstack_container            | 2021-08-24T09:44:20:INFO:root: Error in the lambda:
localstack_container            | 2021-08-24T09:44:20:ERROR:root: Compiler <sqlalchemy.dialects.mysql.mysqldb.MySQLCompiler_mysqldb object at 0x7fb84801d9d0> can't render element of type <function in_op at 0x7fb8464b0b90> (Background on this error at: http://sqlalche.me/e/l7de)
localstack_container            | Traceback (most recent call last):
localstack_container            |   File "/tmp/localstack/zipfile.183ee235/sqlalchemy/sql/compiler.py", line 1281, in visit_binary
localstack_container            |     opstring = OPERATORS[operator_]
localstack_container            | KeyError: <function in_op at 0x7fb8464b0b90>
localstack_container            | 
localstack_container            | During handling of the above exception, another exception occurred:
localstack_container            | 
localstack_container            | Traceback (most recent call last):
localstack_container            |   File "/tmp/localstack/lambda_script_l_a11f462d.py", line 46, in lambda_handler
localstack_container            |     count = get_rows_by_status_and_age(session, [1,6,7], 20)
localstack_container            |   File "/tmp/localstack/lambda_script_l_a11f462d.py", line 32, in get_rows_by_status_and_age
localstack_container            |     return q.all()
localstack_container            |   File "/tmp/localstack/zipfile.183ee235/sqlalchemy/orm/query.py", line 3168, in all
localstack_container            |     return list(self)
localstack_container            |   File "/tmp/localstack/zipfile.183ee235/sqlalchemy/orm/query.py", line 3324, in __iter__
localstack_container            |     return self._execute_and_instances(context)
localstack_container            |   File "/tmp/localstack/zipfile.183ee235/sqlalchemy/orm/query.py", line 3349, in _execute_and_instances
localstack_container            |     result = conn.execute(querycontext.statement, self._params)
localstack_container            |   File "/tmp/localstack/zipfile.183ee235/sqlalchemy/engine/base.py", line 988, in execute
localstack_container            |     return meth(self, multiparams, params)
localstack_container            |   File "/tmp/localstack/zipfile.183ee235/sqlalchemy/sql/elements.py", line 287, in _execute_on_connection
localstack_container            |     return connection._execute_clauseelement(self, multiparams, params)
localstack_container            |   File "/tmp/localstack/zipfile.183ee235/sqlalchemy/engine/base.py", line 1098, in _execute_clauseelement
localstack_container            |     else None,
localstack_container            |   File "<string>", line 1, in <lambda>
localstack_container            |   File "/tmp/localstack/zipfile.183ee235/sqlalchemy/sql/elements.py", line 462, in compile
localstack_container            |     return self._compiler(dialect, bind=bind, **kw)
localstack_container            |   File "/tmp/localstack/zipfile.183ee235/sqlalchemy/sql/elements.py", line 468, in _compiler
localstack_container            |     return dialect.statement_compiler(dialect, self, **kw)
localstack_container            |   File "/tmp/localstack/zipfile.183ee235/sqlalchemy/sql/compiler.py", line 562, in __init__
localstack_container            |     Compiled.__init__(self, dialect, statement, **kwargs)
localstack_container            |   File "/tmp/localstack/zipfile.183ee235/sqlalchemy/sql/compiler.py", line 319, in __init__
localstack_container            |     self.string = self.process(self.statement, **compile_kwargs)
localstack_container            |   File "/tmp/localstack/zipfile.183ee235/sqlalchemy/sql/compiler.py", line 350, in process
localstack_container            |     return obj._compiler_dispatch(self, **kwargs)
localstack_container            |   File "/tmp/localstack/zipfile.183ee235/sqlalchemy/sql/visitors.py", line 91, in _compiler_dispatch
localstack_container            |     return meth(self, **kw)
localstack_container            |   File "/tmp/localstack/zipfile.183ee235/sqlalchemy/sql/compiler.py", line 2117, in visit_select
localstack_container            |     text, select, inner_columns, froms, byfrom, kwargs
localstack_container            |   File "/tmp/localstack/zipfile.183ee235/sqlalchemy/sql/compiler.py", line 2216, in _compose_select_body
localstack_container            |     t = select._whereclause._compiler_dispatch(self, **kwargs)
localstack_container            |   File "/tmp/localstack/zipfile.183ee235/sqlalchemy/sql/visitors.py", line 91, in _compiler_dispatch
localstack_container            |     return meth(self, **kw)
localstack_container            |   File "/tmp/localstack/zipfile.183ee235/sqlalchemy/sql/compiler.py", line 972, in visit_clauselist
localstack_container            |     c._compiler_dispatch(self, **kw) for c in clauselist.clauses
localstack_container            |   File "/tmp/localstack/zipfile.183ee235/sqlalchemy/sql/compiler.py", line 970, in <genexpr>
localstack_container            |     s
localstack_container            |   File "/tmp/localstack/zipfile.183ee235/sqlalchemy/sql/compiler.py", line 972, in <genexpr>
localstack_container            |     c._compiler_dispatch(self, **kw) for c in clauselist.clauses
localstack_container            |   File "/tmp/localstack/zipfile.183ee235/sqlalchemy/sql/visitors.py", line 91, in _compiler_dispatch
localstack_container            |     return meth(self, **kw)
localstack_container            |   File "/tmp/localstack/zipfile.183ee235/sqlalchemy/sql/compiler.py", line 1283, in visit_binary
localstack_container            |     raise exc.UnsupportedCompilationError(self, operator_)
localstack_container            | sqlalchemy.exc.UnsupportedCompilationError: Compiler <sqlalchemy.dialects.mysql.mysqldb.MySQLCompiler_mysqldb object at 0x7fb84801d9d0> can't render element of type <function in_op at 0x7fb8464b0b90> (Background on this error at: http://sqlalche.me/e/l7de)

如果我只保留单个过滤器,它会工作:

        q = session.query(Execution.id).filter(
            Execution.updated_at < time_old)
        return q.all()

从试图理解这个问题来看,编译查询似乎有问题,可能是因为依赖于数据库的语法,但我打印了sqlalchemy方言,它是mysql,就像它应该的那样。正如我提到的,它在除localstack lambda之外的所有其他环境中都可以正常工作

这似乎是由localstack env引起的

有什么想法吗?谢谢


Tags: inpyselfsqlreturncompilersqlalchemycontainer
1条回答
网友
1楼 · 发布于 2024-10-03 09:13:13

这不是一个答案,而是一个可能的解决办法

Localstack有一个选项,可以在单独的docker中执行lambda函数。使用此配置,不会发生sqlalchemy错误,并且lambda会成功运行

为了让localstack在单独的docker中调用lambda函数,在启动localstack docker时使用lambda_EXECUTOR环境变量

在docker compose中,它将如下所示:

services:
  ...
  localstack:
    image: localstack/localstack
    environment:
      ...
      LAMBDA_EXECUTOR: docker
      LAMBDA_DOCKER_NETWORK: testing_network
    ...
    volumes:
      - /var/run/docker.sock:/var/run/docker.sock
    ...
    container_name: localstackcontainer
  ...

说明:

  • LAMBDA_EXECUTOR-告诉localstack在单独的docker中运行lambdas

  • LAMBDA_DOCKER_NETWORK-告诉localstack将单独的DOCKER添加到特定的DOCKER网络。(您通常会将其添加到运行所有服务的docker网络中,以允许通过docker网络进行通信)。(“在我的示例中测试_网络”)

  • 卷映射/var/run/docker.sock:/var/run/docker.sock-这允许localstack docker在主机上启动新的docker

  • 注意:Lambda代码现在将连接到localstack中的其他aws服务,使用localstack容器名称作为端点\u url(通过docker网络连接)。在我的示例中,它是“localstackcontainer:4566”(当lambda在localstack docker内部运行时,它不是localhost:4566)。 注意,这需要我将localstack容器名称从localstack_容器更改为localstackcontainer,因为aws端点不能包含下划线

相关问题 更多 >