상단

인-메모리 기반의 클러스터 컴퓨팅 프레임워크인 Spark를 정리 합니다.

 
 
 

Spark 개요


 

Apach Spark UC 버클리 대학의 AMPLab에서 내놓은 대용량 분산 처리 및 분석용 오픈소스이다. 2014년 2월부터 아파치 재단의 톱 프로젝트가 되었다.

 
  • 대화형 질의 분석기([[Shark|Shark]]), 대용량 그래프 처리 및 분석기([[Bagel|Bagel]]), 실시간 분석기([Spark Streaming](Spark Streaming.md)) 등을 함께 제공

 
 

Spark 구성


Scala 설치

 cd  /install
 wget  https://downloads.lightbend.com/scala/2.12.3/scala-2.12.3.tgz
 
 cd  /appl
 tar  -xvzf  /install/scala-2.12.3.tgz
 mv  scala-2.12.3.tgz  scala
 # export PATH=${PATH}:/appl/scala/bin
 

Spark 설치

 

Spark 설치

 cd  /install
 wget  http://apache.mirror.cdnetworks.com/spark/spark-2.3.0/spark-2.3.0-bin-hadoop2.7.tgz
 
 cd  /appl
 tar  -xvzf  /install/spark-2.3.0-bin-hadoop2.7.tgz
 mv  spark-2.3.0-bin-hadoop2.7  spark
 
 cd  /appl/spark
 cd  conf
 cp  spark-env.sh.template      spark-env.sh
 cp  log4j.properties.template  log4j.properties
 
 vi  spark-env.sh
 
 vi  log4j.properties
     log4j.rootCategory=WARN, console
 
 # cd /appl/spark
 # sbin/start-master.sh
 # sbin/start-slave.sh  spark://localhost:7077
 # bin/pyspark  -master  spark://localhost:7077
 
 cd  /appl/spark
 sbin/start-all.sh
 bin/pyspark
 # bin/spark-shell
 
 
 

폴더 구성

 
  • R/

  • bin/

  • conf/

  • data/

  • examples/

  • jars/

  • kubernetes/

  • licenses/

  • python/

  • sbin/

  • yarn/

 
 

K-ICT 교육


 

Spark 설치

 cd  ~
 mkdir  install
 
 cd  ~/install
 wget  http://apache.mirror.cdnetworks.com/spark/spark-2.3.0/spark-2.3.0-bin-hadoop2.7.tgz
 
 cd  ~  
 tar  -xvzf  /install/spark-2.3.0-bin-hadoop2.7.tgz
 mv  spark-2.3.0-bin-hadoop2.7  spark
 
 cd  ~/spark
 cd  conf
 cp  spark-env.sh.template      spark-env.sh
 cp  log4j.properties.template  log4j.properties
 vi  spark-env.sh
     export LANG=ko_KR.UTF-8
 
     export JAVA_HOME=/usr/lib/jvm/jre-1.7.0-openjdk.x86_64
     export PATH=$PATH:$JAVA_HOME
 
     export HADOOP_INSTALL=/usr/local/hadoop
     export HADOOP_MAPRED_HOME=$HADOOP_INSTALL
     export HADOOP_COMMON_HOME=$HADOOP_INSTALL
     export HADOOP_HDFS_HOME=$HADOOP_INSTALL
     export YARN_HOME=$HADOOP_INSTALL
     export HADOOP_COMMON_LIB_NATIVE_DIR=$HADOOP_INSTALL/lib/native
     export PATH=$PATH:$HADOOP_INSTALL/sbin
     export PATH=$PATH:$HADOOP_INSTALL/bin
 
     export SPARK_DIST_CLASSPATH=$(hadoop classpath)
 
 vi  log4j.properties
     log4j.rootCategory=WARN, console
 	
 cd ~/spark
 sbin/start-all.sh
 bin/pyspark
 
 
 

DataFrame으로 로드

 

RDD (Resilient Distributed Dataset)

 

JSON 파일 로드

 df = sqlContext.read.json("file:///home/eduuser/spark/examples/src/main/resources/people.json")
 df = sqlContext.read.load("file:///home/eduuser/spark/examples/src/main/resources/people.json", format="json")
 

Text 파일 로드

 from pyspark.sql import Row
 
 lines = sc.textFile("file:///home/eduuser/spark/examples/src/main/resources/people.txt") 
 parts = lines.map(lambda l: l.split(","))
 people = parts.map(lambda p: Row(name=p[0], age=int(p[1](1.md))))
 df = sqlContext.createDataFrame(people)
 

Text 파일 로드 with Schema 지정

 from pyspark.sql.types import *
 lines = sc.textFile("file:///home/eduuser/spark/examples/src/main/resources/people.txt")
 
 parts = lines.map(lambda l: l.split(","))
 people = parts.map(lambda p: (p[0], p[1](1.md).strip()))
 
 schemaString = "name age"
 fields = [StringType(), True) for field_name in schemaString.split()](StructField(field_name,)
 schema = StructType(fields)
 
 schemaPeople = sqlContext.createDataFrame(people, schema)
 

Parquet 데이터 로드

 df = sqlContext.read.load("file:///home/eduuser/spark/examples/src/main/resources/users.parquet")
 

DataFrame을 저장

 df.select("name", "favorite_color").write.save("file:///home/eduuser/namesAndFavColors.parquet")
 # df = sqlContext.read.load("file:///home/eduuser/namesAndFavColors.parquet")
 
 df.select("name", "age").write.save("file:///home/eduuser/namesAndAges.parquet", format="parquet")
 

DataFrame을 TempTable로 지정

 df.registerTempTable("people")
 teenagers = sqlContext.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19")
 teenagers.show()
 

pyspark Sample 1

 
 help(sqlContext)
     q                              
 
 df = sqlContext.read.json("file:///home/eduuser/spark/examples/src/main/resources/people.json")
 df.show()
 df.printSchema()
 df.select("name").show()
 df.select(df['name'], df['age']('age'.md) + 1).show()
 df.filter(df['age']('age'.md) > 21).show()
 df.groupBy("age").count().show()
 quit()
 

pyspark Sample 2

 
 from pyspark.sql import Row
 
 lines = sc.textFile("file:///home/eduuser/spark/examples/src/main/resources/people.txt") 
 parts = lines.map(lambda l: l.split(","))
 people = parts.map(lambda p: Row(name=p[0], age=int(p[1](1.md))))
 schemaPeople = sqlContext.createDataFrame(people)
 
 schemaPeople.registerTempTable("people")
 teenagers = sqlContext.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19")
 teenagers.show()
 
 # teenNames = teenagers.map(lambda p: "Name: " + p.name)
 # for teenName in teenNames.collect():
 #     print(teenName)
 quit()
 

pyspark Sample 3

 
 from pyspark.sql.types import *
 
 lines = sc.textFile("file:///home/eduuser/spark/examples/src/main/resources/people.txt")
 parts = lines.map(lambda l: l.split(","))
 people = parts.map(lambda p: (p[0], p[1](1.md).strip()))
 
 schemaString = "name age"
 fields = [StringType(), True) for field_name in schemaString.split()](StructField(field_name,)
 schema = StructType(fields)
 schemaPeople = sqlContext.createDataFrame(people, schema)
 
 schemaPeople.registerTempTable("people")
 teenagers = sqlContext.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19")
 teenagers.show()
 
 # teenNames = teenagers.map(lambda p: "Name: " + p.name)
 # for teenName in teenNames.collect():
 #    print(teenName)
 quit()
 

pyspark Sample 4

 
 #--- 데이터 로드/저장
 df = sqlContext.read.load("file:///home/eduuser/spark/examples/src/main/resources/users.parquet")
 #--- 저장시 /home/eduuser/namesAndFavColors.parquet/ 폴더가 생성 됩니다.
 df.select("name", "favorite_color").write.save("file:///home/eduuser/namesAndFavColors.parquet")
 df = sqlContext.read.load("file:///home/eduuser/namesAndFavColors.parquet")
 
 df = sqlContext.read.load("file:///home/eduuser/spark/examples/src/main/resources/people.json", format="json")
 df.select("name", "age").write.save("file:///home/eduuser/namesAndAges.parquet", format="parquet")
 

DataSet 다운로드

 
 cd  ~
 cd  nia_kbig
 [nia_kbig](eduuser@localhost)$ ./datasetDownload.sh
 다운로드받을 데이터셋 코드를 입력하세요.
     8h7k4
     ka988
     z24nt
 
 cd  view/basic
 ls  -alF
     -rwxr-xr-x. 1 eduuser eduuser  216 2014-12-14 08:46 01.move_data_file.sh*
     -rwxr-xr-x. 1 eduuser eduuser  283 2014-12-04 22:22 01.move_data_file.sh~*
     -rwxr-xr-x. 1 eduuser eduuser  270 2014-12-12 16:20 jeju_2010.csv*
     -rwxr-xr-x. 1 eduuser eduuser  273 2014-12-12 16:16 jeju_2011.csv*
     -rwxr-xr-x. 1 eduuser eduuser  278 2014-12-12 16:16 jeju_2012.csv*
     -rwxr-xr-x. 1 eduuser eduuser 1476 2015-01-11 22:59 view_basic_analysis.r*
 
 # hdfs dfs -mkdir /user
 # hdfs dfs -mkdir /user/eduuser/
 # hdfs dfs -put jeju* /user/eduuser/
 

pyspark Sample 5

 from pyspark.sql import Row
 
 lines = sc.textFile('file:///home/eduuser/nia_kbig/view/basic/jeju_2010.csv')
 parts = lines.map(lambda l: l.split(','))
 jeju2010 = parts.map(lambda p: Row(IN=p[0], OUT=p[1], INCOME=p[2](2.md)))
 schema2010 = sqlContext.createDataFrame(jeju2010)
  
 schema2010.registerTempTable("jeju2010")
 schema2010.show()
 
 sqlContext.sql("select * from jeju2010 where INCOME != 'INCOME'").show()
 jeju2010 = sqlContext.sql("select * from jeju2010 where INCOME != 'INCOME'")
 jenu2010.show()
 

pyspark Sample 6

 
 from pyspark.sql.types import *
 
 lines = sc.textFile('file:///home/eduuser/nia_kbig/view/basic/jeju_2011.csv')
 parts = lines.map(lambda l: l.split(','))
 jeju2011 = parts.map(lambda p: (p[0].strip(), p[1].strip(), p[2](2.md).strip()))
 schemaString = "IN OUT INCOME"
 fields = [StringType(), True) for field_name in schemaString.split()](StructField(field_name,)
 schema = StructType(fields)
 schema2011 = sqlContext.createDataFrame(jeju2011, schema)
 
 schema2011.registerTempTable('jeju2011')
 schema2011.show()
 

pyspark Sample 7

 
 from pyspark.sql.types import *
 
 #--- 데이터 로드
 lines = sc.textFile('file:///home/eduuser/nia_kbig/view/basic/jeju_2010.csv')
 parts = lines.map(lambda l: l.split(','))
 jeju2010 = parts.map(lambda p: Row(IN=p[0].strip(), OUT=p[1].strip(), INCOME=p[2](2.md).strip()))
 schema2010 = sqlContext.createDataFrame(jeju2010)
 
 schema2010.registerTempTable('jeju2010')
 schema2010.show()
 
 lines = sc.textFile('file:///home/eduuser/nia_kbig/view/basic/jeju_2011.csv')
 parts = lines.map(lambda l: l.split(','))
 jeju2011 = parts.map(lambda p: Row(IN=p[0].strip(), OUT=p[1].strip(), INCOME=p[2](2.md).strip()))
 schema2011 = sqlContext.createDataFrame(jeju2011)
 
 schema2011.registerTempTable('jeju2011')
 schema2011.show()
 
 lines = sc.textFile('file:///home/eduuser/nia_kbig/view/basic/jeju_2012.csv')
 parts = lines.map(lambda l: l.split(','))
 jeju2012 = parts.map(lambda p: Row(IN=p[0].strip(), OUT=p[1].strip(), INCOME=p[2](2.md).strip()))
 schema2012 = sqlContext.createDataFrame(jeju2012)
 
 schema2012.registerTempTable('jeju2012')
 schema2012.show()
  
 #--- 데이터 수정
 jeju2010 = sqlContext.sql("select * from jeju2010 where INCOME != 'INCOME'")
 jeju2011 = sqlContext.sql("select * from jeju2011 where INCOME != 'INCOME'")
 jeju2012 = sqlContext.sql("select * from jeju2012 where INCOME != 'INCOME'")
 
 tmp = jeju2010.map(lambda p: Row(YEAR=2010, IN=p[0], OUT=p[2], INCOME=p[1](1.md)))
 jeju2010 = sqlContext.createDataFrame(tmp)
 tmp = jeju2011.map(lambda p: Row(YEAR=2010, IN=p[0], OUT=p[2], INCOME=p[1](1.md)))
 jeju2011 = sqlContext.createDataFrame(tmp)
 tmp = jeju2012.map(lambda p: Row(YEAR=2010, IN=p[0], OUT=p[2], INCOME=p[1](1.md)))
 jeju2012 = sqlContext.createDataFrame(tmp)
 
 jeju = jeju2010.unionAll(jeju2011).unionAll(jeju2012)
 tmp = jeju.map(lambda p: Row(YEAR=int(p[3]), IN=p[0], OUT=p[1], INCOME=p[2](2.md)))
 jeju2= sqlContext.createDataFrame(tmp)
 
 #--- 기술통계
 jeju2.describe().show()
 jeju2.groupBy('YEAR').avg().show()
 jeju2.select('IN', 'INCOME', 'YEAR').groupBy('YEAR').mean().show()
 jeju2.select('IN', 'OUT', 'YEAR').groupBy('YEAR').mean().show()
 
 #--- 상관계수
 jeju2.corr('IN', 'OUT')
 
 #--- 회귀분석
 import numpy as np
 import numpy.linalg as lin
 
 X = np.array(jeju2.select('IN', 'OUT').collect()) 
 Y = np.array(jeju2.select('INCOME').collect())
 
 Beta0 = np.dot(lin.inv(np.dot(X.T, X), np.dot(X.T, Y))
 
 X1 = np.hstack([np.array([X](np.ones(36)]).T,))
 Beta1 = np.dot(lin.inv(np.dot(X1.T, X1)), np.dot(X1.T, Y))
 
 #--- 결정계수
 R0 = np.sum((np.dot(X, Beta0) - np.mean(Y))**2) / np.sum((Y - np.mean(Y)))**2)
 R1 = np.sum((np.dot(X1, Beta1) - np.mean(Y))**2 / npsum((Y - np.mean(Y)))**2)
 
 adR0 = 1 - (1 - R0) * (36 - 1) / (36 - 2 - 1)
 adR1 = 1 - (1 - R1) * (36 - 1) / (36 - 3 - 1.)
 
 #--- 시각화
 import matplotlib.pylab as plt
 plt.scatter(X[0](:,), Y)
 plt.show()
 plt.scatter(X[1](:,), Y)
 plt.show()
 
 Beta_IN = np.dot(Lininv(np.dot(X1[:, [0, 1]].T, X1[:, [0, 1]])), np.dot(X1[:, [1]](0,).T, Y))
 y = Beta_IN[0] + Beta_IN[1] * X1[1](:,)
 
 plt.scatter(X1[1](:,), Y)
 plt.plot(X1[1](:,), Y) 
 plt.show()
 
 plt.scatter(X1[1](:,), Y, label = 'Row Data')
 plt.plot(X1[1](:,), y, label = 'Fitted', color = 'red')
 plt.legend(loc = 'upper left')
 plt.show()
 
 from mpl_toolkits.mplot3d import Axes3D
 from matplotlib import cm
 
 fig = plt.figure()
 ax = Axes3D(fig)
 y = Beta1[0] + Beta1[1] * X[:, 0] + Beta1[2] * X[1](:,)
 XX1, XX2 = np.meshgrid(X[:, 0], X[1](:,))
 YY = Beta1[0] + Beta[1] * XX1 + Beata1[2](2.md) * XX2
 ax.plot(X[:, 0], X[1](:,), y, linestyle = 'none', marker = 'o', markerfacecolor = 'blue')
 ax.plot_surface(XX1, XX2, YY, rstride = 1, cstride = 1, cmap = 'hot')
 

참고 문헌


 
최종 수정일: 2024-09-30 12:26:18

이전글 :
다음글 :