有 Java 编程相关的问题?

你可以在下面搜索框中键入要查询的问题!

java Kafka 2.9.1 producer 0.8.2.1编译与运行时依赖关系

因此,在api 0.8.2中,生产者的Kakfa配置属性发生了变化;在完成这项工作并让我的java Producer编译之后,我得到了一个异常。生产者针对的是我的Kafka_2.9.1-0.8.2.1集群的节点,我得到了关于DefaultSerializer未实例化的异常:

Exception in thread "main" org.apache.kafka.common.KafkaException: Could not instantiate class kafka.serializer.DefaultEncoder Does it have a public no-argument constructor?
        at org.apache.kafka.common.utils.Utils.newInstance(Utils.java:235)
        at org.apache.kafka.common.config.AbstractConfig.getConfiguredInstance(AbstractConfig.java:136)
        at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:216)
........

鉴于这是在Kakfa中实现的,我想知道使用Kafka作为依赖项进行编译是否还不够,因为在运行时可能需要打包到一个或多个Kafka jar中。我还没有找到关于这个的文档(最新的或其他的)。有我丢失的生产者运行时jar吗

作为参考,我把我的build.gradle放在这里(有点乱)。编译中的excludes是在已经获得此错误后添加的新内容,因此无论依赖项块中是否有这些行,都会发生此错误。我确实尝试过在0.8.2中只依赖kafka-client模块,但我认为这对制作人不起作用。文件如下:

buildscript {
    repositories {
        mavenCentral()
    }
    dependencies {
        classpath 'com.google.protobuf:protobuf-gradle-plugin:0.7.0'
    }
}

group 'lamblin'
version '0.1-SNAPSHOT'

apply plugin: 'application'
apply plugin: "com.google.protobuf"

sourceCompatibility = 1.7
targetCompatibility = 1.7

// Eliminates bootstrap class warning from javac
//tasks.withType(Compile) {
//    options.bootClasspath = "$JDK6_HOME/jre/lib/rt.jar"
//}

repositories {
    mavenCentral()
}

dependencies {
    testCompile group: 'junit', name: 'junit', version: '4.11'
    compile group: 'com.google.guava', name: 'guava', version: '18.0'
    compile group: 'com.google.protobuf', name: 'protobuf-java-util', version: '3.0.0-beta-1'
    compile group: 'com.google.transit', name: 'gtfs-realtime-bindings', version: '0.0.4'
    compile group: 'com.offbytwo', name: 'docopt', version: '0.6.0.20150202'
    //compile group: 'org.apache.kafka', name: 'kafka_2.9.1', version: '0.8.2.1' {
    compile ('org.apache.kafka:kafka_2.9.1:0.8.2.1') {
        exclude group: 'com.sun.jmx', module: 'jmxri'
        exclude group: 'javax.jmx', module: 'jms'
        exclude group: 'com.sun.jdmk', module: 'jmxtools'
    }
}

protobuf {
    generateProtoTasks {
        all().each { task ->
            task.builtins {
                python { }
            }
        }
    }
    protoc {
        //  artifact = 'com.google.protobuf:protoc:3.0.0-alpha-3'
        artifact = 'com.google.protobuf:protoc:2.6.1'
    }
}
// First Application Script
mainClassName = "com.insight.lamblin.GtfsToJson"
applicationName = "gtfsToJson"

// Subsequent Scripts
task createAllStartScripts() << {
    // This task is added to by a loop over the scripts array creating-sub tasks
}
def scripts = [ 'gtfsToJson': 'com.insight.lamblin.GtfsToJson',
                'rawGtfsKafkaProducer': 'com.insight.lamblin.RawGtfsKafkaProducer'
]
scripts.each() { scriptName, className ->
    def t = tasks.create(name: scriptName+'StartScript', type: CreateStartScripts) {
        mainClassName = className
        applicationName = scriptName
        outputDir = new File(project.buildDir, 'scripts')
        classpath = jar.outputs.files + project.configurations.runtime
    }
    applicationDistribution.into("bin") {
        from(t)
        fileMode = 0755
    }
    createAllStartScripts.dependsOn(t)
}

共 (1) 个答案

  1. # 1 楼答案

    场景:在一个不起眼的教堂地下室里,一圈折叠金属椅被各种中年男性和少数女性占据,他们中的大多数人都戴着眼镜,看起来不感兴趣。一盒咖啡和一些半个甜甜圈依次放在一个塑料板上,放在靠近入口的一堵墙旁的一张有缺口、无盖的折叠桌上

    丹尼尔:嗨,我叫丹尼尔,我是。。。(抽泣)。。。文档略读器
    小组(慢慢地):欢迎丹尼尔

    那个剧本是因为我的卡夫卡问题似乎只吸引了蟋蟀,所以我在这里保持一点有趣。。。感到孤独

    在我的辩护中,有10部看似权威的卡夫卡。阿帕奇。组织生产商属性设置上的文档。在几乎所有设置属性的示例中,kafka.serializer.DefaultSerializer是非常突出和常见的,而Java producer示例完全没有关于属性或运行示例代码的详细信息

    此外,尽管名称为“default”,但该属性没有默认值,因此需要设置它。这似乎是一个愚蠢的细节,但对卡夫卡开发团队的某些人来说,这一定是有道理的

    当运行用Java编写的Kafka producer时,producer应该从几个可用的Java特定编码器中设置编码器。前面提到的一个似乎是Scala特有的。对于Java,您对^{}感兴趣,而与默认序列化程序等价的是:^{}。如果你设置了key.serializervalue.serializer,它应该可以工作。更好的设置方法是使用类似^{}ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG中的静态字符串

    设置有点像:

    import org.apache.kafka.clients.producer.KafkaProducer; 
    import org.apache.kafka.clients.producer.ProducerConfig; 
    import org.apache.kafka.clients.producer.ProducerRecord; 
    ... 
    import java.util.Properties;
    ...
    Properties props = new Properties(); 
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
              "172.31.22.7:9092,172.31.22.6:9092,172.31.22.5:9092,172.31.22.4:9092"); 
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
              "org.apache.kafka.common.serialization.ByteArraySerializer"); 
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
              "org.apache.kafka.common.serialization.ByteArraySerializer"); 
    KafkaProducer<byte[], byte[]> producer = new KafkaProducer<>(props); 
    ...