spark 持久化

通过持久化存储,提升spark应用性能,以更好地满足实际需求。

Posted by 果然 on June 8, 2022

持久化存储是spark非常重要的一个特性,所谓的spark持久化存储,就是将一个RDD持久化至内存或磁盘中,以便重用该RDD,省去重新计算的环节,以空间换时间。RDD持久化,是一个分布式的过程,其内部的每个partition各自缓存到所在的计算节点上,根据复用的需求再来读取。

spark:RDD持久化的目的

在spark中有时候很多地方都会用到同一个RDD,按照常规的做法的话,每个地方遇到action操作的时候都会对同一个算子计算多次,这样会造成效率低下的问题。
持久化之后的复用,复用的逻辑不会再重头执行了,性能提升明显。

spark 的集中持久化级别

  • Memory_only:将RDD以反序列化java对象的形式存储在JVM中。若内存空间不足,部分数据分区将不再缓存,在每次需要用到这些数据时重新进行计算。这是默认的级别。
  • Memory_AND_DISK:若内存空间不足,将未缓存的数据分区存储在磁盘中,在需要使用这些分区时从磁盘中读取。
  • Memory_ONLY_SER:将RDD以序列化的java对象的形式进行存储(每个分区为一个Byte数组),这种方式会比反序列化对象的方式节省很多空间,但在读取时会增加CPU的计算负担。
  • MEMORY_AND_DISK_SER
  • DISK_ONLY:只在磁盘上缓存RDD
  • MEMORY_ONLY_2,MEMORY_AND_DISK_2,等等:与上面的级别功能相同,只不过每个分区在集群中每个节点上建立副本。

如何选择存储级别

核心问题实在内存使用率和CPU效率之间进行取舍。

  • 若使用默认的存储级别(memory_only),存储在内存中的RDD 没有发生溢出。默认存储级别可以最大程度的提升CPU的效率,可以使在RDD上的操作以最快的速度运行。
  • 若内存不能全部存储RDD,那么使用MEMORY_ONLY_SER,并挑选一个快速序列化库将对象序列化,以节省内存空间。使用这种存储级别,计算速度仍然很快。
  • 除了在计算该数据集的代价特别高,或者在需要过流大量数据的情况下,尽量不要将溢出的数据存储在磁盘。因为,重新计算这个数据分区的耗时与从磁盘读取这些数据的耗时差不多。
  • 若想快速还原故障,建议使用多副本存储级别。所有的存储级别都通过重新计算丢失的数据的方式,提供了完全容错机制。但多副本级别在发生数据丢失时,不需要重新计算对应的数据库,可以让任务继续运行。

删除数据

spark 自动监控各个节点上的缓存使用了,并以最近最少使用的方式(LRU)将旧数据移除内存。若想手动移除一个RDD,而不是等待该RDD被spark自动移除,可以使用RDD.unpersist()

RDD 的checkpoint机制

除了cache和persist之外,spark还提供了另外一种持久化:checkpoint,它能将RDD写入分布式文件系统,提供类似于数据库快照的功能。
它提供了一种相对而言更加可靠的数据持久化方式,把数据保存在分布式文件系统,如HDFS。这里就是利用了hdfs高可用性,高容错性(多副本)来最大程度保证数据的安全性。

RDD 持久化中cache|persist|checkpoint之间区别

-1. cache

  • 将数据临时存放入内存中进行数据重用
  • 会在血缘关系中添加新的依赖,一旦出现问题,可以重头读取数据
  • 如果任务执行完毕,临时保存的数据文件就会丢失

-2. persist

  • 将数据临时存放在内存或磁盘中进行数据重用
  • 涉及到磁盘io,性能较低,但是数据安全
  • 如果作业执行完毕,临时保存的数据文件就会丢失

-3. checkpoint

  • 将数据长久的保存在磁盘文件中进行数据重用
  • 涉及到磁盘io,性能较低,但是数据安全
  • 为了保证数据安全,一般会独立执行作用
  • 为了能提高效率,一般情况,需要与cache联合使用
  • 在执行过程中,会切断血缘关系,重新建立新的血缘关系
  • checkpoint等同于改变数据源

DStream 持久化

DStream 也是支持持久化的,同样是使用persist()与cache()方法。
持久化通常在有状态的算子中使用,如窗口操作,默认情况下,虽然没有显性地调用持久化方法,但是底层已经帮用户做了持久化操作。与RDD的持久化不同,DStream的默认持久性级别将数据序列化在内存中。