有人能帮我理解我做错了什么吗?在
下面所有的工作都符合要求,但我遇到了可伸缩性问题-
On the first run i fetched ~70,000 rows into a blank table in ~2-3 s On the 2nd run i fetched ~80,000 rows into the same table in ~5 min On the 3rd run i fetched ~50,000 rows into the same table in ~30 min On the 4th run i fetched ~120,000 rows into the same table in ~1 hr On the 5th run i fetched ~100,000 rows into the same table in ~2 hr
每次运行代码时,我都会看到在这个活动结束时客户端和数据库之间稳定的~600KB/s的流量
因此,正如您所看到的,所有这些列的哈希检查似乎没有很好的伸缩性
我的代码试图实现什么?
我需要将每日股票数据添加到postgres数据库中。数据源每天只更新一次API响应如下-
{'instrument_token': '210011653'
'exchange_token': '820358'
'tradingsymbol': 'COLG17MAY1020.00PE'
'name': ''
'last_price': 0.0
'expiry': '2017-05-25'
'strike': 1020.0
'tick_size': 0.05
'lot_size': 700
'instrument_type': 'PE'
'segment': 'BFO-OPT'
'exchange': 'BFO'}
响应中的项目和行数每天都在更改 在给定的一天,我可以在一个响应中获取50000-120000行(即大约20-30MB的csv数据)。发送请求将获取给定日期的相同数据。在
因此,核心问题是-我希望避免将同一行添加到数据库中两次,以防在同一天多次获取数据。
我已经试过了
我是一个db新手,我的想法是自动增加一个id,并添加一个data\u date列,所以我的模式如下-
^{pr2}$我建立了一个这样的班级-
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker, mapper, relationship
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, Numeric, String, MetaData, Table, ForeignKey, DateTime, union
from sqlalchemy.engine.url import URL
engine = create_engine('postgresql://blah')
Base = declarative_base(engine)
def _unique(session, cls, hashfunc, queryfunc, constructor, arg, kw):
cache = getattr(session, '_unique_cache', None)
if cache is None:
session._unique_cache = cache = {}
key = (cls, hashfunc(*arg, **kw))
if key in cache:
return cache[key]
else:
with session.no_autoflush:
q = session.query(cls)
q = queryfunc(q, *arg, **kw)
obj = q.first()
if not obj:
obj = constructor(*arg, **kw)
session.add(obj)
cache[key] = obj
return obj
class UniqueMixin(object):
@classmethod
def unique_hash(cls, *arg, **kw):
raise NotImplementedError()
@classmethod
def unique_filter(cls, query, *arg, **kw):
raise NotImplementedError()
@classmethod
def as_unique(cls, session, *arg, **kw):
return _unique(
session,
cls,
cls.unique_hash,
cls.unique_filter,
cls,
arg, kw
)
class Instrument(UniqueMixin, Base):
__tablename__ = 'instruments'
__table_args__ = {'autoload':True}
__table__ = Table('instruments', Base.metadata,
Column('id', Integer, primary_key=True),
Column('data_date', String),
Column('instrument_token', Integer),
Column('exchange_token', Integer),
Column('tradingsymbol', String),
Column('name', String),
Column('last_price', Numeric),
Column('expiry', Integer),
Column('strike', Numeric),
Column('tick_size', Numeric),
Column('lot_size', Integer),
Column('instrument_type', String),
Column('segment', String),
Column('exchange', String))
@classmethod
def unique_hash(cls, data_date, instrument_token, exchange_token, tradingsymbol, name, last_price, expiry, strike, tick_size, lot_size, instrument_type, segment, exchange):
return data_date, instrument_token, exchange_token, tradingsymbol, name, last_price, expiry, strike, tick_size, lot_size, instrument_type, segment, exchange
@classmethod
def unique_filter(cls, query, data_date, instrument_token, exchange_token, tradingsymbol, name, last_price, expiry, strike, tick_size, lot_size, instrument_type, segment, exchange):
return query.filter(Instrument.data_date == data_date, Instrument.instrument_token == instrument_token, Instrument.exchange_token == exchange_token, Instrument.tradingsymbol == tradingsymbol, Instrument.name == name, Instrument.last_price == last_price, Instrument.expiry == expiry, Instrument.strike == strike, Instrument.tick_size == tick_size, Instrument.lot_size == lot_size, Instrument.instrument_type == instrument_type, Instrument.segment == segment, Instrument.exchange == exchange)
def __init__(self, data_date, instrument_token, exchange_token, tradingsymbol, name, last_price, expiry, strike, tick_size, lot_size, instrument_type, segment, exchange):
self.data_date = data_date
self.instrument_token = instrument_token
self.exchange_token = exchange_token
self.tradingsymbol = tradingsymbol
self.name = name
self.last_price = last_price
self.expiry = expiry
self.strike = strike
self.tick_size = tick_size
self.lot_size = lot_size
self.instrument_type = instrument_type
self.segment = segment
self.exchange = exchange
def __repr__(self):
return "<Instruments - '%s': '%s' - '%s' - '%s' - '%s' - '%s' - '%s' - '%s' - '%s' - '%s' - '%s' - '%s' - '%s' - '%s'>" % (
self.id,
self.data_date,
self.instrument_token,
self.exchange_token,
self.tradingsymbol,
self.name,
self.last_price,
self.expiry,
self.strike,
self.tick_size,
self.lot_size,
self.instrument_type,
self.segment,
self.exchange
)
插入数据的代码如下-
for instrument in response:
#print(instrument)
if instrument['expiry'] == '' :
instrument['expiry'] = null()
market_instrument = Instrument.as_unique(self.session,
data_date = datetime.date.today().isoformat(),
instrument_token = instrument['instrument_token'],
exchange_token = instrument['exchange_token'],
tradingsymbol = instrument['tradingsymbol'],
name = instrument['name'],
last_price = instrument['last_price'],
expiry = instrument['expiry'],
strike = instrument['strike'],
tick_size = instrument['tick_size'],
lot_size = instrument['lot_size'],
instrument_type = instrument['instrument_type'],
segment = instrument['segment'],
exchange = instrument['exchange'],
)
self.session.add(market_instrument)
self.session.commit()
我正在考虑的选项
你觉得什么最好?在
选项1 不再作为\u unique使用(
再创建一个data_update_date表(data_date(primary),status(boolean)),在成功的每日插入结束时更新
为今天的日期检查data\u update_date,并跳过整个块的添加(如果存在)
但是,这个选项并不能帮助我了解我的as_unique函数是否还有其他需要更正的错误
选项2 使用powa和配置文件设置新数据库
查找并解决瓶颈
我使用的是官方的postgres docker映像,我遇到了一个死胡同扩展debian基和其他必需的扩展
看起来centos会简单得多,所以我要创建一个新的dockerfile来完成这个任务
但是,由于我是postgresql&sqlalchemy的新手,我还需要您对我的代码是否存在一些明显问题的意见
选项3 仅哈希少数列
除了身份证,我可以把前3个拼凑出来
但是我不知道怎么做
仅仅减少hash classdef参数就会导致参数的数量少于类中定义的数量,因此插入失败
选项4
我既没有嫁给postgresql也没有嫁给sqlalchemy
我应该使用非ORM方法吗?在
或者,我应该使用数据库以外的东西来存储这种数据吗
我在AWS上的m2.large实例上运行这个,它应该具有正确的性能,所以可能我使用了错误的方法来存储数据 如果在插入过程中出现这种情况,在进行技术分析时多个线程将无法使用。。。在
我应该改用hadoop之类的东西吗?在
此外,这个选项的一个明显的缺点是另一个学习曲线,以适应hadoop。。。在
我运行了一些db profiling on the bulk insert operation
缓存命中率为100%
我看不到任何磁盘io
抱歉,我现在不能发布超过2个链接,因此我无法向您显示命中率和磁盘命中率的图表,因此您只需相信我的话:)
as_unique方法基本上是使用一种非常低效的方法来工作的,这种方法会用疯狂的大量查询来攻击数据库。如果有的话,我想这只是作为这个服务器build+config的一个很好的基准,这让我对它的缓存友好工作负载的性能非常满意
正如来自各种响应的提示所指出的,瓶颈在于模式以及代码中实现插入的方式
我是这样解决问题的-
1。添加多列唯一索引
2。使用.on\u conflict\u do_nothing()
^{pr2}$这非常有效&things are much faster now,从而解决了核心问题
非常感谢大家的帮助、提示和建议!在
相关问题 更多 >
编程相关推荐