-
Spark Join Tuning & Key Salting / Part 3Data Engineering/Apache Spark 2022. 3. 5. 15:40반응형
-
- 1. Sort Merge Join
- 1-1. 개요
- 1-2. 이상적인 성능을 발휘하려면
- 2. Broadcast Join
- 2-1. 개요
- 3. Shuffle Hash Join
- 3-1. 개요
- 효율적인 Join을 방해하는 것들
- Data Skewness를 해결하려면?
- 1. Sort Merge Join
- Key Salting
- Key Salting 이란?
- Salting Method 예제
- Reference
Spark에서 join을 수행하는 경우는 크게 두 가지로 나눌 수 있다. (1) 큰 테이블과 작은 테이블을 조인 또는 (2) 큰 테이블과 큰 테이블을 조인. Spark은 join을 수행하기 위해 Sort Merge Join, Broadcast Join, Shuffle Hash Join 등의 방법을 제공한다.
- 핵심 키워드 : sort merge join, shuffle hash join, broadcast join, straggler
1. Sort Merge Join
1-1. 개요
- 모든 노드 간의 all-to-all communication 방식이다.
- 다음과 같이 두 단계로 수행된다.
- (1) 먼저 실제 join 작업을 수행하기 전에 파티션들을 정렬한다. (이 작업만으로도 비용이 크다)
- (2) 정렬된 데이터들을 병합하면서 join key가 같은 row들을 join한다.
- Sort Merge Join은 Shuffle Hash Join과 비교할 때, 클러스터 내 데이터 이동이 더 적은 경향이 있다.
- Spark 2.3부터 디폴트 join 알고리즘으로 사용되고 있다. (spark.sql.join.perferSortMergeJoin=true)
1-2. 이상적인 성능을 발휘하려면
- Join될 파티션들이 최대한 같은 곳에 위치해야 한다. 그렇지 않으면 파티션들을 이동시키기 위해 대량의 shuffle이 발생한다.
- DataFrame의 데이터가 클러스터에 균등하게 분배되어 있어야 한다. 그렇지 않으면 특정 노드에 부하가 집중되고 연산 속도가 느려진다.
- 병렬처리가 이뤄지려면 일정한 수의 고유키가 존재해야 한다.
2. Broadcast Join
2-1. 개요
- join할 두 테이블 중 작은 것을 모든 executor에 복사(broadcast)한다.
- 따라서 all-to-all communication 방법으로 shuffle할 필요가 없다.
- 각 executor에선 보유하고 있는 큰 테이블의 일부와 broadcast된 테이블을 join한다.
- 코드 샘플
import org.apache.spark.sql.functions.broadcast val joinDF = bigDF.join(broadcast(smallDF), "joinKey")
3. Shuffle Hash Join
3-1. 개요
- map-reduce에 기반한 join 방식이다.
- 맵 단계에선 join 칼럼을 기준으로 DataFrame을 매핑하고, 리듀스 단계에서 DataFrame을 shuffle하여 join key가 같은 것끼리 join을 수행한다.
- Spark은 디폴트로 Sort Merge Join을 사용하므로 Shuffle Hash Join을 사용하려면 spark.sql.join.perferSortMergeJoin 옵션을 false로 변경해야 한다.
효율적인 Join을 방해하는 것들
- Data Skewness : join key가 클러스터에 균일하게 분포해 있지 않으면 특정 파티션이 매우 커질 수 있다. 이는 Spark이 parallel하게 연산을 수행하는 것을 방해한다.Limited executor memory
- All-to-all communication : broadcast join이 아닐 경우, 두 DF의 데이터 모두에서 대규모 shuffle이 발생한다.
Data Skewness를 해결하려면?
- Repartitioning : 단순히 repartition을 수행하는 것으로 데이터를 파티션들에 더 골고루 분배할 수 있다.
- Key Salting : 근본적으로 파티셔닝되는 칼럼 키값에 salting을 적용하여 키가 고르게 분배될 수 있도록 할 수 있다.
Key Salting
Key Salting 이란?
한쪽으로 키 값이 몰려있는 데이터를 random Int 값을 데이터 뒤에 붙여서, 키 값 자체를 재 분배하는 기술
우리는 2개의 Table Join 관점에서 데이터 왜곡 (Data Skew)를 이해하려고 노력해야 합니다.
- 두 개의 테이블 A, B가 있다고 가정해 보겠습니다. 즉, 특정 열과 키를 기반으로 조인하려고 합니다.
- join 및 기타 집계의 경우 Spark는 단일 키의 다양한 레코드를 단일 파티션에 함께 배치해야 합니다.
- 특정 키의 Record는 항상 단일 파티션에 있습니다.
- 마찬가지로 다른 키와 해당 레코드는 다른 파티션에 배포됩니다.
이제 키에 다른 키에 비해 더 많은 레코드가 있다고 상상해 보십시오. 따라서 해당 파티션은 다른 파티션과 비교하여 매우 커지거나 기울어 집니다. 따라서 특정 파티션을 처리하는 실행기는 처리하는 데 비교적 더 많은 시간이 필요합니다.
- 이로 인해 전체 Spark 작업이 정지되고 CPU 사용률이 낮고 때로는 메모리 문제가 발생합니다.
- 이것은 데이터 왜곡(Data Skew)가 무엇이며 스파크의 낮은 성능에 어떤 영향을 미치는지 간단히 설명합니다.
이제 데이터 왜곡 문제를 해결하는 방법을 살펴보겠습니다.
첫 번째 기술은 Salting or Key-Salting 입니다. 아이디어는 기존 키를 수정하여 데이터를 균일하게 배포하는 것입니다.
- Table A – Large Table
- Extend the Existing Key by adding Some-Character + Random No. from some Range
Existing-Key + "_" + Range(1,10)
- Table B – Medium Table
- Use Explode Operation on the Key as shown below
Explode(Existing-Key , Range(1,10)) -> x_1, x_2, .............,x_10
키 값=x에 대해 더 많은 레코드가 있다고 가정합니다(키 y 및 z와 비교).
Before Salting - ("=" represents records - Notice key x has more no. of records) x ====================================== y ====== z ======
이로 인해 데이터가 왜곡됩니다. 어떤 노드나 실행 프로그램이 key=x 와 관련된 레코드를 처리해야 하므로 더 많은 레코드, 더 많은 데이터를 처리해야 하므로 비교할 때 훨씬 더 많은 시간이 걸립니다. 키 y & z를 처리하는 노드 또는 실행기는 이러한 키의 레코드가 적기 때문에 더 빠르게 처리됩니다.
이를 처리하기 위해 Salting에서 key=x를 x_1, x_2 … (이 작업이 수행되는 방법은 아래 코드 참조). 이제 JUST ONE KEY=x 와 관련된 모든 레코드가 분할됩니다. KEYS(x_1, x_2…).
- 이 분할은 Random 값(이 경우 1에서 10까지)을 key=x에 추가하여 수행되므로 키 "x"는 x_1, x_2, x_3 …이것은 기본적으로 "x" 대신 10개의 새 키가 있음을 의미합니다. 따라서 단일 키 "x"와 관련된 모든 레코드는 10개의 새 키 세트에 분산됩니다. 이렇게 하면 데이터가 더 분산됩니다
- 키-값 "x"가 더 큰 숫자로 인한 데이터 스큐의 원인이었기 때문입니다. 키 "x"에 속하는 레코드 수. 이제 Random 값(이 경우 1에서 10까지)을 추가하여 키를 분해했으므로 키 "x"는 x_1, x_2, x_3 …x_10과 같이 더 많은 종류를 갖게 됩니다.
After Salting - ("=" represents records - Notice the Uniformity now across all keys) x_1 ========= x_2 ========= x_3 ========= x_4 ========= x_5 ========= x_6 ========= x_7 ========= x_8 ========= x_9 ========= x_10 ========= y ========== z ==========
이제 Spark에서 데이터를 처리할 때(Join 또는 기타 작업) 데이터 왜곡이 제거됩니다.
Salting Method 예제
# SALTING TECHNIQUE - SKEWNESS REMOVAL CODE import org.apache.spark.SparkConf import org.apache.spark.sql.{DataFrame, SparkSession} import org.apache.spark.sql.functions.{array, concat, explode, floor, lit, rand} def removeDataSkew(bigLeftTable: DataFrame, Col_X: String, rightTable: DataFrame) = { var df1 = bigLeftTable .withColumn(Col_X, concat( bigLeftTable.col(Col_X), lit("_"), lit(floor(rand(10000) * 10)))) var df2 = rightTable .withColumn("newExplodedCol", explode(array((0 to 10).map(lit(_)): _ *))) (df1, df2) ===> THESE ARE THE NEW DFs FROM WHERE SKEWNESS IS REMOVED }
# CALLING THE SALTING LOGIC TO REMOVE ORIGINAL DATA SKEW # After removing Skewness, we get the Un-skewed New Dfs val (newDf1, newDf2) = removeDataSkew(oldDf1, "_1", oldDf2) ==> "_1" represents the left column(the key)
# NEW JOIN AFTER REMOVING DATA SKEWNESS(THROUGH SALTING TECHNIQUE) newDf1.join(newDf2, newDf1.col("<col_name>")<=> newDf2.col("<col_name>") ) .show(10,false)
끝.
Reference
https://dhkdn9192.github.io/apache-spark/spark-join-strategy/
https://medium.com/appsflyer/salting-your-spark-to-scale-e6f1c87dd18
https://gankrin.org/fix-data-skewness-in-spark-salting-method/
반응형'Data Engineering > Apache Spark' 카테고리의 다른 글
Spark Partitions Tuning / Part 02 (0) 2022.03.04 Spark 기본기(Executor Tuning) / Part 01 (0) 2022.03.03 spark-submit 하드웨어 옵션 체크하기 (0) 2022.02.07 -