저는 사용자 ID와 사용자가 본 제품 / 아이템을 포함하는 웹 사이트 데이터로 작업하고 있습니다. 다음과 같은 pyspark 데이터 프레임을 만들었습니다.
+--------+----------+-------+----------+---------+
| UserId| productA| itemB| articleC| objectD|
+--------+----------+-------+----------+---------+
| user1| 1| 1| null| null|
| user2| 1| 1| null| null|
| user3| null| 1| 1| null|
| user4| null| null| null| 1|
+--------+----------+-------+----------+---------+
여기서 1은 사용자가 해당 제품을 한 번 이상 본 것을 나타내고 null은 사용자가 해당 제품을 보지 않았 음을 나타냅니다. 수백 개의 제품 / 항목과 수백만 명의 사용자가 있습니다 (이는 단순한 예입니다).
다음과 같은 DataFrame을 얻기 위해 pyspark에서 작업을 수행하고 싶습니다.
+-----------+----------+-------+----------+---------+
| | productA| itemB| articleC| objectD|
+-----------+----------+-------+----------+---------+
| productA| 2| 2| 0| 0|
| itemB| 2| 3| 1| 0|
| articleC| 0| 1| 1| 0|
| objectD| 0| 0| 0| 1|
+-----------+----------+-------+----------+---------+
이 데이터 프레임은 사용자가 한 제품 / 항목을 본 경우 다른 항목도 본 사용자 수를 보여줍니다. 분명히이 Dataframe의 대각선은 각 제품을 본 사용자 수이지만 흥미로운 부분은 대칭이 아닌 대각선 값입니다. 이 단순화 된 예에서는 productA를 본 모든 사용자가 itemB를 보았지만 itemB를 본 3 명의 사용자에 대해서는 2 명만이 제품 A를 본 것을 볼 수 있습니다.
나는 이것을 계산하기 위해 매우 비효율적 인 루틴을 만들었지 만 데이터 세트의 크기로 완료하는 데 ~ 22 시간이 걸립니다. 아래 계산을 더 빠르게 실행하기 위해 pyspark의 기능을 어떻게 활용할 수 있습니까?
import numpy as np
import pandas as pd
import pyspark.sql.functions as F
# df_pivot is the name of the first Dataframe in my explanation above
columns = [c for c in df_pivot.columns]
cols = columns[1:]
net = pd.DataFrame(np.zeros((len(cols), len(cols))), index=cols, columns=cols)
for i in range(len(cols)):
c = cols[i]
cs = cols[i:]
print(f'{i + 1}: {c}')
sum_row = df_pivot.where(F.col(c).isNotNull())\
.select(*cs)\
.groupBy().sum().collect()[0]\
.asDict()
sum_row = {k.replace('sum(', '')[:-1]: v for k, v in sum_row.items()}
values = [sum_row[x] for x in cs]
net.loc[c, cs] = values
net.loc[cs, c] = values
net.head()
동료와 이야기하면서 데이터를 scipy csc_matrix 로 변환 한 다음 다음 과 같이 행렬 의 gramian 을 가져 와서 ( 메모리 오류없이 pandas DataFrame으로 데이터를 가져올 수있는 경우)이를 수행하는 방법을 찾았습니다 .
gramian = sp_csc.transpose().dot(sp_csc)
sp_csc
scipy "Compressed Sparse Column matrix"는 어디에 있습니까 ?
pyspark DataFrame을 pandas로 강제 적용하는 것은 데이터 크기에 따라 여전히 제한적인 것처럼 보입니다. pyspark에서 gramian (pyspark DataFrame과 pyspark DataFrame 자체의 전치의 내적)을 계산하는 더 좋은 방법이 있습니까?
원래 코드 / 루프를 훨씬 빠르게 실행할 수있는 방법을 찾았습니다. 루프 전에 명령 을 사용하여 df_pivot
데이터 프레임 을 캐시해야했습니다 df_pivot.cache()
. pyspark의 지연 계산으로 인해 루프로 인해 pyspark가 각 루프 동안 모든 이전 계산을 다시 계산했습니다. 이것은 이것을 충분히 빨리 계산해야 할 즉각적인 요구를 해결하지만, 누군가가 pyspark parallelize
에서 map
, 및 reduce
루틴을 사용하여 이것을 어떻게 수행 할 수 있는지 여전히 관심이 있습니다 .
IIUC, 원래 데이터 프레임 df_pivot의 피벗을 해제 하고 거기에서 사용하여 자체 완전 외부 조인을 userId
만든 다음 다시 피벗 할 수 있습니다.
from pyspark.sql import functions as F
# list of columns to do pivot
cols = df_pivot.columns[1:]
# normalize the df_pivot to userId vs target
df1 = df_pivot.select(
'userId',
F.explode(F.split(F.concat_ws('|', *[F.when(F.col(c).isNotNull(), F.lit(c)) for c in cols]),'\|')).alias('target')
)
#df1.show()
#+------+--------+
#|userId| target|
#+------+--------+
#| user1|productA|
#| user1| itemB|
#| user2|productA|
#| user2| itemB|
#| user3| itemB|
#| user3|articleC|
#| user4| objectD|
#+------+--------+
# self full-outer join
df2 = df1.join(df1.withColumnRenamed('target','target_1'),'userId','full')
# pivot
df_new = df2.groupby('target') \
.pivot('target_1', cols) \
.agg(F.countDistinct('userId')) \
.fillna(0, subset=cols)
#+--------+--------+-----+--------+-------+
#| target|productA|itemB|articleC|objectD|
#+--------+--------+-----+--------+-------+
#|productA| 2| 2| 0| 0|
#| itemB| 2| 3| 1| 0|
#|articleC| 0| 1| 1| 0|
#| objectD| 0| 0| 0| 1|
#+--------+--------+-----+--------+-------+
참고 : 실제 요구 사항에 따라 최종 집계 F.count('*')
대신 필요할 수도 있습니다 F.countDistinct('userId')
.
이 기사는 인터넷에서 수집됩니다. 재 인쇄 할 때 출처를 알려주십시오.
침해가 발생한 경우 연락 주시기 바랍니다[email protected] 삭제
몇 마디 만하겠습니다