一、RocketMQ
保證消費成功
PushConsumer 為了保證消息肯定消費成功,只有使用方明確表示消費成功,RocketMQ 才會認為消息消費成功。中途斷電,拋出異常等都不會認為成功——即都會重新投遞。
業務實現消費回調的時候,若且唯若此回調函數返回ConsumeConcurrentlyStatus.CONSUME_SUCCESS,RocketMQ 才會認為這批消息(默認是1條)是消費完成的。
如果這時候消息消費失敗,只要返回ConsumeConcurrentlyStatus.RECONSUME_LATER,RocketMQ 就會認為這批消息消費失敗了。如果業務的回調沒有處理好而拋出異常,會認為是消費失敗當ConsumeConcurrentlyStatus.RECONSUME_LATER處理。
為了保證消息是肯定被至少消費成功一次,RocketMQ 會把這批消息重發回 Broker(topic 不是原 topic 而是這個消費租的 RETRY topic),在延遲的某個時間點(默認是10秒,業務可設置)後,再次投遞到這個 ConsumerGroup。而如果一直這樣重複消費都持續失敗到一定次數(默認16次),就會投遞到 DLQ 死信隊列。應用可以監控死信隊列來做人工干預。
啟動的時候從哪裡消費
當新實例啟動的時候,PushConsumer 會拿到本消費組 broker 已經記錄好的消費進度(consumer offset),按照這個進度發起自己的第一次 Pull 請求。
如果這個消費進度在 Broker 並沒有存儲起來,證明這個是一個全新的消費組,這時候客戶端有幾個策略可以選擇:
- CONSUME_FROM_LAST_OFFSET:默認策略,從該隊列最尾開始消費,即跳過歷史消息
- CONSUME_FROM_FIRST_OFFSET:從隊列最開始開始消費,即歷史消息(還儲存在 broker 的)全部消費一遍
- CONSUME_FROM_TIMESTAMP:從某個時間點開始消費,和setConsumeTimestamp()配合使用,默認是半個小時以前
消息 ACK 機制
RocketMQ 是以consumer group+queue為單位是管理消費進度的,以一個 consumer offset 標記這個這個消費組在這條 queue 上的消費進度。如果某已存在的消費組出現了新消費實例的時候,依靠這個組的消費進度,就可以判斷第一次是從哪裡開始拉取的。
每次消息成功後,本地的消費進度會被更新,然後由定時器定時同步到 broker,以此持久化消費進度。但是每次記錄消費進度的時候,只會把一批消息中最小的 offset 值為消費進度值。
這鐘方式和傳統的一條 message 單獨 ack 的方式有本質的區別。性能上提升的同時,會帶來一個潛在的重複問題——由於消費進度只是記錄了一個下標,就可能出現拉取了100條消息如 2101-2200的消息,後面99條都消費結束了,只有2101消費一直沒有結束的情況。
在這種情況下,RocketMQ 為了保證消息肯定被消費成功,消費進度只能維持在2101,直到2101也消費結束了,本地的消費進度才能標記2200消費結束了。
在這種設計下,就有消費大量重複的風險。如2101在還沒有消費完成的時候消費實例突然退出(機器斷電,或者被 kill)。這條 queue 的消費進度還是維持在2101,當 queue 重新分配給新的實例的時候,新的實例從 broker 上拿到的消費進度還是維持在2101,這時候就會又從2101開始消費,2102-2200這批消息實際上已經被消費過還是會投遞一次。
對於這個場景,RocketMQ 暫時無能為力,所以業務必須要保證消息消費的冪等性,這也是 RocketMQ 官方多次強調的態度。
消息重試機制
首先,我們需要明確,只有當消費模式為集群模式時,Broker 才會自動進行重試,對於廣播消息是不會重試的。集群消費模式下,當消息消費失敗,RocketMQ 會通過消息重試機制重新投遞消息,努力使該消息消費成功。
死信的業務處理方式
默認的處理機制中,如果我們只對消息做重複消費,達到最大重試次數之後消息就進入死信隊列了。RocketMQ 的處理方式為將達到最大重試次數(16 次)的消息標記為死信消息,將該死信消息投遞到 DLQ 死信隊列中,業務需要進行人工干預。
二、Kafka
拉取循環
Kafka 對外暴露了一個非常簡潔的 poll 方法,其內部實現了協作、分區重平衡、心跳、數據拉取等功能。
另外需要提醒的是,消費者對象不是線程安全的,也就是不能夠多個線程同時使用一個消費者對象;而且也不能夠一個線程有多個消費者對象。簡而言之,一個線程一個消費者,如果需要多個消費者那麼請使用多線程來進行一一對應。
提交(commit)與位移(offset)
當我們調用poll()時,該方法會返回我們沒有消費的消息。當消息從 broker 返回消費者時,broker 並不跟蹤這些消息是否被消費者接收到;Kafka 讓消費者自身來管理消費的位移,並向消費者提供更新位移的接口,這種更新位移方式稱為提交(commit)。
在正常情況下,消費者會發送分區的提交信息到 Kafka,Kafka 進行記錄。當消費者宕機或者新消費者加入時,Kafka 會進行重平衡,這會導致消費者負責之前並不屬於它的分區。重平衡完成後,消費者會重新獲取分區的位移,下面來看下兩種有意思的情況。
假如一個消費者在重平衡前後都負責某個分區,如果提交位移比之前實際處理的消息位移要小,那麼會導致消息重複消費,如下所示:
假如在重平衡前某個消費者拉取分區消息,在進行消息處理前提交了位移,但還沒完成處理宕機了,然後 Kafka 進行重平衡,新的消費者負責此分區並讀取提交位移,此時會「丟失」消息,如下所示:
對於所有消費者消費失敗的消息,rocketMQ 都會把重試的消息 重新 new 出來,然後投遞到主題SCHEDULE_TOPIC_XXXX下的隊列中,然後由定時任務進行調度重試,同時為了保證消息可被找到,也會將原先的 topic 存儲到 properties 中。
消費重試與死信隊列
Kafka 沒有重試機制不支持消息重試,也沒有死信隊列,因此使用 Kafka 做消息隊列時,如果遇到了消息在業務處理時出現異常,就會很難進行下一步處理。應對這種場景,需要自己實現消息重試的功能。