Scala异常获取一例

在处理第11行读文件时,由于数据文件出现的不规律,在指定日期内可能存在日志文件不存在的情况,这里需要处理下异常:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
def readLog(sc: SparkContext, startDate: String, endDate: String, logNames: List[String]): RDD[String] = {
val dateLst = DateUtils.getDateListBetweenTwoDate(startDate, endDate)

var logRdd = sc.makeRDD(List[String]())
for (date <- dateLst) {
val year = date.substring(0, 4)
val month = date.substring(4, 6)
val day = date.substring(6, 8)
for (logName <- logNames) {
val logRdd = logRdd.union(
try {sc.textFile(s"cosn://fuge/mid-data/fuge/ssp/bid-log/$year/$month/$day/${logName}*")
.map(x => x.split("\\|", -1))
.filter(x => x.length >= 2 && (x(1).trim == "6" || x(1).trim == "0")).map(_.toString) // 0和6为请求成功的状态码
} catch {
case _: Exception => sc.makeRDD(List[String]())
}
)
}
}
logRdd
}

最初犯的错误是把try catch放在了union语句的上层,导致异常是被抓取了,但是RDD却未被正确赋值。

除了上述方法,还可通过重写文件读取接口实现异常的捕获与RDD的正常赋值:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
 def textFileFromObjectStorage(path: String, numOfCoalesce: Int = 0): RDD[String] = {
val data = sc.newAPIHadoopFile(path
, classOf[EmptiableTextInputFormat]
, classOf[LongWritable]
, classOf[Text]
, sc.hadoopConfiguration)
.map {
x =>
x._2.toString
}
numOfCoalesce match {
case 0 =>
data
case _ =>
data.coalesce(numOfCoalesce)
}
}

newAPIHadoopFile说明,参考:newAPIHadoopFile Doc