Nice programing

pyspark 데이터 프레임 집계에 대한 열 이름 변경

nicepro 2021. 1. 9. 11:40
반응형

pyspark 데이터 프레임 집계에 대한 열 이름 변경


pyspark 데이터 프레임으로 일부 데이터를 분석하고 있으며 집계중인 데이터 프레임이 있다고 가정합니다 df.

df.groupBy("group")\
  .agg({"money":"sum"})\
  .show(100)

이것은 나에게 줄 것이다 :

group                SUM(money#2L)
A                    137461285853
B                    172185566943
C                    271179590646

집계는 잘 작동하지만 새 열 이름 "SUM (money # 2L)"이 마음에 들지 않습니다. 이 열의 이름을 .agg메서드 에서 사람이 읽을 수있는 이름으로 바꾸는 깔끔한 방법이 있습니까? 아마도에서 할 일과 더 비슷할 것입니다 dplyr.

df %>% group_by(group) %>% summarise(sum_money = sum(money))

여전히 dplyr 구문을 선호하지만 이 코드 조각은 다음을 수행합니다.

import pyspark.sql.functions as sf

df.groupBy("group")\
  .agg(sf.sum('money').alias('money'))\
  .show(100)

장황 해집니다.


withColumnRenamed트릭을해야합니다. 다음은 pyspark.sql API에 대한 링크 입니다.

df.groupBy("group")\
  .agg({"money":"sum"})\
  .withColumnRenamed("SUM(money)", "money")
  .show(100)

나는 일부 사람들을 도울 수있는 약간의 도우미 기능을 만들었습니다.

import re

from functools import partial

def rename_cols(agg_df, ignore_first_n=1):
    """changes the default spark aggregate names `avg(colname)` 
    to something a bit more useful. Pass an aggregated dataframe
    and the number of aggregation columns to ignore.
    """
    delimiters = "(", ")"
    split_pattern = '|'.join(map(re.escape, delimiters))
    splitter = partial(re.split, split_pattern)
    split_agg = lambda x: '_'.join(splitter(x))[0:-ignore_first_n]
    renamed = map(split_agg, agg_df.columns[ignore_first_n:])
    renamed = zip(agg_df.columns[ignore_first_n:], renamed)
    for old, new in renamed:
        agg_df = agg_df.withColumnRenamed(old, new)
    return agg_df

예 :

gb = (df.selectExpr("id", "rank", "rate", "price", "clicks")
 .groupby("id")
 .agg({"rank": "mean",
       "*": "count",
       "rate": "mean", 
       "price": "mean", 
       "clicks": "mean", 
       })
)

>>> gb.columns
['id',
 'avg(rate)',
 'count(1)',
 'avg(price)',
 'avg(rank)',
 'avg(clicks)']

>>> rename_cols(gb).columns
['id',
 'avg_rate',
 'count_1',
 'avg_price',
 'avg_rank',
 'avg_clicks']

사람들이 너무 많이 타이핑하는 것을 막기 위해 적어도 조금은합니다.


다음과 같이 간단합니다.

 val maxVideoLenPerItemDf = requiredItemsFiltered.groupBy("itemId").agg(max("playBackDuration").as("customVideoLength"))
maxVideoLenPerItemDf.show()

Use .as in agg to name the new row created.


df = df.groupby('Device_ID').agg(aggregate_methods)
for column in df.columns:
    start_index = column.find('(')
    end_index = column.find(')')
    if (start_index and end_index):
        df = df.withColumnRenamed(column, column[start_index+1:end_index])

The above code can strip out anything that is outside of the "()". For example, "sum(foo)" will be renamed as "foo".


import findspark
findspark.init()

from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *

spark = SparkSession.builder.appName('test').getOrCreate()
data = [(1, "siva", 100), (2, "siva2", 200),(3, "siva3", 300),(4, "siva4", 400),(5, "siva5", 500)]
schema = ['id', 'name', 'sallary']

df = spark.createDataFrame(data, schema=schema)
df.show()
+---+-----+-------+
| id| name|sallary|
+---+-----+-------+
|  1| siva|    100|
|  2|siva2|    200|
|  3|siva3|    300|
|  4|siva4|    400|
|  5|siva5|    500|
+---+-----+-------+


**df.agg({"sallary": "max"}).withColumnRenamed('max(sallary)', 'max').show()**
+---+
|max|
+---+
|500|
+---+

Considering you have a dictionary columns_and_operations and, after the aggregation, want to do the renaming without the hardcoding, a simpler way would be:

from functools import reduce

columns_and_operations = {
        "rank": "mean",
        "*": "count",
        "rate": "mean", 
        "price": "mean", 
         "clicks": "mean"}

df = df.groupBy("group").agg(columns_and_operations)

old_names = ["{}({})".format(v, k) for k, v in columns_and_operations.items()]
new_names = list(columns_and_operations.keys())

df = reduce(lambda df, i: df.withColumnRenamed(old_names[i],
                                               new_names[i]),
            range(len(old_names)),
            df)

ReferenceURL : https://stackoverflow.com/questions/29988287/renaming-columns-for-pyspark-dataframes-aggregates

반응형