<p>最后我做的是,除了sqlite3和threading之外,我没有使用peewee、SqliteQueueDatabase或任何ORM。在</p>
<p>使用一种单例技巧,我基本上有一个对象的一个实例,它有一个连接属性,因此一个连接实例被所有线程共享。当连接到sqlite时,我必须设置<code>check_same_thread=False</code>,否则不同的线程不能共享同一个连接。在</p>
<p>以下是简化版本:</p>
<pre><code>import sqlite3
import threading
class Singleton(type):
_instances = {}
def __call__(cls, *args, **kwargs):
if cls not in cls._instances:
cls._instances[cls] = super(Singleton, cls).__call__(*args, **kwargs)
return cls._instances[cls]
class Dao(object, metaclass=Singleton):
def __init__(self, conf=None):
self.lock = threading.Lock()
self.conn = sqlite3.connect(
conf.db,
check_same_thread=False
)
</code></pre>
<p>我第一次在主线程中实例化<code>Dao</code>类并将配置传递给<code>__init__</code>。后来无论哪个线程需要使用数据库,它都只创建了一个<code>Dao</code>对象。但是由于Singleton技巧,调用者只获得了对已经存在的实例的引用,该实例还包括已经建立的连接。在</p>
<p>我将所有的DB操作作为方法添加到这个<code>Dao</code>类中。为了避免重复锁定,我使用<code>wraps</code>创建了装饰器。在</p>
^{pr2}$
<p>对于只读操作,我有一个类似的包装器,但是没有提交/回滚。我在其中设置了是否在只读操作期间执行锁定的配置,只是为了能够在prod中发生DB并发问题时在不需要新版本的情况下调整锁定行为</p>
<p>现在我所要做的就是将我的自定义事务装饰器添加到<code>Dao</code>方法中。(在现实生活中,事务可以由几个SQL命令组成。因此,我有一些没有事务注释的原子方法,它们从来没有直接从外部调用。它们只由其他一些<code>Dao</code>方法调用,这些方法在单个事务中执行多个调用,因此这些复杂的方法具有事务注释。对于这些事务的大小和速度,我非常小心,因为在我的例子中,锁机制基本上阻止了其他线程同时处理数据库。)</p>
<p>因此<code>Dao</code>方法可以类似于:</p>
<pre><code>@transaction_read_write
def set_processed_files(self, id, num_files):
cur = self.conn.cursor()
cur.execute("UPDATE jobs SET num_files = ? WHERE job_id = ?", (num_files, id))
</code></pre>
<p>最后,我决定不使用peewee,但我希望在我的示例中有一些有用的东西。在</p>