亞馬遜云科技Amazon MSK是Amazon云平臺提供的托管Kafka服務。在系統(tǒng)升級或遷移時,用戶常常需要將一個Amazon MSK集群中的數(shù)據(jù)導出(備份),然后在新集群或另一個集群中再將數(shù)據(jù)導入(還原)。通常,Kafka集群間的數(shù)據(jù)復制和同步多采用Kafka MirrorMaker,但是,在某些場景中,受環(huán)境限制,兩個于Kafka集群之間的網(wǎng)絡可能無法連通,或者兩個亞馬遜云科技賬號相互隔離,亦或是需要將Kafka的數(shù)據(jù)沉淀為文件存儲以備他用。此時,基于Kafka Connect S3 Source/Sink Connector的方案會是一種較為合適的選擇,本文就將介紹一下這一方案的具體實現(xiàn)。
?數(shù)據(jù)的導出、導入、備份、還原通常都是一次性操作,為此搭建完備持久的基礎設施并無太大必要,省時省力,簡單便捷才是優(yōu)先的考量因素。為此,本文將提供一套開箱即用的解決方案,方案使用Docker搭建Kafka Connect,所有操作均配備自動化Shell腳本,用戶只需設置一些環(huán)境變量并執(zhí)行相應腳本即可完成全部工作。這種基于Docker的單體模式可以應對中小型規(guī)模的數(shù)據(jù)同步和遷移,如果要尋求穩(wěn)定、健壯的解決方案,可以考慮將Docker版本的Kafka Connect遷移到Kubernetes或Amazon MSK Connect,實現(xiàn)集群化部署。
?整體架構
?首先介紹一下方案的整體架構。導出/導入和備份/還原其實是兩種高度類似的場景,但為了描述清晰,我們還是分開討論。先看一下導出/導入的架構示意圖:
?在這個架構中,Source端的MSK是數(shù)據(jù)流的起點,安裝了S3 Sink Connector的Kafka Connect會從Source端的MSK中提取指定Topic的數(shù)據(jù),然后以Json或Avro文件的形式存儲到S3上;同時,另一個安裝了S3 Source Connector的Kafka Connect會從S3上讀取這些Json或Avro文件,然后寫入到Sink端MSK的對應Topic中。如果Source端和Sink端的MSK集群不在同一個Region,可以在各自的Region分別完成導入和導出,然后在兩個Region之間使用S3的Cross-Rejion Replication進行數(shù)據(jù)同步。
?該架構只需進行簡單的調整,即可用于MSK集群的備份/還原,如下圖所示:先將MSK集群的數(shù)據(jù)備份到S3上,待完成集群的升級、遷移或重建工作后,再從S3上將數(shù)據(jù)恢復到新建集群即可。
?預設條件
?本文聚焦于Kafka Connect的數(shù)據(jù)導出/導入和備份/還原操作,需要提前準備:
?一臺基于Amazon Linux2的EC2實例(建議新建純凈實例),本文所有的實操腳本都將在該實例上執(zhí)行,該實例也是運行Kafka Connect Docker Container的宿主機。
?兩個MSK集群,一個作為Source,一個作為Sink;如果只有一個MSK集群也可完成驗證,該集群將既作Source又作Sink。
?為聚焦Kafka Connect S3 Source/Sink Connector的核心配置,預設MSK集群沒有開啟身份認證(即認證類型為Unauthenticated),數(shù)據(jù)傳輸方式為PLAINTEXT,以便簡化Kafka Connect的連接配置。
?網(wǎng)絡連通性上要求EC2實例能訪問S3、Source端MSK集群、Sink端MSK集群。如果在實際環(huán)境中無法同時連通Source端和Sink端,則可以在兩臺分屬于不同網(wǎng)絡的EC2上進行操作,但它們必須都能訪問S3。如果是跨Region或賬號隔離,則另需配置S3 Cross-Region Replication或手動拷貝數(shù)據(jù)文件。
?全局配置
?由于實際操作將不可避免地依賴到具體的亞馬遜云科技賬號以及本地環(huán)境里的各項信息(如AKSK,服務地址,各類路徑,Topic名稱等),為了保證本文給出的操作腳本具有良好的可移植性,將所有與環(huán)境相關的信息抽離出來,以全局變量的形式在實操前集中配置。以下就是全局變量的配置腳本,讀者需要根據(jù)個人環(huán)境設定這些變量的取值:
?為了便于演示和解讀,本文將使用下面的全局配置,其中前6項配置與賬號和環(huán)境強相關,仍需用戶自行修改,腳本中給出的僅為示意值,而后5項配置與MSK數(shù)據(jù)的導入導出息息相關,不建議修改,因為后續(xù)的解讀將基于這里設定的值展開,待完成驗證后,您可再根據(jù)需要靈活修改后5項配置以完成實際的導入導出工作。
?回到操作流程,登錄準備好的EC2實例,修改下面腳本中與賬號和環(huán)境相關的前6項配置,然后執(zhí)行修改后的腳本。此外,需要提醒注意的是:在后續(xù)操作中,部分腳本執(zhí)行后將不再返回,而是持續(xù)占用當前窗口輸出日志或Kafka消息,因此需要新開命令行窗口,每次新開窗口都需要執(zhí)行一次這里的全局配置腳本。
?關于上述腳本中的后5項配置,有如下詳細說明:
?我們就以腳本中設定的值為例,解讀一下這5項配置聯(lián)合起來將要實現(xiàn)的功能,同時也是本文將演示的主要內容:
?在Source端的MSK集群上存在兩個名為source-topic-1和source-topic-2的Topic,通過安裝有S3 Sink Connector的Kafka Connect(Docker容器)將兩個Topic的數(shù)據(jù)導出到S3的指定存儲桶中,然后再通過安裝有S3 Source Connector的Kafka Connect(Docker容器,可以和S3 Source Connector共存為一個Docker容器)將S3存儲桶中的數(shù)據(jù)寫入到Sink端的MSK集群上,其中原source-topic-1的數(shù)據(jù)將被寫入sink-topic-1,原source-topic-2的數(shù)據(jù)將被寫入sink-topic-2。
?特別地,如果是備份/還原場景,需要保持導出/導入的Topic名稱一致,此時,可直接刪除S3 Source Connector中以transforms開頭的4項配置(將在下文中出現(xiàn)),或者將下面兩項改為:
?如果只有一個MSK集群,同樣可以完成本文的驗證工作,只需將SOURCE_KAFKA_BOOTSTRAP_SEVERS和SINK_KAFKA_BOOTSTRAP_SEVERS同時設置為該集群即可,這樣,該集群既是Source端又是Sink端,由于配置中的Source Topics和Sink Topics并不同名,所以不會產生沖突。
?環(huán)境準備
?安裝工具包
?在EC2上執(zhí)行以下腳本,安裝并配置jq,yq,docker,jdk,kafka-console-client五個必須的軟件包,可以根據(jù)自身EC2的情況酌情選擇安裝全部或部分軟件。建議使用純凈的EC2實例,完成全部的軟件安裝:
?創(chuàng)建S3存儲桶
?整個方案以S3作為數(shù)據(jù)轉儲媒介,為此需要在S3上創(chuàng)建一個存儲桶。Source端MSK集群的數(shù)據(jù)將會導出到該桶中并以Json文件形式保存,向Sink端MSK集群導入數(shù)據(jù)時,讀取的也是存儲在該桶中的Json文件。
?在源MSK上創(chuàng)建Source Topics
?為了確保Topics數(shù)據(jù)能完整備份和還原,S3 Source Connector建議Sink Topics的分區(qū)數(shù)最好與Source Topics保持一致,如果讓MSK自動創(chuàng)建Topic,則很有可能會導致Source Topics和Sink Topics的分區(qū)數(shù)不對等,所以,選擇手動創(chuàng)建Source Topics和Sink Topics,并確保它們的分區(qū)數(shù)一致。以下腳本將創(chuàng)建source-topic-1和source-topic-2兩個Topic,各含9個分區(qū):
?在目標MSK上創(chuàng)建Sink Topics
?原因同上,以下腳本將創(chuàng)建:sink-topic-1和sink-topic-2兩個Topic,各含9個分區(qū):
?制作Kafka Connect鏡像
?接下來是制作帶S3 Sink Connector和S3 Source Connector的Kafka Connect鏡像,鏡像和容器均以kafka-s3-syncer命名,以下是具體操作:
?配置并啟動Kafka Connect
?鏡像制作完成后,就可以啟動了Kafka Connect了。Kafka Connect有很多配置項,需要提醒注意的是:在下面的配置中,使用的是Kafka Connect內置的消息轉換器:JsonConverter,如果你的輸入/輸出格式是Avro或Parquet,則需要另行安裝對應插件并設置正確的Converter Class。
?上述腳本執(zhí)行后,命令窗口將不再返回,而是會持續(xù)輸出容器日志,因此下一步操作需要新開一個命令行窗口。
?
?配置并啟動S3 Sink Connector
?在第5節(jié)的操作中,已經將S3 Sink Connector安裝到了Kafka Connect的Docker鏡像中,但是還需要顯式地配置并啟動它。新開一個命令行窗口,先執(zhí)行一遍《實操步驟(1):全局配置》,聲明全局變量,然后執(zhí)行以下腳本:
?配置并啟動S3 Source Connector
?同上,在第5節(jié)的操作中,已經將S3 Source Connector安裝到了Kafka Connect的Docker鏡像中,同樣需要顯式地配置并啟動它:
?至此,整個環(huán)境搭建完畢,一個以S3作為中轉媒介的MSK數(shù)據(jù)導出、導入、備份、還原鏈路已經處于運行狀態(tài)。
?
?測試
?現(xiàn)在,來驗證一下整個鏈路是否能正常工作。首先,使用kafka-console-consumer.sh監(jiān)控source-topic-1和sink-topic-1兩個Topic,然后使用腳本向source-topic-1持續(xù)寫入數(shù)據(jù),如果在sink-topic-1看到了相同的數(shù)據(jù)輸出,就說明數(shù)據(jù)成功地從source-topic-1導出然后又導入到了sink-topic-1中,相應的,在S3存儲桶中也能看到“沉淀”的數(shù)據(jù)文件。
?打開Source Topic
?新開一個命令行窗口,先執(zhí)行一遍《實操步驟(1):全局配置》,聲明全局變量,然后使用如下命令持續(xù)監(jiān)控source-topic-1中的數(shù)據(jù):
?打開Sink Topic
?新開一個命令行窗口,先執(zhí)行一遍《實操步驟(1):全局配置》,聲明全局變量,然后使用如下命令持續(xù)監(jiān)控sink-topic-1中的數(shù)據(jù):
?向Source Topic寫入數(shù)據(jù)
?新開一個命令行窗口,先執(zhí)行一遍《實操步驟(1:全局配置》,聲明全局變量,然后使用如下命令向source-topic-1中寫入數(shù)據(jù):
?現(xiàn)象與結論
?執(zhí)行上述寫入操作后,從監(jiān)控source-topic-1的命令行窗口中可以很快看到寫入的數(shù)據(jù),這說明Source端MSK已經開始持續(xù)產生數(shù)據(jù)了,隨后(約1分鐘),即可在監(jiān)控sink-topic-1的命令行窗口中看到相同的輸出數(shù)據(jù),這說明目標端的數(shù)據(jù)同步也已開始正常工作。此時,打開S3的存儲桶會發(fā)現(xiàn)大量Json文件,這些Json是由S3 Sink Connector從source-topic-1導出并存放到S3上的,然后S3 Source Connector又讀取了這些Json并寫入到了sink-topic-1中,至此,整個方案的演示與驗證工作全部結束。
??清理
?在驗證過程中,可能需要多次調整并重試,每次重試最好恢復到初始狀態(tài),以下腳本會幫助清理所有已創(chuàng)建的資源:
?小結
?本方案主要定位于輕便易用,在S3 Sink Connector和S3 Source Connector中還有很多與性能、吞吐量相關的配置,例如:s3.part.size, flush.size, s3.poll.interval.ms, tasks.max等,可以在實際需要自行調整,此外,Kafka Connect也可以方便地遷移到Kubernetes或Amazon MSK Connect中以實現(xiàn)集群化部署。