저는 PySpark 초보자입니다.
집계 된 수를 얻기 위해 GroupBy 작업을 수행하려고합니다. 하지만 시간 빈도에 따라 groupBy를 수행 할 수 없습니다. "CAPTUREDTIME, NODE, CHANNEL, LOCATION, TACK"필드를 사용하여 "groupBy"를 수행해야합니다. 하지만이 groupBy에서는 "CAPTUREDTIME"필드를 사용하여 "시간별", "매일", "주별", "월별"을 기준으로 그룹화해야합니다.
아래 샘플 데이터를 찾으십시오.
-----------------+------+------+--------+----------+--------------
|CAPTUREDTIME| NODE| CHANNEL | LOCATION| TACK
+-----------------+------+------+--------+----------+-------------
|20-05-09 03:06:21| PUSC_RES| SIMPLEX| NORTH_AL| UE220034
|20-05-09 04:33:04| PUSC_RES| SIMPLEX| SOUTH_AL| UE220034
|20-05-09 12:04:52| TESC_RES| SIMPLEX| NORTH_AL| UE220057
|20-05-10 04:24:09| TESC_RES| SIMPLEX| NORTH_AL| UE220057
|20-05-10 04:33:04| PUSC_RES| SIMPLEX| SOUTH_AL| UE220034
|20-04-09 10:57:48| TESC_RES| SIMPLEX| NORTH_AL| UE220057
|20-04-09 12:12:26| TESC_RES| SIMPLEX| NORTH_AL| UE220057
|20-04-09 03:26:33| PUSC_RES| SIMPLEX| NORTH_AL| UE220071
+-----------------+------+------+--------+----------+-------------
아래 pyspark 코드를 사용했습니다.
df = df.groupby("CAPTUREDTIME", "NODE", "CHANNEL", "LOCATION", "TACK").agg(
func.count("TACK").alias("count")
)
위의 코드를 'hourly', 'daily', 'weekly', 'monthly'로 그룹화하려면 어떻게해야합니까?
아래 형식의 출력이 필요합니다 (샘플 출력을 공유했습니다).
시간 :
| 캡처 시간 | 노드 | 채널 | 위치 | 압정 | 카운트
| 20-05-09 03 : 00 : 00 | PUSC_RES | 단순 | NORTH_AL | UE220034 | 2
| 20-05-09 04 : 00 : 00 | PUSC_RES | 단순 | SOUTH_AL | UE220034 | 2
매일 :
| 캡처 시간 | 노드 | 채널 | 위치 | 압정 | 카운트
| 20-05-09 00 : 00 : 00 | PUSC_RES | 단순 | NORTH_AL | UE220034 | 1
| 20-05-09 00 : 00 : 00 | PUSC_RES | 단순 | SOUTH_AL | UE220034 | 2
| 20-05-09 00 : 00 : 00 | TESC_RES | 단순 | NORTH_AL | UE220057 | 삼
매주 :
| 캡처 시간 | 노드 | 채널 | 위치 | 압정 | 카운트
| 20-05-09 00 : 00 : 00 | PUSC_RES | 단순 | NORTH_AL | UE220034 | 1
월간 :
| 캡처 시간 | 노드 | 채널 | 위치 | 압정 | 카운트
| 20-05-09 00 : 00 : 00 | PUSC_RES | 단순 | NORTH_AL | UE220034 | 1
문제에 답하는 방법에는 두 가지가 있습니다. 타임 스탬프를 그룹화하려는 날짜 단위로 캐스트하거나 (설명에서 언급했듯이) 원하는 간격으로 그룹화하는 SQL 창 함수를 사용합니다.
Spark의 창 SQL 함수를 통해 월별 집계가 불가능하다는 점만 알아 두십시오.
여기에서 코드를 볼 수 있습니다. 처음 세 가지 예제는 창 SQL 함수를 사용하고 마지막 예제는 매월 타임 스탬프를 캐스팅 한 다음 모든 열을 기준으로 그룹화합니다.
df = spark.createDataFrame(
[
("20-05-09 03:06:21", "PUSC_RES", "SIMPLEX", "NORTH_AL", "UE220034"),
("20-05-09 04:33:04", "PUSC_RES", "SIMPLEX", "SOUTH_AL", "UE220034"),
("20-05-09 12:04:52", "TESC_RES", "SIMPLEX", "NORTH_AL", "UE220057"),
("20-05-10 04:24:09", "TESC_RES", "SIMPLEX", "NORTH_AL", "UE220057"),
("20-05-10 04:33:04", "PUSC_RES", "SIMPLEX", "SOUTH_AL", "UE220034"),
("20-04-09 10:57:48", "TESC_RES", "SIMPLEX", "NORTH_AL", "UE220057"),
("20-04-09 12:12:26", "TESC_RES", "SIMPLEX", "NORTH_AL", "UE220057"),
("20-04-09 03:26:33", "PUSC_RES", "SIMPLEX", "NORTH_AL", "UE220071")
],
['CAPTUREDTIME', 'NODE', 'CHANNEL', 'LOCATION', 'TACK']
)
from pyspark.sql.functions import col, count, date_format, date_sub, date_trunc, month, next_day, to_timestamp, weekofyear, window, year
나는 여전히 이것에 대한 창 논리를 유지하므로 Spark의 모든 가능성을 참조 할 수 있습니다. 데이터 프레임을 표시하기 전에 끝 부분의 창 시작 만 선택합니다.
hourly = (
df
.withColumn("captured_time", to_timestamp(col('CAPTUREDTIME'), 'yy-MM-dd HH:mm:ss'))
.groupBy(window(col("captured_time"), "1 hour").alias("captured_time"), "NODE", "CHANNEL", "LOCATION", "TACK")
.agg(count("*"))
.withColumn("captured_time_hour", col("captured_time.start"))
.drop("captured_time")
)
hourly.sort("captured_time_hour").show(100, False)
date_trunc
기능을 통해 요일 만 고려하여 타임 스탬프를자를 수 있습니다.
daily = (
df
.withColumn("captured_time", to_timestamp(col('CAPTUREDTIME'), 'yy-MM-dd HH:mm:ss'))
.withColumn("captured_time_day", date_trunc("day", col("captured_time")))
.groupBy("captured_time_day", "NODE", "CHANNEL", "LOCATION", "TACK")
.agg(count("*"))
)
daily.sort("captured_time_day").show(100, False)
이것은 좀 더 까다 롭습니다. 먼저 next_day
월요일 기능을 사용 합니다. 일요일을주의 시작으로 생각한다면 그에 따라이 코드를 업데이트하십시오. 그러나 저는 월요일을주의 시작으로 간주합니다 (내가 믿는 SQL 방언과 지역에 따라 다름).
그런 다음 weekofyear
원하는대로 주 번호를 검색 하는 함수를 추가 할 수도 있습니다.
weekly = (
df
.withColumn("captured_time", to_timestamp(col('CAPTUREDTIME'), 'yy-MM-dd HH:mm:ss'))
.withColumn("start_day", date_sub(next_day(col("captured_time"), "monday"), 7))
.groupBy("start_day", "NODE", "CHANNEL", "LOCATION", "TACK")
.agg(count("*"))
.withColumn("start_day", to_timestamp(col("start_day")))
.withColumn("week_of_year", weekofyear(col("start_day")))
)
weekly.sort("start_day").show(100, False)
타임 스탬프를 날짜로 포맷 한 다음 타임 스탬프로 다시 캐스팅합니다. 이것은 다른 방법을 보여주기위한 것입니다. 일일 사용 사례로 타임 스탬프를자를 수 있습니다. 또한 월 이름과 약어를 추출하는 두 가지 방법을 보여줍니다. Spark 3.0.0에서 테스트되었으므로 Spark 버전을 관리하십시오.
monthly = (
df
.withColumn("captured_time", to_timestamp(col('CAPTUREDTIME'), 'yy-MM-dd HH:mm:ss'))
.withColumn("captured_time_month", date_format(col('captured_time'), '1/M/yyyy'))
.groupBy(col("captured_time_month"), "NODE", "CHANNEL", "LOCATION", "TACK")
.agg(count("*").alias("Count TACK"))
.withColumn("captured_time_month", to_timestamp(col("captured_time_month"), '1/M/yyyy'))
.withColumn("month", month(col("captured_time_month")))
.withColumn("month_abbr", date_format(col("captured_time_month"),'MMM'))
.withColumn("full_month_name", date_format(col("captured_time_month"),'MMMM'))
)
monthly.sort("captured_time_month").show(100, False)
챠오!
이 기사는 인터넷에서 수집됩니다. 재 인쇄 할 때 출처를 알려주십시오.
침해가 발생한 경우 연락 주시기 바랍니다[email protected] 삭제
몇 마디 만하겠습니다