在體系結構規劃期間選擇正確的消息傳遞系統始終是一個挑戰,但這是需要確定的最重要的考慮因素之一。作為一名開發人員,我每天都要編寫需要服務大量用戶並實時處理大量數據的應用程式。
通常,我將Java與Spring框架(Spring Boot、Spring數據、Spring雲、Spring緩存等)一起使用。Spring Boot是一個框架,它允許我比以前更快更輕鬆地完成開發過程。它已在我的組織中發揮了關鍵作用。隨著用戶數量的快速增長,我們意識到我們顯然需要每秒處理1,000,000個事件。
當我們發現Apache Kafka時,我們發現它滿足了我們的需求,可以快速處理數百萬條消息。這就是為什麼我們決定嘗試一下。從那一刻起,卡夫卡就成了我口袋裡的重要工具。你會問,我為什麼選擇它?
Apache Kafka是:
- 可伸縮的
- 容錯
- 一個很棒的發布-訂閱消息傳遞系統
- 與大多數消息傳遞系統相比,具有更高的吞吐量
- 高度耐用
- 高度可靠
- 高的性能
這就是為什麼我決定在我的項目中使用它。根據我的經驗,我在這裡提供了一個循序漸進的指南,介紹如何在Spring啟動應用程式中包含Apache Kafka,以便您也可以開始利用它的優點。
先決條件
- 本文要求您擁有Confluent平台
- 手動安裝使用ZIP和TAR檔案
- 下載
- 解壓縮它
- 按照逐步說明,您將在本地環境中啟動和運行Kafka
我建議在您的開發中使用Confluent CLI來啟動和運行Apache Kafka和流平台的其他組件。
你會從這本指南中得到什麼
閱讀完本指南後,您將擁有一個Spring Boot應用程式,其中包含一個Kafka生成器,用於向您的Kafka主題發布消息,以及一個Kafka使用者,用於讀取這些消息。
好了,讓我們開始吧!
表的內容
- 步驟1:生成項目
- 步驟2:發布/讀取來自Kafka主題的消息
- 步驟3:通過應用程式配置Kafka。yml配置文件
- 步驟4:創建一個生產者
- 第五步:創造一個消費者
- 步驟6:創建一個REST控制器
步驟1:生成項目
首先,讓我們使用Spring Initializr來生成我們的項目。我們的項目將有Spring MVC/web支持和Apache Kafka支持。
一旦你解壓縮了這個項目,你將會有一個非常簡單的結構。我將在本文的最後向您展示項目的外觀,以便您能夠輕鬆地遵循相同的結構。我將使用Intellij IDEA,但是你可以使用任何Java IDE。
步驟2:發布/讀取來自Kafka主題的消息
現在,你可以看到它是什麼樣的。讓我們繼續討論來自Kafka主題的發布/閱讀消息。
首先創建一個簡單的Java類,我們將使用它作為示例:package com.demo.models;
public class User {
private String name;
private int age;
public User(String name, int age) {
this.name = name;
this.age = age;
}
}
步驟3:通過應用程式配置Kafka.yml配置文件
接下來,我們需要創建配置文件。我們需要以某種方式配置我們的Kafka生產者和消費者,使他們能夠發布和從主題讀取消息。我們可以使用任意一個應用程式,而不是創建一個Java類,並用@Configuration注釋標記它。屬性文件或application.yml。Spring Boot允許我們避免過去編寫的所有樣板代碼,並為我們提供了更智能的配置應用程式的方法,如下所示:
server: port: 9000
spring:
kafka:
consumer:
bootstrap-servers: localhost:9092
group-id: group_id
auto-offset-reset: earliest
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
producer:
bootstrap-servers: localhost:9092
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
如果您想了解更多關於Spring引導自動配置的信息,可以閱讀這篇簡短而有用的文章。有關可用配置屬性的完整列表,請參閱官方文檔。
步驟4:創建一個生產者
創建生產者將把我們的消息寫入主題。
@Service
public class Producer {
private static final Logger logger = LoggerFactory.getLogger(Producer.class);
private static final String TOPIC = "users";
@Autowired
private KafkaTemplate
kafkaTemplate; public void sendMessage(String message) {
logger.info(String.format("#### -> Producing message -> %s", message));
this.kafkaTemplate.send(TOPIC, message);
}
}
我們只是自動連接KafkaTemplate,並將使用此實例發布消息到主題——這就是生產者!
第五步:創造一個消費者
Consumer是負責根據您自己的業務邏輯的需要讀取消息並對其進行處理的服務。要設置它,請輸入以下內容:
@Service
public class Consumer {
private final Logger logger = LoggerFactory.getLogger(Producer.class);
@KafkaListener(topics = "users", groupId = "group_id")
public void consume(String message) throws IOException {
logger.info(String.format("#### -> Consumed message -> %s", message));
}
}
在這裡,我們告訴我們的方法void consumption (String message)訂閱用戶的主題,並將每個消息發送到應用程式日誌。在實際的應用程式中,可以按照業務需要的方式處理消息。
步驟6:創建一個REST控制器
如果我們已經有了一個消費者,那麼我們就已經擁有了消費Kafka消息所需的一切。
為了完整地顯示我們創建的所有內容是如何工作的,我們需要創建一個具有單個端點的控制器。消息將被發布到這個端點,然後由我們的生產者進行處理。
然後,我們的使用者將以登錄到控制台的方式捕獲和處理它。
@RestController
@RequestMapping(value = "/kafka")
public class KafkaController {
private final Producer producer;
@Autowired
KafkaController(Producer producer) {
this.producer = producer;
}
@PostMapping(value = "/publish")
public void sendMessageToKafkaTopic(@RequestParam("message") String message) {
this.producer.sendMessage(message);
}
}
讓我們用cURL把信息發送給Kafka:
curl -X POST -F 'message=test' http://localhost:9000/kafka/publish
基本上,這是它!在不到10個步驟中,您就了解了將Apache Kafka添加到Spring啟動項目是多麼容易。如果您遵循了這個指南,您現在就知道如何將Kafka集成到您的Spring Boot項目中,並且您已經準備好使用這個超級工具了!
對更感興趣嗎?
如果您想了解更多信息,可以下載Confluent平台,這是Apache Kafka的領先發行版。您還可以在GitHub上找到本文中的所有代碼。
這是Igor Kosandyak的一篇客座文章,他是Oril的一名Java軟體工程師,在各個開發領域都有豐富的經驗。
原文:https://www.confluent.io/blog/apache-kafka-spring-boot-application
本文:https://pub.intelligentx.net/how-work-apache-kafka-your-spring-boot-application
討論:請加入知識星球或者小紅圈【首席架構師圈】