更新時間:2021-10-19 來源:黑馬程序員 瀏覽量:
我們可以讓Kafka根據(jù)消費組中的消費者動態(tài)地為topic分配要消費的分區(qū)。但在某些時候,我們需要指定要消費的分區(qū),例如:
如果某個程序?qū)⒛硞€指定分區(qū)的數(shù)據(jù)保存到外部存儲中,例如:Redis、MySQL,那么保存數(shù)據(jù)的時候,只需要消費該指定的分區(qū)數(shù)據(jù)即可
如果某個程序是高可用的,在程序出現(xiàn)故障時將自動重啟(例如:后面我們將學習的Flink、Spark程序)。這種情況下,程序?qū)闹付ǖ姆謪^(qū)重新開始消費數(shù)據(jù)。
如何進行手動消費分區(qū)中的數(shù)據(jù)呢?
1. 不再使用之前的 subscribe 方法訂閱主題,而使用 「assign」方法指定想要消費的消息
String topic = "test"; TopicPartition partition0 = new TopicPartition(topic, 0); TopicPartition partition1 = new TopicPartition(topic, 1); consumer.assign(Arrays.asList(partition0, partition1));
2. 一旦指定了分區(qū),就可以就像前面的示例一樣,在循環(huán)中調(diào)用「poll」方法消費消息
注意
1. 當手動管理消費分區(qū)時,即使GroupID是一樣的,Kafka的組協(xié)調(diào)器都將不再起作用
2. 如果消費者失敗,也將不再自動進行分區(qū)重新分配
IOC底層實現(xiàn)原理介紹,手動實現(xiàn)IOC容器