數據源、storm應用、結果集。storm應用從數據源讀取數據

2018-02-28 17:12:29 ortotra 22

有贊使用storm已經有將近3年時間,穩定支撐著實時統計、數據同步、對賬、監控、風控等業務。訂單實時統計是其中一個典型的業務,對數據準確性、性能等方面都有較高要求,也是上線時間最久的一個實時計算應用。通過訂單實時統計,描述使用storm時,遇到的準確性、性能、可靠性等方面的問題。


訂單實時統計的演進

第一版:流程走通

在使用storm之前,顯示實時統計數據一般有兩種方案:


在數據庫里執行count、sum等聚合查詢,是簡單快速的實現方案,但容易出現慢查詢。

在業務代碼里對統計指標做累加,可以滿足指標的快速查詢,但統計邏輯耦合到業務代碼,維護不方便,而且錯誤數據定位和修正不方便。

既要解耦業務和統計,也要滿足指標快速查詢,基于storm的實時計算方案可以滿足這兩點需求。


一個storm應用的基本結構有三部分:數據源、storm應用、結果集。storm應用從數據源讀取數據,經過計算后,把結果持久化或發送消息給其他應用。




第一版的訂單實時統計結構如下圖。在數據源方面,最早嘗試在業務代碼里打日志的方式,但總有業務分支無法覆蓋,采集的數據不全。我們的業務數據庫是mysql,隨后嘗試基于mysql binlog的數據源,采用了阿里開源的canal,可以做到完整的收集業務數據變更。


在結果數據的處理上,我們把統計結果持久化到了mysql,并通過另一個后臺應用的RESTful API對外提供服務,一個mysql就可以滿足數據的讀寫需求。




為了提升實時統計應用吞吐量,需要提升消息的并發度。spout里設置了消息緩沖區,只要消息緩沖區不滿,就會源源不斷從消息源canal拉取數據,并把分發到多個bolt處理。


第二版:性能提升

第一版的性能瓶頸在統計結果持久化上。為了確保數據的準確性,把所有的統計指標持久化放在一個數據庫事務里。一筆訂單狀態更新后,會在一個事務里有兩類操作:


訂單的歷史狀態也在數據庫里存著,要與歷史狀態對比決定統計邏輯,并把最新的狀態持久化。storm的應用本身是無狀態的,需要使用存儲設備記錄狀態信息

當大家知道實時計算好用后,各產品都希望有實時數據,統計邏輯越來越復雜。店鋪、商品、用戶等多個指標的寫操作都是在一個事務里commit,這一簡單粗暴的方式早期很好滿足的統計需求,但是對于update操作持有鎖時間過長,嚴重影響了并發能力。

為此做了數據庫事務的瘦身:


去除歷史狀態的mysql持久化,而是通過單條binlog消息的前后狀態對比,決定統計邏輯,這樣就做到了統計邏輯上的無狀態。但又產生了新問題,如何保證消息有且只有處理一次,為此引入了一個redis用于保存最近24小時內已成功處理的消息binlog偏移量,而storm的消息分發機制又可以保證相同消息總是能分配到一個bolt,避免線程安全問題。

統計業務拆分,先是線上業務和公司內部業務分離,隨后又把線上業務按不同產品拆分。這個不僅僅是bolt級別的拆分,而是在spout就完全分開

隨著統計應用拆分,在canal和storm應用之間加上消息隊列。canal不支持多消費者,而實時統計業務也不用關系數據庫底層遷移、主從切換等維護工作,加上消息隊列能把底層數據的維護和性能優化交給更專業的團隊來做。

熱點數據在mysql里做了分桶。比如,通常一個店鋪天級別的統計指標在mysql里是一行數據。如果這個店鋪有突發的大量訂單,會出現多個bolt同時去update這行數據,出現數據熱點,mysql里該行數據的鎖競爭異常激烈。我們把這樣的熱點數據做了分桶,實驗證明在特定場景下可以有一個數量級吞吐量提升。

最終,第二版的訂單實時統計結構如下,主要變化在于引入了MQ,并使用redis作為消息狀態的存儲。而且由最初的一個應用,被拆成了多個應用。




第三版:準確性提升

經過第二版的優化,實時統計的吞吐量已經不成問題,但還是遇到了做大數據最重要的準確性的問題:


統計口徑是會變化的,同樣是GMV,一年前和現在的算法可能有變化。例如一筆貨到付款訂單,是買家下單算成交,還是賣家發貨成交,在不同的時期可能使用不同的算法。

實時統計只能按照當時的算法來做計算。有可能出現一段時間周期內的GMV,前一段是按舊算法來計算,后一段按新算法來計算,提供的數據就不準確了。

實時統計難免會出現bug,有不準確的結果,修復錯誤數據是個難題。

為了解決這個問題,凡是涉及到兩天以前數據的,一律由離線計算提供,最終展示給用戶的數據,就是歷史離線統計數據,并上今日昨日實時統計數據。為什么是今日昨日實時統計呢?因為離線統計有數據準備、建模、統計的過程,要花費幾個小時,每天的凌晨很可能還得不到前一天的離線統計結果。


一旦統計口徑有變化,只需要重跑離線統計任務就可修復歷史數據,做到了冷熱數據分離。




實時計算的常見問題

通過訂單實時統計的案例,可以抽象出一些基于storm實時計算的共性問題。


消息狀態管理

storm不提供消息狀態管理,而且為了達到水平擴展,最好是消息之間無狀態。對于大數據量、低精度的應用,需要做到無狀態。而像訂單實時統計這樣數據量不算太大,但精度要求極高的場景,需要記錄消息處理狀態。而為了應付重啟、分布式擴展的場景,往往需要額外的介質來存儲狀態。狀態信息往往是kv形式的讀寫,我們在實際的應用中,使用過redis、HBase作為存儲。


消息不丟失、不重復、不亂序

對于準確性要求高的場景,需要保證數據正確的只消費一次。storm的有三種消息處理模式:


at most once,若不實現ack和fail方法,無論后續處理結果如何,消息只會發送一次,必定不能滿足高準確性;

at least once,若實現了ack和fail方法,只有調用了ack方法才會任務處理成功,否則會重試。可能會出現消息重復,在并發場景下重復又意味著可能出現亂序;

exactly once,trident每個micro batch作為整體只成功處理一次,但也是無法保證消息真的只正確的處理一次,比如數據已經處理完畢并持久化,但向數據源ack時失敗,就可能會有重試。

對于消息重復、亂序的場景,不是簡單的消息冪等能解決,有以下的處理思路:


使用前面提到的狀態管理的辦法,識別出重復、亂序的數據;

業務邏輯中,兼容重復、亂序數據,比如維護一個業務狀態機,把異常數據剔除。

對于時序判斷,盡量不用使用時間戳,因為在分布式系統里,各服務器時間不一致是很常見的問題。


我們會嘗試在運行過程中重啟消息源、storm應用、存儲/MQ等下游系統,或者制造網絡丟包、延遲等異常,手工觸發可能的消息丟失、重復、亂序場景,來驗證我們的應用能否對應這些異常情況。


復雜拓撲

在storm的文檔里,有很多類似下圖的復雜應用。




對于需要消息可靠處理的場景,是不適合這樣復雜拓撲的,部分失敗如何回滾,是否要全部bolt處理完畢才ack,是需要面對的問題。過長的拓撲鏈路,里面的慢速邏輯會拖慢整體性能。


可以考慮使用更簡化的拓撲,不同的邏輯之間盡量解耦,需要使用bolt的結果時,可以把數據持久化或者推送到MQ。




監控

生產環境少不了監控,除了服務器的基礎監控,還加了不少storm特有的監控:


消息延遲:消息在業務系統的時間戳與storm應用的當前時間戳對比,大于一定閾值則告警,不同應用的閾值會不同;

消息處理時長、fail數:這兩個都可以由storm的接口獲取,數值偏大很可能是出了問題;

應用TPS:記錄應用的emit、ack、fail數的變化趨勢,幫助分析應用的運行情況;

任務級監控:每臺服務器的worker、executor數量,這也可以通過storm接口獲取。

除此之外,會有各類應用特有的監控,一般都是離線計算的結果與實時計算結果對比。對于數據同步類的應用,數據量比較大,可能會使用采樣的方式做校驗。


電話咨詢
郵件咨詢
在線地圖
时时彩龙虎和有规律吗