有 Java 编程相关的问题?

你可以在下面搜索框中键入要查询的问题!

java如何在Grpc中通过MethodDescriptor的InputStream调用服务器?

我想在客户端通过MethodDescriptor的InputStream调用grpc服务器,但没有成功。这是我的代码:

环境
jdk 1.8 
grpc 1.33.1

grpc所有依赖项的版本为:

<dependency>
    <groupId>io.grpc</groupId>
    <artifactId>grpc-all</artifactId>
    <version>1.33.1</version>
    <scope>provided</scope>
 </dependency>

在服务器中,我的EchoServiceImpl扩展了EchoServiceGrpc。EchoServiceImplBase,重写EchoRPC方法。我在构建服务器之前导出inputstream服务

服务器
// 1. implements service
@Service
public class EchoServiceImpl extends EchoServiceGrpc.EchoServiceImplBase {

    @Override
    public void echo(EchoRequest request, StreamObserver<EchoResponse> responseObserver) {
        System.out.println("Received: " + request.getMessage());
               System.out.println("Received: " + request.getMessage());
        EchoResponse.Builder response = EchoResponse.newBuilder()
                .setMessage("ReceivedHELLO");
        responseObserver.onNext(response.build());
        responseObserver.onCompleted();
    }
}

// 2. export  inputstream  Service
  private void exportService(final Object bean) {
        BindableService bindableService = (BindableService) bean;
        ServerServiceDefinition serviceDefinition = bindableService.bindService();
        try {
            ServerServiceDefinition isDefinition = ServerInterceptors.useInputStreamMessages(serviceDefinition);//inputstream

            serviceDefinitions.add(serviceDefinition);
            serviceDefinitions.add(isDefinition);
        } catch (Exception e) {
            log.error("export service is fail", e);
        }
    }

// 3. start Grpc Server
private void startGrpcServer() {
        ServerBuilder<?> serverBuilder = grpcServerBuilder.buildServerBuilder();
        List<ServerServiceDefinition> serviceDefinitions = grpcClientBeanPostProcessor.getServiceDefinitions();
        for (ServerServiceDefinition serviceDefinition : serviceDefinitions) {
            serverBuilder.addService(serviceDefinition);        
        }

        try {
            Server server = serverBuilder.build().start();
            log.info("Grpc server started successfully");
        } catch (IOException e) {
            log.error("Grpc server failed to start", e);
        }
    }

在客户机中,我创建了InputStream MethodDescriptor,RequestMarshaller是inputstram,但我现在还没有成功。谁能提供一些建议?谢谢~~~

客户
public class ClientDemo {

    private static Map<String,Object> methodDescriptorCache = Maps.newHashMap();
    //  InputStream marshaller  
    final static MethodDescriptor.Marshaller<InputStream> marshaller = new MethodDescriptor.Marshaller<InputStream>() {
        @Override
        public InputStream stream(final InputStream value) {
            return value;
        }

        @Override
        public InputStream parse(final InputStream stream) {
            if (stream.markSupported()) {
                return stream;
            } else {
                return new BufferedInputStream(stream);
            }
        }
    };


    public static void main(String[] args) throws ExecutionException, InterruptedException {
        //1.build channel
        ManagedChannel channel = ManagedChannelBuilder.forAddress("localhost", 8080).usePlaintext().build();

        //2. create InputStream MethodDescriptor
        MethodDescriptor<InputStream, InputStream> echo = createInputStreamMethodDescriptor("echo.EchoService", "echo");

        //3. channel.newCall()
        ClientCall<InputStream, InputStream> call =
                channel.newCall(echo, CallOptions.DEFAULT);

        //4. input params
        String req = "{\"message\":\"input stream\"}";
        InputStream request = new ByteArrayInputStream(req.getBytes(StandardCharsets.UTF_8));
        System.out.println(request);
        
        //5. get result
        ListenableFuture<InputStream> res = ClientCalls.futureUnaryCall(call, request);
        System.out.println(res.get());
    }

    public static io.grpc.MethodDescriptor<InputStream, InputStream> createInputStreamMethodDescriptor(String clazzName, String methodName) {
        io.grpc.MethodDescriptor<InputStream, InputStream> methodDescriptor = (MethodDescriptor<InputStream, InputStream>) methodDescriptorCache
                .get(clazzName + methodName);
        if (methodDescriptor != null)
            return methodDescriptor;
        else {
            methodDescriptor = io.grpc.MethodDescriptor.<InputStream, InputStream> newBuilder()
                    .setType(MethodDescriptor.MethodType.UNARY)//
                    .setFullMethodName(io.grpc.MethodDescriptor.generateFullMethodName(clazzName, methodName))//
                    .setRequestMarshaller(marshaller)//
                    .setResponseMarshaller(marshaller)//
                    .setSafe(false)//
                    .setIdempotent(false)//
                    .build();
            methodDescriptorCache.put(clazzName + methodName, methodDescriptor);
            return methodDescriptor;
        }
    }
}
异常消息
 [grpc-nio-worker-ELG-1-2] DEBUG io.grpc.netty.NettyClientHandler - [id: 0xa764352c, L:/127.0.0.1:55783 - R:localhost/127.0.0.1:8080] INBOUND HEADERS: streamId=3 headers=GrpcHttp2ResponseHeaders[:status: 200, content-type: application/grpc, grpc-status: 2] streamDependency=0 weight=16 exclusive=false padding=0 endStream=true
Exception in thread "main" java.util.concurrent.ExecutionException: io.grpc.StatusRuntimeException: UNKNOWN
    at com.google.common.util.concurrent.AbstractFuture.getDoneValue(AbstractFuture.java:564)
    at com.google.common.util.concurrent.AbstractFuture.get(AbstractFuture.java:545)
    at org.apache.shenyu.examples.grpc.test.ClientDemo2.main(ClientDemo2.java:56)
Caused by: io.grpc.StatusRuntimeException: UNKNOWN
    at io.grpc.Status.asRuntimeException(Status.java:533)
    at io.grpc.stub.ClientCalls$UnaryStreamToFuture.onClose(ClientCalls.java:533)
    at io.grpc.internal.DelayedClientCall$DelayedListener$3.run(DelayedClientCall.java:464)
    at io.grpc.internal.DelayedClientCall$DelayedListener.delayOrExecute(DelayedClientCall.java:428)
    at io.grpc.internal.DelayedClientCall$DelayedListener.onClose(DelayedClientCall.java:461)
    at io.grpc.internal.ClientCallImpl.closeObserver(ClientCallImpl.java:617)
    at io.grpc.internal.ClientCallImpl.access$300(ClientCallImpl.java:70)
    at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInternal(ClientCallImpl.java:803)
    at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInContext(ClientCallImpl.java:782)
    at io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
    at io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

Process finished with exit code 1


共 (1) 个答案