타임 스탬프 열을 기준으로 데이터 프레임을 여러 데이터 프레임으로 분할해야합니다. 따라서이 데이터 프레임이 포함해야하는 시간을 제공하고 각 데이터 프레임에 지정된 시간이 포함 된 데이터 프레임 집합을 가져옵니다.
메서드의 서명은 다음과 같습니다.
def splitDataframes(df: DataFrame, hoursNumber: Int): Seq[DataFrame]
어떻게 할 수 있습니까?
데이터 프레임의 스키마는 다음과 같습니다.
root
|-- date_time: integer (nullable = true)
|-- user_id: long (nullable = true)
|-- order_id: string (nullable = true)
|-- description: string (nullable = true)
|-- event_date: date (nullable = true)
|-- event_ts: timestamp (nullable = true)
|-- event_hour: long (nullable = true)
입력 df 필드 중 일부 :
event_ts, user_id
2020-12-13 08:22:00, 1
2020-12-13 08:51:00, 2
2020-12-13 09:28:00, 1
2020-12-13 10:53:00, 3
2020-12-13 11:05:00, 1
2020-12-13 12:19:00, 2
hoursNumber = 2 인 일부 출력 df 필드 :
df1 event_ts, user_id
2020-12-13 08:22:00, 1
2020-12-13 08:51:00, 2
2020-12-13 09:28:00, 1
df2 2020-12-13 10:46:00, 3
2020-12-13 11:05:00, 1
df3 2020-12-13 12:48:00, 2
타임 스탬프를 유닉스 타임 스탬프로 변환 한 다음 가장 이른 타임 스탬프와의 시간 차이를 사용하여 각 행의 ID를 계산합니다.
편집 : 시작 시간을 00:00:00부터 계산하면 솔루션이 더 간단합니다.
import org.apache.spark.sql.DataFrame
def splitDataframes(df: DataFrame, hoursNumber: Int): Seq[DataFrame] = {
val df2 = df.withColumn(
"event_unix_ts",
unix_timestamp($"event_ts")
).withColumn(
"grouping",
floor($"event_unix_ts" / (3600 * hoursNumber))
).drop("event_unix_ts")
val df_array = df2.select("grouping").distinct().collect().map(
x => df2.filter($"grouping" === x(0)).drop("grouping")).toSeq
return df_array
}
이 기사는 인터넷에서 수집됩니다. 재 인쇄 할 때 출처를 알려주십시오.
침해가 발생한 경우 연락 주시기 바랍니다[email protected] 삭제
몇 마디 만하겠습니다