事件源作為一種應用程式體系結構模式越來越流行。事件源涉及將應用程式進行的狀態更改建模為事件的不可變序列或「日誌」。事件源不是在現場修改應用程式的狀態,而是將觸髮狀態更改的事件存儲在不可變的日誌中,並將狀態更改建模為對日誌中事件的響應。我們之前曾寫過有關事件源,Apache Kafka及其相關性的文章。在本文中,我將進一步探討這些想法,並展示流處理(尤其是Kafka Streams)如何幫助將事件源和CQRS付諸實踐。
讓我們舉個例子。考慮一個類似於Facebook的社交網絡應用程式(儘管完全是假設的),當用戶更新其Facebook個人資料時會更新個人資料資料庫。當用戶更新其個人資料時,需要通知多個應用程式-搜索應用程式,以便可以將用戶的個人資料重新編制索引以便可以在更改的屬性上進行搜索;新聞訂閱源應用程式,以便用戶的聯繫可以找到有關個人資料更新的信息;數據倉庫ETL應用程式將最新的概要文件數據加載到支持各種分析查詢等的中央數據倉庫中。
基於事件源的架構
事件來源涉及更改配置文件Web應用程式,以將配置文件更新建模為事件(發生的重要事件),並將其寫入中央日誌(例如Kafka主題)。在這種情況下,所有需要響應配置文件更新事件的應用程式,只需訂閱Kafka主題並創建各自的物化視圖-可以寫緩存,在Elasticsearch中為事件建立索引或簡單地計算in -內存聚合。個人檔案Web應用程式本身也訂閱了相同的Kafka主題,並將更新內容寫入個人檔案資料庫。
事件溯源:一些權衡
使用事件源對應用程式進行建模有許多優點-它提供了對對象進行的每個狀態更改的完整日誌;因此故障排除更加容易。通過將用戶意圖表示為不可變事件的有序日誌,事件源為企業提供了審核和合規性日誌,這還具有提供數據源的額外好處。它支持彈性應用程式;回滾應用程式等於倒退事件日誌和重新處理數據。具有較好的性能特點;寫入和讀取可以獨立縮放。它實現了鬆散耦合的應用程式體系結構。它使向基於微服務的體系結構過渡變得更容易。但最重要的是:
事件源支持構建前向兼容的應用程式體系結構,即將來可以添加更多需要處理同一事件但創建不同實例化視圖的應用程式的能力。
對於上述優點,也有一些缺點。事件源具有更高的學習曲線;這是一個陌生的新編程模型。事件日誌可能涉及更多的查詢工作,因為它需要將事件轉換為適合查詢的所需物化狀態。
那是對事件源和一些權衡的快速介紹。本文無意探討事件源的細節或提倡其用途。您可以在此處閱讀有關事件來源和各種折衷方法的更多信息。
Kafka作為事件溯源的支柱
事件源與Apache Kafka相關。這是如何進行的-事件來源涉及維護多個應用程式可以訂閱的不可變事件序列。 Kafka是一種高性能,低延遲,可擴展和持久的日誌,已被全球數千家公司使用,並經過了大規模的實戰測試。因此,Kafka是存儲事件的自然支柱,同時向基於事件源的應用程式體系結構發展。
事件溯源和CQRS
此外,事件源和CQRS應用程式體系結構模式也相關。命令查詢責任隔離(CQRS)是最常用於事件源的應用程式體系結構模式。 CQRS涉及在內部將應用程式分為兩部分-命令端命令系統更新狀態,而查詢端則在不更改狀態的情況下獲取信息。 CQRS提供了關注點分離–命令或寫端與業務有關;它不關心查詢,數據上的不同實例化視圖,針對性能的實例化視圖的最佳存儲等。另一方面,查詢或讀取端全部與讀取訪問權限有關。其主要目的是使查詢快速高效。
Refactoring an application using event sourcing and CQRS
事件源與CQRS一起工作的方式是使應用程式的一部分在對事件日誌或Kafka主題的寫入過程中對更新進行建模。這與事件處理程序配對,該事件處理程序訂閱Kafka主題,根據需要轉換事件,並將實例化視圖寫入讀取存儲。最後,應用程式的讀取部分針對讀取存儲發出查詢。
CQRS具有一些優點-它使負載與寫入和讀取分離,從而可以分別縮放。各種讀取路徑本身可以獨立縮放。此外,可以針對應用程式的查詢模式優化讀取存儲;圖形應用程式可以將Neo4j用作其讀取存儲,搜索應用程式可以使用Lucene索引,而簡單的內容服務Web應用程式可以使用嵌入式緩存。除了技術優勢之外,CQRS還具有組織上的優勢-通過將寫入和讀取路徑分離,您可以使負責寫入和讀取路徑的業務邏輯的團隊脫鉤。
本文僅涉及CQRS細微差別的表面。如果您想了解更多信息,建議閱讀Martin Fowler和Udi Dahan關於該主題的文章。
到目前為止,我已經對事件源和CQRS進行了介紹,並描述了Kafka如何自然地將這些應用程式架構模式付諸實踐。但是,流處理在何處以及如何進入畫面?
CQRS和Kafka的Streams API
這是流處理,尤其是Kafka Streams如何啟用CQRS的方法。事件處理程序訂閱事件日誌(Kafka主題),使用事件,處理這些事件,並將結果更新應用於讀取存儲。對事件流進行低延遲轉換的過程稱為流處理。在Apache Kafka的0.10版本中,社區發布了Kafka Streams。一個強大的流處理引擎,用於對Kafka主題上的轉換進行建模。
Kafka Streams非常適合在應用程式內部構建事件處理程序組件,該應用程式旨在使用CQRS進行事件來源。它是一個庫,因此可以將其嵌入任何標準Java應用程式中,以對事件流進行轉換建模。例如,這是一個使用Kafka Streams進行字數統計的代碼片段;您可以在Confluent示例github存儲庫中訪問整個程序的代碼。
KStreamBuilder builder = new KStreamBuilder();
KStream
textLines = builder.stream(stringSerde, stringSerde,"TextLinesTopic"); Pattern pattern = Pattern.compile("\\\\W+", Pattern.UNICODE_CHARACTER_CLASS);
KStream
wordCounts = textLines .flatMapValues(value-> Arrays.asList(pattern.split(value.toLowerCase())))
.map((key, word) -> new KeyValue<>(word, word))
.countByKey("Counts")
.toStream();
wordCounts.to(stringSerde, longSerde, "WordsWithCountsTopic");
KafkaStreams streams = new KafkaStreams(builder, streamsConfiguration);
streams.start();
因此,可以輕鬆地將應用程式內的事件處理程序表示為Kafka Streams拓撲,但更進一步,有兩個不同的選項可用於將事件處理程序的輸出建模為對應用程式狀態進行建模的數據存儲的更新。
採取1:將應用程式狀態建模為外部數據存儲
Kafka Streams拓撲的輸出可以是Kafka主題(如上例所示),也可以寫入外部數據存儲(如關係數據庫)。 從世界的角度來看,事件處理程序建模為Kafka Streams拓撲,而應用程式狀態建模為用戶信任和操作的外部數據存儲。 執行CQRS的此選項主張使用Kafka Streams僅對事件處理程序建模,而將應用程式狀態保留在外部數據存儲中,該外部數據存儲是Kafka Streams拓撲的最終輸出。
以2:在Kafka Streams中將應用程式狀態建模為本地狀態
作為一種替代方法,除了對事件處理程序進行建模之外,Kafka Streams還提供了一種對應用程式狀態進行建模的有效方法-它支持開箱即用的本地,分區和持久狀態。此本地狀態可以是RocksDB存儲,也可以是內存中的哈希映射。
運作方式是,將嵌入Kafka Streams庫以進行有狀態流處理的應用程式的每個實例都託管應用程式狀態的子集,建模為狀態存儲的碎片或分區。狀態存儲區的分區方式與應用程式的密鑰空間相同。結果,服務於到達特定應用程式實例的查詢所需的所有數據在狀態存儲碎片中本地可用。 Kafka Streams通過透明地將對狀態存儲所做的所有更新記錄到高度可用且持久的Kafka主題中,來提供對該本地狀態存儲的容錯功能。因此,如果應用程式實例死亡,並且託管的本地狀態存儲碎片丟失,則Kafka Streams只需讀取高度可用的Kafka主題並將狀態數據重新填充即可重新創建狀態存儲碎片。
實際上,Kafka Streams將Kafka用作其本地嵌入式資料庫的提交日誌。這正是在封面下設計傳統資料庫的方式-事務或重做日誌是事實的源頭,而表只是對存儲在事務日誌中的數據的物化視圖。
Kafka Streams中的本地,分區,持久狀態
將Kafka Streams用於使用CQRS構建的有狀態應用程式還具有更多優勢– Kafka Streams還內置了負載平衡和故障轉移功能。如果一個應用程式實例失敗,則Kafka Streams會自動在其餘應用程式實例之間重新分配Kafka主題的分區以及內部狀態存儲碎片。同樣,Kafka Streams允許彈性縮放。如果啟動了使用Kafka Streams執行CQRS的應用程式的新實例,它將自動在新啟動的應用程式實例之間平均移動狀態存儲的現有碎片以及Kafka主題的分區。所有這些功能都以透明的方式提供給Kafka Streams用戶。
需要使用Kafka Streams轉換為基於CQRS的模式的應用程式不必擔心應用程式及其狀態的容錯性,可用性和可伸縮性。
該嵌入式,分區且持久的狀態存儲通過Kafka Streams獨有的一流抽象-KTable向用戶公開。
Kafka流中的交互式查詢
在即將發布的Apache Kafka版本中,Kafka Streams將允許其嵌入式狀態存儲可查詢。
Kafka Streams中的這一獨特功能-交互式查詢(以前被Kafka社區稱為Queryable State)-也使其適合將CQRS設計模式應用於應用程式。事件處理程序被建模為Kafka Streams拓撲,該拓撲將數據生成到讀取存儲,該存儲不過是Kafka Streams內部的嵌入式狀態存儲。應用程式的讀取部分將StateStore API用於狀態存儲,並基於其get()API來提供讀取服務。
使用Kafka和Kafka Streams的事件源和基於CQRS的應用程式
Kafka Streams中的交互式查詢的情況
請注意,使用交互式查詢功能在Kafka Streams中使用嵌入式狀態存儲純粹是可選的,並非對所有應用程式都有意義。有時,您只想使用您知道並信任的外部資料庫。或者,在使用Kafka Streams時,您也可以將數據發送到外部資料庫(例如Cassandra),並讓應用程式的讀取部分查詢該數據。
但是,何時使用像這樣的本地嵌入式應用程式狀態才有意義?這裡有一些利弊考慮-
缺點
- 現在生成的應用程式是有狀態的,需要多加註意才能進行管理。
- 它涉及遠離您知道和信任的數據存儲。
優點
- 移動的零件更少;只是您的應用程式和Kafka集群。您不必部署,維護和操作外部資料庫即可存儲應用程式所需的狀態。
- 它可以更快,更有效地使用應用程式狀態。數據對於您的應用程式是本地的(在內存中或可能在SSD上);您可以快速訪問它。這對於需要訪問大量應用程式狀態的應用程式特別有用。而且,在進行聚合以進行流處理的商店和商店應答查詢之間沒有數據重複。
- 它提供了更好的隔離;狀態在應用程式內。一個惡意應用程式無法淹沒其他有狀態應用程式共享的中央數據存儲。
- 它具有靈活性。內部應用程式狀態可以針對應用程式所需的查詢模式進行優化。
使用Kafka做事件溯源和CQRS:大贏家
我上面列出的利弊體現了所涉及的各種折衷,但是,我認為,朝著此應用程式體系結構邁進的最重要的勝利就是應用程式升級變得更加簡單。處理應用程式的非停機升級的傳統模型(依賴於外部資料庫來確定其應用程式狀態)相當複雜。無需停機升級就不需要同時運行新版本和舊版本的應用程式。升級幾個實例後,如果發現錯誤,則需要能夠透明地將負載切換回同一應用程式的舊實例。鑒於新實例和舊實例將需要更新外部資料庫中的相同表,因此需要格外小心,以在不破壞狀態存儲中數據的情況下進行此類無停機升級。
現在,對於依賴於本地嵌入式狀態的有狀態應用程式,考慮相同的無停機升級問題。通過此模型,您可以與舊版本一起推出新版本的應用程式(在Kafka Streams中具有不同的應用程式ID)。每個人都擁有按照其應用程式業務邏輯版本指示的方式處理的應用程式狀態副本。您可以逐步將流量從舊的引導到新的。如果新版本的某個錯誤會在應用程式狀態存儲區中產生意外結果,那麼您始終可以將其丟棄,修復該錯誤,重新部署該應用程式並讓其從日誌中重建其狀態。
放在一起:零售庫存應用
現在讓我們以一個例子來說明如何將本文介紹的概念付諸實踐-如何使用Kafka和Kafka Streams為應用程式啟用事件源和CQRS。
樣本零售應用程式體系結構
考慮一個實體零售商的應用程式,該應用程式管理所有商店的庫存; 當新貨到達或發生新銷售時,它會更新庫存表,並且要知道商店庫存的當前狀態,它會查詢庫存表。
具有事件源的零售應用程式架構—由Kafka提供支持
如果我們將事件採購體系結構模式應用於此Inventory應用,則新的貨件將在Shipments Kafka主題中表示為事件。 同樣,新銷售將以Sales Kafka主題(可能由Sales應用程式編寫)中的事件表示。 為簡單起見,我們假設「銷售」和「發貨」主題中的Kafka消息的關鍵字是{商店ID,商品ID},而值是商店中商品數量的計數。
Inventory應用程式內的事件處理程序被建模為Kafka Streams拓撲,該拓撲連接了Sales和Shipments Kafka主題。 聯接操作創建並更新狀態存儲庫InventoryTable,該狀態存儲庫表示以連續方式更新的清單的當前狀態。
連接操作的內部結構以構建庫存表
可以將這樣的應用程式部署在不同計算機上的多個實例中(如下圖所示)。而且,InventoryApp的每個實例都承載InventoryTable的分片的子集,其中包含此聯接操作的結果。當用戶查詢InventoryApp來了解商店中某商品的當前庫存數量時,
- 運行InventoryApp的隨機伺服器收到一個請求:GET / inventory / stores / {store id} / items / {item id} / count
- 它使用Kafka Streams實例上的metadataForKey()API來獲取商店的StreamsMetadata和密鑰。 StreamsMetadata保存Kafka Streams拓撲中每個商店的主機和埠信息。應用程式使用StreamsMetadata檢查該實例是否具有包含關鍵字{store id,item id}的InventoryTable分區。如果是這樣,它將使用本地Kafka Streams實例上的store(「 InventoryTable」)api來獲取該商店並對其進行查詢。
- 如果不是,它將為當前持有包含{store id,item id}的Kafka分區的實例找到主機/埠,並轉發GET請求到/ inventory / stores / {store id} / items / {item id} / count到在該主機上運行的InventoryApp實例。
- 向用戶返回庫存檔點
在Kafka Streams中使用交互式查詢的InventoryState應用程式
要了解有關「交互式查詢」功能的更多信息,請閱讀其文檔。除了這些資源之外,請參閱Capital One的演示文稿,該演示文稿將在實踐中應用本文中介紹的一些思想,並概述使用Kafka Streams的基於REST,事件源,CQRS和響應流處理的應用程式體系結構。
如上例所示,存儲和查詢本地狀態對於某些有狀態應用程式可能沒有意義。有時,您想將狀態存儲在您知道並信任的外部資料庫中。例如,在上面的示例中,您可以使用Kafka Streams通過join操作來計算庫存數量,但選擇將結果寫入外部資料庫並查詢。
但是,值得注意的是,構建具有查詢本地狀態的有狀態應用程式有許多優點,如本文前面所述。
結論性思想
事件尋源為應用程式使用零損失協議記錄其固有的不可避免的狀態變化提供了一種有效的方法。這意味著恢復既簡單又高效,因為它完全基於日記或像Kafka這樣的有序日誌。 CQRS更進一步,將原始事件變成可查詢的視圖;精心形成的與其他業務流程相關的視圖。 Kafka的Streams API提供了以流方式創建這些視圖所需的聲明性功能,以及可擴展的查詢層,因此用戶可以直接與此視圖進行交互。結果是在Apache Kafka上構建了適用的基於事件源和CQRS的應用程式體系結構;允許此類應用程式還利用Kafka的核心競爭力-性能,可伸縮性,安全性,可靠性和大規模採用。
最重要的是,以這種方式構建有狀態的應用程式可使組織最終獲得鬆散耦合的應用程式體系結構-一種具有彈性和可伸縮性,更易於故障排除和升級的應用程式體系結構,最重要的是,該體系結構具有前向兼容性。
對更多感興趣?
如果您喜歡本文,則可能需要繼續使用以下資源,以了解有關Apache Kafka上流處理的更多信息:
- 使用Apache Kafka的流SQL引擎KSQL入門,並遵循Stream Processing Cookbook中的各種教程和示例快速入門。
- 開始使用Kafka Streams API來構建自己的實時應用程式和微服務。
- 觀看我們的分為三部分的在線講座系列,了解KSQL如何工作的來龍去脈,並學習如何有效地使用它來執行監視,安全性和異常檢測,在線數據集成,應用程式開發,流ETL等。
- 通過Docker瀏覽有關Kafka Streams API的Confluent教程,並使用我們的Confluent演示應用程式。
原文:https://www.confluent.io/blog/event-sourcing-cqrs-stream-processing-apache-kafka-whats-connection/
本文:https://pub.intelligentx.net/node/788
討論:請加入知識星球或者小紅圈【首席架構師圈】