API Star的WebSocket组件

apistar-websocket的Python项目详细描述


API星形WebSocket

websocket组件,用于轻松使用带有api star和uvicorn的websockets

PyPI version


API Star和Uvicorn的简易WebSocket组件。


功能

很容易使用

只需像路由处理程序上接受WebSocket的任何其他组件一样使用它 连接

# With WebSocketAutoHook in event_hooks is the easiest usage,# but see the Issues below about using WebSocketAutoHookasyncdefws_handler(ws:WebSocket):inbound_msg=awaitws.recv()...awaitws.send(outbound_msg)# Without the WebSocketAutoHookasyncdefws_handler(ws:WebSocket):awaitws.connect()inbound_msg=awaitws.recv()...awaitws.send(outbound_msg)awaitws.close()

自动连接和关闭事件挂钩。

当与WebSocketAutoHook一起使用时,websocket将在处理程序之前连接 被称为。当处理程序返回或由于错误而结束时,websocket将自动 关闭。请参阅下面关于WebSocketAutoHookIssues

安装

pip install apistar-websocket

或者对于Pipenv用户

pipenv install apistar-websocket

示例

下面是一个向连接的客户端发送100个号码并等待的基本示例 每次发送后返回消息。websocket为我们连接并关闭 通过WebSocketAutoHook

importjsonfromapistarimportASyncApp,RoutefromwebsocketimportWebSocket,WebSocketComponent,WebSocketAutoHookasyncdefwelcome_ws(path:str,ws:WebSocket):"""    A WebSocket endpoint used to play a terrible game of data ping pong.    """foriinrange(100):awaitws.send(json.dumps({"msg":f"{path} is nice","count":i,}))# We'll pretend we're playing ping pongmsg=awaitws.receive()routes=[Route('/{+path}','GET',handler=welcome_ws,name='welcome_ws'),]event_hooks=[WebSocketAutoHook]components=[WebSocketComponent()]app=ASyncApp(routes=routes,components=components,event_hooks=event_hooks,)if__name__=='__main__':main()

与上面的示例相同,但不使用WebSocketAutoHook。这里是管理员 必须管理WebSocket连接本身并作为独立的Route运行以防止 从发送响应到客户端的API Star。

importjsonfromapistarimportASyncApp,RoutefromwebsocketimportWebSocket,WebSocketComponentasyncdefwelcome_ws(path:str,ws:WebSocket):"""    A WebSocket endpoint used to subscribe to a stream of data.    """# finish the connectionawait.connect()foriinrange(100):awaitws.send(json.dumps({"msg":f"{path} is nice","count":i,}))# We'll pretend we're playing ping pongmsg=awaitws.receive()# close the connectionawaitws.close()# Must run the route standalone to prevent the attempt at an HTTP Response being sentroutes=[Route('/{+path}','GET',handler=welcome_ws,name='welcome_ws',standalone=True),]components=[WebSocketComponent()]app=ASyncApp(routes=routes,components=components,)if__name__=='__main__':main()

uvicorn

您必须使用uvicorn运行api之星,才能使apistar-websocket工作。

要在启用调试的情况下运行应用程序并在文件更改时自动重新加载应用程序, 非常适合开发,使用:

uvicorn --log-level DEBUG --reload app:app

对于生产环境,从命令开始:

uvicorn app:app

并根据需要进行调整,有关更多选项,请参见uvicorn --help

问题

使用组件的最干净的方法是不使用WebSocketAutoHook,并且作为独立的 使用连接并关闭WebSocket的处理程序进行路由。这是因为当处理程序 not用作独立路由,默认情况下,api star将在处理程序之后发送http响应 已经完成。虽然这对于常规的http请求是必需的,但对于 WebSocket连接已关闭。您可以使用WebSocketAutoHook 你的处理程序将按你所期望的那样工作。 但是为了防止api star发送http响应 WebSocketAutoHook将在关闭websocket以阻止http之后引发异常 导致API Star中出现错误条件的响应。

欢迎加入QQ群-->: 979659372 Python中文网_新手群

推荐PyPI第三方库


热门话题
java在ArrayList中比较数字   java在Kotlin中使异步调用同步   让“Scala编程”junit示例在IntelliJ中工作的java问题   java Servlet侦听器未在ContextListener中设置属性   将Microsoft SQL Server数据库连接到我的Java项目   加载资源时出现java“需要注册工厂”异常   java如何使用POI检查excel中的重复记录?   java如何更改机器生成的代码   java如何确保重写的方法是同步的   用Spring编写Hibernate时的java XML奥秘   java管理mysql数据库中存储的用户权限   java如何运行。来自Javascript的jar方法   java我想在Web应用程序中进行身份验证&对桌面应用程序使用相同的凭据。我该怎么做?