跳转至

15 内存视角(一):如何最大化内存的使用效率?

你好,我是吴磊。

上一讲我们说,想要提升CPU利用率,最重要的就是合理分配执行内存,但是,执行内存只是Spark内存分区的一部分。因此,想要合理分配执行内存,我们必须先从整体上合理划分好Spark所有的内存区域。

可在实际开发应用的时候,身边有不少同学向我抱怨:“Spark划分不同内存区域的原理我都知道,但我还是不知道不同内存区域的大小该怎么设置,纠结来、纠结去。最后,所有跟内存有关的配置项,我还是保留了默认值。”

这种不能把原理和实践结合起来的情况很常见,所以今天这一讲,我就从熟悉的Label Encoding实例出发,一步步带你去分析不同情况下,不同内存区域的调整办法,帮你归纳出最大化内存利用率的常规步骤。这样,你在调整内存的时候,就能结合应用的需要,做到有章可循、有的放矢。

从一个实例开始

我们先来回顾一下第5讲中讲过的Label Encoding。在Label Encoding的业务场景中,我们需要对用户兴趣特征做Encoding。依据模板中兴趣字符串及其索引位置,我们的任务是把千亿条样本中的用户兴趣转换为对应的索引值。模板文件的内容示例如下所示。

//模板文件
//用户兴趣
体育-篮球-NBA-湖人
军事-武器-步枪-AK47

实现的代码如下所示,注意啦,这里的代码是第5讲中优化后的版本。

/**
输入参数:模板文件路径,用户兴趣字符串
返回值:用户兴趣字符串对应的索引值
*/
//函数定义
val findIndex: (String) => (String) => Int = {
(filePath) =>
val source = Source.fromFile(filePath, "UTF-8")
val lines = source.getLines().toArray
source.close()
val searchMap = lines.zip(0 until lines.size).toMap
(interest) => searchMap.getOrElse(interest, -1)
}
val partFunc = findIndex(filePath)

//Dataset中的函数调用
partFunc("体育-篮球-NBA-湖人")

下面,咱们先一起回顾一下代码实现思路,再来分析它目前存在的性能隐患,最后去探讨优化它的方法。

首先,findIndex函数的主体逻辑比较简单,就是读取模板文件和构建Map映射,以及查找用户兴趣并返回索引。不过,findIndex函数被定义成了高阶函数。这样一来,当以模板文件为实参调用这个高阶函数的时候,我们会得到一个内置了Map查找字典的标量函数partFunc,最后在千亿样本上调用partFunc完成数据转换。利用高阶函数,我们就避免了让Executor中的每一个Task去读取模板文件,以及从头构建Map字典这种执行低效的做法。

在运行时,这个函数在Driver端会被封装到一个又一个的Task中去,随后Driver把这些Task分发到Executor,Executor接收到任务之后,交由线程池去执行(调度系统的内容可以回顾第5讲)。这个时候,每个Task就像是一架架小飞机,携带着代码“乘客”和数据“行李”,从Driver飞往Executor。Task小飞机在Executor机场着陆之后,代码“乘客”乘坐出租车或是机场大巴,去往JVM stack;数据“行李”则由专人堆放在JVM Heap,也就是我们常说的堆内内存。

回顾Label encoding中的findIndex函数不难发现,其中大部分都是代码“乘客”,唯一的数据“行李”是名为searchMap的Map字典。像这样用户自定义的数据结构,消耗的内存区域就是堆内内存的User Memory(Spark对内存区域的划分内容可以回顾一下第7讲)。

User Memory性能隐患

回顾到这里,你觉得findIndex函数有没有性能隐患呢?你可以先自己思考一下,有了答案之后再来看我下面的分析。

答案当然是“有”。首先,每架小飞机都携带这么一份数据“大件行李”,自然需要消耗更多的“燃油”,这里的“燃油”指的是Task分发过程中带来的网络开销。其次,因为每架小飞机着陆之后,都会在Executor的“旅客行李专区”User Memory寄存上这份同样的数据“行李”,所以,User Memory需要确保有足够的空间可以寄存所有旅客的行李,也就是大量的重复数据

那么,User Memory到底需要准备出多大的内存空间才行呢?我们不妨来算一算。这样的计算并不难,只需要用飞机架次乘以行李大小就可以了。

用户自定义的数据结构往往是用于辅助函数完成计算任务的,所以函数执行完毕之后,它携带的数据结构的生命周期也就告一段落。因此,在Task的数量统计上,我们不必在意一个Executor总共需要处理多少个Task,只需要关注它在同一时间可以并行处理的Task数量,也就是Executor的线程池大小即可

我们说过,Executor线程池大小由spark.executor.cores和spark.task.cpus这两个参数的商(spark.executor.cores/spark.task.cpus)决定,我们暂且把这个商记作#threads。

接下来是估算数据“行李”大小,由于searchMap并不是分布式数据集,因此我们不必采用先Cache,再提取Spark执行计划统计信息的方式。对于这样的Java数据结构,我们完全可以在REPL中,通过Java的常规方法估算数据存储大小,估算得到的searchMap大小记为#size。

好啦!现在,我们可以算出,User Memory至少需要提供#threads * #size这么大的内存空间,才能支持分布式任务完成计算。但是,对于User Memory内存区域来说,使用#threads * #size的空间去重复存储同样的数据,本身就是降低了内存的利用率。那么,我们该怎么省掉#threads * #size的内存消耗呢?

性能调优

学习过广播变量之后,想必你头脑中已经有了思路。没错,咱们可以尝试使用广播变量,来对示例中的代码进行优化。

仔细观察findIndex函数,我们不难发现,函数的核心计算逻辑有两点。一是读取模板文件、创建Map映射字典;二是以给定字符串对字典进行查找,并返回查找结果。显然,千亿样本转换的核心需求是其中的第二个环节。既然如此,我们完全可以把创建好的Map字典封装成广播变量,然后分发到各个Executors中去。

有了广播变量的帮忙,凡是发往同一个Executor的Task小飞机,都无需亲自携带数据“行李”,这些大件行李会由“联邦广播快递公司”派货机专门发往各个Executors,Driver和每个Executors之间,都有一班这样的货运专线。思路说完了,优化后的代码如下所示。

/**
广播变量实现方式
*/
//定义广播变量
val source = Source.fromFile(filePath, "UTF-8")
val lines = source.getLines().toArray
source.close()
val searchMap = lines.zip(0 until lines.size).toMap
val bcSearchMap = sparkSession.sparkContext.broadcast(searchMap)

//在Dataset中访问广播变量
bcSearchMap.value.getOrElse("体育-篮球-NBA-湖人", -1)

上面代码的实现思路很简单:第一步还是读取模板文件、创建Map字典;第二步,把Map字典封装为广播变量。这样一来,在对千亿样本进行转换时,我们直接通过bcSearchMap.value读取广播变量内容,然后,通过调用Map字典的getOrElse方法来获取用户兴趣对应的索引值。

相比最开始的第一种实现方式,第二种实现方式的代码改动还是比较小的,那这一版代码对内存的消耗情况有什么改进呢?

我们发现,Task小飞机的代码“乘客”换人了!小飞机之前需要携带函数findIndex,现在则换成了一位“匿名的乘客”:一个读取广播变量并调用其getOrElse方法的匿名函数。由于这位“匿名的乘客”将大件行李托运给了“联邦广播快递公司”的专用货机,因此,Task小飞机着陆后,没有任何“行李”需要寄存到User Memory。换句话说,优化后的版本不会对User Memory内存区域进行占用,所以第一种实现方式中#threads * #size的内存消耗就可以省掉了。

Storage Memory规划

这样一来,原来的内存消耗转嫁到了广播变量身上。但是,广播变量也会消耗内存,这会不会带来新的性能隐患呢?那我们就来看看,广播变量消耗的具体是哪块内存区域。

回顾存储系统那一讲,我们说过,Spark存储系统主要有3个服务对象,分别是Shuffle中间文件、RDD缓存和广播变量。它们都由Executor上的BlockManager进行管理,对于数据在内存和磁盘中的存储,BlockManager利用MemoryStore和DiskStore进行抽象和封装。

那么,广播变量所携带的数据内容会物化到MemoryStore中去,以Executor为粒度为所有Task提供唯一的一份数据拷贝。MemoryStore产生的内存占用会被记入到Storage Memory的账上。因此,广播变量消耗的就是Storage Memory内存区域

接下来,我们再来盘算一下,第二种实现方式究竟需要耗费多少内存空间。由于广播变量的分发和存储以Executor为粒度,因此每个Executor消耗的内存空间,就是searchMap一份数据拷贝的大小。searchMap的大小我们刚刚计算过就是#size。

明确了Storage Memory内存区域的具体消耗之后,我们自然可以根据公式:(spark.executor.memory – 300MB)* spark.memory.fraction * spark.memory.storageFraction去有针对性地调节相关的内存配置项。

内存规划两步走

现在,咱们在两份不同的代码实现下,分别定量分析了不同内存区域的消耗与占用。对于这些消耗做到心中有数,我们自然就能够相应地去调整相关的配置项参数。基于这样的思路,想要最大化内存利用率,我们需要遵循两个步骤:

  • 预估内存占用
  • 调整内存配置项

我们以堆内内存为例,来讲一讲内存规划的两步走具体该如何操作。我们都知道,堆内内存划分为Reserved Memory、User Memory、Storage Memory和Execution Memory这4个区域。预留内存固定为300MB,不用理会,其他3个区域需要你去规划。

预估内存占用

首先,我们来说内存占用的预估,主要分为三步。

第一步,计算User Memory的内存消耗。我们先汇总应用中包含的自定义数据结构,并估算这些对象的总大小#size,然后用#size乘以Executor的线程池大小,即可得到User Memory区域的内存消耗#User

第二步,计算Storage Memory的内存消耗。我们先汇总应用中涉及的广播变量和分布式数据集缓存,分别估算这两类对象的总大小,分别记为#bc、#cache。另外,我们把集群中的Executors总数记作#E。这样,每个Executor中Storage Memory区域的内存消耗的公式就是:#Storage = #bc + #cache / #E

第三步,计算执行内存的消耗。学习上一讲,我们知道执行内存的消耗与多个因素有关。第一个因素是Executor线程池大小#threads,第二个因素是数据分片大小,而数据分片大小取决于数据集尺寸#dataset和并行度#N。因此,每个Executor中执行内存的消耗的计算公式为:#Execution = #threads * #dataset / #N

调整内存配置项

得到这3个内存区域的预估大小#User、#Storage、#Execution之后,调整相关的内存配置项就是一件水到渠成的事情(由公式(spark.executor.memory – 300MB)* spark.memory.fraction * spark.memory.storageFraction)可知),这里我们也可以分为3步。

首先,根据定义,spark.memory.fraction可以由公式(#Storage + #Execution)/(#User + #Storage + #Execution)计算得到

同理,spark.memory.storageFraction的数值应该参考(#Storage)/(#Storage + #Execution)

最后,对于Executor堆内内存总大小spark.executor.memory的设置,我们自然要参考4个内存区域的总消耗,也就是300MB + #User + #Storage + #Execution。不过,我们要注意,利用这个公式计算的前提是,不同内存区域的占比与不同类型的数据消耗一致

总的来说,在内存规划的两步走中,第一步预估不同区域的内存占比尤为关键,因为第二步中参数的调整完全取决于第一步的预估结果。如果你按照这两个步骤去设置相关的内存配置项,相信你的应用在运行时就能够充分利用不同的内存区域,避免出现因参数设置不当而导致的内存浪费现象,从而在整体上提升内存利用率。

小结

合理划分Spark所有的内存区域,是同时提升CPU与内存利用率的基础。因此,掌握内存规划很重要,在今天这一讲,我们把内存规划归纳为两步走。

第一步是预估内存占用。

  • 求出User Memory区域的内存消耗,公式为:#User=#size乘以Executor线程池的大小。
  • 求出每个Executor中Storage Memory区域的内存消耗,公式为:#Storage = #bc + #cache / #E。
  • 求出执行内存区域的内存消耗,公式为:#Execution = #threads * #dataset / #N。

第二步是调整内存配置项:根据公式得到的3个内存区域的预估大小#User、#Storage、#Execution,去调整(spark.executor.memory – 300MB)* spark.memory.fraction * spark.memory.storageFraction公式中涉及的所有配置项。

  • spark.memory.fraction可以由公式(#Storage + #Execution)/(#User + #Storage + #Execution)计算得到。
  • spark.memory.storageFraction的数值应该参考(#Storage)/(#Storage + #Execution)。
  • spark.executor.memory的设置,可以通过公式300MB + #User + #Storage + #Execution得到。

这里,我还想多说几句,内存规划两步走终归只是手段,它最终要达到的效果和目的,是确保不同内存区域的占比与不同类型的数据消耗保持一致,从而实现内存利用率的最大化

每日一练

  1. 你知道估算Java对象存储大小的方法有哪些吗?不同的方法又有哪些优、劣势呢?
  2. 对于内存规划的第一步来说,要精确地预估运行时每一个区域的内存消耗,很费时、费力,调优的成本很高。如果我们想省略掉第一步的精确计算,你知道有哪些方法能够粗略、快速地预估不同内存区域的消耗占比吗?

期待在留言区看到你的思考和答案,我们下一讲见!

精选留言(15)
  • 金角大王 👍(38) 💬(1)

    老师,为何UserMemory中自定义数据结构不能像bc那样在StorageMemory中只存一份?

    2021-04-16

  • Fendora范东_ 👍(27) 💬(1)

    还有个疑问,想请教下磊哥 文中说的数据集大小是内存中的数据集吧 文件落盘后数据集大小可以很方便查看,那内存中数据集大小怎么看呢

    2021-04-16

  • 静心 👍(20) 💬(1)

    老师,我发现14讲中,计算合理的并行度,依赖之一是在执行内存大小给定的前提下。而15讲中,计算执行内存大小,依赖之一是在并行度给定的前提下。所以到底如何破局呢?

    2021-05-12

  • zxk 👍(20) 💬(1)

    老师,这边想请教两个问题。 问题一:User Memory、Execution Memory、Storage Memory 是属于 Spark 自身对内存区域的划分,但 Spark 的 executor 实际上又是一个 JVM,假如我把 User Memory 设置的非常小,又自定义了一个很大的数据结构,此时 User Memory 不够用了,而 Execution Memory、Storage Memory 还有很大的空闲,那么这时候会不会 OOM?如果是 GC 又不太符合 JVM 的 GC 条件。 问题二:在使用 mapPartition 算子的时候,如果我在进入迭代前外部定义了一个 map,然后迭代中往这个 map 添加数据,那么这个 map 又是占用哪部分内存的?

    2021-04-18

  • 西南偏北 👍(16) 💬(2)

    老师讲的预估内存占用非常细,但就像老师给出的第二题中说的那样,如果Spark应用程序中的计算逻辑很多,这样预估自然是很精确,但是会花费大量时间,成本巨大!分享一下自己平时粗略估算内存占用的方法(如有不对,还望老师纠正): 1、Storage Memory估算:将要缓存到内存的RDD/Dataset/Dataframe或广播变量进行cache,然后在Spark WEBUI的Storage标签页下直接查看所有的内存占用,大致就对应Storage Memory。 2、Execution Memory估算:有了Storage Memory,因为默认情况下Execution Memory和Storage Memory占用Spark Memory的比例是相同的,这里可以将Execution Memory和Storage Memory设置为相同。 3、User Memory:如果应用中没有使用太多自定义数据类型,保持默认值即可;如果使用了很多自定义数据类型,按老师说的方式进行估算即可。 上面只是一个粗略的估算,可能需要根据任务的执行情况进行一些调整。

    2021-05-04

  • licl1008 👍(14) 💬(4)

    老师 课后思考中提到内存规划第一步很麻烦 您在留言回复中提到可以根据sparkUI估算'数据放大倍数',然后粗略估算内存。请问具体是如何操作? 根据您这个方法,可以同时得出 user execution storage三个空间的内存大小? 谢谢指导

    2021-05-10

  • 快跑 👍(14) 💬(1)

    REPL 中,通过 Java 的常规方法估算数据存储大小 老师,这个过程具体是怎么做呢。

    2021-04-19

  • Geek_d794f8 👍(4) 💬(2)

    老师,我有一个疑惑,对于有多个stage的任务,每个stage的内存预估的情况可能不一样。那样就无法给一个比较适合所有stage得内存配置?

    2021-04-18

  • licl1008 👍(3) 💬(1)

    老师 我是接着昨天的提问再问一下😂 execution mem内存估算中,#dataset不用算#bc过的数据集。但我的疑惑是:#bc只是把文件系统的数据搬到了内存,同样只是一个Relation而已,和执行要的内存似乎没什么联系。execution时,比如一些aggregate操作,不是同样也要把这个#bc relation的数据读出来,然后驻留在内存中处理吗。那岂不是也是要算入执行内存?或者我没懂算执行内存的理论依据到底是什么?我理解的执行内存是做aggregate sort等操作需要的内存空间,也是就pairedbuffer这之类的结构的大小 谢谢老师不吝赐教😄

    2021-05-20

  • 👍(2) 💬(9)

    老师,结合这一节内容和之前的,自定义数据结构其实和hive中的表关系不大,不涉及自定义,那么这时候是不是就可以把存储自定义这部分的内存匀出来给到统一内存身上

    2021-04-16

  • Geek_1564ee 👍(1) 💬(1)

    老师,你好,想问下shuffle map阶段和reduce阶段写入和读取缓冲区是消耗的storage memory内存区域么?另外spark.shuffle.file.buffer和spark.reducer.maxSizeInFlight是以task为粒度的么?

    2022-01-26

  • Geek_eb29a4 👍(1) 💬(1)

    利用高阶函数,我们就避免了让 Executor 中的每一个 Task 去读取模板文件,以及从头构建 Map 字典这种执行低效的做法。 这个原理 有什么参考资料可以看一下?

    2021-12-14

  • wow_xiaodi 👍(1) 💬(1)

    老师,比如一个filter算子里的自定义函数我用了一个自定义数据结构,但是filter算子是RDD里N条都要去执行的,那么这个自定义数据结构将会存在N份,直到GC回收为止。这里这个内存占用大小就很难估算了,算他是1份的占用明显不合理,但是我又不知道有多少条数据会调用,这时如何估算比较好呢?

    2021-08-06

  • licl1008 👍(1) 💬(1)

    老师 算execution mem时,比如两个数据集join的场景, 有一个小数据集broadcast了, 那#dataset要包括#bc吗? 还是只用算那个没有被广播的数据集的大小?

    2021-05-19

  • Geek_81beba 👍(1) 💬(1)

    如何配置spark的参数选项,让他像hive一样自动划分内存和cpu然后稳定的运行呢

    2021-05-07