create account

Apache Spark: Add MR-style (merge-sort) SortShuffleReader for sort-based shuffle by xuanyuan

View this thread on: hive.blogpeakd.comecency.com
· @xuanyuan ·
Apache Spark: Add MR-style (merge-sort) SortShuffleReader for sort-based shuffle
<html>
<p>Background</p>
<p>Currently we are migrating our Hadoop application to Spark for better performance, our unified computing expression layer(like Apache Beam) have integrated with Hadoop and Spark RDD, so this part of user can migrate their application from Hadoop to Spark&nbsp;transparently. During this work, we found Spark performance shot of expectations&nbsp;in the ordering needed cases. I notice that this problem reported in&nbsp;<a href="https://issues.apache.org/jira/browse/SPARK-2926">https://issues.apache.org/jira/browse/SPARK-2926</a>, so this is the follow up working of SPARK-2926 over current code path and Spark 2.1 branch.</p>
<h2>Implementation</h2>
<p>Saisai post the SortBasedShuffleReader document in SPARK-2926, I just leave the changes during the code porting:</p>
<ol>
  <li>For support Spark Streaming, Class `ShuffleBlockFetcherIterator` added some wrapping work for ManageBuffer, so here I changes ShuffleBlockFetcherIterator to get the ManagerBuffer, and do the wrapping work out of ShuffleBlockFetcherIterator</li>
  <li>Class `ShuffleMmeoryManager` has been replaced by `TaskMemoryManager`, so I write a new class named ExternalMerger&nbsp;inherits from `Spillable[ArrayBuffer[MemoryShuffleBlock]]`, this class manage all files and in memory block during `SortShuffleReader.read()`</li>
  <li>Add a tag named `canUseSortShuffleWriter` in `SortShuffleManager`, for the bug fix of Spark UT error in the scenario of using `UnsafeShuffleWriter` in shuffle write stage but using `SortShuffleReader` in shuffle read stage.</li>
  <li>Add shuffle metrics of peakMemoryUsedBytes.</li>
  <li>A Bug fix of data&nbsp;inconsistency in old patch. <a href="https://github.com/xuanyuanking/spark/blob/f07c939a25839a5b0f69c504afb9aa008b1b3c5d/core/src/main/scala/org/apache/spark/util/collection/ExternalMerger.scala#L97">Code Link</a></li>
</ol>
<h2>Conclusion</h2>
<p>SortBasedShuffleReader can bring 12x~30x boosting in task duration and reduce peak execution memory to 1/12 ~ 1/50</p>
<h2>Benchmark and data consistency checking details</h2>
<h3>1.&nbsp;Test hardware and software version</h3>
<p><strong>ItemDetails</strong>Cpu model nameIntel(R) Xeon(R) CPU E5-2650 v3 @ 2.30GHzSpark versioncurrent master 2.3.0-SNAPSHOT of &nbsp;<a href="https://github.com/xuanyuanking/spark/commit/c30d5cfc7117bdadd63bf730e88398139e0f65f4">c30d5cf</a> on on Oct 24, 2017 vs 2.3.0-SHUFFLE of my develop branchHadoop version2.7.2HDFS &amp; YARN8 servers (128G, 20 core + 20 hyper threading)Test Data4964 parquet files transformed form TPCDS store_sales data, each file 130MBTest ApplicationA simple case of reduceByKey + sortByKey. For control the write file size, add more fields flexible in map function.</p>
<h3>2.Test Round1</h3>
<p>Test parameters and description:<strong>ItemDetails</strong>Spark Confspark.executor.cores 10spark.executor.memory 48Gspark.shuffle.sort.bypassMergeThreshold 1 # for close BypassMergeShuffleInput File80 input parquet file, 130MB for each. Ouput100 sorted resultsTest DataFrom 80 files in 4964 parquet files transformed form TPCDS store_sales data, each file 130MBTest ApplicationReduceBy the Int fields "quantity", aggregate all Double fields named "wholesale_cost", finally sort by key and write 100 output files.</p>
<pre><code>val in = spark.read.load("/app/dc/spark/tpcds_parquet/store_sales/part-011[1-8]*.parquet")<br>
case class Info(<br>
 &nbsp;&nbsp;&nbsp;quantity: Int,<br>
 &nbsp;&nbsp;&nbsp;wholesale_cost: Seq[Double])<br>
val pairs = in.rdd.map(row =&gt; {<br>
 &nbsp;&nbsp;&nbsp;def getValue[T](index: Int, default: T): T = {<br>
 &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;if (!row.isNullAt(index)) {<br>
 &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;row.getAs[T](index)<br>
 &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;} else default<br>
 &nbsp;&nbsp;&nbsp;}<br>
 &nbsp;&nbsp;&nbsp;(getValue[Int](0, 0), <br>
 &nbsp;&nbsp;&nbsp;Info(getValue[Int](10, 0),<br>
 &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;Seq(getValue[Double](11, 0.0))))<br>
})<br>
pairs.reduceByKey((left, right) =&gt; {<br>
 &nbsp;&nbsp;&nbsp;Info(left.quantity + right.quantity, left.wholesale_cost ++ right.wholesale_cost)<br>
}).sortByKey().coalesce(100).saveAsTextFile("/app/dc/spark/liyuanjian/lyj-bench")</code></pre>
<p>Test conclusion:</p>
<ol>
  <li>Test result has no diff, both outputs has no diff. (100 output files of two versions has no diff)</li>
  <li>Shuffle sort write stage spend almost same time of original version and sort shuffle reader version. (10 times test, both of them is 7.3min~7.4min)</li>
  <li>Shuffle sort read stage boosting 12x.(2min -&gt; 9s)</li>
</ol>
<p>Test detail snapshot: &nbsp;</p>
<h3>2.Test Round2: Add more pressure for SortShuffleReader by coalesce</h3>
<p>Test parameters and description:<strong>ItemDetails</strong>Spark Confspark.executor.cores 10spark.executor.memory 48Gspark.shuffle.sort.bypassMergeThreshold 1 # for close BypassMergeShuffleInput File80 input parquet file, 130MB for each.Ouput1 sorted resultsTest DataFrom 80 files in 4964 parquet files transformed form TPCDS store_sales data, each file 130MBTest ApplicationReduceBy the Int fields "quantity", aggregate all Double fields named "wholesale_cost", finally sort by key and write 100 output files.</p>
<pre><code>val in = spark.read.load("/app/dc/spark/tpcds_parquet/store_sales/part-011[1-8]*.parquet")<br>
case class Info(<br>
 &nbsp;&nbsp;&nbsp;quantity: Int,<br>
 &nbsp;&nbsp;&nbsp;wholesale_cost: Seq[Double])<br>
val pairs = in.rdd.map(row =&gt; {<br>
 &nbsp;&nbsp;&nbsp;def getValue[T](index: Int, default: T): T = {<br>
 &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;if (!row.isNullAt(index)) {<br>
 &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;row.getAs[T](index)<br>
 &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;} else default<br>
 &nbsp;&nbsp;&nbsp;}<br>
 &nbsp;&nbsp;&nbsp;(getValue[Int](0, 0), <br>
 &nbsp;&nbsp;&nbsp;Info(getValue[Int](10, 0),<br>
 &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;Seq(getValue[Double](11, 0.0))))<br>
})<br>
pairs.reduceByKey((left, right) =&gt; {<br>
 &nbsp;&nbsp;&nbsp;Info(left.quantity + right.quantity, left.wholesale_cost ++ right.wholesale_cost)<br>
}).sortByKey().coalesce(1).saveAsTextFile("/app/dc/spark/liyuanjian/lyj-bench")</code></pre>
<p>Test conclusion:</p>
<ol>
  <li>Shuffle sort read stage boosting 14x.(2min -&gt; 9s)</li>
  <li>Peak execution memory reducting to 1/12. (7.9G -&gt; 654.4MB)</li>
</ol>
<p>Test detail snapshot: &nbsp;</p>
<h3>3.Test Round3: Test file spill scenario of sort shuffle reader</h3>
<p>Test parameters and description:<strong>ItemDetails</strong>Spark Conf#&nbsp;reduce this just for simplify enviroment and easy to check the log.spark.executor.cores 1spark.executor.memory 16G# for close BypassMergeShufflespark.shuffle.sort.bypassMergeThreshold 1# when there's 3 file spilled, begin merging workspark.shuffle.maxMergeFactor 2 Input File8 input parquet file, 130MB for each.Ouput1 sorted resultsTest DataFrom 8 files in 4964 parquet files transformed form TPCDS store_sales data, each file 130MBTest ApplicationReduceBy the Int fields "quantity", but aggregate more fields than before. This is for add more data to the final results.Because of the file spill scenario is hard to produce, I produce this in hard code (@Saisai I <a href="https://github.com/xuanyuanking/spark/blob/f07c939a25839a5b0f69c504afb9aa008b1b3c5d/core/src/main/scala/org/apache/spark/util/collection/Spillable.scala#L93">add the comment here</a>,do you have more good idea for produce this in integrate tests?)</p>
<pre><code>val in = spark.read.load("/app/dc/spark/tpcds_parquet/store_sales/part-0111[1-8].parquet")<br>
case class Info(<br>
 &nbsp;&nbsp;&nbsp;quantity: Int,<br>
 &nbsp;&nbsp;&nbsp;item_sk: Seq[Int],<br>
 &nbsp;&nbsp;&nbsp;promo_sk: Seq[Int],<br>
 &nbsp;&nbsp;&nbsp;wholesale_cost: Seq[Double])<br>
<br>
val pairs = in.rdd.map(row =&gt; {<br>
 &nbsp;&nbsp;&nbsp;def getValue[T](index: Int, default: T): T = {<br>
 &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;if (!row.isNullAt(index)) {<br>
 &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;row.getAs[T](index)<br>
 &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;} else default<br>
 &nbsp;&nbsp;&nbsp;}<br>
 &nbsp;&nbsp;&nbsp;(getValue[Int](0, 0),<br>
 &nbsp;&nbsp;&nbsp;Info(getValue[Int](10, 0),<br>
 &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;Seq(getValue[Int](2, 0)),<br>
 &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;Seq(getValue[Int](10, 0)),<br>
 &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;Seq(getValue[Double](11, 0.0))<br>
 &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;))<br>
})<br>
pairs.reduceByKey((left, right) =&gt; {<br>
 &nbsp;&nbsp;&nbsp;Info(left.quantity + right.quantity,<br>
 &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;left.item_sk ++ right.item_sk,<br>
 &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;left.promo_sk ++ right.promo_sk,<br>
 &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;left.wholesale_cost ++ right.wholesale_cost<br>
 &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;)<br>
 &nbsp;&nbsp;&nbsp;}).sortByKey().coalesce(1).saveAsTextFile("/app/dc/spark/liyuanjian/lyj-with-file-bench-with-merge")</code></pre>
<p>Test conclusion:</p>
<ol>
  <li>All test result has no diff, which includes 4 scenario: (1) orignal version (2) sort shuffle reader version in memory (3) sort shuffle reader version with file spill (4) sort shuffle reader version with file spill and merge</li>
  <li>Duration time for shuffle read of all 3 scenario of sort shuffle reader version is&nbsp;much faster(30x) than the original version (13s,16s,29s vs 16min)</li>
  <li>Peak execution memory reducting to 1/14 and 1/50. (2028.8MB -&gt; 142.6MB, 36.8MB, 21.6MB)</li>
</ol>
<p><strong>version and caseDurationPeak Execution MemoryShuffle Spill(Memory)Shuffle Spill(Disk)</strong>2.3.0-SNAPSHOT16min2028.8 MB--2.3.0-SHUFFLE13s142.6 MB--2.3.0-SHUFFLE with file spill25s36.8 MB142.6 MB142.7 MB2.3.0-SHUFFLE with file spill and file merge29s21.6 MB127.4 MB250.8 MB<br>
Test details:case 1 :&nbsp;2.3.0-SNAPSHOTcase 2 :&nbsp;2.3.0-SHUFFLEcase 3 :&nbsp;2.3.0-SHUFFLE with file spillcase 4 :&nbsp;2.3.0-SHUFFLE with file spill and file merge<br>
</p>
</html>
👍  , , , , , , , , ,
properties (23)
authorxuanyuan
permlinkapache-spark-add-mr-style-merge-sort-sortshufflereader-for-sort-based-shuffle
categorybigdata
json_metadata{"tags":["bigdata","opensource","technology"],"links":["https://issues.apache.org/jira/browse/SPARK-2926","https://github.com/xuanyuanking/spark/blob/f07c939a25839a5b0f69c504afb9aa008b1b3c5d/core/src/main/scala/org/apache/spark/util/collection/ExternalMerger.scala#L97","https://github.com/xuanyuanking/spark/commit/c30d5cfc7117bdadd63bf730e88398139e0f65f4","https://github.com/xuanyuanking/spark/blob/f07c939a25839a5b0f69c504afb9aa008b1b3c5d/core/src/main/scala/org/apache/spark/util/collection/Spillable.scala#L93"],"app":"steemit/0.1","format":"html"}
created2017-11-24 06:38:00
last_update2017-11-24 06:38:00
depth0
children3
last_payout2017-12-01 06:38:00
cashout_time1969-12-31 23:59:59
total_payout_value0.000 HBD
curator_payout_value0.000 HBD
pending_payout_value0.000 HBD
promoted0.000 HBD
body_length10,162
author_reputation66,924,144
root_title"Apache Spark: Add MR-style (merge-sort) SortShuffleReader for sort-based shuffle"
beneficiaries[]
max_accepted_payout1,000,000.000 HBD
percent_hbd10,000
post_id21,365,206
net_rshares4,283,145,519
author_curate_reward""
vote details (10)
@steemitboard ·
Congratulations @xuanyuan! You have completed some achievement on Steemit and have been rewarded with new badge(s) :

[![](https://steemitimages.com/70x80/http://steemitboard.com/notifications/firstpost.png)](http://steemitboard.com/@xuanyuan) You published your First Post
[![](https://steemitimages.com/70x80/http://steemitboard.com/notifications/firstvoted.png)](http://steemitboard.com/@xuanyuan) You got a First Vote
[![](https://steemitimages.com/70x80/http://steemitboard.com/notifications/voted.png)](http://steemitboard.com/@xuanyuan) Award for the number of upvotes received

Click on any badge to view your own Board of Honor on SteemitBoard.
For more information about SteemitBoard, click [here](https://steemit.com/@steemitboard)

If you no longer want to receive notifications, reply to this comment with the word `STOP`

> By upvoting this notification, you can help all Steemit users. Learn how [here](https://steemit.com/steemitboard/@steemitboard/http-i-cubeupload-com-7ciqeo-png)!
properties (22)
authorsteemitboard
permlinksteemitboard-notify-xuanyuan-20171125t100920000z
categorybigdata
json_metadata{"image":["https://steemitboard.com/img/notifications.png"]}
created2017-11-25 10:09:18
last_update2017-11-25 10:09:18
depth1
children0
last_payout2017-12-02 10:09:18
cashout_time1969-12-31 23:59:59
total_payout_value0.000 HBD
curator_payout_value0.000 HBD
pending_payout_value0.000 HBD
promoted0.000 HBD
body_length999
author_reputation38,975,615,169,260
root_title"Apache Spark: Add MR-style (merge-sort) SortShuffleReader for sort-based shuffle"
beneficiaries[]
max_accepted_payout1,000,000.000 HBD
percent_hbd10,000
post_id21,470,309
net_rshares0
@steemitboard ·
Congratulations @xuanyuan! You have received a personal award!

[![](https://steemitimages.com/70x70/http://steemitboard.com/@xuanyuan/birthday1.png)](http://steemitboard.com/@xuanyuan)  1 Year on Steemit
<sub>_Click on the badge to view your Board of Honor._</sub>


**Do not miss the last post from @steemitboard:**
[SteemitBoard World Cup Contest - Semi Finals - Day 1](https://steemit.com/steemitboard/@steemitboard/steemitboard-world-cup-contest-semi-finals-day-1)

---
**Participate in the [SteemitBoard World Cup Contest](https://steemit.com/steemitboard/@steemitboard/steemitboard-world-cup-contest-collect-badges-and-win-free-sbd)!**
Collect World Cup badges and win free SBD
Support the Gold Sponsors of the contest: [@good-karma](https://v2.steemconnect.com/sign/account-witness-vote?witness=good-karma&approve=1) and [@lukestokes](https://v2.steemconnect.com/sign/account-witness-vote?witness=lukestokes.mhth&approve=1)

---

> Do you like [SteemitBoard's project](https://steemit.com/@steemitboard)? Then **[Vote for its witness](https://v2.steemconnect.com/sign/account-witness-vote?witness=steemitboard&approve=1)** and **get one more award**!
properties (22)
authorsteemitboard
permlinksteemitboard-notify-xuanyuan-20180713t082325000z
categorybigdata
json_metadata{"image":["https://steemitboard.com/img/notify.png"]}
created2018-07-13 08:23:24
last_update2018-07-13 08:23:24
depth1
children0
last_payout2018-07-20 08:23:24
cashout_time1969-12-31 23:59:59
total_payout_value0.000 HBD
curator_payout_value0.000 HBD
pending_payout_value0.000 HBD
promoted0.000 HBD
body_length1,158
author_reputation38,975,615,169,260
root_title"Apache Spark: Add MR-style (merge-sort) SortShuffleReader for sort-based shuffle"
beneficiaries[]
max_accepted_payout1,000,000.000 HBD
percent_hbd10,000
post_id64,506,367
net_rshares0
@steemitboard ·
Congratulations @xuanyuan! You received a personal award!

<table><tr><td>https://steemitimages.com/70x70/http://steemitboard.com/@xuanyuan/birthday2.png</td><td>Happy Birthday! - You are on the Steem blockchain for 2 years!</td></tr></table>

<sub>_You can view [your badges on your Steem Board](https://steemitboard.com/@xuanyuan) and compare to others on the [Steem Ranking](https://steemitboard.com/ranking/index.php?name=xuanyuan)_</sub>


###### [Vote for @Steemitboard as a witness](https://v2.steemconnect.com/sign/account-witness-vote?witness=steemitboard&approve=1) to get one more award and increased upvotes!
properties (22)
authorsteemitboard
permlinksteemitboard-notify-xuanyuan-20190713t072023000z
categorybigdata
json_metadata{"image":["https://steemitboard.com/img/notify.png"]}
created2019-07-13 07:20:24
last_update2019-07-13 07:20:24
depth1
children0
last_payout2019-07-20 07:20:24
cashout_time1969-12-31 23:59:59
total_payout_value0.000 HBD
curator_payout_value0.000 HBD
pending_payout_value0.000 HBD
promoted0.000 HBD
body_length620
author_reputation38,975,615,169,260
root_title"Apache Spark: Add MR-style (merge-sort) SortShuffleReader for sort-based shuffle"
beneficiaries[]
max_accepted_payout1,000,000.000 HBD
percent_hbd10,000
post_id88,234,556
net_rshares0