ちょっと変わったものがあります。トロールする必要のあるログがたくさんあります。私はSparkでそれを成功させました、そして私はそれに満足しています。
ただし、データセンターであるデータフレームにもう1つのフィールドを追加する必要があります。
データセンター名を取得できる唯一の場所は、ディレクトリパスからです。
例えば:
/feedname/date/datacenter/another/logfile.txt
ログファイルのパスを抽出してデータフレームに挿入する方法は何ですか?そこから、いくつかの文字列分割を実行して、必要なビットを抽出できます。
私の現在のコード:
mpe_data = my_spark.read\
.option("header","false")\
.option("delimiter", "\t")\
.withColumn("Datacenter", input_file_name())\
.csv('hdfs://nameservice/data/feed/mpe/dt=20191013/*/*/*', final_structure)
mpe_data.printSchema()
mpe_data.createOrReplaceTempView("mpe")
Spark2.0以降の_input_file_name_を使用してファイルパスを取得できます
from pyspark.sql.functions import input_file_name
df.withColumn("Datacenter", input_file_name())
例としてコードを追加します。ファイルを読み取ったら、withcolumnを使用してfile_nameを取得します。
mpe_data = my_spark.read\
.option("header","false")\
.option("delimiter", "\t")\
.csv('hdfs://nameservice/data/feed/mpe/dt=20191013/*/*/*', final_structure)
mpe_data.withColumn("Datacenter", input_file_name())
mpe_data.printSchema()
この記事はインターネットから収集されたものであり、転載の際にはソースを示してください。
侵害の場合は、連絡してください[email protected]
コメントを追加