Skip to content Skip to sidebar Skip to footer

Find Min And Max Range With A Combination Of Column Values In Pyspark

I have a pyspark dataframe like this, +----------+--------+----------+----------+ |id_ | p |d1 | d2 | +----------+--------+----------+----------+ | 1

Solution 1:

Update: Based on OP's comments and update, since any number of overlapping might happen, I think a dataframe-JOIN is probably the most straightforward way. Below is a completely new solution I tested on Spark 2.4.0 (array_join, transform, sequence etc. require Spark 2.4+):

Update-2: Per discussion in the comments/chat, I've added the code-logic to set up the boundaries for each drange(d1, d2) on how/when to adjust d1/d2, a new flag field is required in df_drange to complete this logic. details see below Set up boundaries section

Update-3: adjusted code to handle when (d1 == d2) in df_drange. originally removed such cases.

Setup the data:

Note: I added df2 with d1 and d2 converted to DateType(), while the original df keeps two fields as StringType() since we need some concatenation operations.

from pyspark.sql import Window
from pyspark.sql.functions import lead, expr, to_date, collect_set, array_sort, array_join, broadcast

df = spark.createDataFrame([
      (1, 'A', '2018-09-26', '2018-10-26')
    , (2, 'B', '2018-06-21', '2018-07-19')
    , (2, 'C', '2018-06-27', '2018-07-07')
    , (2, 'A', '2018-07-02', '2019-02-27')
    , (2, 'A', '2019-03-28', '2019-06-25')
  ], ['id_', 'p', 'd1', 'd2'])

# convert d1, d2 to DateType() if they are StringType()
df2 = df.withColumn('d1', to_date('d1')).withColumn('d2', to_date('d2'))

df2.printSchema()
root
 |-- id_: long (nullable = true)
 |-- p: string (nullable = true)
 |-- d1: date (nullable = true)
 |-- d2: date (nullable = true)

Create a referencing dataframe: df_drange

df_drange contains all distinct dates from d1 and d2, plus a flag which is set to 1 when df_drange.d1 is from df.d2 (in the original df) and 0 otherwise. sort the dates and segement them into interval date ranges. retrieve the fields d1, d2, flag(d1 only) and convert them into proper DataType()

df_drange = df.select('id_', 'd1', lit(0).alias('flag')).union(df.select('id_', 'd2', lit(1))) \
    .groupby('id_') \
    .agg(array_sort(collect_set(concat('d1', lit('-'), 'flag'))).alias('dates')) \
    .withColumn('dates', expr("""
         explode(transform(sequence(0, size(dates)-2), i -> named_struct('d1', dates[i], 'd2', dates[i+1])))
       """)) \
    .selectExpr(
         'id_'
       , "to_date(substring_index(dates.d1, '-', 3)) as d1"
       , "to_date(substring_index(dates.d2, '-', 3)) as d2"
       , "boolean(substring_index(dates.d1, '-', -1)) as flag"
     )

df_drange.orderBy('id_','d1').show()
+---+----------+----------+-----+
|id_|        d1|        d2| flag|
+---+----------+----------+-----+
|  1|2018-09-26|2018-10-26|false|
|  2|2018-06-21|2018-06-27|false|
|  2|2018-06-27|2018-07-02|false|
|  2|2018-07-02|2018-07-07|false|
|  2|2018-07-07|2018-07-19| true|
|  2|2018-07-19|2019-02-27| true|
|  2|2019-02-27|2019-03-28| true|
|  2|2019-03-28|2019-06-25|false|
+---+----------+----------+-----+

df_drange.printSchema()
root
 |-- id_: long (nullable = true)
 |-- d1: date (nullable = true)
 |-- d2: date (nullable = true)
 |-- flag: boolean (nullable = true)

Set up df1 with Join

Left join with the original df and for each id_ with any overlapping between (d1, d2) of df_dranges and (d1, d2) of the original df. after groupby(id_, d1, d2, flag) from df_drange, get the array_join(collect_set(p), ' '):

df1 = broadcast(df_drange).join(
      df2
    , (df2.id_ == df_drange.id_) & (
            ((df2.d1 < df_drange.d2) & (df2.d2 > df_drange.d1)) 
          | ((df_drange.d1 == df_drange.d2) & df_drange.d1.between(df2.d1, df2.d2)) 
      )
    , how ='left'
).groupby(df_drange.id_, df_drange.d1, df_drange.d2, df_drange.flag) \
 .agg(array_join(collect_set('p'), ' ').alias('q'))

df1.show()
+---+----------+----------+-----+-----+|id_|        d1|        d2| flag|    q|+---+----------+----------+-----+-----+|1|2018-09-26|2018-10-26|false|    A||2|2018-06-21|2018-06-27|false|    B||2|2018-06-27|2018-07-02|false|  C B||2|2018-07-02|2018-07-07|false|C B A||2|2018-07-07|2018-07-19|true|  B A||2|2018-07-19|2019-02-27|true|    A||2|2019-02-27|2019-03-28|true|||2|2019-03-28|2019-06-25|false|    A|+---+----------+----------+-----+-----+

Set up boundaries

For df1, if q == '', there is a gap, such rows should be removed. the boundaries of each drange is defined based on flag, next_flag, next_d1 as discussed in the comments/chat. below is the pesudo-code to show the current logic how/when to adjust d1/d2:

flag = (if d1 isfrom original_d2) ? true : false
both next_d1 and next_flag defined on WindowSpec-w1

# for df1.d1: if flag istrue, add 1 day, otherwise keep as-is
d1 = IF(flag, date_add(d1,1), d1)

# for df1.d2: keep as-iswhen there is gap with the next row or 
# the next_flag istrue, else minus 1 day
d2 = IF((next_d1 != d2) or next_flag, d2, date_sub(d2,1))

Actual code:

# WindowSpec to calculate next_d1
w1 = Window.partitionBy('id_').orderBy('d1')

# filterout gaps and calculate next_d1 and the adjusted d1 and d2
df_new = df1.where('q!= ""') \
            .withColumn('next_d1', lead('d1').over(w1)) \
            .withColumn('next_flag', coalesce(lead('flag').over(w1), lit(True))) \
            .selectExpr(
                    'id_'
                  , 'q'
                  , 'IF(flag, date_add(d1,1), d1) AS d1'
                  , 'IF((next_d1 != d2) or next_flag, d2, date_sub(d2,1)) AS d2'
             )

df_new.show()
+---+-----+----------+----------+|id_|    q|        d1|        d2|+---+-----+----------+----------+|1|    A|2018-09-26|2018-10-26||2|    B|2018-06-21|2018-06-26||2|  C B|2018-06-27|2018-07-01||2|C B A|2018-07-02|2018-07-07||2|  B A|2018-07-08|2018-07-19||2|    A|2018-07-20|2019-02-27||2|    A|2019-03-28|2019-06-25|+---+-----+----------+----------+

Post a Comment for "Find Min And Max Range With A Combination Of Column Values In Pyspark"