在处理第11行读文件时,由于数据文件出现的不规律,在指定日期内可能存在日志文件不存在的情况,这里需要处理下异常:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
| def readLog(sc: SparkContext, startDate: String, endDate: String, logNames: List[String]): RDD[String] = { val dateLst = DateUtils.getDateListBetweenTwoDate(startDate, endDate)
var logRdd = sc.makeRDD(List[String]()) for (date <- dateLst) { val year = date.substring(0, 4) val month = date.substring(4, 6) val day = date.substring(6, 8) for (logName <- logNames) { val logRdd = logRdd.union( try {sc.textFile(s"cosn://fuge/mid-data/fuge/ssp/bid-log/$year/$month/$day/${logName}*") .map(x => x.split("\\|", -1)) .filter(x => x.length >= 2 && (x(1).trim == "6" || x(1).trim == "0")).map(_.toString) } catch { case _: Exception => sc.makeRDD(List[String]()) } ) } } logRdd }
|
最初犯的错误是把try catch放在了union语句的上层,导致异常是被抓取了,但是RDD却未被正确赋值。
除了上述方法,还可通过重写文件读取接口实现异常的捕获与RDD的正常赋值:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| def textFileFromObjectStorage(path: String, numOfCoalesce: Int = 0): RDD[String] = { val data = sc.newAPIHadoopFile(path , classOf[EmptiableTextInputFormat] , classOf[LongWritable] , classOf[Text] , sc.hadoopConfiguration) .map { x => x._2.toString } numOfCoalesce match { case 0 => data case _ => data.coalesce(numOfCoalesce) } }
|
newAPIHadoopFile说明,参考:newAPIHadoopFile Doc