更新時間:2023-06-23 來源:黑馬程序員 瀏覽量:
SortShuffleManager的運行機制主要分成兩種,一種是普通運行機制,另一種是bypass運行機制。當shuffle write。
task的數量小于等于spark.shuffle.sort.bypassMergeThreshold參數的值時(默認為200),就會啟用bypass機制。
該模式下,數據會先寫入一個內存數據結構中(默認5M),此時根據不同的shuffle算子,可能選用不同的數據結構。如果是reduceByKey這種聚合類的shuffle算子,那么會選用Map數據結構,一邊通過Map進行聚合,一邊寫入內存;如果是join這種普通的shuffle算子,那么會選用Array數據結構,直接寫入內存。
(2)接著,每寫一條數據進入內存數據結構之后,就會判斷一下,是否達到了某個臨界閾值。如果達到臨界閾值的話,那么就會嘗試將內存數據結構中的數據溢寫到磁盤,然后清空內存數據結構。
(3)排序
在溢寫到磁盤文件之前,會先根據key對內存數據結構中已有的數據進行排序。
(4)溢寫
排序過后,會分批將數據寫入磁盤文件。默認的batch數量是10000條,也就是說,排序好的數據,會以每批1萬條數據的形式分批寫入磁盤文件。
(5)merge
一個task將所有數據寫入內存數據結構的過程中,會發(fā)生多次磁盤溢寫操作,也就會產生多個臨時文件。最后會將之前所有的臨時磁盤文件都進行合并成1個磁盤文件,這就是merge過程。由于一個task就只對應一個磁盤文件,也就意味著該task為Reduce端的stage的task準備的數據都在這一個文件中,因此還會單獨寫一份索引文件,其中標識了下游各個task的數據在文件中的start offset與end offset。
觸發(fā)條件
bypass運行機制的觸發(fā)條件如下: 1)shuffle map task數量小于spark.shuffle.sort.bypassMergeThreshold=200參數的值。 2)不是map combine聚合的shuffle算子(比如reduceByKey有map combie)。
bypass運行機制的觸發(fā)條件如下:
1)shuffle map task數量小spark.shuffle.sort.bypassMergeThreshold=200參數的值。
2)不是map combine聚合的shuffle算子(比如reduceByKey有map combie)。
? 此時task會為每個reduce端的task都創(chuàng)建一個臨時磁盤文件,并將數據按key進行hash,然后根據key的hash值, 將key寫入對應的磁盤文件之中。當然,寫入磁盤文件時也是先寫入內存緩沖,緩沖寫滿之后再溢寫到磁盤文件的 。最后,同樣會將所有臨時磁盤文件都合并成一個磁盤文件,并創(chuàng)建一個單獨的索引文件。
? 該過程的磁盤寫機制其實跟未經優(yōu)化的HashShuffleManager是一模一樣的,因為都要創(chuàng)建數量驚人的磁盤文件, 只是在最后會做一個磁盤文件的合并而已。因此少量的最終磁盤文件,也讓該機制相對未經優(yōu)化的 HashShuffleManager來說,shuffle read的性能會更好。
而該機制與普通SortShuffleManager運行機制的不同在于:
第一,磁盤寫機制不同;
第二,不會進行排序。也就是說,啟用該機制的最大好處在于,shuffle write過程中,不需要進行數據的排序操作, 也就節(jié)省掉了這部分的性能開銷。
總結:
SortShuffle也分為普通機制和bypass機制
普通機制在內存數據結構(默認為5M)完成排序,會產生2M個磁盤小文件。
而當shuffle map task數量小于spark.shuffle.sort.bypassMergeThreshold參數的值?;蛘咚阕硬皇蔷酆项惖膕huffle算子(比如reduceByKey)的時候會觸發(fā)SortShuffle的bypass機制,SortShuffle的bypass機制不會進行排序,極大的提高了其性能。