본문 바로가기
데이터 엔지니어링

Spark RDD, DAG란?

by 내기록 2022. 8. 12.
반응형

Apache Spark는 빅 데이터 워크로드에 주로 사용되는 오픈 소스 분산 처리 시스템이다. Apache Spark는 빠른 성능을 위해 인 메모리 캐싱과 최적화된 실행을 사용하며, 일반 배치 처리, 스트리밍 분석, 기계 학습, 그래프 데이터베이스 및 임시 쿼리를 지원한다.

 

MapReduce는 데이터의 중간 과정을 HDFS에 저장하기 때문에 오버헤드가 발생한다. Spark는 이러한 문제점을 In-Memory 처리로 해결할 수 있다. 하지만 메모리 특성상 중간에 오류가 나면 데이터가 모두 사라지게되어 처음부터 다시 연산해야 하는 단점이 생긴다.

Spark는 메모리에서 데이터가 유실되면 RDD의 Lineage 기록에 따라 유실되었던 RDD를 다시 생성한다. 이 내용은 뒤에서 다시 한번 언급된다.

 

Apache Spark는 인메모리 처리 엔진과 풍부한 included libraries (그래프 처리용 GraphX, Machine Learning용 MLib, 미니 배치 스트리밍용 Spark Streaming, Spark SQL) 및 SDK(스칼라, 파이썬, 자바, R)를 가진다. 이러한 라이브러리는 분산 처리를 위한 것이므로 분산 그래프 처리, 분산 기계 학습 등에 사용할 수 있다.

 

기능 및 장점

빠른 성능

Apache Spark는 방향성 비순환 그래프(DAG) 실행 엔진을 사용함으로써 데이터 변환에 대한 효율적인 쿼리 계획을 생성할 수 있다. 또한, Apache Spark는 입력, 출력 및 중간 데이터를 인 메모리에 RDD(Resilient Distributed Dataset)로 저장하므로, I/O 비용 없이 반복 또는 대화형 워크로드를 빠르게 처리하고 성능을 높일 수 있다.

 

다양한 언어를 제공함으로써 애플리케이션을 신속하게 개발 할 수 있다.

Apache Spark는 기본적으로 Java, Scala 및 Python을 지원하므로, 애플리케이션을 구축할 수 있도록 다양한 언어를 제공한다. 또한, Spark SQL 모듈을 사용하여 SQL 또는 HiveQL 쿼리를 Apache Spark에 제출할 수 있다. 애플리케이션을 실행하는 것 외에도, Apache Spark API를 Python과 대화식으로 사용하거나 클러스터의 Apache Spark 셸에서 Scala를 직접 사용할 수 있다. Zeppelin을 사용하여 데이터 탐색과 시각화를 위한 대화형 협업 노트북을 생성할 수도 있다.

 

다양한 워크플로우 생성

Apache Spark에는 기계 학습(MLlib), 스트림 처리(Spark Streaming) 및 그래프 처리(GraphX)용 애플리케이션을 구축하는 데 도움이 되는 몇 가지 라이브러리가 포함되어 있다. 이러한 라이브러리는 Apache Spark 에코시스템과 긴밀하게 통합되며, 다양한 사용 사례를 해결하는 데 바로 활용할 수 있다.

 

 

사용 사례

Amazon Kinesis나 Apache Kafka에서 또는 Spark Streaming의 다른 데이터 스트림에서 실시간 데이터를 사용하고 처리한다. 내결함성이 지원되는 방식으로 스트리밍 분석을 수행하고, Amazon S3 또는 HDFS에 결과를 작성한다.

기계 학습

Apache Spark의 MLlib으로 다양한 확장 가능한 기계 학습 알고리즘을 사용할 수 있으며, 사용자의 자체 라이브러리를 사용할 수도 있다. Spark는 작업 중에 데이터 세트를 In-Memory에 저장함으로써 기계 학습 워크로드에서 흔히 발생하는 반복 쿼리에 뛰어난 성능을 나타낸다.

SQL 또는 HiveQL과의 지연 시간이 짧은 대화형 쿼리를 위해 Spark SQL을 사용한다. Apache Spark는 Amazon S3에 있는 데이터 세트에 임의 액세스할 수 있다. 또한, ODBC 또는 JDBC 연결을 통해 Zeppelin 노트북이나 BI 도구를 사용할 수 있다.

 

 

 

RDDs

RDD(Resilient, Distributed, Dataset)는 변경 불가능한 분산 컬렉션이다.(immutable distributed collection of objects)
: 여러 노드에 흩어져 있으며 병렬 처리될 수 있는 아이템들의 모음을 표현한다.

RDD는 클러스터의 여러 서버에 걸쳐 파티션된(partitioned) 데이터 세트의 논리적 참조이다. 

** spark partition은 제일 아래에서 다시 설명한다.

val rdd = sc.textFile("/some_file",3)  
val lines = sc.parallelize(List("this is","an example"))

sc.testFile을 호출할 때 사용되는 argument '3'이 파티션의 수이다.

 

 

RDDs Operations(Transformations and Actions)

RDD에서 수행할 수 있는 작업에는 두 가지 유형이 있다 : Transformations and Actions


Transformation은 RDD에 일부 기능을 적용하고 새 RDD를 생성한다. 데이터셋은 읽기 전용이므로 데이터에 어떠한 변형을 가할 때 계속 새로운 RDD를 만드는 방법 밖에 없다. 또한 새 RDD는 상위 RDD에 대한 포인터를 유지한다.

이러한 RDD의 생성 순서를 Lineage라고 하며 Lineage는 DAG(Directed Acyclic Graph)의 형태를 가진다.
이곳에는 모든 RDD 생성 과정이 기록되어 있기 때문에 메모리에서 데이터가 유실되면 Lineage 기록에 따라 유실되었던 RDD를 생성할 수 있다. 

Transformations는 하나 이상의 새로운 RDD를 생성하는 lazy operations로 Action을 호출할 때까지 실행되지 않는다.
e.g. map,filter, reduceByKey, join, cogroup, randomSplit

 

RDD에 적용될 수 있는 두 가지 transformations(변환)에는 narrow transformation과 wide transformation이 있다.

wide transformation은 기본적으로 stage boundaries를 야기한다.

 

Narrow transformation - 파티션 간에 데이터를 섞을(shuffled) 필요가 없다.

for example, Map, filter etc..

 

wide transformation - 데이터를 섞을 필요가 있다.

for example, reduceByKey etc..

 

Transformations를 적용하면 최종 RDD(s)의 모든 상위-parent- RDD로 RDD 계보-lineage-를 구축한다.

Transformations는 게으르다-lazy-.

즉, 즉시 실행되지 않고 작업을 호출한 후에만 변환이 실행된다.

 

Action은 Lineage에 기록된 RDD 생성 과정을 실제로 생성하는 명령어이다. Spark에서 작업이 시작되면 Stage 단위로 분할되고 이것은 다시 여러 개의 Task로 나누어 실행된다. 각 노드에 있는 데이터 분포 등에 따라 최적의 RDD 생성 경로를 찾는데, 이를 Lazy-Execution이라 한다.

최적의 RDD 생성 경로는 각 노드에 저장된 데이터의 셔플이 최소한으로 일어나는 것을 말한다. 노드 간의 셔플이 많이 일어나는 경우를 넓은 의존성이라 하고 적게 일어나는 경우를 좁은 의존성이라 한다.

넓은 의존성의 경우 데이터 간 셔플이 많이 일어나기 때문에 성능이 저하된다.

 

val rdd = sc.textFile("spam.txt")
val filtered = rdd.filter(line => line.contains("money"))
filtered.count()

 

sc.textFile()과 sc.filter()은 즉시 실행되지 않는다. RDD에서 action-여기서는 filtered.count()-를 호출한 후에만 실행된다.

Action은 결과를 특정 위치에 저장하거나 표시하는데 사용된다.

또한, filter.toDebugString 명령을 사용하여 RDD lineage 정보를 확인할 수도 있다(위 예제에서 filtered는 RDD이다).

 

RDD는 실행되어야 하는 명령어 세트로 생각할 수 있다. 첫번째 명령어는 load instruction이다-여기서는 textFile-.

 

https://medium.com/@goyalsaurabh66/spark-basics-rdds-stages-tasks-and-dag-8da0f52f0454

 

New RDD is created after every transformation.(DAG graph)

https://medium.com/@goyalsaurabh66/spark-basics-rdds-stages-tasks-and-dag-8da0f52f0454

 

DAG(Directed Acyclic Graph),Stages and Tasks

RDD의 생성 순서를 Lineage라고 하는데 Lineage는 DAG(Directed Acyclic Graph)의 형태를 가진다.

즉, DAG는 RDD에 적용된 operations의 track을 보유하는 그래프이다.

https://techvidvan.com/tutorials/apache-spark-dag-directed-acyclic-graph/

 

DAGScheduler는 단계 지향 스케줄링을 구현하는 Apache Spark의 스케줄링 계층이다.
논리적 실행 계획(RDD transformations을 사용하여 생성된 RDD lineage)을 물리적 실행 계획(using stages)으로 변환한다.

 

DAGScheduler Transforming RDD Lineage(DAG) Into Stage DAG(Physical Execution Plan)

https://medium.com/@goyalsaurabh66/spark-basics-rdds-stages-tasks-and-dag-8da0f52f0454

 

아래에서 위로 가면서 stage를 생성한다.

앞서 언급된 것처럼 DAG 스케줄러는 그래프를 여러 단계로 분할하고 각 단계는 transfomation(변환)을 기반으로 생성된다.

narrow transformation은 single stage로 함께 그룹화된다(pipelined).

 

val input = sc.textFile("log.txt")
val splitedLines = input.map(line => line.split(" "))
                        .map(words => (words(0), 1))
                        .reduceByKey{(a,b) => a + b}

위 예제는 2개의 stage를 생성한다.

 

Shuffled RDD is created by reduceByKey wide transforamtion

https://medium.com/@goyalsaurabh66/spark-basics-rdds-stages-tasks-and-dag-8da0f52f0454

 

Shuffle Operation 또는 wide transformation은 stage의 경계가 된다. 즉, 이미지의 stage는 두 번의 셔플 작업으로 구분된다.

그런 다음 DAG 스케줄러는 task scheduler(작업 스케줄러)에 stage를 제출한다. 제출된 작업 수는 파일 텍스트에 있는 파티션 수에 따라 달라진다. 

예를 들어, 이 예에서 파티션이 4개라고 가정하면 slaves/cores가 충분할 경우 tasks가 4개의 세트로 생성되어 병렬로 제출된다.

https://medium.com/@goyalsaurabh66/spark-basics-rdds-stages-tasks-and-dag-8da0f52f0454

 

상호 의존적이지 않은 단계는 병렬 실행을 위해 클러스터에 제출될 수 있다. 이렇게 하면 클러스터에서 병렬화 기능을 최대화할 수 있다.

 

https://medium.com/@goyalsaurabh66/spark-basics-rdds-stages-tasks-and-dag-8da0f52f0454

 

위 이미지를 보면 Stage0과 Stage1은 상호 의존적이지 않기 때문에 병렬로 실행된다.

2단계(Join 연산)는 0단계와 1단계에 의존하므로 두 단계가 모두 완료된 후에 실행된다.

 

결합된 데이터에 대한 후속 작업은 순차적으로 진행해야 하므로 동일한 단계(Stage2)에서 수행하며, 이전 작업이 완료될 때까지 작업을 시작할 수 없다.

 

Spark Partitions

RDD는 단일 노드에 들어갈 수 없는 경우 다양한 노드에 걸쳐 분할되어야 하는 다양한 데이터의 집합이다. 즉, 파티션의 수가 많을수록 병렬성이 높아진다. RDD의 이러한 파티션은 네트워크의 모든 노드에 분산된다.

Partition은 RDDs나 Dataset을 구성하고 있는 최소 단위 객체이다. 각 Partition은 서로 다른 노드에서 분산처리된다.

Spark에서는 하나의 최소 연산을 Task라고 표현하는데, 이 하나의 Task에서 하나의 Partition이 처리된다.

또한, 하나의 Task는 하나의 Core가 연산 처리한다.

 

1Core = 1Task = 1Partition

 

설정된 Partition의 수에 따라 각 Partition의 크기가 결정된다. 이 Partition의 크기가 결국 Core당 필요한 메모리 크기를 결정하게 된다.

- Partition 수 -> Core 수

- Partition 크기 -> 메모리 크기

 

따라서 Partition의 수와 크기가 Spark 성능에 큰 영향을 미친다. 통상적으로는 Partition의 크기가 클수록 메모리가 더 필요하고 수가 적을수록 Core가 더 필요하다.

- 적은 수의 Partition = 크기가 큰 Partition

- 많은 수의 Partition = 크기가 작은 Partition

 

즉, Partition의 수를 늘리면 Task당 필요한 메모리를 줄이고 병렬화 정도를 늘릴 수 있다.

 

Spark Partition의 종류

쓰이는 때에 따라 3가지로 구분할 수 있다.

- Input Partition

- Output Partition

- Shuffle Partition

 

1) Input Partition

관련 설정 : spark.sql.files.maxpartitionBytes

 

Input Partition은 처음 파일을 읽을 때 생성하는 partition이다. 관련 설정으로 Input Partition의 크기를 설정할 수 있으며 기본값은 128MB이다.

파일의 크기가 128MB보다 크면 Spark에서 128MB만큼 쪼개면서 파일을 읽는다.

파일의 크기가 128MB보다 작다면 그대로 읽어서 파일 하나당 하나의 Partition이 된다.

 

대부부의 경우, 필요한 컬럼만 뽑아서 쓰기 때문에 파일이 128MB보다 작지만 가끔씩 큰 파일을 다룰 경우에는 이 설정값을 조절해야 한다.

 

2) Output Partition

관련 설정 : df.repartition(cnt), df.coalesce(cnt)

 

Output Partition은 파일을 저장할 때 생성하는 Partition이다. 이 Partition의 수가 HDFS 상의 마지막 경로의 파일 수를 지정한다.

기본적으로 HDFS는 큰 파일을 다루도록 설계되어 있어 크기가 큰 파일로 저장하는 것이 좋다. (너무 작지 않게 너무 과하게 크지도 않게)

 

보통 HDFS Blocksize에 맞게 설정하면 되는데, 카카오 Hadoop 클러스터는 256MB로 설정되어 있어서 통상적으로 파일 하나의 크기를 256MB에 맞도록 Partition수를 설정한다고 한다.

 

Partition의 수는 관련 설정을 통해 설정한다. repartition과 coalesce를 이용해 Partition 수를 줄일 수 있다.

 

- 보통 groupBy 집계 후 저장할 때 데이터의 크기가 작아진다. 이때 파일의 크기를 늘리기 위해 위 설정들을 사용해 partition 수를 줄일 수 있다.

- df.where()를 통해 필터링을 하고 그대로 저장하면 파편호가 생긴다. 그래서 repartition(cnt)를 한 후 저장한다.

 

Shuffle Partition

관련 설정 : spark.sql.shuffle.partitions

 

 Spark 성능에 가장 크게 영향을 미치는 Partition으로 Join, groupBy 등의 연산을 수행할 때 Shuffle Partition이 쓰인다.

설정값에 따라 Join, groupBy 수행 시 Partition의 수(또는 Task의 수)가 결정된다.

 

이 설정값은 Core 수에 맞게 설정하라고 하지만 Partition의 크기에 맞춰 설정해야 한다.

이 Partition의 크기가 크고 연산에 쓰이는 메모리가 부족하다면 shuffle Spill이 발생하기 때문이다.

* Shuffle spill : 데이터를 직렬화하고 스토리지에 저장, 처리 이후에는 역 직렬화를 하고 연산을 재개한다

 

Shuffle spill이 일어나면 Task가 지연되고 에러가 발생할 수 있다. 또한 Hadoop 클러스터의 사용률이 높다면 Spark가 강제 종료될 수 있다.

 

Shuffle Spill은 Memory Limit Over처럼 메모리 부족으로 나타나는데 보통 이에 대한 대응은 Core당 메모리를 늘리는 것으로 해결한다. 하지만 무작정 코어를 올리기 전에 옵션을 우선적으로 고려해 설정해야 한다.

 

일반적으로 하나의 Shuffle Partition의 크기가 100~200MB가 될 수 있도록 spark.sql.shuffle.partitions 수를 조절하는 것이 최적이다.

 

* Spark partition 출처 : https://tech.kakao.com/2021/10/08/spark-shuffle-partition/

 

 

 

 

References

https://m.blog.naver.com/PostView.naver?isHttpsRedirect=true&blogId=cafesky7&logNo=221138921615 

https://medium.com/@goyalsaurabh66/spark-basics-rdds-stages-tasks-and-dag-8da0f52f0454

https://aws.amazon.com/ko/elasticmapreduce/details/spark/

https://techvidvan.com/tutorials/apache-spark-dag-directed-acyclic-graph/

https://data-flair.training/blogs/dag-in-apache-spark/

반응형

댓글