Skip to content Skip to sidebar Skip to footer

Converting Complex Rdd To A Flatten Rdd With Pyspark

I have the following CSV (sample) id timestamp routeid creationdate parameters 1000 21-11-2016 22:55 14 21-11-2016 22:55 RSRP=-102, 1002 21-11

Solution 1:

You'll need to define an udf as followed and then select each field. I have used the same data you did with a tab separator.

from pyspark.sql.functions import udf
from pyspark.sql.types import *

df1 = spark.read.format('com.databricks.spark.csv').options(header='true',delimiter='\t').load('./sample.txt')
df1.show()
# +----+----------------+-------+----------------+--------------------+# |  id|       timestamp|routeid|    creationdate|          parameters|# +----+----------------+-------+----------------+--------------------+# |1000|21-11-2016 22:55|     14|21-11-2016 22:55|          RSRP=-102,|# |1002|21-11-2016 22:55|     14|21-11-2016 22:55|RA Req. SN=-146,T...|# |1003|21-11-2016 22:55|     14|21-11-2016 22:55|RA Req. SN=134,RX...|# +----+----------------+-------+----------------+--------------------+

Now let's define our UDF as mentioned above :

import re
deff_(s):
    pattern = re.compile("([^,=]+)=([0-9\-]+)")
    returndict(pattern.findall(s or"")) 

We can test the function directly on a "simple" sample :

f_("RA Req. SN=134,RX Antennas=-91,MCS=-83,TPC=-191,")
# {'RA Req. SN': '134', 'RX Antennas': '-91', 'TPC': '-191', 'MCS': '-83'}

Ok it's working. We can now register to use in SQL :

spark.udf.register("f", f_, MapType(StringType(), StringType()))

spark.sql("SELECT f('RA Req. SN=134,RX Antennas=-91,MCS=-83,TPC=-191,')").show()
# +---------------------------------------------------+# |f(RA Req. SN=134,RX Antennas=-91,MCS=-83,TPC=-191,)|# +---------------------------------------------------+# |                               Map(RA Req. SN ->...|# +---------------------------------------------------+

But in your case, I think that you'll be interested in an actually udf for each field :

extract = udf(f_,  MapType(StringType(), StringType()))

df1.select(df1['*'], extract(df1['parameters']).getItem('RSRP').alias('RSRP')).show()
# +----+----------------+-------+----------------+--------------------+----+# |  id|       timestamp|routeid|    creationdate|          parameters|RSRP|# +----+----------------+-------+----------------+--------------------+----+# |1000|21-11-2016 22:55|     14|21-11-2016 22:55|          RSRP=-102,|-102|# |1002|21-11-2016 22:55|     14|21-11-2016 22:55|RA Req. SN=-146,T...|null|# |1003|21-11-2016 22:55|     14|21-11-2016 22:55|RA Req. SN=134,RX...|null|# +----+----------------+-------+----------------+--------------------+----+

Post a Comment for "Converting Complex Rdd To A Flatten Rdd With Pyspark"