我可以在没有服务器回调的情况下更新bokeh图吗?

2024-09-27 07:28:45 发布

您现在位置:Python中文网/ 问答频道 /正文

我希望Bokeh在python中运行的独立算法的结果返回结果时定期和任意地更新,而不是基于Bokeh接口的任何输入。在

我尝试过各种解决方案,但它们都依赖于对某个UI事件的回调或周期性回调,如下面的代码所示。在

import numpy as np
from bokeh.plotting import figure, curdoc
from bokeh.models import ColumnDataSource, Plot, LinearAxis, Grid
from bokeh.models.glyphs import MultiLine
from time import sleep
from random import randint


def getData():  # simulate data acquisition
    # run slow algorith
    sleep(randint(2,7)) #simulate slowness of algorithm
    return dict(xs=np.random.rand(50, 2).tolist(), ys=np.random.rand(50, 2).tolist())


# init plot
source = ColumnDataSource(data=getData())

plot = Plot(
    title=None, plot_width=600, plot_height=600,
    min_border=0, toolbar_location=None)

glyph = MultiLine(xs="xs", ys="ys", line_color="#8073ac", line_width=0.1)
plot.add_glyph(source, glyph)

xaxis = LinearAxis()
plot.add_layout(xaxis, 'below')

yaxis = LinearAxis()
plot.add_layout(yaxis, 'left')

plot.add_layout(Grid(dimension=0, ticker=xaxis.ticker))
plot.add_layout(Grid(dimension=1, ticker=yaxis.ticker))
curdoc().add_root(plot)


# update plot
def update():
    bokeh_source = getData()
    source.stream(bokeh_source, rollover=50)

curdoc().add_periodic_callback(update, 100)

这看起来确实管用,但这是处理事情的最佳方式吗?与其让Bokeh每隔100毫秒更新一次,我可以在新数据可用时将其推送到它吗?在

谢谢


Tags: fromimportaddsourceplotnpbokehrandom
1条回答
网友
1楼 · 发布于 2024-09-27 07:28:45

您可以使用zmq和asyncio来完成。以下是bokeh服务器的代码,它在异步协同程序中等待数据:

from bokeh.models import ColumnDataSource
from bokeh.plotting import figure, curdoc
from functools import partial
from tornado.ioloop import IOLoop
import zmq.asyncio

doc = curdoc()

context = zmq.asyncio.Context.instance()
socket = context.socket(zmq.SUB)
socket.connect("tcp://127.0.0.1:1234")
socket.setsockopt(zmq.SUBSCRIBE, b"")

def update(new_data):
    source.stream(new_data, rollover=50)

async def loop():
    while True:
        new_data = await socket.recv_pyobj()
        doc.add_next_tick_callback(partial(update, new_data))

source = ColumnDataSource(data=dict(x=[0], y=[0]))

plot = figure(height=300)
plot.line(x='x', y='y', source=source)

doc.add_root(plot)
IOLoop.current().spawn_callback(loop)

要发送数据,只需在另一个python进程中运行以下代码:

^{pr2}$

相关问题 更多 >

    热门问题