М
Size: a a a
М
GP
ЕГ
ЕГ
GP
GP
ЕГ
GP
ЕГ
GP
GP
GP
GP
GP
ЕГ
GP
ЕГ
CO
data_df = spark.sql('''select 100500 user_id, 10 amount, to_date('2021-11-05') date_rep union all
select 100500 user_id, 20 amount, to_date('2021-11-10') date_rep union all
select 222666 user_id, 30 amount, to_date('2021-11-11') date_rep
''')
+-------+------+----------+хочу размножить эту таблицу + сделать коммулятив, что бы у меня данные были на каждый день
|user_id|amount|date_rep |
+-------+------+----------+
|100500 |10 |2021-11-05|
|100500 |20 |2021-11-10|
|222666 |30 |2021-11-11|
+-------+------+----------+
+----------+-------+---------------+Я делаю как в sql, но мне кажется эта глупая логика
| date|user_id| amount_cum |
+----------+-------+---------------+
|2021-11-05| 100500| 10|
|2021-11-06| 100500| 10|
|2021-11-07| 100500| 10|
|2021-11-08| 100500| 10|
|2021-11-09| 100500| 10|
|2021-11-10| 100500| 30|
|2021-11-11| 100500| 30|
|2021-11-12| 100500| 30|
|2021-11-13| 100500| 30|
|2021-11-14| 100500| 30|
|2021-11-15| 100500| 30|
|2021-11-11| 222666| 30|
|2021-11-12| 222666| 30|
|2021-11-13| 222666| 30|
|2021-11-14| 222666| 30|
|2021-11-15| 222666| 30|
+----------+-------+---------------+
data_df_cum = (data_df
.withColumn('amount_cum', F.sum('amount')
.over(Window.partitionBy('user_id')
.orderBy('date_rep').rowsBetween(-sys.maxsize, 0)))
)
data_df_cum_cross = data_df_cum.join(cal_df, cal_df['date']>=data_df_cum['date_rep'])
data_df_cum_cross = data_df_cum_cross.groupBy(F.col('date'), F.col('user_id')).agg({'amount_cum': 'max'})
data_df_cum_cross.sort(F.col('user_id'), F.col('date')).show()CO
cal_df = spark.sql("SELECT sequence(to_date('2021-11-01'), to_date('2021-11-15'), interval 1 day) as date").withColumn("date", F.explode(F.col("date")))