Converting Complex Rdd To A Flatten Rdd With Pyspark
I have the following CSV (sample) id timestamp routeid creationdate parameters 1000 21-11-2016 22:55 14 21-11-2016 22:55 RSRP=-102, 1002 21-11
Solution 1:
You'll need to define an udf as followed and then select each field. I have used the same data you did with a tab separator.
from pyspark.sql.functions import udf
from pyspark.sql.types import *
df1 = spark.read.format('com.databricks.spark.csv').options(header='true',delimiter='\t').load('./sample.txt')
df1.show()
# +----+----------------+-------+----------------+--------------------+# | id| timestamp|routeid| creationdate| parameters|# +----+----------------+-------+----------------+--------------------+# |1000|21-11-2016 22:55| 14|21-11-2016 22:55| RSRP=-102,|# |1002|21-11-2016 22:55| 14|21-11-2016 22:55|RA Req. SN=-146,T...|# |1003|21-11-2016 22:55| 14|21-11-2016 22:55|RA Req. SN=134,RX...|# +----+----------------+-------+----------------+--------------------+
Now let's define our UDF as mentioned above :
import re
deff_(s):
pattern = re.compile("([^,=]+)=([0-9\-]+)")
returndict(pattern.findall(s or""))
We can test the function directly on a "simple" sample :
f_("RA Req. SN=134,RX Antennas=-91,MCS=-83,TPC=-191,")
# {'RA Req. SN': '134', 'RX Antennas': '-91', 'TPC': '-191', 'MCS': '-83'}
Ok it's working. We can now register to use in SQL :
spark.udf.register("f", f_, MapType(StringType(), StringType()))
spark.sql("SELECT f('RA Req. SN=134,RX Antennas=-91,MCS=-83,TPC=-191,')").show()
# +---------------------------------------------------+# |f(RA Req. SN=134,RX Antennas=-91,MCS=-83,TPC=-191,)|# +---------------------------------------------------+# | Map(RA Req. SN ->...|# +---------------------------------------------------+
But in your case, I think that you'll be interested in an actually udf for each field :
extract = udf(f_, MapType(StringType(), StringType()))
df1.select(df1['*'], extract(df1['parameters']).getItem('RSRP').alias('RSRP')).show()
# +----+----------------+-------+----------------+--------------------+----+# | id| timestamp|routeid| creationdate| parameters|RSRP|# +----+----------------+-------+----------------+--------------------+----+# |1000|21-11-2016 22:55| 14|21-11-2016 22:55| RSRP=-102,|-102|# |1002|21-11-2016 22:55| 14|21-11-2016 22:55|RA Req. SN=-146,T...|null|# |1003|21-11-2016 22:55| 14|21-11-2016 22:55|RA Req. SN=134,RX...|null|# +----+----------------+-------+----------------+--------------------+----+
Post a Comment for "Converting Complex Rdd To A Flatten Rdd With Pyspark"