pyspark 데이터 프레임 집계에 대한 열 이름 변경
pyspark 데이터 프레임으로 일부 데이터를 분석하고 있으며 집계중인 데이터 프레임이 있다고 가정합니다 df
.
df.groupBy("group")\
.agg({"money":"sum"})\
.show(100)
이것은 나에게 줄 것이다 :
group SUM(money#2L)
A 137461285853
B 172185566943
C 271179590646
집계는 잘 작동하지만 새 열 이름 "SUM (money # 2L)"이 마음에 들지 않습니다. 이 열의 이름을 .agg
메서드 에서 사람이 읽을 수있는 이름으로 바꾸는 깔끔한 방법이 있습니까? 아마도에서 할 일과 더 비슷할 것입니다 dplyr
.
df %>% group_by(group) %>% summarise(sum_money = sum(money))
여전히 dplyr 구문을 선호하지만 이 코드 조각은 다음을 수행합니다.
import pyspark.sql.functions as sf
df.groupBy("group")\
.agg(sf.sum('money').alias('money'))\
.show(100)
장황 해집니다.
withColumnRenamed
트릭을해야합니다. 다음은 pyspark.sql API에 대한 링크 입니다.
df.groupBy("group")\
.agg({"money":"sum"})\
.withColumnRenamed("SUM(money)", "money")
.show(100)
나는 일부 사람들을 도울 수있는 약간의 도우미 기능을 만들었습니다.
import re
from functools import partial
def rename_cols(agg_df, ignore_first_n=1):
"""changes the default spark aggregate names `avg(colname)`
to something a bit more useful. Pass an aggregated dataframe
and the number of aggregation columns to ignore.
"""
delimiters = "(", ")"
split_pattern = '|'.join(map(re.escape, delimiters))
splitter = partial(re.split, split_pattern)
split_agg = lambda x: '_'.join(splitter(x))[0:-ignore_first_n]
renamed = map(split_agg, agg_df.columns[ignore_first_n:])
renamed = zip(agg_df.columns[ignore_first_n:], renamed)
for old, new in renamed:
agg_df = agg_df.withColumnRenamed(old, new)
return agg_df
예 :
gb = (df.selectExpr("id", "rank", "rate", "price", "clicks")
.groupby("id")
.agg({"rank": "mean",
"*": "count",
"rate": "mean",
"price": "mean",
"clicks": "mean",
})
)
>>> gb.columns
['id',
'avg(rate)',
'count(1)',
'avg(price)',
'avg(rank)',
'avg(clicks)']
>>> rename_cols(gb).columns
['id',
'avg_rate',
'count_1',
'avg_price',
'avg_rank',
'avg_clicks']
사람들이 너무 많이 타이핑하는 것을 막기 위해 적어도 조금은합니다.
다음과 같이 간단합니다.
val maxVideoLenPerItemDf = requiredItemsFiltered.groupBy("itemId").agg(max("playBackDuration").as("customVideoLength"))
maxVideoLenPerItemDf.show()
Use .as
in agg to name the new row created.
df = df.groupby('Device_ID').agg(aggregate_methods)
for column in df.columns:
start_index = column.find('(')
end_index = column.find(')')
if (start_index and end_index):
df = df.withColumnRenamed(column, column[start_index+1:end_index])
The above code can strip out anything that is outside of the "()". For example, "sum(foo)" will be renamed as "foo".
import findspark
findspark.init()
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
spark = SparkSession.builder.appName('test').getOrCreate()
data = [(1, "siva", 100), (2, "siva2", 200),(3, "siva3", 300),(4, "siva4", 400),(5, "siva5", 500)]
schema = ['id', 'name', 'sallary']
df = spark.createDataFrame(data, schema=schema)
df.show()
+---+-----+-------+
| id| name|sallary|
+---+-----+-------+
| 1| siva| 100|
| 2|siva2| 200|
| 3|siva3| 300|
| 4|siva4| 400|
| 5|siva5| 500|
+---+-----+-------+
**df.agg({"sallary": "max"}).withColumnRenamed('max(sallary)', 'max').show()**
+---+
|max|
+---+
|500|
+---+
Considering you have a dictionary columns_and_operations
and, after the aggregation, want to do the renaming without the hardcoding, a simpler way would be:
from functools import reduce
columns_and_operations = {
"rank": "mean",
"*": "count",
"rate": "mean",
"price": "mean",
"clicks": "mean"}
df = df.groupBy("group").agg(columns_and_operations)
old_names = ["{}({})".format(v, k) for k, v in columns_and_operations.items()]
new_names = list(columns_and_operations.keys())
df = reduce(lambda df, i: df.withColumnRenamed(old_names[i],
new_names[i]),
range(len(old_names)),
df)
ReferenceURL : https://stackoverflow.com/questions/29988287/renaming-columns-for-pyspark-dataframes-aggregates
'Nice programing' 카테고리의 다른 글
오른쪽에 아래쪽 삼각형이있는 스피너 주위의 테두리와 같은 사용자 지정 스피너를 만드는 방법은 무엇입니까? (0) | 2021.01.09 |
---|---|
git push heroku master가 "모든 최신 정보"라고 말하지만 앱이 최신 상태가 아닙니다. (0) | 2021.01.09 |
ES6지도 객체를 정렬 할 수 있습니까? (0) | 2021.01.09 |
해시 테이블의 기초? (0) | 2021.01.09 |
Visual Studio의 메서드에 대한 코드 조각 (0) | 2021.01.09 |