使用Python中的Peewee在sqlddl中使用SQLite触发器和日期时间默认值

2024-09-29 22:52:28 发布

您现在位置:Python中文网/ 问答频道 /正文

我有一个SQLite表定义如下:

create table if not exists KeyValuePair (
    key         CHAR(255) primary key not null,
    val         text not null,
    fup         timestamp default current_timestamp not null,  -- time of first upload
    lup         timestamp default current_timestamp not null  -- time of last upload
);

create trigger if not exists entry_first_insert after insert
on KeyValuePair
begin
    update KeyValuePair set lup = current_timestamp where key = new.key;
end;

create trigger if not exists entry_last_updated after update of value
on KeyValuePair
begin
    update KeyValuePair set lup = current_timestamp where key = old.key;
end;

我试图用Python为这个表写一个peewee.Model。到目前为止,我得到的是:

^{pr2}$

当我检查最后一行生成的SQL时,我得到:

CREATE TABLE "keyvaluepair" (
    "key" CHAR(255) NOT NULL PRIMARY KEY,
    "val" TEXT NOT NULL,
    "fup" DATETIME NOT NULL,
    "lup" DATETIME NOT NULL
);

所以我现在有两个问题:

  1. 我无法找到实现entry_first_insertentry_last_updated触发器行为的方法。peewee是否支持触发器?如果不是,有没有办法只从.sql文件而不是Model类定义创建表?在
  2. 有没有办法使fuplup的默认值分配给SQL定义吗?在

Tags: keyif定义createexistsnotcurrentnull
3条回答

不久前,我偶然发现了这个问题,并花了一些时间想出了一个支持PeeWee触发器的优化设计(受上述答案的启发)。我很高兴我们最终实现了它,并希望与大家分享这一点。在某个时候,我会为这个做一个公关。

在PeeWee中创建触发器和触发器侦听器

目的

本文档分两部分介绍如何进行此操作:

  • 如何向数据库中的模型添加Trigger。在
  • 如何创建一个ListenThread,该函数将在每次更新表时通知回调函数。在

如何实施

这个设计的好处是你只需要一个项目:TriggerModelMixin模型。然后很容易创建侦听器来订阅/拥有回调方法。

TriggerModelMixin可以复制粘贴为:

class TriggerModelMixin(Model):
    """ PeeWee Model with support for triggers.
    This will create a trigger that on all table updates will send
    a NOTIFY to {tablename}_updates.
    Note that it will also take care of updating the triggers as
    appropriate/necesary.
    """

    _template = """
    CREATE OR REPLACE FUNCTION {function_name}()
        RETURNS trigger AS
        $BODY$
        BEGIN
          PERFORM pg_notify(
            CAST('{notify_channel_name}' AS text),
            row_to_json(NEW)::text);
          RETURN NEW;
        END;
        $BODY$
        LANGUAGE plpgsql VOLATILE
        COST 100;
        ALTER FUNCTION {function_name}() OWNER TO postgres;

    DROP TRIGGER IF EXISTS {trigger_name} ON "{tablename}";

    CREATE TRIGGER {trigger_name}
        AFTER INSERT OR UPDATE OR DELETE
        ON "{tablename}"
        {frequency}
        EXECUTE PROCEDURE {function_name}();
    """

    function_name_template = "{table_name}updatesfunction"
    trigger_name_template = "{table_name}updatestrigger"
    notify_channel_name_template = "{table_name}updates"
    frequency = "FOR EACH ROW"

    @classmethod
    def get_notify_channel(cls):
        table_name = cls._meta.table_name
        return cls.notify_channel_name_template.format(**{"table_name": table_name})

    @classmethod
    def create_table(cls, fail_silently=False):
        """ Create table and triggers """
        super(TriggerModelMixin, cls).create_table()

        table_name = cls._meta.table_name
        notify_channel = cls.get_notify_channel()
        function_name = cls.function_name_template.format(**{"table_name": table_name})
        trigger_name = cls.trigger_name_template.format(**{"table_name": table_name})

        trigger = cls._template.format(**{
                                            "function_name": function_name,
                                            "trigger_name": trigger_name,
                                            "notify_channel_name": notify_channel,
                                            "tablename": table_name,
                                            "frequency": cls.frequency
                                         }
                                       )
        logger.info(f"Creating Triggers for {cls}")
        cls._meta.database.execute_sql(str(trigger))

    @classmethod
    def create_db_listener(cls):
        ''' Returns an object that will listen to the database notify channel
        and call a specified callback function if triggered.
        '''

        class Trigger_Listener:
            def __init__(self, db_model):
                self.db_model = db_model
                self.running = True
                self.test_mode = False
                self.channel_name = ""

            def stop(self):
                self.running = False

            def listen_and_call(self, f, *args, timeout: int = 5, sync=False):
                ''' Start listening and call the callback method `f` if a
                trigger notify is received.
                This has two styles: sync (blocking) and async (non-blocking)
                Note that `f` must have `record` as a keyword parameter - this
                will be the record that sent the notification.
                '''
                if sync:
                    return self.listen_and_call_sync(f, *args, timeout=timeout)
                else:
                    t = threading.Thread(
                        target=self.listen_and_call_sync,
                        args=(f, *args),
                        kwargs={'timeout': timeout}
                    )
                    t.start()

            def listen_and_call_sync(self, f, *args, timeout: int = 5):
                ''' Call callback function `f` when the channel is notified. '''
                self.channel_name = self.db_model.get_notify_channel()
                db = self.db_model._meta.database

                db.execute_sql(f"LISTEN {self.channel_name};")
                conn = db.connection()
                while self.running:
                    # The if see's if the response is non-null
                    if not select.select([conn], [], [], timeout) == ([], [], []):
                        # Wait for the bytes to become fully available in the buffer
                        conn.poll()
                        while conn.notifies:
                            record = conn.notifies.pop(0)
                            logger.info(f"Trigger recieved with record {record}")
                            f(*args, record=record)
                    if self.test_mode:
                        break

        return Trigger_Listener(cls)

实施示例:

^{pr2}$

如何使用这个

1。向数据库中的模型添加触发器

这很容易。只需将mixin TriggerModelMixin添加到要添加支持的模型中。这个Mixin将处理触发器的创建,以及在调用触发器时通知的侦听方法。

Example of adding trigger support to a model

2。创建ListenThread以进行回调

侦听器有两种模式:async(非阻塞)和{}(阻塞)。默认情况下,它将是non-blocking,如果希望它被阻塞,可以用sync=True来更改它。

要使用它(无论哪种情况),请创建一个回调方法。请注意,当接收到更新(记录是串行处理的)时,此回调方法将被阻塞,因此在该方法中不要有重载或I/O。此方法的唯一要求是record的键控参数,该参数将作为字典返回数据库中的记录。

在此基础上,只需创建侦听器,然后调用listen_and_call

Example of adding a callback method and registering it with the trigger.

可以重写插入时间戳的模型的save函数。有关示例,请参见TimeStampModel

这两个问题我都想好了。此解决方案实际上在sqlddl中强制使用所需的触发器和默认时间戳。

首先,我们定义一个便利类来包装触发器的SQL。对于peewee.Node对象,有一种更合适的方法来实现这一点,但我没有时间为这个项目深入研究所有这些内容。这个Trigger类只提供字符串格式,以输出正确的sql来创建触发器。

class Trigger(object):
    """Trigger template wrapper for use with peewee ORM."""

    _template = """
    {create} {name} {when} {trigger_op}
    on {tablename}
    begin
        {op} {tablename} {sql} where {pk} = {old_new}.{pk};
    end;
    """

    def __init__(self, table, name, when, trigger_op, op, sql, safe=True):
        self.create = 'create trigger' + (' if not exists' if safe else '')
        self.tablename = table._meta.name
        self.pk = table._meta.primary_key.name
        self.name = name
        self.when = when
        self.trigger_op = trigger_op
        self.op = op
        self.sql = sql
        self.old_new = 'new' if trigger_op.lower() == 'insert' else 'old'

    def __str__(self):
        return self._template.format(**self.__dict__)

接下来,我们定义一个继承BaseModel的类TriggerTable。这个类重写默认的create_table,以便在创建表之后创建触发器。如果任何触发器未能创建,则整个创建操作都将回滚。

^{pr2}$

下一步是创建一个类BetterDateTimeField。如果default实例变量设置为datetime.datetime.now函数,则此Field对象重写默认的__ddl__以附加一个“default current}时间戳”字符串。当然有更好的方法可以做到这一点,但这一个抓住了基本的用例。

class BetterDateTimeField(pw.DateTimeField):
    """Propogate defaults to database layer."""

    def __ddl__(self, column_type):
        """Return a list of Node instances that defines the column."""
        ddl = super(BetterDateTimeField, self).__ddl__(column_type)
        if self.default == datetime.datetime.now:
            ddl.append(pw.SQL('DEFAULT current_timestamp'))
        return ddl

最后,我们定义了新的和改进的KeyValuePair模型,其中包含了触发器和日期时间字段的改进。我们通过创建表来结束Python代码。

class KeyValuePair(TriggerTable):
    """DurableHashMap entries are key-value pairs."""

    key = pw.FixedCharField(primary_key=True, max_length=255)
    val = pw.TextField(null=False)
    fup = BetterDateTimeField(
        verbose_name='first_updated', null=False, default=datetime.datetime.now)
    lup = BetterDateTimeField(
        verbose_name='last_updated', null=False, default=datetime.datetime.now)

    @classmethod
    def triggers(cls):
        return (
            cls.new_trigger(
                'kvp_first_insert', 'after', 'insert', 'update',
                'set lup = current_timestamp'),
            cls.new_trigger(
                'kvp_last_udpated', 'after', 'update', 'update',
                'set lup = current_timestamp')
        )

KeyValuePair.create_table()

现在模式已正确创建:

sqlite> .schema keyvaluepair
CREATE TABLE "keyvaluepair" ("key" CHAR(255) NOT NULL PRIMARY KEY, "val" TEXT NOT NULL, "fup" DATETIME NOT NULL DEFAULT current_timestamp, "lup" DATETIME NOT NULL DEFAULT current_timestamp);
CREATE TRIGGER kvp_first_insert after insert
    on keyvaluepair
    begin
        update keyvaluepair set lup = current_timestamp where key = new.key;
    end;
CREATE TRIGGER kvp_last_udpated after update
    on keyvaluepair
    begin
        update keyvaluepair set lup = current_timestamp where key = old.key;
    end;
sqlite> insert into keyvaluepair (key, val) values ('test', 'test-value');
sqlite> select * from keyvaluepair;
test|test-value|2015-12-07 21:58:05|2015-12-07 21:58:05
sqlite> update keyvaluepair set val = 'test-value-two' where key = 'test';
sqlite> select * from keyvaluepair;
test|test-value-two|2015-12-07 21:58:05|2015-12-07 21:58:22

相关问题 更多 >

    热门问题