ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • Spark Join Tuning & Key Salting / Part 3
    Data Engineering/Apache Spark 2022. 3. 5. 15:40
    반응형
      • 1. Sort Merge Join
        • 1-1. 개요
        • 1-2. 이상적인 성능을 발휘하려면
      • 2. Broadcast Join
        • 2-1. 개요
      • 3. Shuffle Hash Join
        • 3-1. 개요
      • 효율적인 Join을 방해하는 것들
      • Data Skewness를 해결하려면?
    • Key Salting
      • Key Salting 이란?
      • Salting Method 예제
    • Reference

     

    Spark에서 join을 수행하는 경우는 크게 두 가지로 나눌 수 있다. (1) 큰 테이블과 작은 테이블을 조인 또는 (2) 큰 테이블과 큰 테이블을 조인. Spark은 join을 수행하기 위해 Sort Merge Join, Broadcast Join, Shuffle Hash Join 등의 방법을 제공한다.

    • 핵심 키워드 : sort merge join, shuffle hash join, broadcast join, straggler

     

    1. Sort Merge Join

    1-1. 개요

    • 모든 노드 간의 all-to-all communication 방식이다.
    • 다음과 같이 두 단계로 수행된다.
      • (1) 먼저 실제 join 작업을 수행하기 전에 파티션들을 정렬한다. (이 작업만으로도 비용이 크다)
      • (2) 정렬된 데이터들을 병합하면서 join key가 같은 row들을 join한다.
    • Sort Merge Join은 Shuffle Hash Join과 비교할 때, 클러스터 내 데이터 이동이 더 적은 경향이 있다.
    • Spark 2.3부터 디폴트 join 알고리즘으로 사용되고 있다. (spark.sql.join.perferSortMergeJoin=true)

    1-2. 이상적인 성능을 발휘하려면

    • Join될 파티션들이 최대한 같은 곳에 위치해야 한다. 그렇지 않으면 파티션들을 이동시키기 위해 대량의 shuffle이 발생한다.
    • DataFrame의 데이터가 클러스터에 균등하게 분배되어 있어야 한다. 그렇지 않으면 특정 노드에 부하가 집중되고 연산 속도가 느려진다.
    • 병렬처리가 이뤄지려면 일정한 수의 고유키가 존재해야 한다.

     

    2. Broadcast Join

    2-1. 개요

    • join할 두 테이블 중 작은 것을 모든 executor에 복사(broadcast)한다.
    • 따라서 all-to-all communication 방법으로 shuffle할 필요가 없다.
    • 각 executor에선 보유하고 있는 큰 테이블의 일부와 broadcast된 테이블을 join한다.
    • 코드 샘플
    import org.apache.spark.sql.functions.broadcast
      
    val joinDF = bigDF.join(broadcast(smallDF), "joinKey")

     

    3. Shuffle Hash Join

    3-1. 개요

    • map-reduce에 기반한 join 방식이다.
    • 맵 단계에선 join 칼럼을 기준으로 DataFrame을 매핑하고, 리듀스 단계에서 DataFrame을 shuffle하여 join key가 같은 것끼리 join을 수행한다.
    • Spark은 디폴트로 Sort Merge Join을 사용하므로 Shuffle Hash Join을 사용하려면 spark.sql.join.perferSortMergeJoin 옵션을 false로 변경해야 한다.

     

    효율적인 Join을 방해하는 것들

    • Data Skewness : join key가 클러스터에 균일하게 분포해 있지 않으면 특정 파티션이 매우 커질 수 있다. 이는 Spark이 parallel하게 연산을 수행하는 것을 방해한다.Limited executor memory
    • All-to-all communication : broadcast join이 아닐 경우, 두 DF의 데이터 모두에서 대규모 shuffle이 발생한다.

     

    Data Skewness를 해결하려면?

    • Repartitioning : 단순히 repartition을 수행하는 것으로 데이터를 파티션들에 더 골고루 분배할 수 있다.
    • Key Salting : 근본적으로 파티셔닝되는 칼럼 키값에 salting을 적용하여 키가 고르게 분배될 수 있도록 할 수 있다.

    Key Salting

     

    Key Salting 이란?

    한쪽으로 키 값이 몰려있는 데이터를 random Int 값을 데이터 뒤에 붙여서, 키 값 자체를 재 분배하는 기술

    우리는 2개의 Table Join 관점에서 데이터 왜곡 (Data Skew)를 이해하려고 노력해야 합니다.

    • 두 개의 테이블 A, B가 있다고 가정해 보겠습니다. 즉, 특정 열과 키를 기반으로 조인하려고 합니다.
    • join 및 기타 집계의 경우 Spark는 단일 키의 다양한 레코드를 단일 파티션에 함께 배치해야 합니다.
    • 특정 키의 Record는 항상 단일 파티션에 있습니다.
    • 마찬가지로 다른 키와 해당 레코드는 다른 파티션에 배포됩니다.

    이제 키에 다른 키에 비해 더 많은 레코드가 있다고 상상해 보십시오. 따라서 해당 파티션은 다른 파티션과 비교하여 매우 커지거나 기울어 집니다. 따라서 특정 파티션을 처리하는 실행기는 처리하는 데 비교적 더 많은 시간이 필요합니다.

    • 이로 인해 전체 Spark 작업이 정지되고 CPU 사용률이 낮고 때로는 메모리 문제가 발생합니다.
    • 이것은 데이터 왜곡(Data Skew)가 무엇이며 스파크의 낮은 성능에 어떤 영향을 미치는지 간단히 설명합니다.

     

    이제 데이터 왜곡 문제를 해결하는 방법을 살펴보겠습니다.

    첫 번째 기술은 Salting or Key-Salting 입니다. 아이디어는 기존 키를 수정하여 데이터를 균일하게 배포하는 것입니다.

    • Table A – Large Table
      • Extend the Existing Key by adding Some-Character + Random No. from some Range
    Existing-Key + "_" + Range(1,10)
    • Table B – Medium Table
      • Use Explode Operation on the Key as shown below
    Explode(Existing-Key , Range(1,10))  -> x_1, x_2, .............,x_10

    키 값=x에 대해 더 많은 레코드가 있다고 가정합니다(키 y 및 z와 비교).

    Before Salting - ("=" represents records - Notice key x has more no. of records)
    
    x ======================================
    y ======
    z ======

    이로 인해 데이터가 왜곡됩니다. 어떤 노드나 실행 프로그램이 key=x 와 관련된 레코드를 처리해야 하므로 더 많은 레코드, 더 많은 데이터를 처리해야 하므로 비교할 때 훨씬 더 많은 시간이 걸립니다. 키 y & z를 처리하는 노드 또는 실행기는 이러한 키의 레코드가 적기 때문에 더 빠르게 처리됩니다.

     

    이를 처리하기 위해 Salting에서 key=x를 x_1, x_2 … (이 작업이 수행되는 방법은 아래 코드 참조). 이제 JUST ONE KEY=x 와 관련된 모든 레코드가 분할됩니다. KEYS(x_1, x_2…).

    • 이 분할은 Random 값(이 경우 1에서 10까지)을 key=x에 추가하여 수행되므로 키 "x"는 x_1, x_2, x_3 …이것은 기본적으로 "x" 대신 10개의 새 키가 있음을 의미합니다. 따라서 단일 키 "x"와 관련된 모든 레코드는 10개의 새 키 세트에 분산됩니다. 이렇게 하면 데이터가 더 분산됩니다
    • 키-값 "x"가 더 큰 숫자로 인한 데이터 스큐의 원인이었기 때문입니다. 키 "x"에 속하는 레코드 수. 이제 Random 값(이 경우 1에서 10까지)을 추가하여 키를 분해했으므로 키 "x"는 x_1, x_2, x_3 …x_10과 같이 더 많은 종류를 갖게 됩니다.
    After Salting - ("=" represents records - Notice the Uniformity now across all keys)
    
    x_1 =========
    x_2 =========
    x_3 =========
    x_4 =========
    x_5 =========
    x_6 =========
    x_7 =========
    x_8 =========
    x_9 =========
    x_10 =========
    y ==========
    z ==========

     

    이제 Spark에서 데이터를 처리할 때(Join 또는 기타 작업) 데이터 왜곡이 제거됩니다.

     

    Salting Method 예제

    # SALTING TECHNIQUE - SKEWNESS REMOVAL CODE 
    
    import org.apache.spark.SparkConf
    import org.apache.spark.sql.{DataFrame, SparkSession}
    import org.apache.spark.sql.functions.{array, concat, explode, floor, lit, rand}
    
    
    def removeDataSkew(bigLeftTable: DataFrame, Col_X: String, rightTable: DataFrame) = {
        var df1 = bigLeftTable
                 .withColumn(Col_X, concat(
                             bigLeftTable.col(Col_X), lit("_"), lit(floor(rand(10000) * 10))))
        var df2 = rightTable
                 .withColumn("newExplodedCol",
                              explode(array((0 to 10).map(lit(_)): _ *)))
    (df1, df2)   ===> THESE ARE THE NEW DFs FROM WHERE SKEWNESS IS REMOVED
    }

     

    # CALLING THE SALTING LOGIC TO REMOVE ORIGINAL DATA SKEW
    # After removing Skewness, we get the Un-skewed New Dfs
    
    val (newDf1, newDf2) = removeDataSkew(oldDf1, "_1", oldDf2)  ==> "_1" represents the left column(the key)
    # NEW JOIN AFTER REMOVING DATA SKEWNESS(THROUGH SALTING TECHNIQUE)
     
    newDf1.join(newDf2,
    newDf1.col("<col_name>")<=> newDf2.col("<col_name>")
    )
    .show(10,false)

    끝.

     


    Reference

    https://dhkdn9192.github.io/apache-spark/spark-join-strategy/

     

    효율적인 Spark Join 전략

    Spark에서 join을 수행하는 경우는 크게 두 가지로 나눌 수 있다. (1) 큰 테이블과 작은 테이블을 조인 또는 (2) 큰 테이블과 큰 테이블을 조인. Spark은 join을 수행하기 위해 Sort Merge Join, Broadcast Join, Sh

    dhkdn9192.github.io

    https://medium.com/appsflyer/salting-your-spark-to-scale-e6f1c87dd18

     

    Salting Your Spark to Scale

    Has your Spark job every crashed due to data skew? Curious to know what that means and explore a way to fix it, then read on to learn…

    medium.com

    https://gankrin.org/fix-data-skewness-in-spark-salting-method/

     

    How To Fix - Data Skewness in Spark (Salting Method) - Gankrin

    Fix - Data Skewness in Spark using Salting Method.Data skew problem is basically related to an Uneven or Non-Uniform Distribution of data .

    gankrin.org

     

    반응형
Designed by Tistory.