首先,我们对比一下foreachPartition
和foreach
两个方法的实现,有什么不同的地方:
1 | def foreach(f: T => Unit): Unit = withScope { |
2个方法,参数都是一个函数文本,不同的是foreach当中,函数文本希望的参数是T,也就是RDD当中的元素类型;foreachPartition当中,函数文本希望的参数是Iterator[T]
,也就是一个partition。
而在内部实现上,其实是大同小异的。对于foreachPartition而言,直接在各个partition上运行传入的函数文本;而对于foreach而言,是把传入的函数文本,交给各个partition的foreach去执行。
我们查看一些spark性能优化指南,会提到用foreachPartition替代foreach,有助于性能的提高
。那么我们要怎样来理解这句话呢?看看下面这段代码:
1 | rdd.foreach { x => { |
在上面这段代码当中,针对RDD当中的每一条数据,都会new一个db client,这样的效率,显然是无比底下的。正确的写法应该是这个样子的:
1 | rdd.foreachPartition { part => { |
那么这种写法究竟好在哪里,还是要从spark的核心概念开始讲起,我们都知道spark是一个分布式的实时计算系统,而RDD是分布式计算的基础,而partition分区又是这个当中的关键,比如我们搭建一个3*4core
的spark集群,对于一个大任务而言,我们往往是希望有12个线程一起来完成这个任务,用下面的代码来构建rdd就能够达到我们的目的:
1 | val rdd = sc.textFile("hdfs://master:9000/woozoom/mavlink1.log", 12) |
注意第二个参数12,代表着构建出来的rdd的分区数量。之后,rdd.foreachPartition
,spark集群会把12个分区分别交给12个线程来分别进行处理。结合上面的代码,dbClient 会在每个线程当中分别构建,会有12个db client被构建。
那么有没有另一种可能性,我们只构建一个db client,12个线程都用这一个db client来执行数据库操作,像下面这样:
1 | val dbClient = new DBClient |
要这么写,需要有2个前提:
- dbClient 是线程安全的
- dbClient 实现了java的序列化接口。而在很多情况下,例如在对hbase进行访问的时候,这两个条件都是不满足的。
Partition数量影响及调整
下面我们讨论一下Partition数量的影响以及合适的值
Partition数量的影响
- Partition数量太少
- 太少的影响显而易见,就是资源不能充分利用,例如local模式下,有16core,但是Partition数量仅为8的话,有一半的core没利用到。
- Partition数量太多
- 太多,资源利用没什么问题,但是导致task过多,task的序列化和传输的时间开销增大。
Partition调整
- repartition
- reparation是coalesce(numPartitions, shuffle = true),repartition不仅会调整Partition数,也会将Partitioner修改为hashPartitioner,产生shuffle操作。
- coalesce
- coalesce函数可以控制是否shuffle,但当shuffle为false时,只能减小Partition数,无法增大。