티스토리 뷰

Spark

RDD Programing

문타리 2019. 5. 14.

RDD (Resilient Distributed Dataset)

  • Dataset 메모리나 디스크에 분산 저장된 변경 불가능한 데이터 객체들의 모음
  • Distributed RDD에 있는 데이터는 클러스터에 자동 분배 및 병렬 연산 수행
  • Resilient 클러스터의 한 노드가 실패하더라도 다른 노드가 작업처리(fail over)
  • Immutable 수정 X, 수정하려면 새로운 RDD 생성해야 함

RDD Operation APIs

  • Transformations(데이터 변형) map,filter, groupBy, join
  • Actions(연산결과 리턴 or 저장) count, collect, save

Lazy Execution

실제 Action이 Call될 때 작업이 수행됨

Caching 및 Persistence

RAM/Disk에 캐시 가능, 중간 단계 결과를 저장해서 후속 연산에 사용가능하기 때문에 반복 연산에 유리

Spark 언어 지원

  • Standalone Programs : Scala, Java, Python, R
  • Interactive Shells : Scala, Python, R
  • Performance : Java > Scala > Python
//Scala
val lines = sc.textFile("log.txt")
lines.filter(x => x.contains("ERROR")).count()
//Java
JavaRDD<String> lines = sc.textFile("log.txt");
lines.filter(new Function<String, Boolean>(){
    Boolean call(String x){
      return x.contains("ERROR");
  }
}).count();
#Python
lines = sc.textFile("log.txt")
lines.filter(lambda x: "ERROR" in x).count()

Spark Program Architecture (로그 분석 프로그램)

Spark-Shell 사용하기

Scala용 Spark-Shell 실행

$> cd /tcl/spark-2.4.0-bin-hadoop2.7
$> bin/spark-shell

scala> sc
scala> spark
scala> sc.master

SparkSubmit 프로세스 확인

  • jar로 패키징 한 파일 실행
$> jps

드라이버 프로그램용 웹 UI 확인

http://192.168.56.XXX:4040

RDD 생성

scala> val data = 1 to 10000
scala> val distData = sc.parallelize(data)
scala> distData.filter(_ < 10).collect()

(클러스터의 코어 수에 따라) 정해진 파티션 수만큼 새로운 RDD가 작성되고, 콜렉션의 요소가 RDD로 복사됨

Lazy Execution

scala> val data = sc.textFile("README.txt") # 잘못된 파일명
scala> val distData = data.map(r => r + "_map")
scala> distData.count # InvalidInputException 발생

scala> val data = sc.textFile("README.md") # 파일명 수정
scala> val distData = data.map(r => r + "_map")
scala> distData.count

Lineage 확인, Partition 개수 확인

  • Lineage : RDD로 만들어지는 과정이 기록된 계보
  • Partition : a logical chunk of a large data set
scala> distData.toDebugString
scala> distData.getNumPartitions

Shuffle by Repartition

scala> val data = sc.textFile("README.md") 
scala> data.getNumPartitions
scala> val distData = data.map(r => "line_start : " + r)
scala> distData.getNumPartitions # 파티션 개수 변화 없음
scala> val newData = distData.repartition(10)
scala> newData.getNumPartitions # 파티션 개수 변경됨
scala> newData.toDebugString # Lineage 확인
scala> newData.count
scala> newData.collect.foreach(println)

# Partition 별로 원본 파일 내용 유지되어 있음
scala> data.saveAsTextFile("dataREADME.md")
# Partition 별로 원본 파일 내용 섞여 있음
scala> newData.saveAsTextFile("newData")

Word Count

scala> val f = sc.textFile("README.md")
scala> val wc = f.flatMap(l => l.split(" ")).map(word => (word, 1)).reduceByKey(_ + _)
scala> wc.saveAsTextFile("wc_out.txt")
scala> wc.collect.foreach(println) # Stage Skip 확인

Shuffle

데이터를 집계, 조인 또는 다른 방식으로 분산시키기 위해 파티션 간에 데이터를 재분배하는 프로세스

  • partition 수 변경, reduceByKey 등을 사용할 경우 Shuffle이 일어남
  • local file-system에 중간 결과물 저장
  • 중간 결과물 파티셔닝 (repartitioning)
  • Serialization/Deserialization, Network I/O

참고 사이트

Overview - Spark 2.4.3 Documentation
Spark Tutorial – Learn Spark Programming

'Spark' 카테고리의 다른 글

Spark 소개 및 Cluster Mode 이해하기  (2) 2019.03.25
Virtual Box를 활용한 Spark 실습 환경 구축  (0) 2019.01.22
댓글