我的公司正在尝试将服务从AWS迁移到GCP。我们面临一些问题。AWS Kinesis收集的数据是.gz
文件。我们使用GCP的Cloud Storage将这些文件传输到GCP平台,并使用Dataproc处理这些数据。所有这些数据都可以在AWS中正确处理,但不能由同一Spark作业正确读取。
参见最后抛出的异常。
我试图ABC.gz
在GCP Cloud Shell中解压缩这些文件之一,例如。在解压文件仍然结尾.gz
:ABC.gz
。我认为这是根本原因,因为Spark可能试图解压缩解压缩的文件。
如果我们通过删除.gz
后缀来重命名这些文件,则Spark可以正常运行。但是,重命名过程非常耗时,并且需要花费多个小时来处理一天的数据。
任何建议深表感谢。提前致谢。
Caused by: java.io.IOException: incorrect header check
at org.apache.hadoop.io.compress.zlib.ZlibDecompressor.inflateBytesDirect(Native Method)
at org.apache.hadoop.io.compress.zlib.ZlibDecompressor.decompress(ZlibDecompressor.java:225)
at org.apache.hadoop.io.compress.DecompressorStream.decompress(DecompressorStream.java:111)
at org.apache.hadoop.io.compress.DecompressorStream.read(DecompressorStream.java:105)
at java.io.InputStream.read(InputStream.java:101)
at org.apache.hadoop.util.LineReader.fillBuffer(LineReader.java:182)
at org.apache.hadoop.util.LineReader.readDefaultLine(LineReader.java:218)
at org.apache.hadoop.util.LineReader.readLine(LineReader.java:176)
at org.apache.hadoop.mapreduce.lib.input.LineRecordReader.skipUtfByteOrderMark(LineRecordReader.java:151)
at org.apache.hadoop.mapreduce.lib.input.LineRecordReader.nextKeyValue(LineRecordReader.java:191)
at org.apache.spark.sql.execution.datasources.RecordReaderIterator.hasNext(RecordReaderIterator.scala:39)
at org.apache.spark.sql.execution.datasources.HadoopFileLinesReader.hasNext(HadoopFileLinesReader.scala:50)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:109)
at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:190)
at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:109)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown
Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$12$$anon$1.hasNext(WholeStageCodegenExec.scala:6
31)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:253)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:247)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:836)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:836)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:49)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
没有更多细节,很难说出到底发生了什么,但是很可能您存储的.gz
文件未压缩或使用GCS解压缩转码。这意味着Spark读取的文件已经解压缩(如果使用GCS解压缩转码,它们首先不会被压缩,也不会被HTTP客户端库解压缩),这会导致失败,因为Hadoop / Spark也会自动尝试解压缩具有.gz
扩展名的文件。
如果上述情况成立,则似乎除了重命名这些文件以删除.gz
扩展名之外,没有其他选择。另外,请注意,在Spark / Hadoop中,Gzip压缩文件的处理效率很低,因为它们不可拆分。
本文收集自互联网,转载请注明来源。
如有侵权,请联系[email protected] 删除。
我来说两句