有 Java 编程相关的问题?

你可以在下面搜索框中键入要查询的问题!

协议缓冲区如何直接从protobuf创建GRPC客户端,而无需将其编译为java代码

当使用GRPC时,我们需要从我们的。通过协议缓冲区编译器(protoc)或使用Gradle或Maven protoc构建插件定义proto服务

Flow now: protobuf file -> java code -> gRPC client.

那么,有没有办法跳过这一步

如何创建一个通用的GRPC客户机,它可以直接从protobuf文件调用服务器,而无需编译成java代码? 或者,有没有办法在运行时生成代码

Flow expect: protobuf file -> gRPC client.

我想建立一个通用的gRPC客户端系统,输入是protobuf文件以及方法、包、消息请求的描述。。。无需为每个protobuf重新编译

多谢各位


共 (4) 个答案

  1. # 1 楼答案

    Protobuf系统确实需要运行protoc。但是,可以跳过生成的代码。您可以传递--descriptor_set_out=FILE.proto文件解析为描述符文件,而不是将--java_out--grpc_java_out之类的内容传递给protoc。描述符文件是proto-encoded ^{}。这与反射服务使用的基本格式相同

    一旦有了描述符,就可以load it a FileDescriptor at a timecreate a DynamicMessage

    然后,对于gRPC片段,您需要创建一个gRPC MethodDescriptor

    MethodDescriptor.<DynamicMessage, DynamicMessage>newBuilder()
        // UNKNOWN is fine, but the "correct" value can be computed from
        // methodDesc.toProto().getClientStreaming()/getServerStreaming()
        .setType(MethodDescriptor.MethodType.UNKNOWN)
        .setFullMethodName(MethodDescriptor.generateFullMethodName(
            serviceDesc.getFullName(), methodDesc.getName()))
        .setRequestMarshaller(ProtoUtils.marshaller(
            DynamicMessage.newBuilder(methodDesc.getInputType()).buildPartial()))
        .setResponseMarshaller(ProtoUtils.marshaller(
            DynamicMessage.newBuilder(methodDesc.getOutputType()).buildPartial()))
        .build();
    

    在这一点上,您就拥有了所需的一切,并且可以在gRPC中调用^{}。您还可以自由地使用^{}来使用更类似于存根API的东西

    所以动态调用是绝对可能的,并且用于grpcurl之类的事情。但这也不容易,因此通常只有在必要时才进行

  2. # 2 楼答案

    从技术上讲,两者都是可能的

    codegen只是生成了几个类;主要是protobuf消息、grpc方法描述符和存根。您可以实现它或签入生成的代码以绕过codegen。我不确定做这个tbh有什么好处。而且,如果原版被更改,这将是非常烦人的

    只要您签入一些接口/抽象类来表示生成的存根/方法描述符和protobuf消息,就可以使用byte-codegen动态地执行此操作。您必须确保这些非动态代码与proto定义同步(最有可能是运行时检查/异常)

  3. # 3 楼答案

    从技术上讲,没有什么可以防止这种情况发生。两大障碍是:

    1. 具有运行时可调用的解析器,用于读取。原型,和
    2. 有一个通用的gRPC客户机,它将服务方法名称作为文本

    两者都是可能的,但都不是微不足道的

    对于1,粗略的方法是使用描述符集选项shell/invokeprotoc生成模式二进制文件,然后将其反序列化为FileDescriptorSet(从descriptor.proto);此模型允许您访问protoc如何查看文件。一些平台还具有本机解析器(本质上是将protoc重新实现为该平台中的库),例如protobuf-net.Reflection在中这样做。净地

    对于2,here's an implementation of that in C#。即使细节有所不同,这种方法也应该可以相当方便地移植到Java中。您可以查看生成的实现,以了解它在任何特定语言中的工作方式

    (很抱歉,具体的例子是C#/.NET,但我就住在那里;方法应该是可移植的,即使具体的代码:不是直接的)

  4. # 4 楼答案

    我是用Java做的,步骤是:

    1. 调用反射服务以按方法名称获取FileDescriptorProto列表
    2. 按包名、服务名从FileDescriptorProto列表中获取方法的FileDescriptor
    3. ServiceDescriptor获取MethodDescriptor,从FileDescriptor获取
    4. 通过MethodDescriptor生成MethodDescriptor<DynamicMessage, DynamicMessage>
    5. 从JSON或其他内容生成请求DynamicMessage
    6. 调用方法
    7. 将响应内容从DynamicMessage响应解析为JSON

    您可以在项目helloworlde/grpc-java-sample#reflection中引用完整的示例


    原型是:

    syntax = "proto3";
    
    package io.github.helloworlde.grpc;
    
    option go_package = "api;grpc_gateway";
    option java_package = "io.github.helloworlde.grpc";
    option java_multiple_files = true;
    option java_outer_classname = "HelloWorldGrpc";
    
    service HelloService{
      rpc SayHello(HelloMessage) returns (HelloResponse){
      }
    }
    
    message HelloMessage {
      string message = 2;
    }
    
    message HelloResponse {
      string message = 1;
    }
    

    您自己启动该proto的服务器,并使用Java编写完整代码,如下所示:

    import com.google.protobuf.ByteString;
    import com.google.protobuf.DescriptorProtos;
    import com.google.protobuf.Descriptors;
    import com.google.protobuf.DynamicMessage;
    import com.google.protobuf.InvalidProtocolBufferException;
    import com.google.protobuf.TypeRegistry;
    import com.google.protobuf.util.JsonFormat;
    import io.grpc.CallOptions;
    import io.grpc.ManagedChannel;
    import io.grpc.ManagedChannelBuilder;
    import io.grpc.MethodDescriptor;
    import io.grpc.protobuf.ProtoUtils;
    import io.grpc.reflection.v1alpha.ServerReflectionGrpc;
    import io.grpc.reflection.v1alpha.ServerReflectionRequest;
    import io.grpc.reflection.v1alpha.ServerReflectionResponse;
    import io.grpc.stub.ClientCalls;
    import io.grpc.stub.StreamObserver;
    import lombok.SneakyThrows;
    import lombok.extern.slf4j.Slf4j;
    
    import java.util.List;
    import java.util.Map;
    import java.util.Objects;
    import java.util.concurrent.TimeUnit;
    import java.util.stream.Collectors;
    
    @Slf4j
    public class ReflectionCall {
    
        public static void main(String[] args) throws InterruptedException {
            // 反射方法的格式只支持 package.service.method 或者 package.service
            String methodSymbol = "io.github.helloworlde.grpc.HelloService.SayHello";
            String requestContent = "{\"message\": \"Reflection\"}";
    
            // 构建 Channel
            ManagedChannel channel = ManagedChannelBuilder.forAddress("127.0.0.1", 9090)
                                                          .usePlaintext()
                                                          .build();
            // 使用 Channel 构建 BlockingStub
            ServerReflectionGrpc.ServerReflectionStub reflectionStub = ServerReflectionGrpc.newStub(channel);
            // 响应观察器
            StreamObserver<ServerReflectionResponse> streamObserver = new StreamObserver<ServerReflectionResponse>() {
                @Override
                public void onNext(ServerReflectionResponse response) {
                    try {
                        // 只需要关注文件描述类型的响应
                        if (response.getMessageResponseCase() == ServerReflectionResponse.MessageResponseCase.FILE_DESCRIPTOR_RESPONSE) {
                            List<ByteString> fileDescriptorProtoList = response.getFileDescriptorResponse().getFileDescriptorProtoList();
                            handleResponse(fileDescriptorProtoList, channel, methodSymbol, requestContent);
                        } else {
                            log.warn("未知响应类型: " + response.getMessageResponseCase());
                        }
                    } catch (Exception e) {
                        log.error("处理响应失败: {}", e.getMessage(), e);
                    }
                }
    
                @Override
                public void onError(Throwable t) {
    
                }
    
                @Override
                public void onCompleted() {
                    log.info("Complete");
                }
            };
            // 请求观察器
            StreamObserver<ServerReflectionRequest> requestStreamObserver = reflectionStub.serverReflectionInfo(streamObserver);
    
            // 构建并发送获取方法文件描述请求
            ServerReflectionRequest getFileContainingSymbolRequest = ServerReflectionRequest.newBuilder()
                                                                                            .setFileContainingSymbol(methodSymbol)
                                                                                            .build();
            requestStreamObserver.onNext(getFileContainingSymbolRequest);
            channel.awaitTermination(10, TimeUnit.SECONDS);
        }
    
        /**
         * 处理响应
         */
        private static void handleResponse(List<ByteString> fileDescriptorProtoList,
                                           ManagedChannel channel,
                                           String methodFullName,
                                           String requestContent) {
            try {
                // 解析方法和服务名称
                String fullServiceName = extraPrefix(methodFullName);
                String methodName = extraSuffix(methodFullName);
                String packageName = extraPrefix(fullServiceName);
                String serviceName = extraSuffix(fullServiceName);
    
                // 根据响应解析 FileDescriptor
                Descriptors.FileDescriptor fileDescriptor = getFileDescriptor(fileDescriptorProtoList, packageName, serviceName);
    
                // 查找服务描述
                Descriptors.ServiceDescriptor serviceDescriptor = fileDescriptor.getFile().findServiceByName(serviceName);
                // 查找方法描述
                Descriptors.MethodDescriptor methodDescriptor = serviceDescriptor.findMethodByName(methodName);
    
                // 发起请求
                executeCall(channel, fileDescriptor, methodDescriptor, requestContent);
            } catch (Exception e) {
                log.error(e.getMessage(), e);
            }
        }
    
        /**
         * 解析并查找方法对应的文件描述
         */
        private static Descriptors.FileDescriptor getFileDescriptor(List<ByteString> fileDescriptorProtoList,
                                                                    String packageName,
                                                                    String serviceName) throws Exception {
    
            Map<String, DescriptorProtos.FileDescriptorProto> fileDescriptorProtoMap =
                    fileDescriptorProtoList.stream()
                                           .map(bs -> {
                                               try {
                                                   return DescriptorProtos.FileDescriptorProto.parseFrom(bs);
                                               } catch (InvalidProtocolBufferException e) {
                                                   e.printStackTrace();
                                               }
                                               return null;
                                           })
                                           .filter(Objects::nonNull)
                                           .collect(Collectors.toMap(DescriptorProtos.FileDescriptorProto::getName, f -> f));
    
    
            if (fileDescriptorProtoMap.isEmpty()) {
                log.error("服务不存在");
                throw new IllegalArgumentException("方法的文件描述不存在");
            }
    
            // 查找服务对应的 Proto 描述
            DescriptorProtos.FileDescriptorProto fileDescriptorProto = findServiceFileDescriptorProto(packageName, serviceName, fileDescriptorProtoMap);
    
            // 获取这个 Proto 的依赖
            Descriptors.FileDescriptor[] dependencies = getDependencies(fileDescriptorProto, fileDescriptorProtoMap);
    
            // 生成 Proto 的 FileDescriptor
            return Descriptors.FileDescriptor.buildFrom(fileDescriptorProto, dependencies);
        }
    
    
        /**
         * 根据包名和服务名查找相应的文件描述
         */
        private static DescriptorProtos.FileDescriptorProto findServiceFileDescriptorProto(String packageName,
                                                                                           String serviceName,
                                                                                           Map<String, DescriptorProtos.FileDescriptorProto> fileDescriptorProtoMap) {
            for (DescriptorProtos.FileDescriptorProto proto : fileDescriptorProtoMap.values()) {
                if (proto.getPackage().equals(packageName)) {
                    boolean exist = proto.getServiceList()
                                         .stream()
                                         .anyMatch(s -> serviceName.equals(s.getName()));
                    if (exist) {
                        return proto;
                    }
                }
            }
    
            throw new IllegalArgumentException("服务不存在");
        }
    
        /**
         * 获取前缀
         */
        private static String extraPrefix(String content) {
            int index = content.lastIndexOf(".");
            return content.substring(0, index);
        }
    
        /**
         * 获取后缀
         */
        private static String extraSuffix(String content) {
            int index = content.lastIndexOf(".");
            return content.substring(index + 1);
        }
    
        /**
         * 获取依赖类型
         */
        private static Descriptors.FileDescriptor[] getDependencies(DescriptorProtos.FileDescriptorProto proto,
                                                                    Map<String, DescriptorProtos.FileDescriptorProto> finalDescriptorProtoMap) {
            return proto.getDependencyList()
                        .stream()
                        .map(finalDescriptorProtoMap::get)
                        .map(f -> toFileDescriptor(f, getDependencies(f, finalDescriptorProtoMap)))
                        .toArray(Descriptors.FileDescriptor[]::new);
        }
    
        /**
         * 将 FileDescriptorProto 转为 FileDescriptor
         */
        @SneakyThrows
        private static Descriptors.FileDescriptor toFileDescriptor(DescriptorProtos.FileDescriptorProto fileDescriptorProto,
                                                                   Descriptors.FileDescriptor[] dependencies) {
            return Descriptors.FileDescriptor.buildFrom(fileDescriptorProto, dependencies);
        }
    
    
        /**
         * 执行方法调用
         */
        private static void executeCall(ManagedChannel channel,
                                        Descriptors.FileDescriptor fileDescriptor,
                                        Descriptors.MethodDescriptor originMethodDescriptor,
                                        String requestContent) throws Exception {
    
            // 重新生成 MethodDescriptor
            MethodDescriptor<DynamicMessage, DynamicMessage> methodDescriptor = generateMethodDescriptor(originMethodDescriptor);
    
            CallOptions callOptions = CallOptions.DEFAULT;
    
            TypeRegistry registry = TypeRegistry.newBuilder()
                                                .add(fileDescriptor.getMessageTypes())
                                                .build();
    
            // 将请求内容由 JSON 字符串转为相应的类型
            JsonFormat.Parser parser = JsonFormat.parser().usingTypeRegistry(registry);
            DynamicMessage.Builder messageBuilder = DynamicMessage.newBuilder(originMethodDescriptor.getInputType());
            parser.merge(requestContent, messageBuilder);
            DynamicMessage requestMessage = messageBuilder.build();
    
            // 调用,调用方式可以通过 originMethodDescriptor.isClientStreaming() 和 originMethodDescriptor.isServerStreaming() 推断
            DynamicMessage response = ClientCalls.blockingUnaryCall(channel, methodDescriptor, callOptions, requestMessage);
    
            // 将响应解析为 JSON 字符串
            JsonFormat.Printer printer = JsonFormat.printer()
                                                   .usingTypeRegistry(registry)
                                                   .includingDefaultValueFields();
            String responseContent = printer.print(response);
    
            log.info("响应: {}", responseContent);
        }
    
        /**
         * 重新生成方法描述
         */
        private static MethodDescriptor<DynamicMessage, DynamicMessage> generateMethodDescriptor(Descriptors.MethodDescriptor originMethodDescriptor) {
            // 生成方法全名
            String fullMethodName = MethodDescriptor.generateFullMethodName(originMethodDescriptor.getService().getFullName(), originMethodDescriptor.getName());
            // 请求和响应类型
            MethodDescriptor.Marshaller<DynamicMessage> inputTypeMarshaller = ProtoUtils.marshaller(DynamicMessage.newBuilder(originMethodDescriptor.getInputType())
                                                                                                                  .buildPartial());
            MethodDescriptor.Marshaller<DynamicMessage> outputTypeMarshaller = ProtoUtils.marshaller(DynamicMessage.newBuilder(originMethodDescriptor.getOutputType())
                                                                                                                   .buildPartial());
    
            // 生成方法描述, originMethodDescriptor 的 fullMethodName 不正确
            return MethodDescriptor.<DynamicMessage, DynamicMessage>newBuilder()
                    .setFullMethodName(fullMethodName)
                    .setRequestMarshaller(inputTypeMarshaller)
                    .setResponseMarshaller(outputTypeMarshaller)
                    // 使用 UNKNOWN,自动修改
                    .setType(MethodDescriptor.MethodType.UNKNOWN)
                    .build();
        }
    }