如何用rxpy重写RXJS中的订阅服务器

2024-10-01 15:35:42 发布

您现在位置:Python中文网/ 问答频道 /正文

我不知道怎么把RXJS的订户改成Python的形式,效果不好。你知道吗

Typescript使用GRPC流形式并插入RXJS订阅服务器。Python不知道怎么写。你知道吗

RXJS代码

import * as grpc from 'grpc';
import { Observable, Subscriber } from 'rxjs';

export function subscribeToWorkspace(
  input: WorkspaceSubscriptionInput,
): Observable<EventPayload> {
  return new Observable<EventPayload>((observer: Subscriber<EventPayload>) => {
    log(`opening stream for subscription ${input.toString()}`);

    const stream: grpc.ClientReadableStream<EventPayload> =
      client.subscribeToWorkspace(input, metadata);

    stream.on('data', (eventPayload: EventPayload) => {
      if (observer.closed) {
        return;
      }
      observer.next(eventPayload);
    });

    stream.on('error', (err) => observer.error(err));
    stream.on('end', () => observer.complete());

    return () => {
      log(`closing stream for subscription ${input.toString()}`);
      stream.cancel();
    };
  });
}

Python代码

from rx import of
def subscribe_to_workspace(cls, subsinput: WorkspaceSubscriptionInput) -> Observable:
        return of(cls.client.SubscribeToWorkspace(subsinput))

Rxpy不工作。Python应该怎么做


Tags: 代码fromimportinputstreamgrpcreturnon
1条回答
网友
1楼 · 发布于 2024-10-01 15:35:42
# pip install rx

from rx import Observable, Observer
def get_strings(observer):
    observer.on_next("Ram")
    observer.on_next("Mohan")
    observer.on_next("Shyam")
    observer.on_completed()
class PrintObserver(Observer):
   def on_next(self, value):
       print("Received {0}".format(value))
   def on_completed(self):
       print("Finished")
   def on_error(self, error):
       print("Error: {0}".format(error))
source = Observable.create(get_strings)
source.subscribe(PrintObserver())

相关问题 更多 >

    热门问题