SQLAlchemy对postgresql插入可伸缩性的唯一检查

2024-10-04 11:25:24 发布

您现在位置:Python中文网/ 问答频道 /正文

有人能帮我理解我做错了什么吗?在

下面所有的工作都符合要求,但我遇到了可伸缩性问题-

On the first run i fetched ~70,000 rows into a blank table in ~2-3 s

On the 2nd run i fetched ~80,000 rows into the same table in ~5 min

On the 3rd run i fetched ~50,000 rows into the same table in ~30 min    

On the 4th run i fetched ~120,000 rows into the same table in ~1 hr    

On the 5th run i fetched ~100,000 rows into the same table in ~2 hr

每次运行代码时,我都会看到在这个活动结束时客户端和数据库之间稳定的~600KB/s的流量

因此,正如您所看到的,所有这些列的哈希检查似乎没有很好的伸缩性

我的代码试图实现什么?

我需要将每日股票数据添加到postgres数据库中。数据源每天只更新一次API响应如下-

{'instrument_token': '210011653'
'exchange_token': '820358'
'tradingsymbol': 'COLG17MAY1020.00PE'
'name': ''
'last_price': 0.0
'expiry': '2017-05-25'
'strike': 1020.0
'tick_size': 0.05
'lot_size': 700
'instrument_type': 'PE'
'segment': 'BFO-OPT'
'exchange': 'BFO'} 

响应中的项目和行数每天都在更改 在给定的一天,我可以在一个响应中获取50000-120000行(即大约20-30MB的csv数据)。发送请求将获取给定日期的相同数据。在

因此,核心问题是-我希望避免将同一行添加到数据库中两次,以防在同一天多次获取数据。

我已经试过了

我是一个db新手,我的想法是自动增加一个id,并添加一个data\u date列,所以我的模式如下-

^{pr2}$

我建立了一个这样的班级-

from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker, mapper, relationship
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, Numeric, String, MetaData, Table, ForeignKey, DateTime, union
from sqlalchemy.engine.url import URL

engine = create_engine('postgresql://blah')
Base = declarative_base(engine)

def _unique(session, cls, hashfunc, queryfunc, constructor, arg, kw):
    cache = getattr(session, '_unique_cache', None)
    if cache is None:
        session._unique_cache = cache = {}

    key = (cls, hashfunc(*arg, **kw))
    if key in cache:
        return cache[key]
    else:
        with session.no_autoflush:
            q = session.query(cls)
            q = queryfunc(q, *arg, **kw)
            obj = q.first()
            if not obj:
                obj = constructor(*arg, **kw)
                session.add(obj)
        cache[key] = obj
        return obj

class UniqueMixin(object):
    @classmethod
    def unique_hash(cls, *arg, **kw):
        raise NotImplementedError()

    @classmethod
    def unique_filter(cls, query, *arg, **kw):
        raise NotImplementedError()

    @classmethod
    def as_unique(cls, session, *arg, **kw):
        return _unique(
                    session,
                    cls,
                    cls.unique_hash,
                    cls.unique_filter,
                    cls,
                    arg, kw
               )

class Instrument(UniqueMixin, Base):

        __tablename__ = 'instruments'
        __table_args__ = {'autoload':True}
        __table__ = Table('instruments', Base.metadata,
            Column('id', Integer, primary_key=True),
            Column('data_date', String),
            Column('instrument_token', Integer),
            Column('exchange_token', Integer),
            Column('tradingsymbol', String),
            Column('name', String),
            Column('last_price', Numeric),
            Column('expiry', Integer),
            Column('strike', Numeric),
            Column('tick_size', Numeric),
            Column('lot_size', Integer),
            Column('instrument_type', String),
            Column('segment', String),
            Column('exchange', String))


        @classmethod
        def unique_hash(cls, data_date, instrument_token, exchange_token, tradingsymbol, name, last_price, expiry, strike, tick_size, lot_size, instrument_type, segment, exchange):
            return data_date, instrument_token, exchange_token, tradingsymbol, name, last_price, expiry, strike, tick_size, lot_size, instrument_type, segment, exchange
        @classmethod
        def unique_filter(cls, query, data_date, instrument_token, exchange_token, tradingsymbol, name, last_price, expiry, strike, tick_size, lot_size, instrument_type, segment, exchange):
            return query.filter(Instrument.data_date == data_date, Instrument.instrument_token == instrument_token, Instrument.exchange_token == exchange_token, Instrument.tradingsymbol == tradingsymbol, Instrument.name == name, Instrument.last_price == last_price, Instrument.expiry == expiry, Instrument.strike == strike, Instrument.tick_size == tick_size, Instrument.lot_size == lot_size, Instrument.instrument_type == instrument_type, Instrument.segment == segment, Instrument.exchange == exchange)



        def __init__(self, data_date, instrument_token, exchange_token, tradingsymbol, name, last_price, expiry, strike, tick_size, lot_size, instrument_type, segment, exchange):

            self.data_date = data_date
            self.instrument_token = instrument_token
            self.exchange_token = exchange_token
            self.tradingsymbol = tradingsymbol
            self.name = name
            self.last_price = last_price
            self.expiry = expiry
            self.strike = strike
            self.tick_size = tick_size
            self.lot_size = lot_size
            self.instrument_type = instrument_type
            self.segment = segment
            self.exchange = exchange

        def __repr__(self):
            return "<Instruments - '%s': '%s' - '%s' - '%s' - '%s' - '%s' - '%s' - '%s' - '%s' - '%s' - '%s' - '%s' - '%s' - '%s'>" % (
                                                                                                                                        self.id, 
                                                                                                                                        self.data_date, 
                                                                                                                                        self.instrument_token, 
                                                                                                                                        self.exchange_token, 
                                                                                                                                        self.tradingsymbol, 
                                                                                                                                        self.name, 
                                                                                                                                        self.last_price, 
                                                                                                                                        self.expiry, 
                                                                                                                                        self.strike, 
                                                                                                                                        self.tick_size, 
                                                                                                                                        self.lot_size, 
                                                                                                                                        self.instrument_type, 
                                                                                                                                        self.segment, 
                                                                                                                                        self.exchange
                                                                                                                                        )

插入数据的代码如下-

    for instrument in response:
        #print(instrument)
        if instrument['expiry'] == '' :
            instrument['expiry'] = null()
        market_instrument = Instrument.as_unique(self.session, 
                                                    data_date = datetime.date.today().isoformat(), 
                                                    instrument_token =  instrument['instrument_token'], 
                                                    exchange_token =    instrument['exchange_token'], 
                                                    tradingsymbol = instrument['tradingsymbol'], 
                                                    name =  instrument['name'], 
                                                    last_price =    instrument['last_price'], 
                                                    expiry =    instrument['expiry'], 
                                                    strike =    instrument['strike'],
                                                    tick_size = instrument['tick_size'],
                                                    lot_size =  instrument['lot_size'], 
                                                    instrument_type =   instrument['instrument_type'], 
                                                    segment =   instrument['segment'], 
                                                    exchange = instrument['exchange'], 
                                                    )
        self.session.add(market_instrument)
    self.session.commit()

我正在考虑的选项

你觉得什么最好?在

选项1 不再作为\u unique使用(

再创建一个data_update_date表(data_date(primary),status(boolean)),在成功的每日插入结束时更新

为今天的日期检查data\u update_date,并跳过整个块的添加(如果存在)

但是,这个选项并不能帮助我了解我的as_unique函数是否还有其他需要更正的错误

选项2 使用powa和配置文件设置新数据库

查找并解决瓶颈

我使用的是官方的postgres docker映像,我遇到了一个死胡同扩展debian基和其他必需的扩展

看起来centos会简单得多,所以我要创建一个新的dockerfile来完成这个任务

但是,由于我是postgresql&sqlalchemy的新手,我还需要您对我的代码是否存在一些明显问题的意见

选项3 仅哈希少数列

除了身份证,我可以把前3个拼凑出来

但是我不知道怎么做

仅仅减少hash classdef参数就会导致参数的数量少于类中定义的数量,因此插入失败

选项4

我既没有嫁给postgresql也没有嫁给sqlalchemy

我应该使用非ORM方法吗?在

或者,我应该使用数据库以外的东西来存储这种数据吗

我在AWS上的m2.large实例上运行这个,它应该具有正确的性能,所以可能我使用了错误的方法来存储数据 如果在插入过程中出现这种情况,在进行技术分析时多个线程将无法使用。。。在

我应该改用hadoop之类的东西吗?在

此外,这个选项的一个明显的缺点是另一个学习曲线,以适应hadoop。。。在


Tags: nameselftokendatasizedateexchangecolumn
1条回答
网友
1楼 · 发布于 2024-10-04 11:25:24

我运行了一些db profiling on the bulk insert operation

缓存命中率为100%

我看不到任何磁盘io

抱歉,我现在不能发布超过2个链接,因此我无法向您显示命中率和磁盘命中率的图表,因此您只需相信我的话:)

as_unique方法基本上是使用一种非常低效的方法来工作的,这种方法会用疯狂的大量查询来攻击数据库。如果有的话,我想这只是作为这个服务器build+config的一个很好的基准,这让我对它的缓存友好工作负载的性能非常满意

正如来自各种响应的提示所指出的,瓶颈在于模式以及代码中实现插入的方式

我是这样解决问题的-

1。添加多列唯一索引

CREATE UNIQUE INDEX market_daily_uq_idx ON instruments (
data_date, 
instrument_token, 
exchange_token, 
tradingsymbol, 
instrument_type, 
segment, 
exchange
);

2。使用.on\u conflict\u do_nothing()

^{pr2}$

这非常有效&things are much faster now,从而解决了核心问题

非常感谢大家的帮助、提示和建议!在

相关问题 更多 >