Problem reading csv in S3 on spark

UserBird
Dataiker
Problem reading csv in S3 on spark

Hi,



We've encountered the following issue trying to process some large csv files (200G) in s3.




[21:10:41] [INFO] [dku.utils] - [2017/05/21-21:10:41.591] [Thread-3] [WARN] [dku.spark.dataset.s3] - Dataset XXX is not compatible with direct Spark S3 interface: non-empty header


Thereafter, spark job seems to be blocking (I'm guessing that it's trying to be handle one file within a single task). First stage consists in a union plus a map.




[01:15:35] [INFO] [dku.utils] - [2017/05/22-01:15:35.821] [dag-scheduler-event-loop] [INFO] [org.apache.spark.scheduler.DAGScheduler] - waiting: Set(ResultStage 2)
[01:15:35] [INFO] [dku.utils] - [2017/05/22-01:15:35.822] [dag-scheduler-event-loop] [INFO] [org.apache.spark.scheduler.DAGScheduler] - failed: Set()
[01:40:35] [INFO] [dku.utils] - [2017/05/22-01:40:35.345] [dispatcher-event-loop-2] [INFO] [org.apache.spark.storage.BlockManagerInfo] - Removed broadcast_1_piece0 on 172.31.30.96:39920 in memory (size: 39.5 KB, free: 413.9 MB)
[01:40:35] [INFO] [dku.utils] - [2017/05/22-01:40:35.368] [dispatcher-event-loop-4] [INFO] [org.apache.spark.storage.BlockManagerInfo] - Removed broadcast_1_piece0 on 172.31.9.77:39238 in memory (size: 39.5 KB, free: 2.8 GB)


I've tried to skip first lines/uncheck 'Parse next line as column headers' in input datasets, but still seeing same message and behavior.



Do you have an idea on this issue and how we could fix it? Wwe don't have much format choices in s3, and avro seems not usable as well



http://support.dataiku.com/support/tickets/3233



Thanks!



 



 



 

0 Kudos
3 Replies
Léopold_Boudard

Follow-up on this, I could finally go a little further unchecking 'parse line as column headers' and not skipping first lines (which is a bit weird to me). With this option, I did not have the warning and dataset seemed to be properly processed by spark.



Though I had multiple Exceptions thereafter




[2017/05/22-08:01:40.326] [Exec-42] [INFO] [dku.utils] - : org.apache.spark.SparkException: Job aborted due to stage failure: Task 38 in stage 1.0 failed 20 times, most recent failure: Lost task 38.19 in stage 1.0 (TID 6918, 172.31.6.51, executor 25): java.lang.IndexOutOfBoundsException: Index: 10, Size: 10
[2017/05/22-08:01:40.326] [Exec-42] [INFO] [dku.utils] - at java.util.ArrayList.rangeCheck(ArrayList.java:653)
[2017/05/22-08:01:40.327] [Exec-42] [INFO] [dku.utils] - at java.util.ArrayList.get(ArrayList.java:429)
[2017/05/22-08:01:40.327] [Exec-42] [INFO] [dku.utils] - at com.dataiku.dip.shaker.mrimpl.formats.CSVInputFormatAdapter$InternalRecordReader.nextKeyValue(CSVInputFormatAdapter.java:146)
[2017/05/22-08:01:40.327] [Exec-42] [INFO] [dku.utils] - at com.dataiku.dip.shaker.mrimpl.formats.UniversalFileInputFormat$1.nextKeyValue(UniversalFileInputFormat.java:153)
[2017/05/22-08:01:40.327] [Exec-42] [INFO] [dku.utils] - at org.apache.spark.rdd.NewHadoopRDD$$anon$1.hasNext(NewHadoopRDD.scala:199)
[2017/05/22-08:01:40.327] [Exec-42] [INFO] [dku.utils] - at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
[2017/05/22-08:01:40.327] [Exec-42] [INFO] [dku.utils] - at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
[2017/05/22-08:01:40.327] [Exec-42] [INFO] [dku.utils] - at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:439)
[2017/05/22-08:01:40.327] [Exec-42] [INFO] [dku.utils] - at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
[2017/05/22-08:01:40.327] [Exec-42] [INFO] [dku.utils] - at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
[2017/05/22-08:01:40.327] [Exec-42] [INFO] [dku.utils] - at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
[2017/05/22-08:01:40.327] [Exec-42] [INFO] [dku.utils] - at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
[2017/05/22-08:01:40.327] [Exec-42] [INFO] [dku.utils] - at org.apache.spark.shuffle.sort.UnsafeShuffleWriter.write(UnsafeShuffleWriter.java:166)
[2017/05/22-08:01:40.327] [Exec-42] [INFO] [dku.utils] - at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)


 



My guess is that in some cases, dataiku reader couldn't parse properly some bad lines given complex schema (that has nested structures). I selected the 'Discard and emit warning' on read option, though it doesn't seem to solve this issue.



Do you know another workaround on this?



Thanks!



 



 

0 Kudos
AdrienL
Dataiker
Unfortunately, Spark cannot read CSV files that have formatting issues or multiline cells.
If you want to process this data with Spark, you can sync this dataset to HDFS beforehand. Multiline CSV cells are not really supported by Hadoop though.
0 Kudos
AdrienL
Dataiker

Hello,



This warning indicates that the format is not compatible with the direct S3 interface, and the file will be streamed to Spark through Dataiku DSS, which is very slow, possibly giving the impression that the job is hanging.



For S3, CSV should be compatible with the direct Spark/S3 interface if it:




  • Doesn't have headers

  • Doesn't skip lines

  • Does not contain multiline cells



Avro is also compatible, although the schema detection might not work if the structure is not too complex / too big, in which case you can set it manually or programmatically.

0 Kudos