MarketStore Python驱动程序
pymarketstore2的Python项目详细描述
pymarketstore
MarketStore的Python驱动程序
生成状态:
pymarketstore可以从MarketStore查询和写入财务TimeSeries数据
用2.7,3.3+测试
如何安装
$ pip install pymarketstore
示例
In [1]: import pymarketstore as pymkts
## query data
In [2]: param = pymkts.Params('BTC', '1Min', 'OHLCV', limit=10)
In [3]: cli = pymkts.Client()
In [4]: reply = cli.query(param)
In [5]: reply.first().df()
Out[5]:
Open High Low Close Volume
Epoch
2018-01-17 17:19:00+00:00 10400.00 10400.25 10315.00 10337.25 7.772154
2018-01-17 17:20:00+00:00 10328.22 10359.00 10328.22 10337.00 14.206040
2018-01-17 17:21:00+00:00 10337.01 10337.01 10180.01 10192.15 7.906481
2018-01-17 17:22:00+00:00 10199.99 10200.00 10129.88 10160.08 28.119562
2018-01-17 17:23:00+00:00 10140.01 10161.00 10115.00 10115.01 11.283704
2018-01-17 17:24:00+00:00 10115.00 10194.99 10102.35 10194.99 10.617131
2018-01-17 17:25:00+00:00 10194.99 10240.00 10194.98 10220.00 8.586766
2018-01-17 17:26:00+00:00 10210.02 10210.02 10101.00 10138.00 6.616969
2018-01-17 17:27:00+00:00 10137.99 10138.00 10108.76 10124.94 9.962978
2018-01-17 17:28:00+00:00 10124.95 10142.39 10124.94 10142.39 2.262249
## write data
In [7]: import numpy as np
In [8]: import pandas as pd
In [9]: data = np.array([(pd.Timestamp('2017-01-01 00:00').value / 10**9, 10.0)], dtype=[('Epoch', 'i8'), ('Ask', 'f4')])
In [10]: cli.write(data, 'TEST/1Min/Tick')
Out[10]: {'responses': None}
In [11]: cli.query(pymkts.Params('TEST', '1Min', 'Tick')).first().df()
Out[11]:
Ask
Epoch
2017-01-01 00:00:00+00:00 10.0
客户机
pymkts.Client(endpoint='http://localhost:5993/rpc')
使用终结点构造客户端对象。
查询
pymkts.Client#query(symbols, timeframe, attrgroup, start=None, end=None, limit=None, limit_from_start=False)
可以使用pymkts.Params
生成参数。
- 符号:单个符号的字符串或多符号查询的符号字符串列表
- 时间表:时间表字符串
- 属性组:属性组字符串。符号、时间范围和属性组组成一个bucket键,在服务器中进行查询
- 开始:unix epoch second(int)、datetime对象或时间戳字符串。结果将只包括时间戳等于或晚于此时间的数据。
- 结束:unix epoch second(int)、datetime对象或时间戳字符串。结果将只包括时间戳等于或早于此时间的数据。
- 限制:要返回的记录数,从开始边界或结束边界开始计算。
- limit_from_start:boolean表示
limit
来自起始边界。默认为false。
将Params
的一个或多个实例传递给Client.query()
。它将返回QueryReply
对象,该对象保存从服务器返回的内部numpy数组数据。
写入
pymkts.Client#write(data, tbk)
您可以通过Client.write()
方法将numpy数组写入服务器。数据参数必须是numpy的recarray typewith
int64中名为Epoch
的列,在第一列处键入。tbk
是数据记录的bucket键。
列出符号
pymkts.Client#list_symbols()
将返回存储在服务器中的所有符号的列表。
服务器版本
pymkts.Client#server_version()
从服务器响应返回MarketStore版本头的字符串。
流媒体
如果服务器支持WebSocket流,则可以使用
pymkts.StreamConn
类。为了方便起见,您可以调用pymkts.Client#stream()
来获取具有相同服务器的实例
作为rest客户端的信息。
一旦有了这个实例,您将通过 方法或装饰器。这些方法接受 要筛选要对其执行操作的流的正则表达式。
要真正连接并开始从服务器接收消息,
您将使用流名称调用run()
。默认情况下,它订阅
通过*/*/*
向所有人致意。
pymkts.Client#stream()
返回一个StreamConn
,它是到服务器的websocket连接。
pymkts.StreamConn#(endpoint)
创建到endpoint
服务器的连接实例。终点
字符串是带有“ws”或“wss”方案以及端口和路径的完整url。
pymkts.StreamConn#register(stream_path, func)
@pymkts.StreamConn#on(stream_path)
向连接添加新的消息处理程序。函数将被调用
使用handler(StreamConn, {"key": "...", "data": {...,}})
如果键
(时间段键)与stream_path
正则表达式匹配。
on
方法是register
的装饰版本。
pymkts.StreamConn#run([stream1, stream2, ...])
开始与服务器通信并进入一个不确定的循环。它 在引发未处理的异常之前不返回,在这种情况下 连接已关闭,因此需要执行重试。而且,因为这是 一个阻塞方法,您可能需要在后台线程中运行它。
示例代码如下。
import pymarketstore as pymkts
conn = pymkts.StreamConn('ws://localhost:5993/ws')
@conn.on(r'^BTC/')
def on_btc(conn, msg):
print('received btc', msg['data'])
conn.run(['BTC/*/*']) # runs until exception
-> received btc {'Open': 4370.0, 'High': 4372.93, 'Low': 4370.0, 'Close': 4371.74, 'Volume': 3.3880948699999993, 'Epoch': 1507299600}