반응형
스파크 : RDD
RDD는 스파크에서 가장 기본적인 추상적 부분입니다. RDD에는 세 가지의 핵심 특성이 있습니다.
- 의존성 dependency
- 결과를 새로 만들어야 하는 경우에 스파크는 이 의존성 정보를 참고하고 연산을 다시 반복하면서 RDD를 다시 만들 수 있습니다.
- 파티션(지역성 정보 포함)
- 스파크에게 작업을 나눠서 executor들에 분산하여 파티션별로 병렬 연산을 할 수 있는 능력을 부여합니다.
- 연산 함수: Partition => Iteratior[T]
- RDd에 저장되는 데이터를 Iterator[T] 형태로 만들어 주는 연산함수를 갖고 있습니다.
이러한 모델은 연산식 자체가 스파크에 투명하지 않았습니다. 예를들어 사용자가 연산 함수 안에서 조인, 필터링, 선택, 집계 등에서 스파크에서는 람다 표현식으로 보였습니다. 이에따라 연산이나 표현식을 검사하지 못하다 보니 최적화 할 방법이 없었습니다.
이러한 불투명함은 스파크 구조확립을 바꿨습니다.
스파크의 구조 확립
스파크 2.x는 구조확립을 위한 핵심 개념들을 도입했습니다. 하나는 데이터 분석을 통해 찾는 일상적이 패턴들을 사용하여 연산을 표현했습니다. 이런 구분은 DSL에서 일반적인 연산 집합을 사용함으로써 더 좁혀졌습니다.
DSL에서 이런 연산들을 사용하면서 지원 언어인 Java, Python, scala, SQL에서의 API사용이 가능해지고 이런 작업들은 시각적으로 무엇을 원하는지 알수 있게 되어 효율적인 플랜 작성이 가능해졌습니다.
예시1-RDD API 패턴
# (name,age) 형태의 튜플로 된 RDD를 생성한다.
dataRDd = sc.parallelize([("Brroke",20),("Denny",31),("Jules",30)])
# 집계와 평균을 위한 람다 표현식과 함께 map과 redcueByKey 트랜스포메이션을 사용한다.
agesRDD = (dataRDD
.map(lambda x: (x[0],(x[1],1))
.reduceByKey(lambda x, y: (x[0]+y[0],x[1]+y[1]))
.map(lambda x: x[0], x[1][0]/x[1][1])))
예시2-DSL 연산들과 데이터프레임 API 패턴
from pyspark.sql import SparkSession
from pyspark.sql.functions import avg
# SparkSession으로부터 데이터 프레임을 만듬
spark = (SparkSession
.builder
.appName("AuthorsAges")
.getOrCreate())
# 데이터프레임 생성
data_df = spark.createDataFrame([("Brooke",20),("Denny",31),("Jules",30)],["name","age"])
# 동일한 이름으로 그룹화 하여 나이별로 계산해 평균을 구한다.
avg_df = data_df.groupBy("name").agg(avg("age"))
# 최종결과 보여줌
avg_df.show()
결과 - 예시
name | avg(age) |
---|---|
Brooke | 22.5 |
Jules | 30.0 |
TD | 35.0 |
Denny | 31.0 |
'Apache > Apache Spark' 카테고리의 다른 글
SQL 테이블과 뷰 (0) | 2023.08.20 |
---|---|
Spark SQL과 데이터 프레임 (0) | 2023.08.20 |
Spark DDL을 사용하여 dataframe 생성하기 (0) | 2023.08.04 |
스파크 애플리케이션 개념의 이해 (0) | 2023.07.31 |
Apache Spark란? (0) | 2022.07.20 |