推荐阅读文章列表
背景
先来看一个简单的case——匹配任意字符开头的单词,scala代码逻辑如下:
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object SerializableStudy {
def main(args: Array[String]): Unit = {
val conf = newSparkConf().setAppName("Serializable").setMaster("local")
val sc = newSparkContext(conf)
sc.setLogLevel("ERROR")
val words = List("spark", "scala", "java", "hadoop")
val inputRDD = sc.parallelize(words, 2)
val query = newQuery("s")
query.matchWord(inputRDD)
}
}
class Query(val name: String) {
def matchWord(inputRDD: RDD[String]): Unit = {
inputRDD.filter(
word => word.startsWith(name)
).collect().foreach(println)
}
}
执行后报错如下:Caused by: java.io.NotSerializableException: Query
问题分析
为什么会报错呢?
因为在Executor端执行filter函数时,用到了Driver端的name变量,此时涉及到网络传输,必须进行序列化;name变量是String,已经实现了序列化接口,那为什么仍然会报错呢?
进一步分析发现,这个name实际上是Query对象的成员变量,因此必须对Query对象进行序列化
解决方案
让Query类继承序列化特质即可(extends Serializable)
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object SerializableStudy {
def main(args: Array[String]): Unit = {
val conf = newSparkConf().setAppName("Serializable").setMaster("local")
val sc = newSparkContext(conf)
sc.setLogLevel("ERROR")
val words = List("spark", "scala", "java", "hadoop")
val inputRDD = sc.parallelize(words, 2)
val query = newQuery("s")
query.matchWord(inputRDD)
}
}
class Query(val name: String) extends Serializable {
def matchWord(inputRDD: RDD[String]): Unit = {
inputRDD.filter(
word => word.startsWith(name)
).collect().foreach(println)
}
}
Spark默认使用 Java 序列化(JavaSerializer)方式来进行数据的序列化和反序列化,它是一种内置的机制,允许开发者将对象的状态保存到文件系统或通过网络传输。尽管这种方式简单且易于使用,但它也有一些缺点,比如生成的序列化数据体积较大,序列化性能不是最优的。同时,Spark还提供了一种性能更高的序列化方式:Kryo 序列化(KryoSerializer)
通常,我们会在Spark代码中引入Kryo序列化的方式,来提升任务执行效率
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
总结
RDD算子逻辑代码(比如filter、map、groupby、collect等算子)在Executor端执行,其他的代码都是在Driver端执行
写在最后
V7.0笔记获取方式
公众号回复:大数据面试笔记

