ASGI Websocket服务器的设计考虑了简单性。
microsockets的Python项目详细描述
微软软件包
ASGI Websocket服务器的设计考虑了简单性
安装
pip install microsockets
制作应用程序
^{pr2}$添加事件处理程序
@app.on("join")asyncdefhandle(ws):room=ws.payload["room"]user=ws.payload["user"]# Rooms managed by rooms.RoomManager, which can be subclassed/replacedawaitws.join(room)# Emit event back to client, may pass a payload as second argumentawaitws.emit("joined")# Broadcast to all other users in the room. skip_self=False would send it to current client as well.awaitws.broadcast("user joined",json.dumps(dict(user=user)),to=[room])
添加中间件
@app.hooks.before_onasyncdefload_payload(ws,func):ws.payload=json.loads(ws.payload)awaitfunc(ws)
使用ASGI服务器运行
pip install uvicorn
uvicorn example:app
安装JS客户端
npm install microsockets
生成客户端处理程序并发出事件
importMicroSocketfrommicrosockets;constsocket=MicroSocket("ws://127.0.0.1:8000");// socket.on(event, handler): Add event handler// socket.off(event, handler?): Remove event handler(s)// socket.send(event, payload): Overridden WebSocket method, requires event. Payload defaults to "".// MicroSocket returns a modified WebSocket, so the full WebSocket API is still available.socket.onopen=function(e){document.write("[open] Connection established <br />");document.write("Joining room <br />");socket.send("join",JSON.stringify({room:"gamers",user:Math.random()}));};socket.on("joined",function(payload){document.write(`[joined] Payload: ${payload} <br />`);});socket.on("user joined",function(payload){document.write(`[user joined] Payload: ${payload} <br />`);});
运行example.py
并在多个浏览器选项卡中打开example.html
,以查看这个基本示例。在
- 项目
标签: