来自单独数据库连接的peewee多进程'peewee.OperationalError:磁盘I/O错误'

2024-09-30 12:28:22 发布

您现在位置:Python中文网/ 问答频道 /正文

我试图使用多处理,以便在后台运行CPU密集型作业。我希望这个过程能够使用peewee ORM将其结果写入SQLite数据库

为了做到这一点,我试图在创建线程后重写模型类的Meta.database,以便为新进程建立单独的db连接

def get_db():
    db = SqliteExtDatabase(path)
    return db

class BaseModel(Model):
    class Meta:
        database = get_db()

# Many other models

class Batch(BaseModel):
    
    def multi():
        def background_proc():
            # trying to override Meta's db connection.
            BaseModel._meta.database = get_db()
            job = Job.get_by_id(1)
            print("working in the background")
        
        process = multiprocessing.Process(target=background_proc)
        process.start()

执行my_batch.multi()时出错

Process Process-1:
Traceback (most recent call last):
  File "/Users/layne/.pyenv/versions/3.7.6/envs/jupyterlab/lib/python3.7/site-packages/peewee.py", line 3099, in execute_sql
    cursor.execute(sql, params or ())
sqlite3.OperationalError: disk I/O error

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/Users/layne/.pyenv/versions/3.7.6/lib/python3.7/multiprocessing/process.py", line 297, in _bootstrap
    self.run()
  File "/Users/layne/.pyenv/versions/3.7.6/lib/python3.7/multiprocessing/process.py", line 99, in run
    self._target(*self._args, **self._kwargs)
  File "/Users/layne/Desktop/pydatasci/pydatasci/aidb/__init__.py", line 1249, in background_proc
    job = Job.get_by_id(1)
  File "/Users/layne/.pyenv/versions/3.7.6/envs/jupyterlab/lib/python3.7/site-packages/peewee.py", line 6395, in get_by_id
    return cls.get(cls._meta.primary_key == pk)
  File "/Users/layne/.pyenv/versions/3.7.6/envs/jupyterlab/lib/python3.7/site-packages/peewee.py", line 6384, in get
    return sq.get()
  File "/Users/layne/.pyenv/versions/3.7.6/envs/jupyterlab/lib/python3.7/site-packages/peewee.py", line 6807, in get
    return clone.execute(database)[0]
  File "/Users/layne/.pyenv/versions/3.7.6/envs/jupyterlab/lib/python3.7/site-packages/peewee.py", line 1886, in inner
    return method(self, database, *args, **kwargs)
  File "/Users/layne/.pyenv/versions/3.7.6/envs/jupyterlab/lib/python3.7/site-packages/peewee.py", line 1957, in execute
    return self._execute(database)
  File "/Users/layne/.pyenv/versions/3.7.6/envs/jupyterlab/lib/python3.7/site-packages/peewee.py", line 2129, in _execute
    cursor = database.execute(self)
  File "/Users/layne/.pyenv/versions/3.7.6/envs/jupyterlab/lib/python3.7/site-packages/peewee.py", line 3112, in execute
    return self.execute_sql(sql, params, commit=commit)
  File "/Users/layne/.pyenv/versions/3.7.6/envs/jupyterlab/lib/python3.7/site-packages/peewee.py", line 3106, in execute_sql
    self.commit()
  File "/Users/layne/.pyenv/versions/3.7.6/envs/jupyterlab/lib/python3.7/site-packages/peewee.py", line 2873, in __exit__
    reraise(new_type, new_type(exc_value, *exc_args), traceback)
  File "/Users/layne/.pyenv/versions/3.7.6/envs/jupyterlab/lib/python3.7/site-packages/peewee.py", line 183, in reraise
    raise value.with_traceback(tb)
  File "/Users/layne/.pyenv/versions/3.7.6/envs/jupyterlab/lib/python3.7/site-packages/peewee.py", line 3099, in execute_sql
    cursor.execute(sql, params or ())
peewee.OperationalError: disk I/O error

我使用线程实现了这一点,但实际上很难终止线程(不仅仅是从循环中断),CPU密集型(不是io延迟)作业应该是多处理的

更新:查看peewee代理http://docs.peewee-orm.com/en/latest/peewee/database.html#dynamically-defining-a-database


Tags: inpypyenvexecutelibpackageslinesite
2条回答

我认为问题在于:

在单独的进程中,在尝试用新连接替换现有连接之前,我没有关闭该连接

def background_proc():
    db = BaseModel._meta.database
    db.close() #<  - this
    BaseModel._meta.database = get_db()

这是可行的,我可以继续在主进程上使用原始连接(或非多进程调用的任何连接)

也许每个进程中的init DB对象都会对您有所帮助

def get_db():
    db = SqliteExtDatabase(path)
    return db

class BaseModel(Model):

    def __init__(self, database, **kwargs):
        self.database = database

# Many other models

class Batch(BaseModel):
    
    def multi():
        def background_proc():
            # trying to override Meta's db connection.
            db = get_db()
            basemodel = BaseModel(db)
            # do something like "basemodel.insert(name="Alex")"
            job = Job(db)
            result = job.get_by_id(1)
            print("result")
            print("working in the background")
        
        process = multiprocessing.Process(target=background_proc)
        process.start()

相关问题 更多 >

    热门问题