我想给卡夫卡寄一个大的CSV。基本结构是读取CSV的一行并用头压缩它。
a = dict(zip(header, line.split(",")
然后将其转换为json:
message = json.dumps(a)
然后我使用kafka python库发送消息
from kafka import SimpleProducer, KafkaClient
kafka = KafkaClient("localhost:9092")
producer = SimpleProducer(kafka)
producer.send_messages("topic", message)
使用PyS点火
我已经很容易地从CSV文件创建了消息的RDDsc = SparkContext()
text = sc.textFile("file.csv")
header = text.first().split(',')
def remove_header(itr_index, itr):
return iter(list(itr)[1:]) if itr_index == 0 else itr
noHeader = text.mapPartitionsWithIndex(remove_header)
messageRDD = noHeader.map(lambda x: json.dumps(dict(zip(header, x.split(","))
现在我要发送这些消息:我定义了一个函数
def sendkafka(message):
kafka = KafkaClient("localhost:9092")
producer = SimpleProducer(kafka)
return producer.send_messages('topic',message)
然后我创建一个新的RDD来发送消息
sentRDD = messageRDD.map(lambda x: kafkasend(x))
然后我打电话给sentRDD.count()
开始搅动和发送信息
不幸的是,这是非常缓慢的。它每秒发送1000条信息。这是一个10节点集群,每个4个CPU,8gb内存。
相比之下,在一个1000万行csv上创建消息大约需要7秒。~大约2gb
我认为问题在于我在函数中实例化了一个kafka生产者。然而,如果我不这样做,斯帕克抱怨说,即使我已经尝试在全球范围内定义了它,但它并不存在。
也许有人能对如何解决这个问题有所启发。
谢谢你
您可以为每个分区创建一个生产者,并使用
mapPartitions
或foreachPartition
:如果上面的方法不能帮助您使用asynchronous producer扩展它。
在Spark 2.x中,也可以使用Kafka数据源。你必须包括
spark-sql-kafka
jar,匹配的Spark和Scala版本(分别是2.2.0和2.11):将数据转换为
DataFrame
(如果它还没有DataFrame
):使用
DataFrameWriter
编写:相关问题 更多 >
编程相关推荐