HDFS에서 데이터를 요청했는데 읽은 파일의 메타 데이터를 가져오고 싶습니다. 이를 통해 주어진 순간에 사용 가능한 데이터를 기반으로 한 보고서를 작성할 수 있습니다.
org.apache.hadoop.fs.FileSystem
모든 파일 목록을 얻는 데 사용할 솔루션을 찾았습니다 . 분할 규칙을 알고 있으며 row -> meta
수신 된 목록을 기반으로 매핑을 작성할 수 있습니다 .
그러나이 결정은 구현 및 지원하기가 어렵습니다. 동일한 결과를 얻을 수있는 더 간단한 방법이 있습니까?
가장 쉬운 방법은 spark udf input_file_name
입니다.
import scala.collection.mutable.Map
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
val df = spark.read.text("<path>").withColumn("input_file_name", input_file_name()).repartition($"input_file_name")
def getMetadata(rdd: Iterator[Row]) = {
val map = Map[String, Long]()
val fs = FileSystem.get(new Configuration())
rdd.map(row => {
val path = row.getString(row.size -1)
if(! map.contains(path)){
map.put(path,fs.listStatus(new Path(path))(0).getModificationTime())
}
Row.fromSeq(row.toSeq ++ Array[Any](map(path)))
})
}
spark.createDataFrame(df.rdd.mapPartitions(getMetadata),df.schema.add("modified_ts", LongType)).show(10000,false)
다음 modified_ts
은 mtime
파일입니다.
데이터의 크기에 따라 조인을 사용할 수도 있습니다. 논리는 다음과 같습니다.
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.spark.sql.functions._
val mtime =(path:String)=> FileSystem.get(new Configuration()).listStatus(new Path(path)).head.getModificationTime
val mtimeUDF = udf(mtime)
val df = spark.read.text("<path>").withColumn("input_file_name", input_file_name())
val metadata_df = df.select($"input_file_name").distinct().withColumn("mtime", mtimeUDF($"input_file_name"))
val rows_with_metadata = df.join(metadata_df , "input_file_name")
rows_with_metadata.show(false)
이 기사는 인터넷에서 수집됩니다. 재 인쇄 할 때 출처를 알려주십시오.
침해가 발생한 경우 연락 주시기 바랍니다[email protected] 삭제
몇 마디 만하겠습니다