有 Java 编程相关的问题?

你可以在下面搜索框中键入要查询的问题!

一个消费者正在读取数据

我用的是spark steaming,用的是kafka,我有一个有20个分区的主题。当流媒体作业运行时,只有一个消费者正在读取所有主题中的数据,这会导致读取数据的速度变慢。我们有没有办法在spark steaming中为每个部分配置一个消费者

JavaStreamingContext jsc = AnalyticsContext.getInstance().getSparkStreamContext();
Map<String, Object> kafkaParams = MessageSessionFactory.getConsumerConfigParamsMap(MessageSessionFactory.DEFAULT_CLUSTER_IDENTITY, consumerGroup);

String[] topics = topic.split(",");
Collection<String> topicCollection = Arrays.asList(topics);
metricStream = KafkaUtils.createDirectStream(
                            jsc,
                            LocationStrategies.PreferConsistent(),
                            ConsumerStrategies.Subscribe(topicCollection, kafkaParams)
);
}

TOPIC             PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID                                     HOST            CLIENT-ID
metric_data_spark 16         3379403197      3379436869      33672           consumer-2-da278f31-c368-414c-925b-d3ca4881709e /xx.xx.xx.xx    consumer-2
metric_data_spark 7          3399030625      3399065857      35232           consumer-2-da278f31-c368-414c-925b-d3ca4881709e /xx.xx.xx.xx    consumer-2
metric_data_spark 13         3389008901      3389044210      35309           consumer-2-da278f31-c368-414c-925b-d3ca4881709e /xx.xx.xx.xx    consumer-2
metric_data_spark 17         3380638947      3380639928      981             consumer-2-da278f31-c368-414c-925b-d3ca4881709e /xx.xx.xx.xx    consumer-2
metric_data_spark 1          3593201424      3593236844      35420           consumer-2-da278f31-c368-414c-925b-d3ca4881709e /xx.xx.xx.xx    consumer-2
metric_data_spark 8          3394218406      3394252084      33678           consumer-2-da278f31-c368-414c-925b-d3ca4881709e /xx.xx.xx.xx    consumer-2
metric_data_spark 19         3376897309      3376917998      20689           consumer-2-da278f31-c368-414c-925b-d3ca4881709e /xx.xx.xx.xx    consumer-2
metric_data_spark 3          3447204634      3447240071      35437           consumer-2-da278f31-c368-414c-925b-d3ca4881709e /xx.xx.xx.xx    consumer-2
metric_data_spark 18         3375082623      3375083663      1040            consumer-2-da278f31-c368-414c-925b-d3ca4881709e /xx.xx.xx.xx    consumer-2
metric_data_spark 2          3433294129      3433327970      33841           consumer-2-da278f31-c368-414c-925b-d3ca4881709e /xx.xx.xx.xx    consumer-2
metric_data_spark 9          3396324976      3396345705      20729           consumer-2-da278f31-c368-414c-925b-d3ca4881709e /xx.xx.xx.xx    consumer-2
metric_data_spark 0          3582591157      3582624892      33735           consumer-2-da278f31-c368-414c-925b-d3ca4881709e /xx.xx.xx.xx    consumer-2
metric_data_spark 14         3381779702      3381813477      33775           consumer-2-da278f31-c368-414c-925b-d3ca4881709e /xx.xx.xx.xx    consumer-2
metric_data_spark 4          3412492002      3412525779      33777           consumer-2-da278f31-c368-414c-925b-d3ca4881709e /xx.xx.xx.xx    consumer-2
metric_data_spark 11         3393158700      3393179419      20719           consumer-2-da278f31-c368-414c-925b-d3ca4881709e /xx.xx.xx.xx    consumer-2
metric_data_spark 10         3392216079      3392235071      18992           consumer-2-da278f31-c368-414c-925b-d3ca4881709e /xx.xx.xx.xx    consumer-2
metric_data_spark 15         3383001380      3383036803      35423           consumer-2-da278f31-c368-414c-925b-d3ca4881709e /xx.xx.xx.xx    consumer-2
metric_data_spark 6          3398338540      3398372367      33827           consumer-2-da278f31-c368-414c-925b-d3ca4881709e /xx.xx.xx.xx    consumer-2
metric_data_spark 12         3387738477      3387772279      33802           consumer-2-da278f31-c368-414c-925b-d3ca4881709e /xx.xx.xx.xx    consumer-2
metric_data_spark 5          3408698217      3408733614      35397           consumer-2-da278f31-c368-414c-925b-d3ca4881709e /xx.xx.xx.xx    consumer-2

我们需要做的更改使每个分区有一个使用者来读取数据


共 (1) 个答案

  1. # 1 楼答案

    因为你使用的是一致的安置策略,所以它应该分配给执行者

    运行Spark submit时,需要指定最多要启动20个执行者 num-executors 20

    不过,如果你做的不止这些,你会有空闲的执行者不使用卡夫卡数据(但他们可能仍然能够处理其他阶段)