spark

专为大规模数据处理而设计的快速通用的计算引擎

Posted by 果然 on December 30, 2021

Apache Spark是专为大规模数据处理而设计的快速通用的计算引擎,是伯克利分校的 AMP 实验室所开源的类 Hadoop MapReduce 的通用并行框架。
spark,拥有 Hadoop MapReduce 所具有的优点;但不同于 MapReduce 的是 ——— Job 中间输出结果可以保存在内存中,从而不再需要读写 HDFS,因此spark能更好地适用于数据挖掘与机器学习等需要迭代的 MapReduce 算法。

区别

【spark 较 hadoop 快】
因为上述链接已给出了一些回答,下面简单总结下:

区别 spark mapreduce
内存 vs 磁盘 不需将计算中间结果写入内存(由于RDD和DAG) 计算的中间结果写入磁盘,还需读取磁盘,导致了频繁的磁盘IO
shuffle不同 仅在部分场景中才需排序,支持基于Hash的分布式聚合 花费大量时间进行排序
多进程模型 vs 多线程模型 复用线程池中的线程来减少启动关闭task所需开销,但会发生资源争用 采用多进程模型,会消耗一定的启动时间

单机安装

在centos7 中安装 spark,需下载:
scala-2.11.12.tgz
spark-2.4.4-bin-without-hadoop.tgz

scala 的安装

tar -zxvf scala-2.11.12.tgz -C /usr/local  
cd /usr/local/scala 
mv scala-2.11.12 scala
vim /etc/profile
  
# ADD scala path  
 
export SCALA_HOME=/usr/local/scala
export PATH=${SCALA_HOME}/bin 
source /etc/profile

# verfity
scala -version 

spark 的安装

tar -zxvf spark-2.4.4-bin-without-hadoop.tgz /usr/local
cd /usr/local/spark
mv spark-2.4.4-bin-without-hadoop spark 

vim /etc/profile
  
# ADD spark path 
export SPARK_HOME=/usr/local/spark
export PATH=${SPARK_HOME}/bin 
source /etc/profile

cd /usr/local/spark/conf
cp spark-env.sh.template spark-env.sh

vim spark-env.sh  

export SPARK_DIST_CLASSPATH=$(/usr/local/hadoop/bin/hadoop classpath)
export JAVA_HOME=/usr/java/jdk1.8.0_162
export HADOOP_HOME=/usr/local/hadoop
#export HIVE_CONF_DIR=/usr/local/hive/conf
export SPARK_HOME=/usr/local/spark
export SCALA_HOME=/usr/local/scala
export SPARK_MASTER_IP=aubin.com
export SPARK_WORKER_MEMORY=512M  

# verfity
./bin/spark-shell

dirver 和 executor的理解

d-e

spark模式比较

spark 主要有四种运行模式: Local、standalone、yarn、mesos。

模式 解释 sparkshell
local 在一台机器上,一般用于开发测试 ./spark-shell -master local[2]
standalone 完全独立的spark集群,不依赖其他集群 ./spark-shell –master spark://xxx:70077
yarn 依赖hadoop集群,yarn资源调度框架,将应用提交至yarn ./spark-shell –master yarn
mesos 类似yarn模式,运行在mesos集群上 ./spark-shell –master mesos

这里简单说明下,基于 Yarn-cluster 模式提交任务的原理图解。

./spark-submit --master yarn --deploy-mode cluster --class jar...

参见打包为jar的过程。

yarn-cluster

执行流程如下:

  • 客户机提交 application 应用程序,发送请求到RM(ResourceManager),请求启动AM(ApplicationMaster);
  • RM 接收到请求后随机在一台 NM(NodeManager)上启动 AM(相当于Driver 端);
  • AM 启动,AM发送请求到 RM,请求一批 container 用于启动 executor;
  • RM 返回一批 NM 节点给 AM;
  • AM 连接到NM,发送请求到 NM启动 executor;
  • executor 反向注册到 AM 所在节点的Driver。Driver发送 task 至 executor。

spark-submit 提交作业参数说明

注意:在集群中设置了 kerberos 网络权限协议,在提交spark作业时,需提供参数 –keytab 与 –principal,具体可见链接

参数 说明
–master 集群的 master 地址
–deploy-mode driver运行模式,client或cluster模式
–class 应用程序主类
-–executor-memory 每个executor 的执行内存
-–executor-cores 分配给每个executor的内核数
–num-executor executor的数量(仅yarn)
–keytab 包含keytab文件的全路径(仅yarn)
–principal 运行于secure hdfs 时用于登录到KDC的principal

spark 读取具有 HA 的HDFS数据

方法一:通过配置参数的方式访问

val spark = SparkSession.builder().appName("OfflineSpark")
.master("local[*]")
.getOrCreate()
val sc = spark.sparkContext
 
sc.hadoopConfiguration.set("fs.defaultFS", "hdfs://cdhservice")
sc.hadoopConfiguration.set("dfs.nameservices", "cdhservice")
sc.hadoopConfiguration.set("dfs.ha.namenodes.cdhservice", "namenode36,namenode105")
sc.hadoopConfiguration.set("dfs.namenode.rpc-address.cdhservice.namenode36", "cdh1:8020")
sc.hadoopConfiguration.set("dfs.namenode.rpc-address.cdhservice.namenode105", "cdh3:8020")
sc.hadoopConfiguration.set("dfs.client.failover.proxy.provider.cdhservice", "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider")
 
val data = sc.textFile("hdfs://cdhservice/user/tescomm/data/install.log")
data.foreach(println)
val druiddata = spark.read.parquet("hdfs://cdhservice/user/tescomm/data/16th").limit(10)
druiddata.show()

方法二:通过配置文件访问:

 val spark = SparkSession.builder().appName("OfflineSpark")
.master("local[*]")
.getOrCreate()
val sc = spark.sparkContext
sc.hadoopConfiguration.addResource("cdhservice/core-site.xml")
sc.hadoopConfiguration.addResource("cdhservice/hdfs-site.xml")
 
val data = sc.textFile("hdfs://cdhservice/user/tescomm/data/install.log")
data.foreach(println)
val druiddata = spark.read.parquet("hdfs://cdhservice/user/tescomm/data/16th").limit(10)
druiddata.show()

小结

spark 有三大引擎,spark core、sparkSQL、sparkStreaming
spark core 的关键抽象是 SparkContext、RDD
SparkSQL 的关键抽象是 SparkSession、DataFrame
sparkStreaming 的关键抽象是 StreamingContext、DStream

SparkSession 是 spark2.0 引入的概念,主要用在 sparkSQL 中,当然也可以用在其他场合,它可以代替 SparkContext; SparkSession 其实是封装了 SQLContext 和 HiveContext

bugs

  1. Error: A JNI error has occurred, please check your installation and try again
    原因】:缺少 hadoop 依赖包
    解决方法】:spark-env.sh 中添加 hadoop 的 classpath 至 SPARK_DIST_CLASSPATH中
    export SPARK_DIST_CLASSPATH=$(/usr/local/hadoop/bin/hadoop classpath)
  2. Exception in thread “main” java.lang.NoSuchMethodError: jline.console.completer.CandidateListCompletionHandler.setPrintSpaceAfterFullCompletion(Z)V
    原因】:缺少 jline jar 依赖包
    解决方法】:将 scala 的jar文件中的 jline.jar 复制至 spark 所在的 jar 包中
  3. 其他问题

秋景