Sparkは初めてで、値の集計に関してサポートが必要です。
+--------------------+--------------------+-----+
| amount| transaction_code|Total|
+--------------------+--------------------+-----+
|[10, 20, 30, 40, ...|[buy, buy, sell, ...|210.0|
+--------------------+--------------------+-----+
このデータフレームに新しい列を追加する必要があります。transaction_codeに「buy」が表示されている場合は、金額に存在する値を追加します。たとえば、transaction_codeが「buy」であるため、10と20を追加します。
私はそれらを完全に集約する方法を知っています。以下は私が書いたコードです。
df2extract = df2extract.select(
'amount',
'transaction_code',
F.expr('AGGREGATE(amount, cast(0 as float), (acc, x) -> acc + x)').alias('Total')
).show()
if関数を使用できることがわかりましたが、それらを初期化する方法と量を追跡する方法を決定できません。この件で私を助けてください。どうもありがとう!
あなたが使用することができるarray_zip
とfilter
。
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
spark = SparkSession.builder \
.appName('SO')\
.getOrCreate()
sc= spark.sparkContext
df = sc.parallelize([
([10, 20, 30, 40], ["buy", "buy", "sell"])]).toDF(["amount", "transaction_code"])
df.show()
# +----------------+----------------+
# | amount|transaction_code|
# +----------------+----------------+
# |[10, 20, 30, 40]|[buy, buy, sell]|
# +----------------+----------------+
df1 = df.withColumn("zip", F.arrays_zip(F.col('amount'),F.col('transaction_code')))
df2 = df1.withColumn("buy_filter", F.expr('''filter(zip, x-> x.transaction_code == 'buy')'''))
df3 = df2.select("amount", "transaction_code", F.col("buy_filter.amount").alias("buy_values"))
df3.select("amount", "transaction_code", F.expr('AGGREGATE(buy_values, cast(0 as float), (acc, x) -> acc + x)').alias('total')).show()
# +----------------+----------------+-----+
# | amount|transaction_code|total|
# +----------------+----------------+-----+
# |[10, 20, 30, 40]|[buy, buy, sell]| 30.0|
# +----------------+----------------+-----+
この記事はインターネットから収集されたものであり、転載の際にはソースを示してください。
侵害の場合は、連絡してください[email protected]
コメントを追加