我正在使用以下命令在数据框中启用的inferschema选项读取csv文件。
df2 = spark.read.options(Map("inferSchema"->"true","header"->"true")).csv("s3://Bucket-Name/Fun/Map/file.csv")
df2.printSchema()
Output:
root
|-- CC|Fun|Head|Country|SendType: string (nullable = true)
现在,我只想将上述输出存储到仅包含这些列名和这些列的数据类型的csv文件中,如下所示。
column_name,datatype
CC,string
Fun,string
Head,string
Country,string
SendType,string
我尝试使用下面的选项将其写入csv,但这是将整个数据写入文件。
df2.coalesce(1).write.format("csv").mode("append").save("schema.csv")
问候mahi
尝试类似下面的用法,coalesce(1)
并.option("header","true")
与标题一起输出
import java.io.FileWriter
object SparkSchema {
def main(args: Array[String]): Unit = {
val fw = new FileWriter("src/main/resources/csv.schema", true)
fw.write("column_name,datatype\n")
val spark = Constant.getSparkSess
import spark.implicits._
val df = List(("", "", "", 1l)).toDF("applicationName", "id", "requestId", "version")
val columnList : List[(String, String)] = df.schema.fields.map(field => (field.name, field.dataType.typeName))
.toList
try {
val outString = columnList.map(col => {
col._1 + "," + col._2
}).mkString("\n")
fw.write(outString)
}
finally fw.close()
val newColumnList : List[(String, String)] = List(("newColumn","integer"))
val finalColList = columnList ++ newColumnList
writeToS3("s3://bucket/newFileName.csv",finalColList)
}
def writeToS3(s3FileNameWithpath : String,finalColList : List[(String,String)]) {
val outString = finalColList.map(col => {
col._1 + "," + col._2
}).mkString("\\n")
import org.apache.hadoop.fs._
import org.apache.hadoop.conf.Configuration
val conf = new Configuration()
conf.set("fs.s3a.access.key", "YOUR ACCESS KEY")
conf.set("fs.s3a.secret.key", "YOUR SECRET KEY")
val dest = new Path(s3FileNameWithpath)
val fs = dest.getFileSystem(conf)
val out = fs.create(dest, true)
out.write( outString.getBytes )
out.close()
}
}
本文收集自互联网,转载请注明来源。
如有侵权,请联系[email protected] 删除。
我来说两句