一个消费者正在读取数据
我用的是spark steaming,用的是kafka,我有一个有20个分区的主题。当流媒体作业运行时,只有一个消费者正在读取所有主题中的数据,这会导致读取数据的速度变慢。我们有没有办法在spark steaming中为每个部分配置一个消费者
JavaStreamingContext jsc = AnalyticsContext.getInstance().getSparkStreamContext();
Map<String, Object> kafkaParams = MessageSessionFactory.getConsumerConfigParamsMap(MessageSessionFactory.DEFAULT_CLUSTER_IDENTITY, consumerGroup);
String[] topics = topic.split(",");
Collection<String> topicCollection = Arrays.asList(topics);
metricStream = KafkaUtils.createDirectStream(
jsc,
LocationStrategies.PreferConsistent(),
ConsumerStrategies.Subscribe(topicCollection, kafkaParams)
);
}
TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
metric_data_spark 16 3379403197 3379436869 33672 consumer-2-da278f31-c368-414c-925b-d3ca4881709e /xx.xx.xx.xx consumer-2
metric_data_spark 7 3399030625 3399065857 35232 consumer-2-da278f31-c368-414c-925b-d3ca4881709e /xx.xx.xx.xx consumer-2
metric_data_spark 13 3389008901 3389044210 35309 consumer-2-da278f31-c368-414c-925b-d3ca4881709e /xx.xx.xx.xx consumer-2
metric_data_spark 17 3380638947 3380639928 981 consumer-2-da278f31-c368-414c-925b-d3ca4881709e /xx.xx.xx.xx consumer-2
metric_data_spark 1 3593201424 3593236844 35420 consumer-2-da278f31-c368-414c-925b-d3ca4881709e /xx.xx.xx.xx consumer-2
metric_data_spark 8 3394218406 3394252084 33678 consumer-2-da278f31-c368-414c-925b-d3ca4881709e /xx.xx.xx.xx consumer-2
metric_data_spark 19 3376897309 3376917998 20689 consumer-2-da278f31-c368-414c-925b-d3ca4881709e /xx.xx.xx.xx consumer-2
metric_data_spark 3 3447204634 3447240071 35437 consumer-2-da278f31-c368-414c-925b-d3ca4881709e /xx.xx.xx.xx consumer-2
metric_data_spark 18 3375082623 3375083663 1040 consumer-2-da278f31-c368-414c-925b-d3ca4881709e /xx.xx.xx.xx consumer-2
metric_data_spark 2 3433294129 3433327970 33841 consumer-2-da278f31-c368-414c-925b-d3ca4881709e /xx.xx.xx.xx consumer-2
metric_data_spark 9 3396324976 3396345705 20729 consumer-2-da278f31-c368-414c-925b-d3ca4881709e /xx.xx.xx.xx consumer-2
metric_data_spark 0 3582591157 3582624892 33735 consumer-2-da278f31-c368-414c-925b-d3ca4881709e /xx.xx.xx.xx consumer-2
metric_data_spark 14 3381779702 3381813477 33775 consumer-2-da278f31-c368-414c-925b-d3ca4881709e /xx.xx.xx.xx consumer-2
metric_data_spark 4 3412492002 3412525779 33777 consumer-2-da278f31-c368-414c-925b-d3ca4881709e /xx.xx.xx.xx consumer-2
metric_data_spark 11 3393158700 3393179419 20719 consumer-2-da278f31-c368-414c-925b-d3ca4881709e /xx.xx.xx.xx consumer-2
metric_data_spark 10 3392216079 3392235071 18992 consumer-2-da278f31-c368-414c-925b-d3ca4881709e /xx.xx.xx.xx consumer-2
metric_data_spark 15 3383001380 3383036803 35423 consumer-2-da278f31-c368-414c-925b-d3ca4881709e /xx.xx.xx.xx consumer-2
metric_data_spark 6 3398338540 3398372367 33827 consumer-2-da278f31-c368-414c-925b-d3ca4881709e /xx.xx.xx.xx consumer-2
metric_data_spark 12 3387738477 3387772279 33802 consumer-2-da278f31-c368-414c-925b-d3ca4881709e /xx.xx.xx.xx consumer-2
metric_data_spark 5 3408698217 3408733614 35397 consumer-2-da278f31-c368-414c-925b-d3ca4881709e /xx.xx.xx.xx consumer-2
我们需要做的更改使每个分区有一个使用者来读取数据
# 1 楼答案
因为你使用的是一致的安置策略,所以它应该分配给执行者
运行Spark submit时,需要指定最多要启动20个执行者
num-executors 20
不过,如果你做的不止这些,你会有空闲的执行者不使用卡夫卡数据(但他们可能仍然能够处理其他阶段)