PySpark DataFrame 문제에서 시간 빈도를 사용하여 GroupBy

스택 테스트

저는 PySpark 초보자입니다.

집계 된 수를 얻기 위해 GroupBy 작업을 수행하려고합니다. 하지만 시간 빈도에 따라 groupBy를 수행 할 수 없습니다. "CAPTUREDTIME, NODE, CHANNEL, LOCATION, TACK"필드를 사용하여 "groupBy"를 수행해야합니다. 하지만이 groupBy에서는 "CAPTUREDTIME"필드를 사용하여 "시간별", "매일", "주별", "월별"을 기준으로 그룹화해야합니다.

아래 샘플 데이터를 찾으십시오.

-----------------+------+------+--------+----------+--------------

|CAPTUREDTIME|      NODE|       CHANNEL  |  LOCATION|    TACK

+-----------------+------+------+--------+----------+-------------

|20-05-09 03:06:21|   PUSC_RES|   SIMPLEX|  NORTH_AL|    UE220034

|20-05-09 04:33:04|   PUSC_RES|   SIMPLEX|  SOUTH_AL|    UE220034

|20-05-09 12:04:52|   TESC_RES|   SIMPLEX|  NORTH_AL|    UE220057

|20-05-10 04:24:09|   TESC_RES|   SIMPLEX|  NORTH_AL|    UE220057

|20-05-10 04:33:04|   PUSC_RES|   SIMPLEX|  SOUTH_AL|    UE220034

|20-04-09 10:57:48|   TESC_RES|   SIMPLEX|  NORTH_AL|    UE220057

|20-04-09 12:12:26|   TESC_RES|   SIMPLEX|  NORTH_AL|    UE220057

|20-04-09 03:26:33|   PUSC_RES|   SIMPLEX|  NORTH_AL|    UE220071

+-----------------+------+------+--------+----------+------------- 

아래 pyspark 코드를 사용했습니다.

df = df.groupby("CAPTUREDTIME", "NODE", "CHANNEL", "LOCATION", "TACK").agg(
    func.count("TACK").alias("count")
)

위의 코드를 'hourly', 'daily', 'weekly', 'monthly'로 그룹화하려면 어떻게해야합니까?

아래 형식의 출력이 필요합니다 (샘플 출력을 공유했습니다).

시간 :

| 캡처 시간 | 노드 | 채널 | 위치 | 압정 | 카운트

| 20-05-09 03 : 00 : 00 | PUSC_RES | 단순 | NORTH_AL | UE220034 | 2

| 20-05-09 04 : 00 : 00 | PUSC_RES | 단순 | SOUTH_AL | UE220034 | 2

매일 :

| 캡처 시간 | 노드 | 채널 | 위치 | 압정 | 카운트

| 20-05-09 00 : 00 : 00 | PUSC_RES | 단순 | NORTH_AL | UE220034 | 1

| 20-05-09 00 : 00 : 00 | PUSC_RES | 단순 | SOUTH_AL | UE220034 | 2

| 20-05-09 00 : 00 : 00 | TESC_RES | 단순 | NORTH_AL | UE220057 |

매주 :

| 캡처 시간 | 노드 | 채널 | 위치 | 압정 | 카운트

| 20-05-09 00 : 00 : 00 | PUSC_RES | 단순 | NORTH_AL | UE220034 | 1

월간 :

| 캡처 시간 | 노드 | 채널 | 위치 | 압정 | 카운트

| 20-05-09 00 : 00 : 00 | PUSC_RES | 단순 | NORTH_AL | UE220034 | 1

교활한

문제에 답하는 방법에는 두 가지가 있습니다. 타임 스탬프를 그룹화하려는 날짜 단위로 캐스트하거나 (설명에서 언급했듯이) 원하는 간격으로 그룹화하는 SQL 창 함수를 사용합니다.

Spark의 창 SQL 함수를 통해 월별 집계가 불가능하다는 점만 알아 두십시오.

여기에서 코드를 볼 수 있습니다. 처음 세 가지 예제는 창 SQL 함수를 사용하고 마지막 예제는 매월 타임 스탬프를 캐스팅 한 다음 모든 열을 기준으로 그룹화합니다.

df = spark.createDataFrame(
    [
        ("20-05-09 03:06:21", "PUSC_RES", "SIMPLEX", "NORTH_AL", "UE220034"),
        ("20-05-09 04:33:04", "PUSC_RES", "SIMPLEX", "SOUTH_AL", "UE220034"),
        ("20-05-09 12:04:52", "TESC_RES", "SIMPLEX", "NORTH_AL", "UE220057"),
        ("20-05-10 04:24:09", "TESC_RES", "SIMPLEX", "NORTH_AL", "UE220057"),
        ("20-05-10 04:33:04", "PUSC_RES", "SIMPLEX", "SOUTH_AL", "UE220034"),
        ("20-04-09 10:57:48", "TESC_RES", "SIMPLEX", "NORTH_AL", "UE220057"),
        ("20-04-09 12:12:26", "TESC_RES", "SIMPLEX", "NORTH_AL", "UE220057"),
        ("20-04-09 03:26:33", "PUSC_RES", "SIMPLEX", "NORTH_AL", "UE220071")
    ],
    ['CAPTUREDTIME', 'NODE', 'CHANNEL', 'LOCATION', 'TACK']
)

from pyspark.sql.functions import col, count, date_format, date_sub, date_trunc, month, next_day, to_timestamp, weekofyear, window, year

매시간

나는 여전히 이것에 대한 창 논리를 유지하므로 Spark의 모든 가능성을 참조 할 수 있습니다. 데이터 프레임을 표시하기 전에 끝 부분의 창 시작 만 선택합니다.

hourly = (
    df
    .withColumn("captured_time", to_timestamp(col('CAPTUREDTIME'), 'yy-MM-dd HH:mm:ss'))
    .groupBy(window(col("captured_time"), "1 hour").alias("captured_time"), "NODE", "CHANNEL", "LOCATION", "TACK")
    .agg(count("*"))
    .withColumn("captured_time_hour", col("captured_time.start"))
    .drop("captured_time")
)
hourly.sort("captured_time_hour").show(100, False)

매일

date_trunc기능을 통해 요일 만 고려하여 타임 스탬프를자를 수 있습니다.

daily = (
    df
    .withColumn("captured_time", to_timestamp(col('CAPTUREDTIME'), 'yy-MM-dd HH:mm:ss'))
    .withColumn("captured_time_day", date_trunc("day", col("captured_time")))
    .groupBy("captured_time_day", "NODE", "CHANNEL", "LOCATION", "TACK")
    .agg(count("*"))
)
daily.sort("captured_time_day").show(100, False)

주간

이것은 좀 더 까다 롭습니다. 먼저 next_day월요일 기능을 사용 합니다. 일요일을주의 시작으로 생각한다면 그에 따라이 코드를 업데이트하십시오. 그러나 저는 월요일을주의 시작으로 간주합니다 (내가 믿는 SQL 방언과 지역에 따라 다름).

그런 다음 weekofyear원하는대로 주 번호를 검색 하는 함수를 추가 할 수도 있습니다.

weekly = (
    df
    .withColumn("captured_time", to_timestamp(col('CAPTUREDTIME'), 'yy-MM-dd HH:mm:ss'))
    .withColumn("start_day", date_sub(next_day(col("captured_time"), "monday"), 7))
    .groupBy("start_day", "NODE", "CHANNEL", "LOCATION", "TACK")
    .agg(count("*"))
    .withColumn("start_day", to_timestamp(col("start_day")))
    .withColumn("week_of_year", weekofyear(col("start_day")))
)
weekly.sort("start_day").show(100, False)

월간 간행물

타임 스탬프를 날짜로 포맷 한 다음 타임 스탬프로 다시 캐스팅합니다. 이것은 다른 방법을 보여주기위한 것입니다. 일일 사용 사례로 타임 스탬프를자를 수 있습니다. 또한 월 이름과 약어를 추출하는 두 가지 방법을 보여줍니다. Spark 3.0.0에서 테스트되었으므로 Spark 버전을 관리하십시오.

monthly = (
    df
    .withColumn("captured_time", to_timestamp(col('CAPTUREDTIME'), 'yy-MM-dd HH:mm:ss'))
    .withColumn("captured_time_month", date_format(col('captured_time'), '1/M/yyyy'))
    .groupBy(col("captured_time_month"), "NODE", "CHANNEL", "LOCATION", "TACK")
    .agg(count("*").alias("Count TACK"))
    .withColumn("captured_time_month", to_timestamp(col("captured_time_month"), '1/M/yyyy'))
    .withColumn("month", month(col("captured_time_month")))
    .withColumn("month_abbr", date_format(col("captured_time_month"),'MMM'))
    .withColumn("full_month_name", date_format(col("captured_time_month"),'MMMM'))
)
monthly.sort("captured_time_month").show(100, False)

챠오!

이 기사는 인터넷에서 수집됩니다. 재 인쇄 할 때 출처를 알려주십시오.

침해가 발생한 경우 연락 주시기 바랍니다[email protected] 삭제

에서 수정
0

몇 마디 만하겠습니다

0리뷰
로그인참여 후 검토

관련 기사

분류에서Dev

Datastax 커넥터를 사용하여 Cassandra에 Pyspark 삽입 시간

분류에서Dev

agg를 사용하여 groupBy 뒤에 문자열을 연결하는 pyspark

분류에서Dev

UDF를 사용하여 PySpark Dataframe에서 중첩 된 XML 필드 구문 분석

분류에서Dev

시간 데이터에 GroupBy를 사용하여 데이터 프레임 리샘플링

분류에서Dev

PySpark DataFrame의 특정 창에 groupBy 및 집계 함수를 적용하는 방법은 무엇입니까?

분류에서Dev

시간을 클릭하여 사용자 빈 공간에서 모델 닫기를 제한하는 방법

분류에서Dev

pyspark를 사용하여 동시에 집계 및 특징 추출

분류에서Dev

특정 비즈니스 문제를 해결하기 위해 pyspark에서 groupBy, collect_list, arrays_zip 및 폭발을 함께 사용하는 방법

분류에서Dev

R에서 Openxlsx를 사용하여 빈 통합 문서 시작

분류에서Dev

빈 문자열에서 Add ToDo를 다시 클릭하면 메시지가 표시되도록 잠시 후 명확한 시간 초과 기능을 사용하여 udage를 제거했습니다.

분류에서Dev

Pyspark의 조건을 사용하는 Dataframe의 Groupby 기능

분류에서Dev

시퀀스를 키 인수로 사용하는 Spark Dataframe groupBy

분류에서Dev

groupby를 사용하여 빈도로 정렬

분류에서Dev

Python Pandas DataFrame : 경도를 사용하여 로컬 UTM 시간을 GMT로 변환

분류에서Dev

단일 DataFrame에서 두 개의 열을 병합하고 PySpark를 사용하여 발생 횟수 계산

분류에서Dev

Kafka Magic 도구를 사용하여 Kafka 주제에서 메시지를 검색 할 때 여러 시간 범위

분류에서Dev

제한 시간과 문제 ExecutorService를을 사용하여

분류에서Dev

TimerTrigger를 사용하여 연속 작업에서 Azure WebJob 제한 시간 사용

분류에서Dev

pyspark를 사용하여 RDD를 DataFrame으로 변환

분류에서Dev

Java를 사용하여 문자열 변수에서 빈 p 태그를 제거 하시겠습니까?

분류에서Dev

경도 및 위도 열을 PySpark의 입력으로 사용하여 TimezoneFinder ()에서 새 "시간대"열을 만듭니다.

분류에서Dev

pyspark를 사용하여 Dataframe의 모든 열에 대한 최대 날짜를 계산하는 방법

분류에서Dev

pyspark를 사용하여 Dataframe의 모든 열에 대한 최대 날짜를 계산하는 방법

분류에서Dev

스레드에서 더 높은 빈도의 데이터를 읽고 Tkinter를 사용하여 실시간으로 그래프 플로팅

분류에서Dev

내장 도구를 사용하여 Win 3.1 / DOS 6.22 시스템에서 빈 디스크 공간을 안전하게 지 웁니다.

분류에서Dev

PySpark Dataframe의 패턴을 기반으로 목록에서 일부 문자를 제거하는 방법

분류에서Dev

Python에서 시간 여행 디버깅-어떤 도구를 사용하도록 제안됩니까?

분류에서Dev

PySpark를 사용하여 Spark DataFrame에서 중첩 된 구조체 열의 이름을 모두 소문자로 바꿉니다.

분류에서Dev

awk를 사용하여 모든 빈번한 시간 간격 사이에 데이터를 읽는 방법

Related 관련 기사

  1. 1

    Datastax 커넥터를 사용하여 Cassandra에 Pyspark 삽입 시간

  2. 2

    agg를 사용하여 groupBy 뒤에 문자열을 연결하는 pyspark

  3. 3

    UDF를 사용하여 PySpark Dataframe에서 중첩 된 XML 필드 구문 분석

  4. 4

    시간 데이터에 GroupBy를 사용하여 데이터 프레임 리샘플링

  5. 5

    PySpark DataFrame의 특정 창에 groupBy 및 집계 함수를 적용하는 방법은 무엇입니까?

  6. 6

    시간을 클릭하여 사용자 빈 공간에서 모델 닫기를 제한하는 방법

  7. 7

    pyspark를 사용하여 동시에 집계 및 특징 추출

  8. 8

    특정 비즈니스 문제를 해결하기 위해 pyspark에서 groupBy, collect_list, arrays_zip 및 폭발을 함께 사용하는 방법

  9. 9

    R에서 Openxlsx를 사용하여 빈 통합 문서 시작

  10. 10

    빈 문자열에서 Add ToDo를 다시 클릭하면 메시지가 표시되도록 잠시 후 명확한 시간 초과 기능을 사용하여 udage를 제거했습니다.

  11. 11

    Pyspark의 조건을 사용하는 Dataframe의 Groupby 기능

  12. 12

    시퀀스를 키 인수로 사용하는 Spark Dataframe groupBy

  13. 13

    groupby를 사용하여 빈도로 정렬

  14. 14

    Python Pandas DataFrame : 경도를 사용하여 로컬 UTM 시간을 GMT로 변환

  15. 15

    단일 DataFrame에서 두 개의 열을 병합하고 PySpark를 사용하여 발생 횟수 계산

  16. 16

    Kafka Magic 도구를 사용하여 Kafka 주제에서 메시지를 검색 할 때 여러 시간 범위

  17. 17

    제한 시간과 문제 ExecutorService를을 사용하여

  18. 18

    TimerTrigger를 사용하여 연속 작업에서 Azure WebJob 제한 시간 사용

  19. 19

    pyspark를 사용하여 RDD를 DataFrame으로 변환

  20. 20

    Java를 사용하여 문자열 변수에서 빈 p 태그를 제거 하시겠습니까?

  21. 21

    경도 및 위도 열을 PySpark의 입력으로 사용하여 TimezoneFinder ()에서 새 "시간대"열을 만듭니다.

  22. 22

    pyspark를 사용하여 Dataframe의 모든 열에 대한 최대 날짜를 계산하는 방법

  23. 23

    pyspark를 사용하여 Dataframe의 모든 열에 대한 최대 날짜를 계산하는 방법

  24. 24

    스레드에서 더 높은 빈도의 데이터를 읽고 Tkinter를 사용하여 실시간으로 그래프 플로팅

  25. 25

    내장 도구를 사용하여 Win 3.1 / DOS 6.22 시스템에서 빈 디스크 공간을 안전하게 지 웁니다.

  26. 26

    PySpark Dataframe의 패턴을 기반으로 목록에서 일부 문자를 제거하는 방법

  27. 27

    Python에서 시간 여행 디버깅-어떤 도구를 사용하도록 제안됩니까?

  28. 28

    PySpark를 사용하여 Spark DataFrame에서 중첩 된 구조체 열의 이름을 모두 소문자로 바꿉니다.

  29. 29

    awk를 사용하여 모든 빈번한 시간 간격 사이에 데이터를 읽는 방법

뜨겁다태그

보관