擅长:python、mysql、java
<p>经过一些研究,我能够解决这个问题:</p>
<p>在生产者端,指定分区:</p>
<pre><code>self.producer.send(topic_name, value=data,partition=0)
</code></pre>
<p>在消费者方面,</p>
<pre><code>consumer = KafkaConsumer(
bootstrap_servers=['localhost:9092'],
auto_offset_reset='latest',
enable_auto_commit=True,
auto_commit_interval_ms = 5000,
max_poll_records = 100,
value_deserializer=lambda x: json.loads(x.decode('utf-8')))
consumer.assign([TopicPartition('trial', 0)])
</code></pre>