<html>
<p>Background</p>
<p>Currently we are migrating our Hadoop application to Spark for better performance, our unified computing expression layer(like Apache Beam) have integrated with Hadoop and Spark RDD, so this part of user can migrate their application from Hadoop to Spark transparently. During this work, we found Spark performance shot of expectations in the ordering needed cases. I notice that this problem reported in <a href="https://issues.apache.org/jira/browse/SPARK-2926">https://issues.apache.org/jira/browse/SPARK-2926</a>, so this is the follow up working of SPARK-2926 over current code path and Spark 2.1 branch.</p>
<h2>Implementation</h2>
<p>Saisai post the SortBasedShuffleReader document in SPARK-2926, I just leave the changes during the code porting:</p>
<ol>
<li>For support Spark Streaming, Class `ShuffleBlockFetcherIterator` added some wrapping work for ManageBuffer, so here I changes ShuffleBlockFetcherIterator to get the ManagerBuffer, and do the wrapping work out of ShuffleBlockFetcherIterator</li>
<li>Class `ShuffleMmeoryManager` has been replaced by `TaskMemoryManager`, so I write a new class named ExternalMerger inherits from `Spillable[ArrayBuffer[MemoryShuffleBlock]]`, this class manage all files and in memory block during `SortShuffleReader.read()`</li>
<li>Add a tag named `canUseSortShuffleWriter` in `SortShuffleManager`, for the bug fix of Spark UT error in the scenario of using `UnsafeShuffleWriter` in shuffle write stage but using `SortShuffleReader` in shuffle read stage.</li>
<li>Add shuffle metrics of peakMemoryUsedBytes.</li>
<li>A Bug fix of data inconsistency in old patch. <a href="https://github.com/xuanyuanking/spark/blob/f07c939a25839a5b0f69c504afb9aa008b1b3c5d/core/src/main/scala/org/apache/spark/util/collection/ExternalMerger.scala#L97">Code Link</a></li>
</ol>
<h2>Conclusion</h2>
<p>SortBasedShuffleReader can bring 12x~30x boosting in task duration and reduce peak execution memory to 1/12 ~ 1/50</p>
<h2>Benchmark and data consistency checking details</h2>
<h3>1. Test hardware and software version</h3>
<p><strong>ItemDetails</strong>Cpu model nameIntel(R) Xeon(R) CPU E5-2650 v3 @ 2.30GHzSpark versioncurrent master 2.3.0-SNAPSHOT of <a href="https://github.com/xuanyuanking/spark/commit/c30d5cfc7117bdadd63bf730e88398139e0f65f4">c30d5cf</a> on on Oct 24, 2017 vs 2.3.0-SHUFFLE of my develop branchHadoop version2.7.2HDFS & YARN8 servers (128G, 20 core + 20 hyper threading)Test Data4964 parquet files transformed form TPCDS store_sales data, each file 130MBTest ApplicationA simple case of reduceByKey + sortByKey. For control the write file size, add more fields flexible in map function.</p>
<h3>2.Test Round1</h3>
<p>Test parameters and description:<strong>ItemDetails</strong>Spark Confspark.executor.cores 10spark.executor.memory 48Gspark.shuffle.sort.bypassMergeThreshold 1 # for close BypassMergeShuffleInput File80 input parquet file, 130MB for each. Ouput100 sorted resultsTest DataFrom 80 files in 4964 parquet files transformed form TPCDS store_sales data, each file 130MBTest ApplicationReduceBy the Int fields "quantity", aggregate all Double fields named "wholesale_cost", finally sort by key and write 100 output files.</p>
<pre><code>val in = spark.read.load("/app/dc/spark/tpcds_parquet/store_sales/part-011[1-8]*.parquet")<br>
case class Info(<br>
quantity: Int,<br>
wholesale_cost: Seq[Double])<br>
val pairs = in.rdd.map(row => {<br>
def getValue[T](index: Int, default: T): T = {<br>
if (!row.isNullAt(index)) {<br>
row.getAs[T](index)<br>
} else default<br>
}<br>
(getValue[Int](0, 0), <br>
Info(getValue[Int](10, 0),<br>
Seq(getValue[Double](11, 0.0))))<br>
})<br>
pairs.reduceByKey((left, right) => {<br>
Info(left.quantity + right.quantity, left.wholesale_cost ++ right.wholesale_cost)<br>
}).sortByKey().coalesce(100).saveAsTextFile("/app/dc/spark/liyuanjian/lyj-bench")</code></pre>
<p>Test conclusion:</p>
<ol>
<li>Test result has no diff, both outputs has no diff. (100 output files of two versions has no diff)</li>
<li>Shuffle sort write stage spend almost same time of original version and sort shuffle reader version. (10 times test, both of them is 7.3min~7.4min)</li>
<li>Shuffle sort read stage boosting 12x.(2min -> 9s)</li>
</ol>
<p>Test detail snapshot: </p>
<h3>2.Test Round2: Add more pressure for SortShuffleReader by coalesce</h3>
<p>Test parameters and description:<strong>ItemDetails</strong>Spark Confspark.executor.cores 10spark.executor.memory 48Gspark.shuffle.sort.bypassMergeThreshold 1 # for close BypassMergeShuffleInput File80 input parquet file, 130MB for each.Ouput1 sorted resultsTest DataFrom 80 files in 4964 parquet files transformed form TPCDS store_sales data, each file 130MBTest ApplicationReduceBy the Int fields "quantity", aggregate all Double fields named "wholesale_cost", finally sort by key and write 100 output files.</p>
<pre><code>val in = spark.read.load("/app/dc/spark/tpcds_parquet/store_sales/part-011[1-8]*.parquet")<br>
case class Info(<br>
quantity: Int,<br>
wholesale_cost: Seq[Double])<br>
val pairs = in.rdd.map(row => {<br>
def getValue[T](index: Int, default: T): T = {<br>
if (!row.isNullAt(index)) {<br>
row.getAs[T](index)<br>
} else default<br>
}<br>
(getValue[Int](0, 0), <br>
Info(getValue[Int](10, 0),<br>
Seq(getValue[Double](11, 0.0))))<br>
})<br>
pairs.reduceByKey((left, right) => {<br>
Info(left.quantity + right.quantity, left.wholesale_cost ++ right.wholesale_cost)<br>
}).sortByKey().coalesce(1).saveAsTextFile("/app/dc/spark/liyuanjian/lyj-bench")</code></pre>
<p>Test conclusion:</p>
<ol>
<li>Shuffle sort read stage boosting 14x.(2min -> 9s)</li>
<li>Peak execution memory reducting to 1/12. (7.9G -> 654.4MB)</li>
</ol>
<p>Test detail snapshot: </p>
<h3>3.Test Round3: Test file spill scenario of sort shuffle reader</h3>
<p>Test parameters and description:<strong>ItemDetails</strong>Spark Conf# reduce this just for simplify enviroment and easy to check the log.spark.executor.cores 1spark.executor.memory 16G# for close BypassMergeShufflespark.shuffle.sort.bypassMergeThreshold 1# when there's 3 file spilled, begin merging workspark.shuffle.maxMergeFactor 2 Input File8 input parquet file, 130MB for each.Ouput1 sorted resultsTest DataFrom 8 files in 4964 parquet files transformed form TPCDS store_sales data, each file 130MBTest ApplicationReduceBy the Int fields "quantity", but aggregate more fields than before. This is for add more data to the final results.Because of the file spill scenario is hard to produce, I produce this in hard code (@Saisai I <a href="https://github.com/xuanyuanking/spark/blob/f07c939a25839a5b0f69c504afb9aa008b1b3c5d/core/src/main/scala/org/apache/spark/util/collection/Spillable.scala#L93">add the comment here</a>,do you have more good idea for produce this in integrate tests?)</p>
<pre><code>val in = spark.read.load("/app/dc/spark/tpcds_parquet/store_sales/part-0111[1-8].parquet")<br>
case class Info(<br>
quantity: Int,<br>
item_sk: Seq[Int],<br>
promo_sk: Seq[Int],<br>
wholesale_cost: Seq[Double])<br>
<br>
val pairs = in.rdd.map(row => {<br>
def getValue[T](index: Int, default: T): T = {<br>
if (!row.isNullAt(index)) {<br>
row.getAs[T](index)<br>
} else default<br>
}<br>
(getValue[Int](0, 0),<br>
Info(getValue[Int](10, 0),<br>
Seq(getValue[Int](2, 0)),<br>
Seq(getValue[Int](10, 0)),<br>
Seq(getValue[Double](11, 0.0))<br>
))<br>
})<br>
pairs.reduceByKey((left, right) => {<br>
Info(left.quantity + right.quantity,<br>
left.item_sk ++ right.item_sk,<br>
left.promo_sk ++ right.promo_sk,<br>
left.wholesale_cost ++ right.wholesale_cost<br>
)<br>
}).sortByKey().coalesce(1).saveAsTextFile("/app/dc/spark/liyuanjian/lyj-with-file-bench-with-merge")</code></pre>
<p>Test conclusion:</p>
<ol>
<li>All test result has no diff, which includes 4 scenario: (1) orignal version (2) sort shuffle reader version in memory (3) sort shuffle reader version with file spill (4) sort shuffle reader version with file spill and merge</li>
<li>Duration time for shuffle read of all 3 scenario of sort shuffle reader version is much faster(30x) than the original version (13s,16s,29s vs 16min)</li>
<li>Peak execution memory reducting to 1/14 and 1/50. (2028.8MB -> 142.6MB, 36.8MB, 21.6MB)</li>
</ol>
<p><strong>version and caseDurationPeak Execution MemoryShuffle Spill(Memory)Shuffle Spill(Disk)</strong>2.3.0-SNAPSHOT16min2028.8 MB--2.3.0-SHUFFLE13s142.6 MB--2.3.0-SHUFFLE with file spill25s36.8 MB142.6 MB142.7 MB2.3.0-SHUFFLE with file spill and file merge29s21.6 MB127.4 MB250.8 MB<br>
Test details:case 1 : 2.3.0-SNAPSHOTcase 2 : 2.3.0-SHUFFLEcase 3 : 2.3.0-SHUFFLE with file spillcase 4 : 2.3.0-SHUFFLE with file spill and file merge<br>
</p>
</html>