圖1 預(yù)計算工作流程
為了實現(xiàn)這個,你的系統(tǒng)需要:
能很容易存儲大的、不斷增長的數(shù)據(jù)集;能在數(shù)據(jù)集上可擴(kuò)展地計算查詢函數(shù)。
這樣的系統(tǒng)是存在的,即Hadoop。它是一個成熟的、經(jīng)歷了無數(shù)團(tuán)隊實戰(zhàn)檢驗過的系統(tǒng),同時擁有一個巨大的工具生態(tài)系統(tǒng)。它雖不完美,但是這里用來做批量處理的最好的一個工具。
許多人也許會告訴你,Hadoop只適用于那些“非結(jié)構(gòu)化”的數(shù)據(jù),這是完全錯誤的看法。Hadoop處理“結(jié)構(gòu)化”的數(shù)據(jù)也很不錯,通過使用像Thrift或者Protocol Buffers這樣的工具,你可以使用豐富的數(shù)據(jù)結(jié)構(gòu)存儲你的數(shù)據(jù)。
Hadoop由分布式文件系統(tǒng)HDFS和批處理框架MapReduce兩部分構(gòu)成。HDFS可以通過文件存儲大量數(shù)據(jù),MapReduce可以在這樣數(shù)據(jù)上進(jìn)行可擴(kuò)展計算。這個系統(tǒng)完全符合我們的要求。
我們將數(shù)據(jù)以文件形式存儲到HDFS中去。文件可以包括一個數(shù)據(jù)記錄序列。新增數(shù)據(jù)時,我們只需要在包括所有數(shù)據(jù)的文件夾中新增一個包含這條新記錄的文件即可。像這樣在HDFS存儲數(shù)據(jù)滿足了“能夠很容易存儲大的、不斷增長的數(shù)據(jù)集”這個要求。
預(yù)計算數(shù)據(jù)集上的查詢也很直觀,MapReduce是一個足夠復(fù)雜的框架,使得幾乎所有的函數(shù)都可以按照多個MapReduce任務(wù)這種方式實現(xiàn)。像Cascalog、Cascading和Pig這樣的工具使實現(xiàn)這些函數(shù)變得十分簡單。
最后,為了可以快速訪問這些預(yù)計算查詢結(jié)果,你需要對查詢結(jié)果進(jìn)行索引,這里有許多數(shù)據(jù)庫可以完成這個工作。ElephantDB和 Voldemort read-only可以通過從Hadoop中導(dǎo)出key/value數(shù)據(jù)來加快查詢速度。這些數(shù)據(jù)庫支持批量寫和隨機(jī)讀,同時不支持隨機(jī)寫。隨機(jī)寫使得數(shù)據(jù)庫變得復(fù)雜,所以通過不支持隨機(jī)寫,這些數(shù)據(jù)庫設(shè)計得特別簡潔,也就幾千行代碼而已。簡潔使得這些數(shù)據(jù)庫魯棒性變得非常好。
下面來看批量處理系統(tǒng)整體上是如何配合工作的。假設(shè)寫一個網(wǎng)站分析程序來跟蹤頁面訪問量,你需要能夠查詢到任意時間段的頁面訪問量,數(shù)據(jù)是以小時方式提供的。如圖2所示。
圖2 批處理工程流程示例(timestamp代表時間戳,count代表個數(shù))
實現(xiàn)這個很簡單,每一個數(shù)據(jù)記錄包括一個單一頁面的訪問量。這些數(shù)據(jù)通過文件形式存儲到HDFS中,一個函數(shù)通過實現(xiàn)MapReduce計算任務(wù),來計算一個URL下頁面每小時的訪問量。這個函數(shù)產(chǎn)生的是key/value對,其中[URL, hour]是key,value是頁面的訪問量。這些key/value對被導(dǎo)出到ElephantDB中去,使得應(yīng)用程序可以快速得到任意[URL, hour]對對應(yīng)的值。如果應(yīng)用程序想要知道某個時間范圍內(nèi)某個頁面的訪問量,它可以查詢ElephantDB中那段時間內(nèi)的數(shù)據(jù),然后把這些數(shù)據(jù)相加就可以得到這個訪問量數(shù)據(jù)了。
在數(shù)據(jù)滯后幾小時這個缺陷下,批量處理可以計算任意數(shù)據(jù)集上的任意函數(shù)。系統(tǒng)中的“任意性”是指這個系統(tǒng)可以處理任何問題。更重要的是,它很簡單,容易理解和完全可擴(kuò)展,你需要考慮的只是數(shù)據(jù)和查詢函數(shù),Hadoop會幫你處理并行的事情。
批處理系統(tǒng)、CAP定理和容忍人為錯誤
截至目前,我們的系統(tǒng)都很不錯,這個批處理系統(tǒng)是不是可以達(dá)到容忍人為錯誤的目標(biāo)呢?
讓我們從CAP定理開始。這個批處理系統(tǒng)總是最終一致的:寫入的數(shù)據(jù)總可以在幾小時后被查詢到。這個系統(tǒng)是一個很容易掌控的最終一致性系統(tǒng),使得你可以只用關(guān)注你的數(shù)據(jù)和針對數(shù)據(jù)的查詢函數(shù)。這里沒有涉及讀取修復(fù)、并發(fā)和其他一些需要考慮的復(fù)雜問題。
接下來看看這個系統(tǒng)對人為錯誤的容忍性。在這個系統(tǒng)中人們可能會犯兩個錯誤:部署了一個有Bug的查詢函數(shù)或者寫入了錯誤的數(shù)據(jù)。
如果部署了一個有Bug的查詢函數(shù),需要做的所有事情就是修正那個Bug,重新部署這個查詢函數(shù),然后在主數(shù)據(jù)集上重新計算它。這之所以能起作用是因為查詢只是一個函數(shù)而已。
另外,錯誤的數(shù)據(jù)有明確的辦法可以恢復(fù):刪除錯誤數(shù)據(jù),然后重新計算查詢。由于數(shù)據(jù)是不可變的,而且數(shù)據(jù)集只是往后添加新數(shù)據(jù),寫入錯誤的數(shù)據(jù)不會覆蓋或者刪除正確的數(shù)據(jù),這與傳統(tǒng)數(shù)據(jù)庫更新一個數(shù)據(jù)就丟掉舊的數(shù)據(jù)形成了鮮明的對比。
注意到MVCC和HBase類似的行版本管理并不能達(dá)到上面人為錯誤容忍級別。MVCC和HBase行版本管理不能永久保存數(shù)據(jù),一旦數(shù)據(jù)庫合并了這些版本,舊的數(shù)據(jù)就會丟失。只有不可變數(shù)據(jù)系統(tǒng)能夠保證你在寫入錯誤數(shù)據(jù)時可以找到一個恢復(fù)數(shù)據(jù)的方法。
實時層
上面的批量處理系統(tǒng)幾乎完全解決了在任意數(shù)據(jù)集上運(yùn)行任意函數(shù)的實時性需求。任何超過幾個小時的數(shù)據(jù)已經(jīng)被計算進(jìn)入了批處理視圖中,所以剩下來要做的就是處理最近幾個小時的數(shù)據(jù)。我們知道在最近幾小時數(shù)據(jù)上進(jìn)行查詢比在整個數(shù)據(jù)集上查詢要容易,這是關(guān)鍵點(diǎn)。
為了處理最近幾個小時的數(shù)據(jù),需要一個實時系統(tǒng)和批處理系統(tǒng)同時運(yùn)行。這個實時系統(tǒng)在最近幾個小時數(shù)據(jù)上預(yù)計算查詢函數(shù)。要計算一個查詢函數(shù),需要查詢批處理視圖和實時視圖,并把它們合并起來以得到最終的數(shù)據(jù)。
圖3 計算一個查詢
在實時層,可以使用Riak或者Cassandra這種讀寫數(shù)據(jù)庫,而且實時層依賴那些數(shù)據(jù)庫中對狀態(tài)更新的增量算法。
讓Hadoop模擬實時計算的工具是Storm。我寫Storm的目的是讓Hadoop可以健壯、可擴(kuò)展地處理大量的實時數(shù)據(jù)。Storm在數(shù)據(jù)流上運(yùn)行無限的計算,并且對這些數(shù)據(jù)處理提供了強(qiáng)有力的保障。
讓我們回到剛才那個根據(jù)某個URL查詢某個頁面在某個時間段內(nèi)頁面訪問量的例子,通過這個例子我將展示實時層是如何工作的。
批處理系統(tǒng)還是跟之前一樣:一個基于Hadoop和ElephantDB的批處理工作流,在幾個小時之前的數(shù)據(jù)上預(yù)計算查詢函數(shù)。剩下就是讓實時系統(tǒng)去處理最近幾小時數(shù)據(jù)了。
我們將最近幾小時的數(shù)據(jù)狀態(tài)存入Cassandra中,用Storm去處理頁面訪問量數(shù)據(jù)流并并行更新到數(shù)據(jù)庫中,針對每一個頁面訪問量,在 [URL, hour]所代表的key下,有一個計數(shù)器,這個計數(shù)器在Cassandra中實現(xiàn)。這就是所有的事情,Storm讓事情變得非常簡單。
圖4 批處理/實時架構(gòu)示例
批處理層+實時層、CAP定理和人為錯誤容忍性
貌似又回到一開始提出的問題上去了,訪問實時數(shù)據(jù)需要使用NoSQL數(shù)據(jù)庫和增量算法。這就說明回到了版本化數(shù)據(jù)、矢量時鐘和讀取修復(fù)這些復(fù)雜問題中來。但這是有本質(zhì)區(qū)別的。由于實時層只處理最近幾小時的數(shù)據(jù),所有實時層的計算都會被最終批處理層重新計算。所以如果犯了什么錯誤或者實時層出了問題,最終都會被批處理層更正過來,所有復(fù)雜的問題都是暫時的。
這并不意味著不需要關(guān)心實時層的讀取修復(fù)和最終一致性,你仍然需要實時層盡可能的一致。但當(dāng)犯了一個錯誤時,不會永久性地破壞數(shù)據(jù)。這便移除了許多你所需要面對的復(fù)雜問題。
在批處理層僅需要考慮數(shù)據(jù)和數(shù)據(jù)上的查詢函數(shù),批處理層因此很好掌控。在實時層,需要使用增量算法和復(fù)雜的NoSQL數(shù)據(jù)庫。把所有的復(fù)雜問題獨(dú)立到實時層中,對系統(tǒng)的魯棒性、可靠性做出了重大貢獻(xiàn)。
同樣的,實時層并沒有影響系統(tǒng)的人為錯誤容忍性,這個數(shù)據(jù)不可變和只追加的批處理系統(tǒng),仍然是整個系統(tǒng)的核心,所以所有的都可以像上面說的一樣被糾正過來。
我有一個類似的系統(tǒng):Hadoop和ElephantDB組成批處理系統(tǒng),Storm和Cassandra組成實時系統(tǒng)。由于缺乏監(jiān)控,某天當(dāng)我起床的時候發(fā)現(xiàn)Cassandra運(yùn)行滿負(fù)荷了,使得所有的數(shù)據(jù)請求都超時。這使得Storm計算失敗,一些數(shù)據(jù)又重新回到了等待隊列中,這個數(shù)據(jù)就一次次重復(fù)請求。
如果我沒有批處理層,那么我就需要擴(kuò)展和恢復(fù)Cassandra,這個很不容易。更糟的是,因為請求不斷的重復(fù),無法得到正確的數(shù)據(jù)。
幸運(yùn)的是,所有的復(fù)雜問題都被隔離到實時層中去了,我清空了所有的后臺請求隊列,把它們打到了批處理層上,同時重啟了Cassandra集群,過了幾個小時之后所有數(shù)據(jù)都恢復(fù)正常了。沒有錯誤數(shù)據(jù),請求中也沒有不準(zhǔn)確的地方。
垃圾回收
上面描述的所有東西都是建立在一個不可變的、不斷增長的數(shù)據(jù)集上的。如果數(shù)據(jù)集已經(jīng)很大,使得不可能用水平擴(kuò)展儲存所有時間的所有數(shù)據(jù),該如何處理呢?這是不是就推翻了我說的一切呢?是不是需要回到可變數(shù)據(jù)的系統(tǒng)上呢?
不。我們可以很容易地用“垃圾回收”對基本模型進(jìn)行擴(kuò)展來解決上面的問題。垃圾回收是一個在主數(shù)據(jù)集上的簡單函數(shù),返回的是一個過濾版本的主數(shù)據(jù)集。垃圾回收掉了舊數(shù)據(jù),可以選擇任意的垃圾回收策略??梢栽谝鬃兊南到y(tǒng)中只保留數(shù)據(jù)最新的一個值或者保留每個數(shù)據(jù)的歷史。比如,如果要處理位置數(shù)據(jù),可以保留每人每年的一個地點(diǎn)??勺冃允且粋€不是很靈活的垃圾回收形式(它跟CAP定理交互得也很糟糕)。
垃圾回收可以被實現(xiàn)成批處理的一個任務(wù),隔段時間運(yùn)行一下。由于它是作為離線批處理任務(wù)執(zhí)行的,所以不影響我們與CAP定理的交互。
總結(jié)
讓可擴(kuò)展的數(shù)據(jù)系統(tǒng)復(fù)雜的原因不是CAP系統(tǒng),而是數(shù)據(jù)增量算法和數(shù)據(jù)的可變狀態(tài)。最近由于分布式數(shù)據(jù)庫的興起導(dǎo)致了復(fù)雜度越來越不可控。前面講過,我將挑戰(zhàn)對傳統(tǒng)數(shù)據(jù)系統(tǒng)構(gòu)建方法的假設(shè)。我把CRUD變成了CR,把持久化層分成了批處理和實時兩個層,并且得到對人為錯誤容忍的能力。我花費(fèi)了多年來之不易的經(jīng)驗打破我對傳統(tǒng)數(shù)據(jù)庫的假設(shè),并得到了這些結(jié)論。
批處理/實時架構(gòu)有許多有趣的能力我并沒有提到,下面我總結(jié)了一些。
算法的靈活性。隨著數(shù)據(jù)量的增長,一些算法會越來越難計算。比如計算標(biāo)識符的數(shù)量,當(dāng)標(biāo)識符集合越來越大時,將會越來越難計算。批處理/實時分離系統(tǒng)給了你在批處理系統(tǒng)上使用精確算法和在實時系統(tǒng)上使用近似算法的靈活性。批處理系統(tǒng)計算結(jié)果會最終覆蓋實時系統(tǒng)的計算結(jié)果,所以最終近似值會被修正,而你的系統(tǒng)擁有了“最終精確性”。
數(shù)據(jù)結(jié)構(gòu)遷移變得很容易。數(shù)據(jù)結(jié)構(gòu)遷移的難題將一去不復(fù)返。由于批量計算是系統(tǒng)的核心,很容易在整個系統(tǒng)上運(yùn)行一個函數(shù),所以很容易更改你數(shù)據(jù)的結(jié)構(gòu)或者視圖。
簡單的Ad-Hoc網(wǎng)絡(luò)。由于批處理系統(tǒng)的任意性,使得你可以在數(shù)據(jù)上進(jìn)行任意查詢。由于所有的數(shù)據(jù)在一個點(diǎn)上都可以獲取,所以Ad-Hoc網(wǎng)絡(luò)變得簡單而且方便。
自我檢查。由于數(shù)據(jù)是不可變的,數(shù)據(jù)集就可以自我檢查。數(shù)據(jù)集記錄了它的數(shù)據(jù)歷史,對于人為錯誤容忍性和數(shù)據(jù)分析很有用。
我并沒有說我已經(jīng)“解決”了數(shù)據(jù)量過大的問題,但我已經(jīng)為解決大數(shù)據(jù)問題制訂了一個框架。批處理/實時架構(gòu)可以應(yīng)用到任何一個數(shù)據(jù)系統(tǒng)中去,“授人以魚,不如授人以漁”,我已經(jīng)告訴你了如何去構(gòu)建這樣的系統(tǒng)。
為了提高系統(tǒng)整體能力來解決大數(shù)據(jù)的問題,我們還有許多工作需要做。
擴(kuò)展數(shù)據(jù)模型,支持批量寫和隨機(jī)讀。不是每一個應(yīng)用程序都支持key/value的數(shù)據(jù)庫,這也是我們團(tuán)隊對擴(kuò)展ElephantDB,使得可以支持搜索、文檔數(shù)據(jù)庫、區(qū)間查詢感興趣的原因。更好的批處理原語。Hadoop并不是批處理的最終形態(tài),好多批處理計算Hadoop效率不高。Spark是一個有意思的擴(kuò)展MapReduce的項目。提升后的讀寫NoSQL數(shù)據(jù)庫。這里不同類型數(shù)據(jù)的數(shù)據(jù)庫還有很大的提升空間,隨著這些數(shù)據(jù)庫的成熟,它們將收獲很多。高層級的抽象。未來工作中最有意思的就是對批處理模塊和實時處理模塊的高層次抽象,在批處理和實時架構(gòu)下你沒有理由不擁有一個簡單的、描述性的、魯棒性好的語言。
許多人需要一個可擴(kuò)展的關(guān)系型數(shù)據(jù)庫,本文就是想讓你知道完全不需要那個。大數(shù)據(jù)量和NoSQL運(yùn)動使數(shù)據(jù)管理比RDBMS更加復(fù)雜。那僅僅是因為我對大數(shù)據(jù)的處理采用了跟RDBMS同樣的方法:把數(shù)據(jù)和視圖混為一談,并且依賴增量算法。大數(shù)據(jù)量需要采用完全不同的方式構(gòu)建數(shù)據(jù)系統(tǒng)。通過存儲持續(xù)增長的不可變數(shù)據(jù),并且系統(tǒng)核心采用預(yù)計算,大數(shù)據(jù)系統(tǒng)就可以變得比關(guān)系型數(shù)據(jù)庫更易掌控,并且可擴(kuò)展性很強(qiáng)。