Python/Spark

Spark 독학하기 3일차 – 분산 처리가 뭐야..

오늘은강박사갈거야~~ 2025. 5. 11. 00:47
반응형

🎯 목표

  • Spark의 분산/병렬 처리 개념을 실제 코드 흐름 속에서 이해해 보기
  • Job → Stage → Task → Partition 구조를 익히는 게 목표
  • 실무에서 Spark를 언제, 왜, 어떻게 써야 하는지도 감 잡는 것

전체 요약

Spark는 데이터를 잘게 나눠서(Task), 여러 작업자(Executor)가 병렬로 처리하게 만든 분산 처리 프레임워크다.
우리가 코드를 작성하면, 내부적으로 실행 계획(DAG)이 쌓이고, .show() 같은 Action이 호출되는 순간 Job이 실행된다.
Job 안에서 중간에 Shuffle이 발생하면 Stage가 나뉘고, 각 Stage는 Partition 개수만큼 Task가 만들어진다.
이 구조 덕분에 대용량 데이터를 효율적으로 처리할 수 있지만, 중간 Shuffle 비용은 항상 조심해야 한다.


본문

1. Lazy Execution: Spark는 언제 실행되는가?

Spark는 코드 한 줄 한 줄마다 바로 실행되지 않는다.
filter, groupBy, agg 같은 연산은 실행 계획(DAG)에만 쌓여 있고,
.show(), .collect() 같은 Action 함수가 호출되는 시점에 비로소 실행된다.

 
df = spark.read.csv("data.csv", header=True)
df_filtered = df.filter("age > 30").groupBy("gender").agg({"salary": "avg"})
df_filtered.show()
 

이 코드에서 .show()가 실행되기 전까지는 실제 연산은 하나도 발생하지 않는다. 이런 방식을 Lazy Execution이라고 부른다.

코드 여러 줄 썼는데, Spark는 언제 진짜로 실행되는 건가..?

Transformation은 실행 안 되고 DAG에만 쌓이고, Action이 호출되는 시점에 실행된다고 한다. 그게 Lazy Execution이다.

 


2. Transformation vs Action

Spark에서 연산은 두 가지로 나뉜다: TransformationAction

  • Transformation은 .filter(), .map()처럼 새로운 DataFrame을 정의하는 작업이다.
    실제로 연산이 즉시 실행되지는 않고, 실행 계획(DAG)에 기록만 된다.
  • Action은 .show(), .collect(), .count(), .write()처럼 실제 계산을 트리거하는 작업이다.
    이 시점에서 비로소 Spark가 DAG를 실행해서 결과를 만든다.
Transformation과 Action을 왜 나누는 거지..

 찾아보니, Transformation을 미리 실행하지 않고 쌓아두는 구조(Lazy Execution)를 통해 Spark는 전체 연산 흐름을 최적화해서 한 번에 실행할 수 있게 만든다고 한다. 중간마다 실행되면 불필요한 작업이 반복될 수도 있는데, 이걸 방지하고 최적화하려고 구분돼 있다고 한다.

 


Transformation과 Action 비교

구분 Transformation Action
역할 연산 정의 (계획만 세움) 연산 실행 (계획을 실제 실행함)
예시 .select(), .filter(), .map() .show(), .collect(), .count()
실행 시점 호출해도 즉시 실행 안 됨 호출 시 바로 실행됨
목적 DAG에 쌓기 위한 처리 DAG 실행을 트리거함
성능 최적화 관점 여러 Transformation을 묶어 한 번에 실행함 실행 트리거 포인트, 너무 많으면 느려짐

 

3. DAG: 실행 흐름을 계획으로 만든다

Spark는 .show(), .collect() 같은 Action이 호출되기 전까지는 실제로 아무 연산도 하지 않는다.

대신 Transformation들을 하나의 실행 계획으로 쌓아둔다. 이 실행 계획이 바로 DAG (Directed Acyclic Graph, 방향성 비순환 그래프) 구조로, DAG는 말 그대로 “방향성 있고, 순환하지 않는 그래프”라고 한다.

 

Spark 입장에서는 “어떤 연산을 어떤 순서로, 어떤 데이터 단위로 수행할 것인지”를 표현하는 흐름도라고 보면 된다.

이 구조 덕분에 Spark는 쓸데없는 중간 연산을 피하고, 전체 작업을 묶어서 한 번에 최적화 실행할 수 있다.

DAG 구성 예시

df = spark.read.csv("data.csv", header=True)
df_filtered = df.filter("age > 30")
df_grouped = df_filtered.groupBy("gender").agg({"salary": "avg"})
df_grouped.show()
 

이 코드를 Spark는 이렇게 DAG로 해석한다:

  1. read.csv() → 입력
  2. filter() → 첫 Transformation
  3. groupBy() + agg() → Shuffle 발생 지점
  4. show() → Action (실행 트리거)

→ 이 DAG를 기반으로 Job이 생성되고,
→ 그 안에서 Stage와 Task가 쪼개진다 (이건 다음 단계에서 설명)

 

 

4. Partition: 데이터 처리의 최소 단위

Spark에서 병렬 처리를 하려면, 전체 데이터를 조각조각 나눠서 각각 따로 처리해야 한다.
이때 사용되는 단위가 바로 Partition이다.

  • Partition은 하나의 Task가 처리할 데이터 범위다.
  • Spark는 기본적으로 파일을 읽는 시점에 데이터를 자동으로 Partition으로 나눈다.
  • 예를 들어 1GB짜리 CSV 파일이라면, 보통 128MB 단위로 나뉘고, Partition은 약 8개가 된다.
  • Task 수는 Partition 수와 1:1이다. Partition이 8개면 Task도 8개.
 
근데 Partition은 코드 어디서 생기는 거지..


Spark는 .read() 같은 입력 연산 시점에 자동으로 Partition을 만든다고 한다. 예를 들어 큰 CSV 파일을 읽으면 내부적으로 잘게 나누는데, 이 단위가 Partition이고, Task는 이걸 처리하는 단위라고 한다. 참고로 .repartition(n)이나 .coalesce(n)으로 Partition 수를 수동으로 조절할 수도 있다고 한다.

df = spark.read.csv("data.csv", header=True) # 이 시점에 Partition 자동 생성

5. Executor: 실제로 Task를 처리하는 주체

Executor는 Partition(Task)을 받아서 실제 연산을 처리하는 Spark의 작업자다.

  • Spark는 애플리케이션을 실행할 때 여러 개의 Executor를 띄운다.
  • 각 Executor는 JVM 프로세스 하나이며, 하나 이상의 Task를 실행한다.
  • Executor는 자체 메모리를 가지고 있고, 처리 중인 데이터를 캐시하거나 중간 결과를 저장할 수도 있다.

로컬에서 실행하면 Executor는 하나지만, 클러스터 환경에서는 여러 노드에 분산되어 동작한다.

 

근데 Executor 수는 코드에서 어디서 정하는거지..-_-


SparkSession 안에서는 Executor 수를 정하지 않고, 외부 실행 명령어 (spark-submit)에서 설정한다고 한다.
로컬 개발 시엔 .master("local[4]")처럼 코어 수를 흉내 내고,
실제 클러스터에서는 --num-executors, --executor-cores 같은 옵션으로 제어한다고 한다.
# 로컬 개발 예시
spark = SparkSession.builder.master("local[4]").getOrCreate()

 

# 클러스터 실행 예시
spark-submit \ --num-executors 4 \ --executor-cores 2 \ --executor-memory 4g \ your_spark_app.py

6. Shuffle: Stage를 나누고 성능을 잡아먹는 주범

filter, select 같은 연산은 Partition 내부에서만 작업하면 되기 때문에 병렬 처리에 적합하고, 성능도 잘 나온다.
하지만 아래 연산들은 문제가 생긴다:

  • .groupBy()
  • .join()
  • .distinct()
  • .reduceByKey() 등

이런 연산들은 같은 키를 가진 데이터들을 한 곳에 모아야 하므로, Partition 간에 데이터를 재분배해야 한다. 이 과정을 Shuffle이라고 부른다.

Shuffle이 발생하면:

  • 기존 데이터를 디스크에 임시 저장했다가
  • 네트워크로 분산시켜 다른 Executor에 보내고
  • 정렬하거나 그룹핑해서 새 Partition을 만든다

이 과정은 디스크 I/O + 네트워크 전송 + 정렬 연산까지 포함되기 때문에 가장 비싼 작업이라네. 
결국 이 작업을 최소화 하는게 좋겠네. 이게 뭔가 키포인트 같은데

 


7. 최적화 전략: 필터 먼저, 그룹바이 나중에

Shuffle을 줄이는 가장 대표적인 전략 중 하나는 불필요한 데이터를 일찍 제거하는 것이다.
예를 들어 아래 두 연산 순서를 비교해 보자:

# 비효율적인 순서
df.groupBy("user_id").agg(...).filter("age > 30")

# 더 나은 순서
df.filter("age > 30").groupBy("user_id").agg(...)
 

앞의 방식은 전체 데이터를 먼저 그룹핑하고 그다음에 필터링하므로, Shuffle 전에 데이터가 너무 많아질 수 있다.

뒤의 방식은 먼저 데이터를 줄여놓고 그걸 그룹핑하므로 Shuffle 비용을 줄일 수 있다.


 

다음 계획

  • DAG 구조를 실제로 시각화해 보기
  • Shuffle이 발생하는 예제 분석
  • Spark UI를 통한 병목 탐색 연습
반응형