join - Joining two spark dataframes on time (TimestampType) in python -


i have 2 dataframes , join them based on 1 column, caveat column timestamp, , timestamp has within offset (5 seconds) in order join records. more specifically, record in dates_df date=1/3/2015:00:00:00 should joined events_df time=1/3/2015:00:00:01 because both timestamps within 5 seconds each other.

i'm trying logic working python spark, , extremely painful. how people joins in spark?

my approach add 2 columns dates_df determine lower_timestamp , upper_timestamp bounds 5 second offset, , perform conditional join. , fails, more specifically:

joined_df = dates_df.join(events_df,      dates_df.lower_timestamp < events_df.time < dates_df.upper_timestamp)  joined_df.explain() 

captures last part of query:

filter (time#6 < upper_timestamp#4)  cartesianproduct  .... 

and gives me wrong result.

do have full blown cartesian join each inequality, removing duplicates go along?

here full code:

from datetime import datetime, timedelta  pyspark import sparkcontext, sparkconf pyspark.sql import sqlcontext pyspark.sql.types import * pyspark.sql.functions import udf   master = 'local[*]' app_name = 'stackoverflow_join'  conf = sparkconf().setappname(app_name).setmaster(master) sc = sparkcontext(conf=conf)  sqlcontext = sqlcontext(sc)  def lower_range_func(x, offset=5):     return x - timedelta(seconds=offset)  def upper_range_func(x, offset=5):     return x + timedelta(seconds=offset)   lower_range = udf(lower_range_func, timestamptype()) upper_range = udf(upper_range_func, timestamptype())  dates_fields = [structfield("name", stringtype(), true), structfield("date", timestamptype(), true)] dates_schema = structtype(dates_fields)  dates = [('day_%s' % x, datetime(year=2015, day=x, month=1)) x in range(1,5)] dates_df = sqlcontext.createdataframe(dates, dates_schema)  dates_df.show()  # extend dates_df time ranges dates_df = dates_df.withcolumn('lower_timestamp', lower_range(dates_df['date'])).\            withcolumn('upper_timestamp', upper_range(dates_df['date']))   event_fields = [structfield("time", timestamptype(), true), structfield("event", stringtype(), true)] event_schema = structtype(event_fields)  events = [(datetime(year=2015, day=3, month=1, second=3), 'meeting')] events_df = sqlcontext.createdataframe(events, event_schema)  events_df.show()  # finally, join data joined_df = dates_df.join(events_df,      dates_df.lower_timestamp < events_df.time < dates_df.upper_timestamp)      joined_df.show() 

i following output:

+-----+--------------------+ | name|                date| +-----+--------------------+ |day_1|2015-01-01 00:00:...| |day_2|2015-01-02 00:00:...| |day_3|2015-01-03 00:00:...| |day_4|2015-01-04 00:00:...| +-----+--------------------+  +--------------------+-------+ |                time|  event| +--------------------+-------+ |2015-01-03 00:00:...|meeting| +--------------------+-------+   +-----+--------------------+--------------------+--------------------+--------------------+-------+ | name|                date|     lower_timestamp|     upper_timestamp|                time|  event| +-----+--------------------+--------------------+--------------------+--------------------+-------+ |day_3|2015-01-03 00:00:...|2015-01-02 23:59:...|2015-01-03 00:00:...|2015-01-03 00:00:...|meeting| |day_4|2015-01-04 00:00:...|2015-01-03 23:59:...|2015-01-04 00:00:...|2015-01-03 00:00:...|meeting| +-----+--------------------+--------------------+--------------------+--------------------+-------+ 

i did spark sql query explain() see how done, , replicated same behavior in python. first here how same sql spark:

dates_df.registertemptable("dates") events_df.registertemptable("events") results = sqlcontext.sql("select * dates inner join events on dates.lower_timestamp < events.time ,  events.time < dates.upper_timestamp") results.explain() 

this works, question how in python, solution seems plain join, followed 2 filters:

joined_df = dates_df.join(events_df).filter(dates_df.lower_timestamp < events_df.time).filter(events_df.time < dates_df.upper_timestamp) 

joined_df.explain() yields same query sql spark results.explain() assume how things done.


Comments

Popular posts from this blog

javascript - Bootstrap Popover: iOS Safari strange behaviour -

Website Login Issue developed in magento -

Can the constants be defined inside a model file of a framework in PHP? -