Pandas-utilising 함수를 PySpark로 변환하는 방법을 알아 내려고합니다.
다음과 같은 Pandas DataFrame이 있습니다.
+---+----+
|num| val|
+---+----+
| 1| 0.0|
| 2| 0.0|
| 3|48.6|
| 4|49.0|
| 5|48.7|
| 6|49.1|
| 7|74.5|
| 8|48.7|
| 9| 0.0|
| 10|49.0|
| 11| 0.0|
| 12| 0.0|
+---+----+
아래 스 니펫의 코드는 매우 간단합니다. 0이 아닌 값을 찾을 때까지 앞으로 나아갑니다. 그들 중 아무것도 없으면 같은 목적으로 뒤로 이동합니다.
def next_non_zero(data,i,column):
for j in range(i+1,len(data[column])):
res = data[column].iloc[j]
if res !=0:
return res
for j in range(i-1,0,-1):
res = data[column].iloc[j]
if res !=0:
return res
def fix_zero(data, column):
for i, row in data.iterrows():
if (row[column] == 0):
data.at[i,column] = next_non_zero(data,i,column)
따라서 결과적으로
+---+----+
|num| val|
+---+----+
| 1|48.6|
| 2|48.6|
| 3|48.6|
| 4|49.0|
| 5|48.7|
| 6|49.1|
| 7|74.5|
| 8|48.7|
| 9|49.0|
| 10|49.0|
| 11|49.0|
| 12|49.0|
+---+----+
따라서 PySpark에서 원하는 결과로 새 열을 만들고 예를 들어 withColumn ()을 사용하여 기존 열을 대체해야한다는 것을 이해합니다. 그러나 DataFrame을 올바르게 반복하는 방법을 이해하지 못합니다.
Window를 통해 기능을 사용하려고합니다.
my_window = Window.partitionBy().orderBy('num')
df = df.withColumn('new_val', F.when(df.val==0,F.lead(df.val).over(my_window)).
otherwise(F.lag(df.val).over(my_window))
분명히 한 번만 반복되므로 원하는 결과를 제공하지 않습니다. 그래서 나는 udf 재귀를 작성하려고했습니다.
def fix_zero(param):
return F.when(F.lead(param).over(my_window)!=0,F.lead(param).over(my_window)).
otherwise(fix_zero(F.lead(param).over(my_window)))
spark_udf = udf(fix_zero, DoubleType())
df = df.withColumn('new_val', F.when(df.val!=0, df.val).otherwise(fix_zero('val')))
나는 얻었다
RecursionError: maximum recursion depth exceeded in comparison
나는 이것이 행이 아니라 lead ()의 결과로 재귀를 전달하기 때문이라고 생각합니다. 어쨌든 지금은이 장애물에 완전히 갇혀 있으며 어떤 조언이라도 깊이 감사하겠습니다.
Window를 사용하여 널이 아닌 값에 도달 할 때까지 모든 선행 (또는 이후의 모든 행)을 통과하는 방법이 있습니다.
그래서 첫 번째 단계는 모든 0 값을 null로 바꾸는 것이 었습니다.
데이터 프레임 재생성 :
values = [
(1, 0.0),
(2,0.0),
(3,48.6),
(4,49.0),
(5,48.7),
(6,49.1),
(7, 74.5),
(8,48.7),
(9,0.0),
(10,49.0),
(11,0.0),
(12,0.0)
]
df = spark.createDataFrame(values, ['num','val'])
0을 null로 바꾸기
from pyspark.sql.functions import when, lit, col
df= df.withColumn('val_null', when(col('val') != 0.0,col('val')))
그런 다음 first 및 null과 결합 된 창을 정의하면 행 앞의 마지막 null이 아닌 값을 얻고 행 뒤에있는 첫 번째 null이 아닌 값을 얻을 수 있습니다.
from pyspark.sql import Window
from pyspark.sql.functions import last,first,coalesce
windowForward = Window.rowsBetween(Window.unboundedPreceding, Window.currentRow)
ffilled_column = last(df['val_null'], ignorenulls=True).over(windowForward)
windowBackward = Window.rowsBetween(Window.currentRow,Window.unboundedFollowing)
bfilled_column = first(df['val_null'], ignorenulls=True).over(windowBackward)
# creating new columns in df
df =df.withColumn('ffill',ffilled_column).withColumn('bfill',bfilled_column)
# replace null with bfill if bfill is not null otherwise fill with ffill
df =df.withColumn('val_full',coalesce('bfill','ffill'))
이 기술을 사용하여 'val_full'열에 예상 출력에 도달합니다.
+---+----+--------+-----+-----+--------+
|num| val|val_null|ffill|bfill|val_full|
+---+----+--------+-----+-----+--------+
| 1| 0.0| null| null| 48.6| 48.6|
| 2| 0.0| null| null| 48.6| 48.6|
| 3|48.6| 48.6| 48.6| 48.6| 48.6|
| 4|49.0| 49.0| 49.0| 49.0| 49.0|
| 5|48.7| 48.7| 48.7| 48.7| 48.7|
| 6|49.1| 49.1| 49.1| 49.1| 49.1|
| 7|74.5| 74.5| 74.5| 74.5| 74.5|
| 8|48.7| 48.7| 48.7| 48.7| 48.7|
| 9| 0.0| null| 48.7| 49.0| 49.0|
| 10|49.0| 49.0| 49.0| 49.0| 49.0|
| 11| 0.0| null| 49.0| null| 49.0|
| 12| 0.0| null| 49.0| null| 49.0|
+---+----+--------+-----+-----+--------+
이 기사는 인터넷에서 수집됩니다. 재 인쇄 할 때 출처를 알려주십시오.
침해가 발생한 경우 연락 주시기 바랍니다[email protected] 삭제
몇 마디 만하겠습니다