深度解讀!阿里騰訊位元組首選,新一代大數據引擎Flink厲害在哪?附學習禮包

2020-09-15     大數據文摘

原標題:深度解讀!阿里騰訊位元組首選,新一代大數據引擎Flink厲害在哪?附學習禮包

大數據文摘出品

千呼萬喚,Apache Flink 1.11.0終於上新了!

作為備受矚目的新一代開源大數據計算引擎,Flink項目無疑已成為 Apache 基金會和 GitHub 最為活躍的項目之一。自2014年正式開源發展非常迅速,截止到2020年7月,社區的star數達到13600+,contributor 數達到718,有22989次commits,目前在GitHub上的訪問量在Apache項目中位居前三。

作為快速發展的新一代大數據引擎,Flink本身的架構優勢也吸引著越來越多的開源愛好者投入到社區的建設來。目前,Flink已經成為阿里巴巴、騰訊、位元組跳動、滴滴、美團點評等知名公司建設流處理平台的首選。

不過你可能要問,流計算引擎這麼多,為什麼Flink這麼受歡迎?

面對全量數據和增量數據,業界亟需用一套統一的大數據引擎技術來處理所有數據。

Apache Flink 被業界公認為最好的流計算引擎,其計算能力不僅僅局限於做流處理,而是一套兼具流、批、機器學習等多種計算功能的大數據引擎,用戶只需根據業務邏輯開發一套代碼,無論是全量數據還是增量數據,亦或者實時處理,一套方案即可全部支持。

Apache Flink 的系統架構

在近期發布的Apache Flink 1.11.0版本中,超過 200 名貢獻者參與了 Flink 1.11.0 的開發,提交了超過 1300 個修復或優化。這些修改極大的提高了 Flink 的可用性,並且增強了各個 API 棧的功能。

先附上 GitHub 下載地址

https://flink.apache.org/downloads.html#apache-flink-1110

為了讓大家更全面地了解 Apache Flink 背後的技術以及應用實踐,大數據文摘聯合Flink社區,為大家獻上超值學習福利!馬上下載,越早學習,越能抓住時代先機!

重磅福利1:Flink 年度學習資料大禮包!

這份大禮包里不僅有大數據實時計算及 Apache Flink 年度Flink 年度學習資料大禮包,還有300+頁實戰應用精華總結!

  • 零基礎入門,30 天成長為 Flink 大神的經典教程。
  • Apache Flink 核心貢獻者及阿里巴巴技術專家的一線實戰經驗總結。
  • 收錄來自 bilibili、美團點評、小米、OPPO、快手、Lyft、Netflix 等國內外一線大廠實時計算平台及實時數倉最佳實踐案例。

重磅福利2:阿里雲《實時計算Flink極客訓練營》免費開啟!

不過,一個人看學習資料容易半途而廢,配合這份「大禮包」,文摘菌在這裡也安利一波阿里雲下周開營的《實時計算Flink極客訓練營》

在5天的課程中你將獲得:

  • 實現從0到1了解 Flink和流計算概念,為大數據引擎學習打下基礎。
  • 通過實際案例,了解Flink SQL/Table的基礎概念,以及其是如何統一流處理和批處理的;
  • PyFlink簡介及上手,如何通過Flink於外部系統進行對接;
  • 課程內容側重於原理解析與基礎應用,從最實際的應用場景出發引導你深入了解Flink。

點擊下方連結,報名《實時計算Flink極客訓練營》,並加入配套釘釘群,即可免費獲取Flink年度學習大禮包!

https://developer.aliyun.com/learning/trainingcamp/sc/2?utm_content=g_1000181368

還有直播課程教你Flink正確打開方式,幫助你從 Flink 小白成長為 Flink 技術專家!

更重要的是,課程現在報名免費,名額有限,先到先得哦!

深度解讀 Flink 1.11:流批一體 Hive 數倉

最新版的Flink1.11流計算結合 Hive 批處理數倉自上線以來就廣受好評,我們最後也為大家帶來了阿里雲兩位技術專家李勁松(之信)和李銳(天離)Flink 1.11流批一體 Hive 數倉的深度解讀。

最新版的Flink 1.11 中流計算結合 Hive 批處理數倉,給離線數倉帶來 Flink 流處理實時且 Exactly-once 的能力。另外,Flink 1.11 完善了 Flink 自身的 Filesystem connector,大大提高了 Flink 的易用性。

數倉架構

離線數倉

傳統的離線數倉是由 Hive 加上 HDFS 的方案,Hive 數倉有著成熟和穩定的大數據分析能力,結合調度和上下游工具,構建一個完整的數據處理分析平台,流程如下:

  • Flume 把數據導入 Hive 數倉
  • 調度工具,調度 ETL 作業進行數據處理
  • 在 Hive 數倉的表上,可以進行靈活的 Ad-hoc 查詢
  • 調度工具,調度聚合作業輸出到BI層的資料庫中

這個流程下的問題是:

  • 導入過程不夠靈活,這應該是一個靈活 SQL 流計算的過程
  • 基於調度作業的級聯計算,實時性太差
  • ETL 不能有流式的增量計算

實時數倉

針對離線數倉的特點,隨著實時計算的流行,越來越多的公司引入實時數倉,實時數倉基於 Kafka + Flink streaming,定義全流程的流計算作業,有著秒級甚至毫秒的實時性。

但是,實時數倉的一個問題是歷史數據只有 3-15 天,無法在其上做 Ad-hoc 的查詢。如果搭建 Lambda 的離線+實時的架構,維護成本、計算存儲成本、一致性保證、重複的開發會帶來很大的負擔。

Hive 實時化

Flink 1.11 為解決離線數倉的問題,給 Hive 數倉帶來了實時化的能力,加強各環節的實時性的同時,又不會給架構造成太大的負擔。

Hive streaming sink

實時數據導入 Hive 數倉,你是怎麼做的?Flume、Spark Streaming 還是 Flink Datastream?千呼萬喚,Table / SQL 層的 streaming file sink 來啦,Flink 1.11 支持 Filesystem connector 和 Hive connector 的 streaming sink。

(註:圖中 StreamingFileSink 的 Bucket 概念就是 Table/SQL 中的 Partition)

Table/SQL 層的 streaming sink 不僅:

  • 帶來 Flink streaming 的實時/准實時的能力
  • 支持 Filesystem connector 的全部 formats(csv,json,avro,parquet,orc)
  • 支持 Hive table 的所有 formats
  • 繼承 Datastream StreamingFileSink 的所有特性:Exactly-once、支持HDFS, S3

而且引入了新的機制:Partition commit。

一個合理的數倉的數據導入,它不止包含數據文件的寫入,也包含了 Partition 的可見性提交。當某個 Partition 完成寫入時,需要通知 Hive metastore 或者在文件夾內添加 SUCCESS 文件。Flink 1.11 的 Partition commit 機制可以讓你:

  • Trigger:控制Partition提交的時機,可以根據Watermark加上從Partition中提取的時間來判斷,也可以通過Processing time來判斷。你可以控制:是想先儘快看到沒寫完的Partition;還是保證寫完Partition之後,再讓下游看到它。
  • Policy:提交策略,內置支持SUCCESS文件和Metastore的提交,你也可以擴展提交的實現,比如在提交階段觸發Hive的analysis來生成統計信息,或者進行小文件的合併等等。

一個例子:

-- 結合Hive dialect使用Hive DDL語法

SET table.sql-dialect=hive;

CREATE TABLE hive_table (

user_id STRING,

order_amount DOUBLE

) PARTITIONED BY (

dt STRING,

hour STRING

) STORED AS PARQUET TBLPROPERTIES (

-- 使用partition中抽取時間,加上watermark決定partiton commit的時機

'sink.partition-commit.trigger'='partition-time',

-- 配置hour級別的partition時間抽取策略,這個例子中dt欄位是yyyy-MM-dd格式的天,hour是0-23的小時,timestamp-pattern定義了如何從這兩個partition欄位推出完整的timestamp

'partition.time-extractor.timestamp-pattern'=』$dt $hour:00:00』,

-- 配置dalay為小時級,當 watermark > partition時間 + 1小時,會commit這個partition

'sink.partition-commit.delay'='1 h',

-- partitiion commit的策略是:先更新metastore(addPartition),再寫SUCCESS文件

'sink.partition-commit.policy.kind』='metastore,success-file'

SET table.sql-dialect=default;

CREATE TABLE kafka_table (

user_id STRING,

order_amount DOUBLE,

log_ts TIMESTAMP(3),

WATERMARK FOR log_ts AS log_ts - INTERVAL '5' SECOND

-- 可以結合Table Hints動態指定table properties [3]

INSERT INTO TABLE hive_table SELECT user_id, order_amount, DATE_FORMAT(log_ts, 'yyyy-MM-dd'), DATE_FORMAT(log_ts, 'HH') FROM kafka_table;

Hive streaming source

Hive 數倉中存在大量的 ETL 任務,這些任務往往是通過調度工具來周期性的運行,這樣做主要有兩個問題:

  1. 實時性不強,往往調度最小是小時級。
  2. 流程複雜,組件多,容易出現問題。

針對這些離線的 ETL 作業,Flink 1.11 為此開發了實時化的 Hive 流讀,支持:

  • Partition 表,監控 Partition 的生成,增量讀取新的 Partition。
  • 非 Partition 表,監控文件夾內新文件的生成,增量讀取新的文件。

你甚至可以使用10分鐘級別的分區策略,使用 Flink 的 Hive streaming source 和Hive streaming sink 可以大大提高 Hive 數倉的實時性到准實時分鐘級,在實時化的同時,也支持針對 Table 全量的 Ad-hoc 查詢,提高靈活性。

SELECT * FROM hive_table

/*+ OPTIONS('streaming-source.enable'=』true』,

'streaming-source.consume-start-offset'='2020-05-20') */;

實時數據關聯 Hive 表

在 Flink 與 Hive 集成的功能發布以後,我們收到最多的用戶反饋之一就是希望能夠將 Flink 的實時數據與離線的 Hive 表進行關聯。因此,在 Flink 1.11 中,我們支持將實時表與 Hive 表進行 temporal join。沿用 Flink 官方文檔中的例子,假定 Orders 是實時表,而 LatestRates 是一張 Hive 表,用戶可以通過以下語句進行temporal join:

SELECT

o.amout, o.currency, r.rate, o.amount * r.rate

FROM

Orders AS o

JOIN LatestRates FOR SYSTEM_TIME AS OF o.proctime AS r

ON r.currency = o.currency

與 Hive 表進行 temporal join 目前只支持 processing time,我們會把 Hive 表的數據緩存到內存中,並按照固定的時間間隔去更新緩存的數據。用戶可以通過參數「lookup.join.cache.ttl」 來控制緩存更新的間隔,默認間隔為一個小時。

「lookup.join.cache.ttl」 需要配置到 Hive 表的 property 當中,因此每張表可以有不同的配置。另外,由於需要將整張 Hive 表加載到內存中,因此目前只適用於 Hive 表較小的場景。

Hive 增強

Hive Dialect 語法兼容

Flink on Hive 用戶並不能很好的使用 DDL,主要是因為:

  • Flink 1.10 中進一步完善了 DDL,但由於 Flink 與 Hive 在元數據語義上的差異,通過 Flink DDL 來操作 Hive 元數據的可用性比較差,僅能覆蓋很少的應用場景。
  • 使用 Flink 對接 Hive 的用戶經常需要切換到 Hive CLI 來執行 DDL。

針對上述兩個問題,我們提出了 FLIP-123,通過 Hive Dialect 為用戶提供 Hive語法兼容。該功能的最終目標,是為用戶提供近似 Hive CLI/Beeline 的使用體驗,讓用戶無需在 Flink 和 Hive 的 CLI 之間進行切換,甚至可以直接遷移部分 Hive 腳本到 Flink 中執行。

在 Flink 1.11中,Hive Dialect 可以支持大部分常用的 DDL,比如 CREATE/ALTER TABLE、CHANGE/REPLACE COLUMN、ADD/DROP PARTITION 等等。為此,我們為 Hive Dialect 實現了一個獨立的 parser,Flink 會根據用戶指定的 Dialect 決定使用哪個 parser 來解析 SQL 語句。用戶可以通過配置項「 table.sql-dialect 」 來指定使用的 SQL Dialect。它的默認值為 「default」,即 Flink 原生的 Dialect,而將其設置為 「hive」 時就開啟了 Hive Dialect。對於 SQL 用戶,可以在 yaml 文件中設置「table.sql-dialect」 來指定 session 的初始 Dialect,也可以通過 set 命令來動態調整需要使用的 Dialect,而無需重啟 session。

Hive Dialect 目前所支持的具體功能可以參考 FLIP-123 或 Flink 的官方文檔。另外,該功能的一些設計原則和使用注意事項如下:

  1. Hive Dialect 只能用於操作 Hive 表,而不是 Flink 原生的表(如 Kafka、ES 的表),這也意味著 Hive Dialect 需要配合 HiveCatalog 使用。
  2. 使用 Hive Dialect 時,原有的 Flink 的一些語法可能會無法使用(例如 Flink 定義的類型別名),在需要使用 Flink 語法時可以動態切換到默認的 Dialect。
  3. Hive Dialect 的 DDL 語法定義基於 Hive 的官方文檔,而不同 Hive 版本之間語法可能會有輕微的差異,需要用戶進行一定的調整。
  4. Hive Dialect 的語法實現基於 Calcite,而 Calcite 與 Hive 有不同的保留關鍵字。因此,某些在 Hive 中可以直接作為標識符的關鍵字(如 「default」 ),在Hive Dialect 中可能需要用「`」進行轉義。

向量化讀取

Flink 1.10中,Flink 已經支持了 ORC (Hive 2+) 的向量化讀取支持,但是這很局限,為此,Flink 1.11 增加了更多的向量化支持:

  • ORC for Hive 1.x
  • Parquet for Hive 1,2,3

也就是說已經補全了所有版本的 Parquet 和 ORC 向量化支持,默認是開啟的,提供開關。

簡化 Hive 依賴

Flink 1.10 中,Flink 文檔中列出了所需的 Hive 相關依賴,推薦用戶自行下載。但是這仍然稍顯麻煩,所以在1.11 中,Flink 提供了內置的依賴支持:

  • flink-sql-connector-hive-1.2.2_2.11-1.11.jar:Hive 1 的依賴版本。
  • flink-sql-connector-hive-2.2.0_2.11-1.11.jar:Hive 2.0 - 2.2 的依賴版本。
  • flink-sql-connector-hive-2.3.6_2.11-1.11.jar:Hive 2.3 的依賴版本。
  • flink-sql-connector-hive-3.1.2_2.11-1.11.jar:Hive 3 的依賴版本。

現在,你只需要單獨下一個包,再搞定 HADOOP_CLASSPATH,即可運行 Flink on Hive。

Flink 增強

除了 Hive 相關的 features,Flink 1.11 也完成了大量其它關於流批一體的增強。

Flink Filesystem connector

Flink table 在長久以來只支持一個 csv 的 file system table,而且它還不支持Partition,行為上在某些方面也有些不符合大數據計算的直覺。

在 Flink 1.11,重構了整個 Filesystem connector 的實現 :

  • 結合 Partition,現在,Filesystem connector 支持 SQL 中 Partition 的所有語義,支持 Partition 的 DDL,支持 Partition Pruning,支持靜態/動態 Partition 的插入,支持 overwrite 的插入。
  • 支持各種 Formats:
  • CSV
  • JSON
  • Aparch AVRO
  • Apache Parquet
  • Apache ORC.
  • 支持 Batch 的讀寫。
  • 支持 Streaming sink,也支持上述 Hive 支持的 Partition commit,支持寫Success 文件。

例子:

CREATE TABLE fs_table (

user_id STRING,

order_amount DOUBLE,

dt STRING,

hour STRING

) PARTITIONED BY (dt, hour) WITH (

』connector』=』filesystem』,

』path』=』...』,

』format』=』parquet』,

'partition.time-extractor.timestamp-pattern'=』$dt $hour:00:00』,

'sink.partition-commit.delay'='1 h',

『sink.partition-commit.policy.kind』='success-file')

-- stream environment or batch environment

INSERT INTO TABLE fs_table SELECT user_id, order_amount, DATE_FORMAT(log_ts, 'yyyy-MM-dd'), DATE_FORMAT(log_ts, 'HH') FROM kafka_table;

-- 通過 Partition 查詢

SELECT * FROM fs_table WHERE dt=』2020-05-20』 and hour=』12』;

引入 Max Slot

Yarn perJob 或者 session 模式在 1.11 之前是無限擴張的,沒有辦法限制它的資源使用,只能用 Yarn queue 等方式來限制。但是傳統的批作業其實都是大並發,運行在局限的資源上,一部分一部分階段性的運行,為此,Flink 1.11 引入 Max Slot 的配置[11],限制 Yarn application 的資源使用。

slotmanager.number-of-slots.max

定義 Flink 集群分配的最大 Slot 數。此配置選項用於限制批處理工作負載的資源消耗。不建議為流作業配置此選項,如果沒有足夠的 Slot,則流作業可能會失敗。

想更進一步了解Flink?戳下方連結,報名《實時計算Flink極客訓練營》,獲取阿里雲大數據獨門絕學!

https://developer.aliyun.com/learning/trainingcamp/sc/2?utm_content=g_1000181368

文章來源: https://twgreatdaily.com/zh-cn/PgpAkXQBeElxlkkakYoo.html