목차 LIST
Join operation은 스파크에서 가장 대중적으로 사용되는 transformation이다. 스파크는 join을 어떻게 구현할지 선택하기 위한 몇가지 알고리즘을 사용한다. 이 알고리즘들은 스파크가 작업을 어떻게 처리할지 결정하는 단계에서 사용된다.
아래에서 스파크가 조인을 사용하기 위한 전략을 결정하는지 알아보자.
Spark Join 전략의 요소들
Data Size
Spark는 비용이 많이 드는 셔플과 정렬을 피할 수 있는 join 전략을 선택한다. 따라서, 데이터가 브로드캐스트 될 수 있다면 해시 기반 join 전략을 더 선호한다.
셔플 (shuffle)?
Apache Spark에서 shuffle은 데이터를 재분배하는 과정을 의미합니다. shuffle은 특정 작업을 수행하기 위해 다른 노드(executor) 간에 데이터를 재분배해야 할 때 발생합니다.
브로드캐스트?
크기가 작은 데이터 집합을 네트워크의 모든 노드에 전송하는 과정을 의미한다. 모든 노드는 같은 데이터에 접근할 수 있다.
해시 기반 조인 전략?
데이터를 합칠 때 사용되는 방법으로, 데이터의 key를 사용하여 해시 테이블을 만들고 이를 이용해 빠르게 매칭할 수 있는 데이터를 찾는다. 매우 효율적인 방법이기 때문에 큰 데이터 집합에서 특정 조건에 맞는 데이터를 빠르게 찾아내는데 사용된다.
Type of Join
비등가 조인("<,>,≥, ≤”)은 특정되지 않은 값의 범위를 비교하는 것이 필요하기 때문에 nested loop가 필요하다.
그러므로, 이러한 join types에 Spark는 브로드캐스트 nested loop join과 Cartesian product join을 지원한다.
내장 루프 조인 (nested loop join)?
DBMS에서 조인 연산을 수행할 때 사용되는 전략 중 하나로 두 테이블을 조인할 때 사용되는 가장 단순한 형태의 조인 방법이다.
이 방법은 한 테이블의 각 row를 다른 테이블의 각 row와 차례대로 비교하며 조인 조건에 맞는 경우 결과에 포함시키는 과정을 반복한다.
카테시안 프로덕트 조인 (Cartesian Product Join)?
가능한 모든 조합을 생성하여 조인을 수행하는 방식으로 비등가 조인을 수행할 때 필요한 조건을 만족하는 특정 조합을 찾기 위해 사용된다.
Join hints
Spark의 join 정책에 더 많은 제어를 원한다면 /*+ BROADCAST(table name)*/* 와 같은 Join hints를 사용하면 된다.
종류는 다음과 같다.
BROADCAST | BROADCASTJOIN | MAPJOIN
MERGE | SHUFFLE_MERGE | MERGEJOIN
SHUFFLE_HASH
SHUFFLE_REPLICATE_NL
여러 개의 join hint를 사용했다면, 우선순위는 아래와 같이 정해진다.
BROADCAST > MERGE > SHUFFLE_HASH > SHUFFLE_REPLICATE_NL
Spark Join strategies
위 이미지를 보면, 등가조인과 비등가조인일 때로 우선 분리된다.
그리고 사용자가 직접 언급한 hint 가 있는지 확인하고, 없다면 위 flow에 따라 Join 방식이 선택된다.
이제 flow에서 언급된 Join 들에 대해 하나씩 알아보자.
Broadcast Hash Join
Broadcast Hash Join(BHJ)에서는 작은 데이터셋의 조인 키에 대한 해시테이블을 생성하고, 이 해시 테이블을 더 큰 데이터셋의 파티션이 위치한 모든 노드로 브로드캐스팅 한다. 그 다음 큰 데이터셋의 특정 키에 해당하는 데이터가 해시 테이블 안에 있는지 조회(lookup)한다. 해시테이블 조회이기 때문에, 동등 조건만 지원된다. Spark는 작은 데이터셋의 크기 기준을 결정하기 위한 설정을 제공한다.
참고로, 해시 테이블은 각 '키'에 대한 고유 해시값을 생성하고, 이 값을 이용해 데이터를 저장하고 조회하기 때문에 주어진 '키'가 해시테이블에 존재하는지 여부는 빠르게 조회할 수 있지만 '키'의 범위를 기반으로 한 검색은 지원하지 않는다.
spark.sql.autoBroadcastJoinThreshold=10MB
이 설정값을 세팅할 때 명심해야 하는 것은, 해시테이블이 드라이버(Spark context의 Spark Driver) 내에 캐싱되어야 하기 때문에 그 크기가 드라이버에 맞아야 한다는 것이다. 이후 해당 해시테이블은 각 노드로 브로드캐스팅된다. 이 전략은 적어도 하나의 데이터셋이 드라이버로 수집될 만큼 충분히 작을 때 선호된다.
스파크 드라이버가 작은 데이터셋에 대한 해시테이블을 생성하고 이를 메모리에 저장한 후, 필요한 워커 노드들에게 이 데이터를 전송하여 효율적인 데이터 조인을 가능하게 한다는 말이다.
스파크가 자동으로 해당 조인을 선택하지 않도록 하려면 설정값을 -1로 설정하면 된다.
BHJ는 shuffle을 발생시키지도 않고, 어떠한 정렬 오퍼레이션을 발생시키지도 않는다. 대부분의 상황에서 가장 빠른 조인 전략으로 사용된다.
Shuffle Hash Join
이름으로 추측할 수 있듯이 이 조인은 먼저 데이터를 shuffle 한 후 hash를 적용한다. 조인에 사용되는 두 데이터셋 모두 shuffle 과정을 거쳐서 동일한 조인 키를 가진 데이터는 같은 executor로 전송된다. 즉, 각 executor는 두 데이터셋에서 조인 키가 일치하는 데이터의 부분 집합을 받게 된다.
이후, executor 내에서 두 데이터셋 중 상대적으로 작은 데이터셋의 조인키를 사용하여 해시 테이블을 생성한다. 그 후 다른 데이터셋(상대적으로 큰 데이터셋)에서 각 데이터 항목의 조인 키를 해시 테이블에서 검색(lookup)한다. 이 과정을 통해 조인 조건에 맞는 데이터를 찾아 결합한다.
순서는 다음과 같다.
partition → shuffle → Hashing → In memory Join → Result aggregation
shuffle 때문에 비용이 많이 드는 조인이기 때문에, spark는 이 조인의 사용 여부를 on/off 할 수 있는 설정을 제공한다.
# 참고로 이건 sortMergeJoin 사용여부를 체크하는 것이므로, false로 하면 shuffleHashJoin을 사용하겠다는 뜻이다.
# 자세한 내용은 위 spark join strategies flow를 확인하자
spark.sql.join.preferSortMergeJoin=false/true
이 조인 전략은 적어도 하나의 데이터셋이 해시테이블을 만들기에 충분히 작을 때 선호된다. (브로드캐스트 임계값과 셔플 파티션 수의 곱보다 작을 때) 또한, 가장 작은 데이터셋이 적어도 3배 작아야 하며, 그렇지 않으면 sort merge join이 사용된다.
Shuffle Sort Merge Join
3개의 과정으로 구성되는 조인 방법이다. 첫 번째로, shuffle 과정을 진행한다. : 동일한 조인 키를 가지는 데이터셋을 동일한 executor로 이동시킨다. 그런 다음 executor node에서 노드상의 데이터셋 파티션을 조인 키 기반으로 정렬하고 조인키 기반으로 병합한다.
이 과정에서 "정렬"이 적용되므로 조인 키들은 정렬이 가능한 데이터 타입이어야 한다.
shuffle 과정에서는 데이터가 네트워크를 통해 여러 노드 간에 재분배되며, 이 과정은 조인 연산이나 그룹화 같은 작업을 효율적으로 수행하기 위해 필요한 데이터의 재배치를 의미한다.
동일한 조인 키를 가진 데이터를 같은 executor에 위치하게 함으로써, 데이터 조인이나 그룹 연산을 더 효율적으로 수행할 수 있다.
"노드상의 데이터셋 파티션"
- 언제 파티셔닝 되는가?
분산 데이터 처리 시스템(ex. hadoop, spark 등)에서 데이터는 처음 시스템에 로드될 때 파티셔닝되기 시작한다. 데이터를 시스템에 입력하거나, 대규모 데이터셋을 처리하기 위해 저장하는 초기단계에서부터 이미 데이터는 논리적이거나 물리적인 단위로 나눠진다. 파티셔닝의 주 목적은 데이터를 효율적으로 저장하고 분산 처리를 용이하기 위함이다.
- 데이터가 파티션 되는 방식?
1) 자동 파티셔닝 : 대부분의 분산 처리 시스템은 데이터를 자동으로 파티션한다. 예를 들어, HDFS는 파일을 블록으로 나누고, 이 블록들을 클러스터의 다양한 노드에 분산하여 저장한다.
2) 사용자 정의 파티셔닝 : 시스템에 따라 사용자가 파티셔닝 방식을 정의하고 조절할 수 있다. 예를 들어, Spark는 데이터를 로드할 때 'partitionBy' 같은 함수를 사용하여 데이터를 특정 키에 따라 파티션할 수 있다.
데이터를 병합하기 위해 스파크는 각 테이블마다 한개씩 두 개의 포인터를 사용하고, 키가 일치하도록 포인터를 이동시킨다. 데이터셋이 정렬되어 있기 때문에, 키가 일치하지 않는 순간에 merge 또는 join 작업을 중단한다. 만약, 동일한 값을 가지는 여러 키가 존재할 경우, 이것을 그룹화하고 카테시안 곱을 수행한다.
동일한 값을 가지는 여러 키가 존재할 경우에 대한 예시
데이터셋 A (사용자 정보):
사용자ID (키), 사용자 이름 (값)(1, "Alice"), (2, "Bob"), (3, "Charlie")
데이터셋 B (사용자 취미 정보):
사용자ID (키), 취미 (값)(1, "등산"), (1, "수영"), (2, "독서"), (3, "수영"), (3, "요리")
"동일한 값을 가지는 여러 키"의 예시로 사용자 ID 1과 3이 두 개씩 있다.
카테시안 곱 적용 결과(조인 결과)
: (1, "Alice", "등산"), (1, "Alice", "수영"), (2, "Bob", "독서"), (3, "Charlie", "수영"), (3, "Charlie", "요리")
카테시안 곱?
그룹화된 데이터에 대해 카테시안 곱을 수행한다는 것은, 각 그룹 내의 모든 데이터 항목들 사이에서 가능한 모든 조합을 생성한다는 것을 의미한다. 카테시안 곱은 두 집합 사이의 모든 가능한 쌍을 만드는 연산으로, 데이터베이스의 조인 연산에서 자주 사용된다.
예를 들어, 한 집합에 A, B가 있고 다른 집합에 1, 2가 있다면 이들의 카테시안 곱은 (A,1)(A,2)(B,1)(B,2)가 된다.
Broadcast Nested Loop Join
스파크의 fallback join 전략(후순위)으로 만약 비등가 조인이고 힌트 타입이 명시되지 않은 경우 이 조인 방식이 사용된다. Broadcast Nested Loop Join은 한 데이터셋을 전체적으로 브로드캐스트하여 내장 루프를 사용해 데이터를 조인하는 방식으로 진행된다. 기본적으로 데이터셋 1의 모든 레코드가 데이터셋 2의 모든 레코드와 조인하는 방식이다.
for record_d1 in dataset_1:
for record_d2 in dataset_2:
do check for join
끝!
References
https://faun.pub/primer-on-spark-join-strategy-134e7340f7a6
'데이터 엔지니어링' 카테고리의 다른 글
Dataflow Windowing : watermark (0) | 2024.05.17 |
---|---|
NGINX TCP / UDP Load Balancing (0) | 2024.04.17 |
Apache Spark : RDD, Transformation, Action, Persist (0) | 2024.02.27 |
Apache Spark : SparkContext vs SparkSession (0) | 2024.02.26 |
Apache Spark : Spark 소개 및 구조 (0) | 2024.02.06 |
댓글