芹菜腌制和卡桑德拉司机玩得不好,找不出根本原因

2024-10-05 13:18:41 发布

您现在位置:Python中文网/ 问答频道 /正文

我正在经历一些我不太明白的行为。我使用Cassandra存储消息对象,并使用Celery对数据库进行异步拉入和推送。除了一个Cerry任务之外,所有的工作都很正常;其他使用相同代码/类的任务也可以工作。下面是代码逻辑的粗略分解:

db_manager = DBManager()

class User(object):
    def __init__(self, user_id):
        ... normal init stuff ...
        self.loader()

    @run_async
    def loader(self):
        ... loads from database if found, otherwise pulls from API ...

    # THIS WORKS
    @celery.task(name='user-to-db', filter=task_method)
    def to_db(self):
         # db_manager is a custom backend that handles relevant db reads, writes, etc.
         db_manager.add('users', self.user_payload)

     # THIS WORKS
     @celery.task(name='load-friends', filter=task_method)
     def load_friends(self):
          # Checks secondary redis index for friends of user
          friends = redis.srandmember('users:the-users-id:friends', self.id, 20)
          if not friends:
               profiles = load_friends_from_api(user_id=self.id)
          else:
               query = "SELECT * FROM keyspace.users WHERE id IN ({friends})".format(friends=friends)
          # Init a User object for every friend
          loaded_friends = [User(friend) for friend in profiles]
          # Returns a class container with all the instances of User(friend), accessible through a class property
          return FriendContainer(self.id, loaded_friends)

     # THIS DOES NOT WORK
     @celery.task(name='get-user-messages', filter=task_method)
     def get_user_messages(self):
          # THIS IS WHERE IT FAILS #
          messages = db_manager.get("SELECT message FROM keyspace.message_timelines WHERE user_id = {user_id}".format(user_id=self.id))
          # THAT LINE ABOVE #

          # Init a message class object for every message payload in database
          msgs = [Message(m, user=self) for m in messages]
          # Returns a message container class holding all the message objects, accessible through a class property
          return MessageContainer(msgs)

最后一个类方法引发错误:

^{pr2}$

cassandra.io.eventletreactor.message指向Cassandra中的用户定义类型,我将其用作每个用户的消息对象的容器。引发此错误的行是:

messages = db_manager.get("SELECT message FROM keyspace.message_timelines WHERE user_id = {user_id}".format(user_id=self.id))

这是来自DBManager()的方法:

class DBManager(object):
    ... stuff ...

    def get(self, query):
        # I do some stuff to prepare the query, namely substituting `WHERE this = that` for `WHERE this = ?` to create a Cassandra prepared statement.
        statement = cassandra.prepare(query_prepared)
        # I want these messages as a dict, not the default namedtuple
        cassandra.row_factory = dict_factory
        # User id is parsed out of query
        results = cassandra.execute(statement, (user_id,))
        rows = results.current_rows
        # rows is a list of dicts, no weird class references or anything in there
        return rows

我已经读到过Celery任务类外方法是一种实验性的方法,但是我不明白为什么所有其他方法都是使用相同的DBManager实例的任务都在工作。在

这个问题似乎局限于用户定义的类型message在Cassandra驱动程序中表现不佳;但是,如果我在Celery任务本身中从DBManager运行get方法,它就可以工作。也就是说,如果我将抛出错误的代码从DBManager.get复制/粘贴到User.get_user_messages,它工作得很好。如果我试图从User.get_user_messages内调用DBManager.get,它将中断。在

我就是不知道问题出在哪里。我可以很好地完成以下所有工作:

  1. 在不使用芹菜的情况下运行get_user_messages方法,它就可以工作了。在
  2. 如果我在celerry任务方法本身中运行get方法代码,那么使用运行{}方法。在
  3. 我可以运行注册为celry任务的其他方法,这些方法指向DBManager中使用Cassandra驱动程序的其他方法,甚至可以将相同的message用户定义类型插入数据库。在
  4. 我试过把所有的东西都腌制过,而且是各种各样的组合,所以我无法重现这个错误。在

我没有尝试过的东西:

  1. 将序列化程序更改为jsonyaml。db有效负载中有一些便利项不会与这两个项目中的任何一个序列化。在
  2. 使用dill代替pickle。考虑到我可以让各个部分单独工作,这似乎不需要切换序列化程序。在

我可以直接通过Cassandra驱动程序而不是我的DBManager类运行查询,但我觉得这应该是可以解决的,我只是遗漏了一些非常非常明显的东西,以至于我没有看到它。如果您有任何建议,我们将不胜感激。在

如果相关:Cassandra 3.3、cql3.4、DataStax python驱动程序3.1


Tags: 方法selfidmessagetaskdbgetdef
1条回答
网友
1楼 · 发布于 2024-10-05 13:18:41

嗯,我发现了问题,而且很明显。我想我并没有试着去酸洗所有的东西,只是大部分的东西,而且我在凌晨4点的调试中也没有发现这一点。在

无论如何,cassandra.row_factory = dict_factory在对用户定义的类型调用时,实际上并不是以dict的形式返回所有内容,它给出了一个{'label': message(x='this', y='that')}的dict,其中message是一个namedtuple。Cassandra驱动程序在类实例中动态创建namedtuple,因此pickle找不到它。在

相关问题 更多 >

    热门问题