ЕГ
Size: a a a
ЕГ
ЕГ
ЕГ
CO
ЕГ
CO
ЕГ
CO
my_df = (my_df
.withColumn('date_to',F.lag('date_from',-1).over(Window.partitionBy('id').orderBy('date_from')))
.withColumn('date_to', F.date_add(F.col('date_to'),-1))
)
cur_date = F.from_utc_timestamp(F.current_timestamp(),"Europe/Kiev")
# df_not_null = my_df.na.fill(value=,subset=['date_to'])
nnn_df = my_df.join(df_calendar,
on=df_calendar['date_rep'].between(my_df['date_from'], my_df['date_to'])
& (n_my_df['date_from']<=df_calendar['date_rep'])CO
ЕГ
CO
ЕГ
ЕГ
ЕГ
CO
my_col = ['id', 'name', 'type_name', 'date_from', 'action']
my_data = [
['16763', 'p1', 'type1', '2021-01-01', 'action1'],
['16763', 'p1', 'type2', '2021-01-10', 'action1'],
['16763', 'p1', 'type3', '2021-01-15', 'action3'],
['19622', 'p2', 'type1', '2021-01-05', 'action1'],
['19799', 'p3', 'type2', '2021-01-02', 'action2'],
['19799', 'p3', 'type1', '2021-01-10', 'action1']
]
my_cal = spark.sql("SELECT explode(sequence(to_date('2021-01-01'), to_date('2021-01-20'))) date_rep")
my_df = spark.createDataFrame(my_data, my_col)
my_df = (my_df
.withColumn('date_to',F.lag('date_from',-1).over(Window.partitionBy('id').orderBy('date_from')))
.withColumn('date_to', F.date_add(F.col('date_to'),-1))
.withColumn('date_to', F.when(F.col('date_to').isNull(), F.to_date(F.lit('2021-01-20'))).otherwise(F.col('date_to')))
)
nnn_df = my_df.join(my_cal,
on=my_cal['date_rep'].between(my_df['date_from'], my_df['date_to'])
& (my_df['date_from']<=my_cal['date_rep'])
)
nnn_df.show()
Если получается что лаг, это легкая в отработке функцияCO
ЕГ
CO
ЕГ
ЕГ