Dataproc无法解压缩AWS Kinesis压缩的.gz文件

Goleo8

我的公司正在尝试将服务从AWS迁移到GCP。我们面临一些问题。AWS Kinesis收集的数据是.gz文件。我们使用GCP的Cloud Storage将这些文件传输到GCP平台,并使用Dataproc处理这些数据。所有这些数据都可以在AWS中正确处理,但不能由同一Spark作业正确读取。

参见最后抛出的异常。

我试图ABC.gz在GCP Cloud Shell中解压缩这些文件之一,例如在解压文件仍然结尾.gzABC.gz我认为这是根本原因,因为Spark可能试图解压缩解压缩的文件。

如果我们通过删除.gz后缀来重命名这些文件,则Spark可以正常运行。但是,重命名过程非常耗时,并且需要花费多个小时来处理一天的数据。

任何建议深表感谢。提前致谢。

Caused by: java.io.IOException: incorrect header check
  at org.apache.hadoop.io.compress.zlib.ZlibDecompressor.inflateBytesDirect(Native Method)
  at org.apache.hadoop.io.compress.zlib.ZlibDecompressor.decompress(ZlibDecompressor.java:225)
  at org.apache.hadoop.io.compress.DecompressorStream.decompress(DecompressorStream.java:111)
  at org.apache.hadoop.io.compress.DecompressorStream.read(DecompressorStream.java:105)
  at java.io.InputStream.read(InputStream.java:101)
  at org.apache.hadoop.util.LineReader.fillBuffer(LineReader.java:182)
  at org.apache.hadoop.util.LineReader.readDefaultLine(LineReader.java:218)
  at org.apache.hadoop.util.LineReader.readLine(LineReader.java:176)
  at org.apache.hadoop.mapreduce.lib.input.LineRecordReader.skipUtfByteOrderMark(LineRecordReader.java:151)
  at org.apache.hadoop.mapreduce.lib.input.LineRecordReader.nextKeyValue(LineRecordReader.java:191)
  at org.apache.spark.sql.execution.datasources.RecordReaderIterator.hasNext(RecordReaderIterator.scala:39)
  at org.apache.spark.sql.execution.datasources.HadoopFileLinesReader.hasNext(HadoopFileLinesReader.scala:50)
  at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
  at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
  at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:109)
  at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:190)
  at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:109)
  at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown
 Source)
  at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
  at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$12$$anon$1.hasNext(WholeStageCodegenExec.scala:6
31)
  at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:253)
  at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:247)
  at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:836)
  at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:836)
  at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:49)
  at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
伊戈尔·德沃扎克(Igor Dvorzhak)

没有更多细节,很难说出到底发生了什么,但是很可能您存储的.gz文件未压缩或使用GCS解压缩转码这意味着Spark读取的文件已经解压缩(如果使用GCS解压缩转码,它们首先不会被压缩,也不会被HTTP客户端库解压缩),这会导致失败,因为Hadoop / Spark也会自动尝试解压缩具有.gz扩展名的文件

如果上述情况成立,则似乎除了重命名这些文件以删除.gz扩展名之外,没有其他选择另外,请注意,在Spark / Hadoop中,Gzip压缩文件的处理效率很低,因为它们不可拆分。

本文收集自互联网,转载请注明来源。

如有侵权,请联系[email protected] 删除。

编辑于
0

我来说两句

0条评论
登录后参与评论

相关文章