본문 바로가기

MLOps/Data

꼬리에 꼬리를 무는 Spark와 RDD, DataFrame, Dataset 이야기

 

꼬리에 꼬리를 무는 Spark와 RDD, DataFrame, Dataset 이야기 


서론

실습이 더 중요하지만... 이론적으로 공부했다. 노트북아 힘내렴


Spark에서 사용할 수 있는 데이터 형태에는 뭐가 있나?

  • Apache Spark에서는 RDD(Resilient Distributed Dataset), DataFrame 및 Dataset 3가지 데이터 형태를 사용 가능
  • Spark의 기본 컨셉이 RDD(Resilent Distributed Datasets) 임

3개 중에 뭐 사용?

데이터 특성에 따라 적합한 데이터 형태를 선택해야 함

1. RDD (Resilient Distributed Dataset) 

  • Spark에서 가장 초기에 개발된 데이터 형태
  • 불변성과 탄력성을 가지는 분산된 객체 컬렉션
  • RDD는 직접적으로 자바나 스칼라 같은 프로그래밍 언어로 작성된 코드로 조작 가능

불변성?

  • RDD에 저장된 데이터가 변경되지 않는다는 것을 의미
  • RDD는 수정이 불가능해서 변경하려면 새로운 RDD 생성해야 함
  • 불변이라는 특징 때문에 동시성 문제 피함

탄력성?

  • RDD가 손상되었을 때 자동으로 복구되는 능력
  • RDD 복제본 유지하고 클러스터의 다른 노드에 저장

동시성

  • 동시에 여러 작업을 수행하는 컴퓨팅 환경
  • 분산 시스템에서 여러 노드에서 병렬로 작업이 처리되므로 동시성은 중요함

 

불변 인 거랑 동시성 문제를 피하는 거랑 뭔 상관이니?

예를 들면

가정 :  여러 사용자가 스파크 대규모 데이터를 처리 수행하려고 함

  1. 각 사용자는 Spark에서 RDD 사용해서 데이터 처리 작업 수행
  2. 이때, RDD는 불변이기 때문에 여러 사용자가 동시에 RDD 써도, RDD 변경 불가
  3. 따라서 사용자 1이 RDD 사용해서 데이터 처리 작업 수행하고 있다면 -> 
    사용자 2는 동시에 RDD를 생성하고 사용해서 다른 작업 수행
  4. 아무 문제 발생 X, 이것이 바로 수정이 불가능한 불변 특징에서 오는 것

자동으로 복구된다는 게 뭐냐

  • RDD가 여러 파티션으로 나뉘어 클러스터 여러 노드에 분산 저장되고 각 파티션은 복제본을 가짐
  • 복제본은 다른 노드에 저장

예를 들어 

가정 : 클러스터의 한 노드에서 RDD의 한 파티션이 손상되었다고 가정

  1. Spark는 해당 파티션의 복제본 사용해서 데이터 복원함
  2. 이를 통해 RDD는 손상된 파티션 제외하고 전체적으로 안정적인 상태 유지
  3. 추가로 RDD 가 생성된 시점부터 모든 작업에 대한 정보를 저장하는 로그 파일을 유지함
  4. RDD 손상 -> 이전 작업에 대한 정보를 사용해 파티션 복원

모든 작업에 대한 정보를 저장하는 로그 파일은 어디에 저장함?

  • 로컬 파일 시스템 or HDFS 같은 파일시스템 내에 저장

그렇다면 파티션은 몇 개의 복제본을 가지나?

  • 클러스터 설정에 따라 다르고 아래 설정 값에서 지정 가능
spark.default.parallelism
  • 일반적으로 2개 복제본 생성
  • 만약 클러스터가 10개 노드로 구성되었으면, 20개의 복제본을 가짐

그럼 복제본이 모두 인-메모리에서 사용되는 건가?

  • 모두 메모리에 올라가는 건 아니고 디스크에 저장될 수 있는데, 이것도 물론 사용자 수준에서 설정 가능

파티션?

  • 데이터의 물리적인 블록
  • 스파크는 두 가지 유형의 변환 작업을 수행

1. Narrow transformation

  • 입력 파티션에 대해 하나의 출력 파티션 생성

2. Wide transformation

  • 여러 입력 파티션을 사용하여 출력 파티션 생성

 

하드 디스크 파티션 구조의 그 파티션 인가?

아님. 스파크 파티션은 하드 디스크 파티션 구조와 같은 물리적 구조 X

스파크에서 파티션이란 RDD/DataFrame/Dataset의 논리적인 구성 요소 일 뿐

 


하드 디스크 파티션 구조

1. MBR(Master Boot Record)

  • 하드 디스크 첫 번째 섹터에 위치한 부트 섹터
  • 디스크에 저장된 운영 체제를 로드하는데 필요한 정보와 파티션 테이블 포함
  • 최대 4개 주파티션 또는 3개 주 파티션과 1개의 확장 파티션 지원

2. Active Primary Partition

  • 하드 디스크에는 여러 개 주 파티션(primary)이 있을 수 있고 그중 하나는 활성 파티션으로 표시
  • 활성 파티션은 컴퓨터가 부팅할 때 운영체제를 로드하는 데 사용되는 파티션

3. Extended Partition

  • MBR 방식에서 확장 파티션은 주 파티션과 함께 사용되어 추가적인 파티션 생성하는 공간 제공

2. DataFrame 

  • 스키마를 가지는 분산된 데이터 집합, 효율적인 SQL 쿼리 및 일반적인 데이터 처리 작업 지원
  • RDD에 비해 성능이 더 높으며, 데이터 처리 및 조작에 대한 최적화 기능을 제공.
  • 스파크의 구조적 API를 사용하여 데이터 조작과 처리를 보다 쉽게 할 수 있음

효율적인 SQL 쿼리?

  • DataFrame은 SQL과 유사한 구문 사용가능 해서 SQL 쿼리를 효율적으로 수행하다고 표현함
  • 추가로, Dataframe은 Catalyst Optimizer와 같은 최적화 기술을 사용해 쿼리 실행 계획을 최적화함

Catalyst Optimizer

  • SQL 쿼리를 분석해서 불필요한 연산을 제거하고 최적화된 실행 계획을 생성

예를 들어 아래와 같은 쿼리가 있다고 가정함

SELECT * FROM sales WHERE year(sale_date) = 2023

이 SQL 쿼리는 'sales' 테이블에서 'sale_date' 컬럼이 '2023'인 로우의 개수를 세는 작업을 수행함

Catalyst Optimzer는 이 SQL 쿼리를 분석해서,

'sale_date' 컬럼이 인덱싱 되어 있을 경우 -> 인덱스 사용해서 더 빠르게 데이터 찾음

또한 'year(sale_date)' 연산이 매번 수행되지 않도록,

  1.  'sale_date' 칼럼을 먼저 추출하고 연도 계산
  2. 조건 검사

해서 SQL 쿼리 최적화함


데이터 처리 및 조작에 대한 최적화 기능을 제공한다는 게 뭔 소리여?

Spark 가 사용하는 최적화 기능들

1. Lazy Evaluation

  • 연산을 지연시켜 필요한 시점에서만 실행 -> 불필요한 연산 줄임 -> 메모리 사용량 최적화
  • 예를 들어, 여러 개 데이터 처리 및 조작 작업을 순차적으로 수행할 경우, (1) Spark는 하나로 실행 계획을 묶어 최적화함

2. In-Memory Computing

  • (2) In-memory computing 사용 -> 데이터 처리 및 조작 작업 빠르게 수행
  • 메모리에 데이터 로드 -> 디스크 I/O 줄임 -> 데이터 처리 및 조작 빠르게 수행

3. Partitoning

  • 데이터 분할
  • 분할된 데이터를 병렬로 처리 -> 속도 향상

4. Data Compression

  • (3) 데이터 압축 -> 디스크 I/O 줄임 -> 메모리 사용량 최적화

5. Caching

  • 자주 사용되는 데이터 메모리에 캐싱 -> 디스크 I/O 줄임

6. Code Generation

  • 데이터 처리 및 조작 작업 자동 생성

(1) Spark는 하나로 실행 계획을 묶어 최적화?

아래 코드가 4가지 단계를 수행했다고 하자

  1. 'data/sales.parquet 파일에서 데이터 읽고,
  2. 'product' 칼럼이 'TV'이고, 'sales_date' 컬럼이 2023인 로우 선택하고
  3. 'region' 컬럼 그룹화 하고 총액 계산 하고,
  4. 총 매출액 높은 순으로 정렬 후 출력
val df = spark.read.parquet("data/sales.parquet")
val filteredDf = df.filter($"product" === "TV" && year($"sale_date") === 2023)
val aggDf = filteredDf.groupBy("region").agg(sum("amount").alias("total_amount"))
val resultDf = aggDf.orderBy($"total_amount".desc)
resultDf.show()

 

스파크는 이 작업을 하나의 실행으로 묶어서 최적화함

예를 들어

  1. 'sales.parquet' 읽는 작업, 'product' 칼럼이 'TV' 이고, 'sales'_date' 컬럼이 2023년인 로우를 선택하는 작업이 함께 수행
  2. 또한, 'region' 칼럼을 그룹화, 총매출 계산 작업과 총 정렬하는 작업은 동일한 데이터 사용하므로 함께 수행

이러한 최적화를 통해 속도 향상


(2) In-memory computing 사용하면 메모리 많이 필요하겠네?

당연. 하지만 메모리 사용량을 최적화하는 다양한 방법이 있음

  1. Partioning을 사용해 데이터 분할 -> 분할된 데이터 병렬 처리해서 메모리 사용량 최소화
  2. 자주 사용되는 데이터 캐싱
  3. 데이터 압축해서 메모리 사용량 줄이기

(2-1) 분할된 데이터를 병렬로 처리하는 것과 분할되지 않은 데이터를 처리하는 것에서 메모리 사용량 차이가 왜 발생?

  • 파티션마다 필요한 메모리 공간이 작아지므로 전체적으로 메모리 사용량 줄어들 수 있음
  • 반면에 분할되지 않은 데이터 처리할 때는, 모든 데이터를 한 번에 로드하여 처리해야 하므로 메모리 사용량 크게 증가할 수 있음

(2-2) 분할된 파티션을 병렬로 처리를 한다고 가정해도 분할된 개수는 어차피 모든 데이터 크기와 같은 것 아님?

  • 맞음. 하지만 파티션 분할 하면, 각 파티션을 병렬로 처리할 수 있음
  • 예를 들어, 10GB 데이터 처리하는데, 10개 파티션으로 분할한다고 가정해 봄
  1. 각 파티션은 1GB 데이터 -> 병렬 처리가 가능
  2. 전체 적인 처리 속도 향상 -> 파티션마다 필요한 메모리 공간이 작아진다
  3. 전체적인 메모리 사용량 최적화 가능해진다
  4. 반면에, 분할 안 하고 전체 데이터 메모리에 로드하면 OOM 날 수도 ㅋㅋ

(3) 데이터 압축? 내가 아는 그 압축?

  • 맞다. 우리가 아는 그 압축
  • 일반적인 데이터 압축 방법 Gzip, Bzip2, Snappy 등등

3. Dataset

  • Dataset은 스파크 1.6 이후에 추가된 개념, DataFrame과 RDD 장점을 결합한 것
  • 구조적 API 사용하여 데이터 처리 및 조작을 보다 쉽게 할 수 있음

DataFrame과 RDD의 무슨 장점을 결합했나?

DataFrame의 구조적인 데이터 처리와 RDD의 탄력성과 불변성을 분산 데이터 처리의 장점

아래 코드를 보면

  • Dataset은 타입 체크를 수행해서 사전에 타입 에러 방지 가능함
  • 또한, filter() 메서드 사용해서 필터링하는 경우, DataFrame은 Catalyst Optimizer를 사용해 실행 계획 최적화
from pyspark.sql import SparkSession
from pyspark.sql.functions import year

spark = SparkSession.builder \
    .appName("DatasetExample") \
    .getOrCreate()

# 스키마 정의
from pyspark.sql.types import StructType, StructField, IntegerType, StringType
schema = StructType([
    StructField("name", StringType(), True),
    StructField("age", IntegerType(), True),
    StructField("gender", StringType(), True)
])

# csv 로드하고 데이터 변환
ds = spark.read \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .schema(schema) \
    .csv("path/to/data.csv") \
    .as("person")

# 컬럼 선택하고 필터링
result = ds.select("name", "age", "gender").filter(year("sale_date") == 2023)
result.show()

4. 3개 중에 뭐 사용함?

1. RDD

기본적으로 분산된 데이터 처리를 위해 사용

1. 분산 데이터 처리를 수행해야 하는 경우

  • 병렬화된 웹 크롤링

2. 데이터 타입이 일치하지 않는 경우

  • 다양한 종류 로그 파일

3. 다양한 데이터 소스 처리 하는 경우

  • NoSQL, 실시간 스트리밍

4. 데이터 처리 복잡한 경우

  • 머신러닝 알고리즘

2. DataFrame

1. SQL 쿼리 기반 데이터 처리 수행 하는 경우

  • 테이블 형태 DB 처리

2. 칼럼 기반 데이터 처리 하는 경우

3. 데이터 타입 일치해야 하는 경우

4. 데이터 처리 복잡하지 않은 경우


3. Dataset

1. 타입 안정성을 보장해야 하는 경우

2. 데이터 타입 일치하지 않는 경우

3. 데이터 처리 복잡한 경우

4. SQL 쿼리 기반으로 데이터 처리하는 경우