본문 바로가기
데이터 엔지니어링

Apache Spark : RDD, Transformation, Action, Persist

by 내기록 2024. 2. 27.
반응형

목차 LIST

     

    RDD

    스파크는 연산 과정을 클러스터 전체에 걸쳐 자동으로 병렬화하여 분산 배치된 연산 작업들의 모음으로 표현한다. 이 모음은 RDD(Resilient Distributed Dataset, 탄력적인 분산 데이터세트)라고 부른다. RDD는 분산 데이터와 연산을 위한 스파크의 핵심 개념이다. 즉, RDD는 분산되어 존재하는 데이터 요소들의 모음이다.

     

     

    RDD 기초

    RDD는 쉽게 말하면 '분산되어 있는 변경 불가능한 객체 모음'이다. 각 RDD는 클러스터의 서로 다른 노드들에서 연산이 가능하도록 여러 개의 파티션으로 나뉜다. RDD는 외부 데이터세트를 로드하거나 드라이버 프로그램에서 객체 컬렉션(ex. list, set)을 분산시키는 두 가지 방법으로 생성할 수 있다. 아래는 SparkContext, textFile()을 사용하여 텍스트 파일을 문자열 RDD로 로딩하는 것이다.

     

    >> lines = sc.textFile("README.md")

     

    생성된 RDD는 두 가지 타입의 연산을 지원하는데, 트랜스포메이션(transformation)과 액션(action)이다.

     

    트랜스포메이션(transformation)

    스파크에서 transformation은 RDD와 같은 분산 데이터 컬렉션에 적용하여 새로운 분산 데이터셋을 생성하는 오퍼레이션이다.

    transformation의 예로는 map, filter, groupBy, join 등이 있다. RDD나 데이터프레임에서 transformation을 호출할 때 스파크는 바로 연산 작업을 진행하지 않는다. 대신, 논리적 실행 계획만 짜놓는데, 이것은 실행될 transformation의 시퀀스를 보여주는 DAG(Directed Acyclic Graph)이다. Action을 하기 전까지 transformation은 일어나지 않는다.

     

    트랜스포메이션은 연산 결과로 신규 RDD를 생성한다. 즉, 기존의 inputRDD를 변경하지 않는다는 것을 명심하자. 트렌스포메이션의 예로는 표현식과 일치하는 데이터를 걸러내는 filter이 있다.

    아래 코드는 Python이라는 단어를 포함한 문자열만 가진 새로운 RDD(pythonLines)를 생성한다.

    pythonLines = lines.filter(lambda line: "Python" in line)

     

    좁은 트랜스포메이션(Narrow Transformation)

    좁은 트랜스포메이션은 입력 파티션에 대한 연산이 독립적으로 이루어지며, 연산의 결과로 생성되는 출력 파티션은 오직 해당 입력 파티션의 데이터에만 의존한다. 이는 연산을 수행할 때, 다른 파티션의 데이터를 참조하거나 필요로 하지 않는 다는 것을 의미한다. 이러한 트랜스포메이션은 셔플(데이터 재분배 과정)이 필요하지 않기 때문에, 대부분의 경우 다른 파티션의 데이터를 읽을 필요가 없다.

     

    다시 말해 각 입력 파티션을 독립적으로 처리하고, 그 결과로 새로운 파티션을 생성하는 연산을 의미한다. 그리고 결과로 생성되는 새로운 데이터는 원본 데이터가 위치한 같은 노드에 저장될 수 있기 때문에 다른 노드로의 데이터 전송이 필요하지 않으므로 네트워크 트래픽 또한 발생하지 않는다.

     

    예를 들어, map, filter와 같은 연산이 여기에 해당된다.

    map 연산을 사용하여 모둔 데이터에 동일한 함수를 적용하는 경우를 생각해보면, 하나의 입력 파티션에 있는 데이터에 이 함수를 적용하고 이 연산의 결과는 오직 이 입력 파티션의 데이터에만 기반하며 다른 파티션의 데이터와는 무관하게 출력 파티션을 생성한다.

    좁은 트랜스포메이션은 실행 비용이 상대적으로 낮고, 성능이 좋기 때문에 빠른 처리가 가능하다.

     

    📌 체크포인팅과 off_heap 영속화

    좁은 트랜스포메이션의 경우 대부분의 작업이 메모리 내에서 빠르게 처리되지만 일부 작업은 GC 오버헤드나 메모리 부담이 클 수 있다. 이런 경우 체크포인팅이나 off_heap(힙 외부) 영속화 기법을 사용할 수 있다. 체크포인팅은 계산 중에 데이터셋의 중간 상태를 디스크와 같은 안정적인 저장소에 저장함으로써 실패 시 복구 비용을 줄이고 처리 과정의 안정성을 높이는 방법이다.
    off_heap 영속화는 JVM 힙 메모리 외부에 데이터를 저장하여 GC 오버헤드를 줄이는 방법으로 메모리 사용 효율성을 개선한다.

     

    📌 heap과 off_heap

    메모리 관리에서 "힙(Heap)"과 "오프-힙(Off-Heap)"은 데이터를 저장하는 두 가지 방법이다. 이 두 영역은 메모리 사용과 관련하여 서로 다른 특성과 용도를 가지고 있습니다. 메모리는 힙, 스택(Stack), 그리고 다른 영역들(예: 코드 영역, 데이터 영역 등)으로 구분될 수 있기 때문에 메모리가 힙과 오프-힙으로만 구성되는 것은 아니다. 

    1.  힙(Heap) 영역은 동적 메모리 할당을 위해 사용되는 메모리의 한 부분으로 프로그램이 실행되는 동안 필요에 따라 메모리를 할당하고 해제할 수 있다. 예를 들어, 자바에서는 객체가 힙에 할당되며, 가비지 컬렉터가 더 이상 사용되지 않는 객체를 자동으로 정리한다.

    2. 오프-힙(Off-Heap) 영역은 자바 가상 머신(JVM)의 힙 메모리와 달리 가비지 컬렉터(GC)의 관리를 받지 않는다. 
    - GC가 큰 오버헤드를 유발하거나 지연 시간이 중요한 애플리케이션에서는 오프-힙 메모리를 사용하여 대량의 데이터를 저장하고 처리할 수 있다.
    - JVM 밖의 메모리를 직접 사용하기 때문에 C/C++ 같은 네이티브 코드나 라이브러리와 상호작용 하는 경우 유용하다. 예를 들어, 네이티브 코드를 통해 할당된 메모리를 자바 코드에서 직접 접근하거나, 반대로 자바에서 할당한 오프-힙 메모리를 네이티브 코드에서 사용할 수 있다.
    - GC로 인한 지연이나 메모리 부족 문제를 더 잘 관리할 수 있다. 큰 메모리 할당이 필요한 애플리케이션에서는 오프-힙 메모리를 통해 JVM의 힙 크기 제한을 우회하고, 애플리케이션의 메모리 사용량을 직접 제어할 수 있다.

     

    넓은 트랜스포메이션(Wide Transformation)

    넓은 트랜스포메이션은 입력 데이터의 여러 파티션 간에 데이터가 재분배되어야 하는 경우를 말한다. 이러한 트랜스포메이션은 셔플을 필요로 하며, groupBy, reduceByKey, join과 같은 연산이 여기에 속한다. 넓은 트랜스포메이션은 종종 네트워크를 통한 대량의 데이터 이동을 발생시키므로 결과적으로 실행 시간이 더 오래 걸리며 리소스 사용량이 더 많다.

     

     

    액션(action)

    액션은 기존에 있는 RDD로 결과 값을 계산하고 결과를 돌려주거나 외부 스토리지(ex. HDFS)에 저장하는 오퍼레이션이다. 액션의 예로는 count, collect, saveAsTextFile 등이 있다. 액션을 호출할 때, 스파크는 트랜스포메이션을 통해 계획된 전체 로지컬 실행 계획(DAG)를 실행한다. 그리고 이것을 실행하기 전에 실행 계획을 최적화한다.

     

    아래 코드는 첫 번째 요소를 되돌려주는 fist()를 사용한 예제이다.

    >> pythonLines.first()

     

    새로운 액션을 호출할 때마다 전체 RDD가 처음부터(from scratch) 계산된다는 것은 중요한 점이다. 이런 비효율성을 피하기 위해 중간 결과를 영속화(persist)하는 방법을 쓸 수 있다.

     

    Lazy Evaluation

    Lazy Evaluation은 스파크가 데이터 트랜스포메이션과 액션이 정의됐을때 즉시 실행하는 것이 아닌 지연 방식으로 실행하는 것을 말한다.

    즉, 스파크가 액션을 만나기 전까지는 실제로 트랜스포메이션을 처리하지 않는다는 것을 뜻한다. 

    RDD에 대한 트랜스포메이션을 호출할 때(ex. map()) 그 연산이 즉시 수행되는 것은 아니라는 것을 말한다. 대신 내부적으로 스파크는 메타데이터에 이러한 연산이 요청되었다는 사실만 기록한다. 그러므로 RDD가 실제로 어떤 특정 데이터를 갖고 있는게 아닌 트랜스포메이션들이 생성한 데이터를 어떻게 계산할지에 대한 명령어를 갖고 있다고 생각하는 것이 구조를 이해하기에 더 쉽다.

    RDD에 데이터를 로드하는 것 또한 트랜스포메이션과 마찬가지로 여유롭게 수행된다. 그러므로 sc.textFile()을 호출했더라도 실제로 필요한 시점이 되기 전까지는 데이터를 로딩하지 않는다. 

     

    이 방식은 최적화와 성능에서 몇 가지의 이점을 제공한다. 

    스파크의 lazy evaluation은 특정 시점에 실행된다. 논리적인 실행 계획은 즉시 실행되지 않고, 스파크는 액션이 호출될 때까지 계산을 미룬다.

     

     

    영속화(캐싱)

    앞에서 논의된 것처럼 스파크 RDD는 lazy evaluation. 즉, 여유로운 방식으로 수행되지만, 때로는 동일한 RDD를 여러 번 사용하고 싶을 때도 있을 것이다. 이를 고려하지 않으면 스파크는 RDD에서 호출하는 액션들에 대한 모든 의존성을 재연산하게 된다. 이는 데이터를 여러 번 스캔하는 반복 알고리즘들에 사용되면 매우 무거운 작업이 될 수 있다.

     

    간단한 예제를 보면, RDD에 대한 동일한 연산이 액션이 실행될 때마다 동일하게 수행된다.

    val result = input.map(x => x*x) # 트랜스포메이션
    
    println(result.count()) # 액션을 통해 연산 수행
    println(result.collect().mkString(",")) # 액션을 통해 연산 수행

     

    RDD를 여러 번 반복 연산하는 것을 피하라면 스파크에 데이터 영속화를 사용한다. RDD 영속화에 대한 요청을 하면 RDD가 계산한 노드들은 그 파티션을 저장하고 있게 된다. 영속화된 데이터를 갖고 있는 노드에 장애가 생기면 스파크는 필요 시 유실된 데이터 파티션을 재연산한다. 

     

    스파크는 RDD 재사용을 위해 persistence, caching, checkpointing이라는 옵션을 제공한다.

    큰 규모의 데이터셋은 영속화 비용이 크게 높아지므로 오히려 재연산을 유도하는게 좋을 수 있다. 따라서 무조건 영속화를 하는 것이 좋은 것은 아니다.

    어떤 케이스에서 RDD 재사용을 통해 퍼포먼스를 향상시킬 수 있을까?

     

    1. 반복적인 연산
      : 동일한 데이터셋애 대해 반복적으로 연산을 수행한다면 이 데이터셋을 영속화하는 것이 좋다. 매번 연산할 때마다 데이터셋이 메모리 내에 존재하고 있는 것이 보장되므로 성능 향상을 기대할 수 있다.
    2. 동일 RDD에 대해 여러 번의 액션 호출
    3. 각 파티션의 연산 비용이 너무 큰 경우
      : 데이터셋이 여러번 사용되는 경우가 아니어도 중간 결과를 저장하여 실패 시 비용을 줄일 수 있다. 위에서 언급했던 것처럼 좁은 트랜스포메이션이 클러스터의 executor의 처리량보다 더 큰 GC 오버헤드나 메모리 부담이 발생하면 체크포인팅이나 off_heap 영속화를 사용할 수 있다.

     

    메모리 영속화는 메모리 공간이 필요하며, 직렬화와 역직렬화를 위한 시간도 필요하다.

     

    • 메모리 영속화나 연산은 모두 spark executor JVM 안에서 이루어진다. 따라서 메모리를 많이 차지하는 메모리 영속화는 메모리에 부담을 주어 재연산보다 비용이 더 높은 결과를 가져올 수 있다.
    • 디스크 영속화나 checkpointing은 읽기와 쓰기 비용이 높아지므로 맵리듀스의 단점을 그대로 가진다.
    • 일반적으로 연산의 규모가 작업 규모에 비해 크다면 재연산보다 RDD의 재사용을 하는 것이 더 좋다.
    • 작업이 GC나 메모리 부족 오류로 실패한다면, 또는 클러스터에 다른 잡들이 많다면 checkpointing이나 off_heap 영속화가 도움이 될 수 있다.

     

     

     

    References

    https://spark.apache.org/docs/latest/cluster-overview.html

    러닝 스파크 구버전

    https://medium.com/@think-data/mastering-lazy-evaluation-a-must-know-for-pyspark-pros-ac855202495e

    https://jaemunbro.medium.com/apache-spark-rdd-재사용을-위한-영속화

     

    반응형

    댓글