之前我們介紹過了 Kafka 整體架構,Kafka 生產者,Kafka 生產的消息最終流向哪裡呢?當然是需要消費了,要不只產生一系列數據沒有任何作用啊,如果把 Kafka 比作餐廳的話,那麼生產者就是廚師的角色,消費者就是客人,只有廚師的話,那麼炒出來的菜沒有人吃也沒有意義,如果只有客人沒有廚師的話,誰會去這個店吃飯呢?!所以如果你看完前面的文章意猶未盡的話,可以繼續讓你爽一爽。如果你沒看過前面的文章,那就從現在開始讓你爽。
應用程式使用 KafkaConsumer 從 Kafka 中訂閱主題並接收來自這些主題的消息,然後再把他們保存起來。應用程式首先需要創建一個 KafkaConsumer 對象,訂閱主題並開始接受消息,驗證消息並保存結果。一段時間後,生產者往主題寫入的速度超過了應用程式驗證數據的速度,這時候該如何處理?如果只使用單個消費者的話,應用程式會跟不上消息生成的速度,就像多個生產者像相同的主題寫入消息一樣,這時候就需要多個消費者共同參與消費主題中的消息,對消息進行分流處理。
Kafka 消費者從屬於 消費者群組 。一個群組中的消費者訂閱的都是 相同 的主題,每個消費者接收主題一部分分區的消息。下面是一個 Kafka 分區消費示意圖
上圖中的主題 T1 有四個分區,分別是分區0、分區1、分區2、分區3,我們創建一個消費者群組1,消費者群組中只有一個消費者,它訂閱主題T1,接收到 T1 中的全部消息。由於一個消費者處理四個生產者發送到分區的消息,壓力有些大,需要幫手來幫忙分擔任務,於是就演變為下圖
這樣一來,消費者的消費能力就大大提高了,但是在某些環境下比如用戶產生消息特別多的時候,生產者產生的消息仍舊讓消費者吃不消,那就繼續增加消費者。
如上圖所示,每個分區所產生的消息能夠被每個消費者群組中的消費者消費,如果向消費者群組中增加更多的消費者,那麼多餘的消費者將會閒置,如下圖所示
向群組中增加消費者是橫向伸縮消費能力的主要方式。總而言之,我們可以通過增加消費組的消費者來進行 水平擴展提升消費能力 。這也是為什麼建議創建主題時使用比較多的分區數,這樣可以在消費負載高的情況下增加消費者來提升性能。另外,消費者的數量不應該比分區數多,因為多出來的消費者是空閒的,沒有任何幫助。
Kafka 一個很重要的特性就是,只需寫入一次消息,可以支持任意多的應用讀取這個消息。換句話說,每個應用都可以讀到全量的消息。為了使得每個應用都能讀到全量消息,應用需要有不同的消費組。對於上面的例子,假如我們新增了一個新的消費組 G2,而這個消費組有兩個消費者,那麼就演變為下圖這樣
在這個場景中,消費組 G1 和消費組 G2 都能收到 T1 主題的全量消息,在邏輯意義上來說它們屬於不同的應用。
總結起來就是如果應用需要讀取全量消息,那麼請為該應用設置一個消費組;如果該應用消費能力不足,那麼可以考慮在這個消費組裡增加消費者。
消費者組是什麼
消費者組(Consumer Group) 是由一個或多個消費者實例(Consumer Instance)組成的群組,具有可擴展性和可容錯性的一種機制。消費者組內的消費者 共享 一個消費者組ID,這個ID 也叫做 Group ID ,組內的消費者共同對一個主題進行訂閱和消費,同一個組中的消費者只能消費一個分區的消息,多餘的消費者會閒置,派不上用場。
我們在上面提到了兩種消費方式
消費者重平衡
我們從上面的 消費者演變圖 中可以知道這麼一個過程:最初是一個消費者訂閱一個主題並消費其全部分區的消息,後來有一個消費者加入群組,隨後又有更多的消費者加入群組,而新加入的消費者實例 分攤 了最初消費者的部分消息,這種把分區的所有權通過一個消費者轉到其他消費者的行為稱為 重平衡 ,英文名也叫做 Rebalance 。如下圖所示
重平衡非常重要,它為消費者群組帶來了 高可用性 和 伸縮性 ,我們可以放心的添加消費者或移除消費者,不過在正常情況下我們並不希望發生這樣的行為。在重平衡期間,消費者無法讀取消息,造成整個消費者組在重平衡的期間都不可用。另外,當分區被重新分配給另一個消費者時,消息當前的讀取狀態會丟失,它有可能還需要去刷新緩存,在它重新恢復狀態之前會拖慢應用程式。
消費者通過向 組織協調者 (Kafka Broker)發送心跳來維護自己是消費者組的一員並確認其擁有的分區。對於不同不的消費群體來說,其組織協調者可以是不同的。只要消費者定期發送心跳,就會認為消費者是存活的並處理其分區中的消息。當消費者檢索記錄或者提交它所消費的記錄時就會發送心跳。
如果過了一段時間 Kafka 停止發送心跳了,會話(Session)就會過期,組織協調者就會認為這個 Consumer 已經死亡,就會觸發一次重平衡。如果消費者宕機並且停止發送消息,組織協調者會等待幾秒鐘,確認它死亡了才會觸發重平衡。在這段時間裡, 死亡的消費者將不處理任何消息。在清理消費者時,消費者將通知協調者它要離開群組,組織協調者會觸發一次重平衡,儘量降低處理停頓。
重平衡是一把雙刃劍,它為消費者群組帶來高可用性和伸縮性的同時,還有有一些明顯的缺點(bug),而這些 bug 到現在社區還無法修改。
重平衡的過程對消費者組有極大的影響。因為每次重平衡過程中都會導致萬物靜止,參考 JVM 中的垃圾回收機制,也就是 Stop The World ,STW,(引用自《深入理解 Java 虛擬機》中 p76 關於 Serial 收集器的描述):
更重要的是它在進行垃圾收集時,必須暫停其他所有的工作線程。直到它收集結束。 Stop The World 這個名字聽起來很帥,但這項工作實際上是由虛擬機在後台自動發起並完成的,在用戶不可見的情況下把用戶正常工作的線程全部停掉,這對很多應用來說都是難以接受的。
也就是說,在重平衡期間,消費者組中的消費者實例都會停止消費,等待重平衡的完成。而且重平衡這個過程很慢......
創建消費者
上面的理論說的有點多,下面就通過代碼來講解一下消費者是如何消費的
在讀取消息之前,需要先創建一個 KafkaConsumer 對象。創建 KafkaConsumer 對象與創建 KafkaProducer 對象十分相似 --- 把需要傳遞給消費者的屬性放在 properties 對象中,後面我們會著重討論 Kafka 的一些配置,這裡我們先簡單的創建一下,使用3個屬性就足矣,分別是 bootstrap.server , key.deserializer , value.deserializer 。
還有一個屬性是 group.id 這個屬性不是必須的,它指定了 KafkaConsumer 是屬於哪個消費者群組。創建不屬於任何一個群組的消費者也是可以的
Properties properties = new Properties();
properties.put("bootstrap.server","192.168.1.9:9092"); properties.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer"); properties.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");
KafkaConsumerconsumer = new KafkaConsumer<>(properties);
主題訂閱
創建好消費者之後,下一步就開始訂閱主題了。 subscribe() 方法接受一個主題列表作為參數,使用起來比較簡單
consumer.subscribe(Collections.singletonList("customerTopic"));
為了簡單我們只訂閱了一個主題 customerTopic ,參數傳入的是一個正則表達式,正則表達式可以匹配多個主題,如果有人創建了新的主題,並且主題的名字與正則表達式相匹配,那麼會立即觸發一次重平衡,消費者就可以讀取新的主題。
要訂閱所有與 test 相關的主題,可以這樣做
consumer.subscribe("test.*");
輪詢
我們知道,Kafka 是支持訂閱/發布模式的,生產者發送數據給 Kafka Broker,那麼消費者是如何知道生產者發送了數據呢?其實生產者產生的數據消費者是不知道的,KafkaConsumer 採用輪詢的方式定期去 Kafka Broker 中進行數據的檢索,如果有數據就用來消費,如果沒有就再繼續輪詢等待,下面是輪詢等待的具體實現
try {
while (true) {
ConsumerRecordsrecords = consumer.poll(Duration.ofSeconds(100));
for (ConsumerRecordrecord : records) {
int updateCount = 1;
if (map.containsKey(record.value())) {
updateCount = (int) map.get(record.value() + 1);
}
map.put(record.value(), updateCount);
}
}
}finally {
consumer.close();
}
線程安全性
在同一個群組中,我們無法讓一個線程運行多個消費者,也無法讓多個線程安全的共享一個消費者。按照規則,一個消費者使用一個線程,如果一個消費者群組中多個消費者都想要運行的話,那麼必須讓每個消費者在自己的線程中運行,可以使用 Java 中的 ExecutorService 啟動多個消費者進行進行處理。
消費者配置
到目前為止,我們學習了如何使用消費者 API,不過只介紹了幾個最基本的屬性,Kafka 文檔列出了所有與消費者相關的配置說明。大部分參數都有合理的默認值,一般不需要修改它們,下面我們就來介紹一下這些參數。
該屬性指定了消費者從伺服器獲取記錄的最小位元組數。broker 在收到消費者的數據請求時,如果可用的數據量小於 fetch.min.bytes 指定的大小,那麼它會等到有足夠的可用數據時才把它返回給消費者。這樣可以降低消費者和 broker 的工作負載,因為它們在主題使用頻率不是很高的時候就不用來回處理消息。如果沒有很多可用數據,但消費者的 CPU 使用率很高,那麼就需要把該屬性的值設得比默認值大。如果消費者的數量比較多,把該屬性的值調大可以降低 broker 的工作負載。
我們通過上面的 fetch.min.bytes 告訴 Kafka,等到有足夠的數據時才會把它返回給消費者。而 fetch.max.wait.ms 則用於指定 broker 的等待時間,默認是 500 毫秒。如果沒有足夠的數據流入 kafka 的話,消費者獲取的最小數據量要求就得不到滿足,最終導致 500 毫秒的延遲。如果要降低潛在的延遲,就可以把參數值設置的小一些。如果 fetch.max.wait.ms 被設置為 100 毫秒的延遲,而 fetch.min.bytes 的值設置為 1MB,那麼 Kafka 在收到消費者請求後,要麼返回 1MB 的數據,要麼在 100 ms 後返回所有可用的數據。就看哪個條件首先被滿足。
該屬性指定了伺服器從每個分區里返回給消費者的 最大位元組數 。它的默認值時 1MB,也就是說, KafkaConsumer.poll() 方法從每個分區里返回的記錄最多不超過 max.partition.fetch.bytes 指定的位元組。如果一個主題有20個分區和5個消費者,那麼每個消費者需要 至少 4 MB的可用內存來接收記錄。在為消費者分配內存時,可以給它們多分配一些,因為如果群組裡有消費者發生崩潰,剩下的消費者需要處理更多的分區。max.partition.fetch.bytes 的值必須比 broker 能夠接收的最大消息的位元組數(通過 max.message.size 屬性配置大), 否則消費者可能無法讀取這些消息,導致消費者一直掛起重試 。 在設置該屬性時,另外一個考量的因素是消費者處理數據的時間。消費者需要頻繁的調用 poll() 方法來避免會話過期和發生分區再平衡,如果單次調用poll() 返回的數據太多,消費者需要更多的時間進行處理,可能無法及時進行下一個輪詢來避免會話過期。如果出現這種情況,可以把 max.partition.fetch.bytes 值改小,或者延長會話過期時間。
這個屬性指定了消費者在被認為死亡之前可以與伺服器斷開連接的時間,默認是 3s。如果消費者沒有在 session.timeout.ms 指定的時間內發送心跳給群組協調器,就會被認定為死亡,協調器就會觸發重平衡。把它的分區分配給消費者群組中的其它消費者,此屬性與 heartbeat.interval.ms 緊密相關。heartbeat.interval.ms 指定了 poll() 方法向群組協調器發送心跳的頻率,session.timeout.ms 則指定了消費者可以多久不發送心跳。所以,這兩個屬性一般需要同時修改,heartbeat.interval.ms 必須比 session.timeout.ms 小,一般是 session.timeout.ms 的三分之一。如果 session.timeout.ms 是 3s,那麼 heartbeat.interval.ms 應該是 1s。把 session.timeout.ms 值設置的比默認值小,可以更快地檢測和恢復崩憤的節點,不過長時間的輪詢或垃圾收集可能導致非預期的重平衡。把該屬性的值設置得大一些,可以減少意外的重平衡,不過檢測節點崩潰需要更長的時間。
該屬性指定了消費者在讀取一個沒有偏移量的分區或者偏移量無效的情況下的該如何處理。它的默認值是 latest ,意思指的是,在偏移量無效的情況下,消費者將從最新的記錄開始讀取數據。另一個值是 earliest ,意思指的是在偏移量無效的情況下,消費者將從起始位置處開始讀取分區的記錄。
我們稍後將介紹幾種不同的提交偏移量的方式。該屬性指定了消費者是否自動提交偏移量,默認值是 true,為了儘量避免出現重複數據和數據丟失,可以把它設置為 false,由自己控制何時提交偏移量。如果把它設置為 true,還可以通過 auto.commit.interval.ms 屬性來控制提交的頻率
我們知道,分區會分配給群組中的消費者。 PartitionAssignor 會根據給定的消費者和主題,決定哪些分區應該被分配給哪個消費者,Kafka 有兩個默認的分配策略 Range 和 RoundRobin
該屬性可以是任意字符串,broker 用他來標識從客戶端發送過來的消息,通常被用在日誌、度量指標和配額中
該屬性用於控制單次調用 call() 方法能夠返回的記錄數量,可以幫你控制在輪詢中需要處理的數據量。
socket 在讀寫數據時用到的 TCP 緩衝區也可以設置大小。如果它們被設置為 -1,就使用作業系統默認值。如果生產者或消費者與 broker 處於不同的數據中心內,可以適當增大這些值,因為跨數據中心的網絡一般都有比較高的延遲和比較低的帶寬。
特殊偏移
我們上面提到,消費者在每次調用 poll() 方法進行定時輪詢的時候,會返回由生產者寫入 Kafka 但是還沒有被消費者消費的記錄,因此我們可以追蹤到哪些記錄是被群組裡的哪個消費者讀取的。消費者可以使用 Kafka 來追蹤消息在分區中的位置(偏移量)
消費者會向一個叫做 _consumer_offset 的特殊主題中發送消息,這個主題會保存每次所發送消息中的分區偏移量,這個主題的主要作用就是消費者觸發重平衡後記錄偏移使用的,消費者每次向這個主題發送消息,正常情況下不觸發重平衡,這個主題是不起作用的,當觸發重平衡後,消費者停止工作,每個消費者可能會分到對應的分區,這個主題就是讓消費者能夠繼續處理消息所設置的。
如果提交的偏移量小於客戶端最後一次處理的偏移量,那麼位於兩個偏移量之間的消息就會被重複處理
如果提交的偏移量大於最後一次消費時的偏移量,那麼處於兩個偏移量中間的消息將會丟失
既然 _consumer_offset 如此重要,那麼它的提交方式是怎樣的呢?下面我們就來說一下
提交方式
KafkaConsumer API 提供了多種方式來提交偏移量
自動提交
最簡單的方式就是讓消費者自動提交偏移量。如果 enable.auto.commit 被設置為true,那麼每過 5s,消費者會自動把從 poll() 方法輪詢到的最大偏移量提交上去。提交時間間隔由 auto.commit.interval.ms 控制,默認是 5s。與消費者里的其他東西一樣,自動提交也是在輪詢中進行的。消費者在每次輪詢中會檢查是否提交該偏移量了,如果是,那麼就會提交從上一次輪詢中返回的偏移量。
提交當前偏移量
把 auto.commit.offset 設置為 false,可以讓應用程式決定何時提交偏移量。使用 commitSync() 提交偏移量。這個 API 會提交由 poll() 方法返回的最新偏移量,提交成功後馬上返回,如果提交失敗就拋出異常。
commitSync() 將會提交由 poll() 返回的最新偏移量,如果處理完所有記錄後要確保調用了 commitSync(),否則還是會有丟失消息的風險,如果發生了在均衡,從最近一批消息到發生在均衡之間的所有消息都將被重複處理。
異步提交
異步提交 commitAsync() 與同步提交 commitSync() 最大的區別在於異步提交不會進行重試,同步提交會一致進行重試。
同步和異步組合提交
一般情況下,針對偶爾出現的提交失敗,不進行重試不會有太大的問題,因為如果提交失敗是因為臨時問題導致的,那麼後續的提交總會有成功的。但是如果在關閉消費者或再均衡前的最後一次提交,就要確保提交成功。
因此,在消費者關閉之前一般會組合使用commitAsync和commitSync提交偏移量。
提交特定的偏移量
消費者API允許調用 commitSync() 和 commitAsync() 方法時傳入希望提交的 partition 和 offset 的 map,即提交特定的偏移量。