join - Joining two spark dataframes on time (TimestampType) in python -
i have 2 dataframes , join them based on 1 column, caveat column timestamp, , timestamp has within offset (5 seconds) in order join records. more specifically, record in dates_df date=1/3/2015:00:00:00 should joined events_df time=1/3/2015:00:00:01 because both timestamps within 5 seconds each other.
i'm trying logic working python spark, , extremely painful. how people joins in spark?
my approach add 2 columns dates_df determine lower_timestamp , upper_timestamp bounds 5 second offset, , perform conditional join. , fails, more specifically:
joined_df = dates_df.join(events_df, dates_df.lower_timestamp < events_df.time < dates_df.upper_timestamp) joined_df.explain() captures last part of query:
filter (time#6 < upper_timestamp#4) cartesianproduct .... and gives me wrong result.
do have full blown cartesian join each inequality, removing duplicates go along?
here full code:
from datetime import datetime, timedelta pyspark import sparkcontext, sparkconf pyspark.sql import sqlcontext pyspark.sql.types import * pyspark.sql.functions import udf master = 'local[*]' app_name = 'stackoverflow_join' conf = sparkconf().setappname(app_name).setmaster(master) sc = sparkcontext(conf=conf) sqlcontext = sqlcontext(sc) def lower_range_func(x, offset=5): return x - timedelta(seconds=offset) def upper_range_func(x, offset=5): return x + timedelta(seconds=offset) lower_range = udf(lower_range_func, timestamptype()) upper_range = udf(upper_range_func, timestamptype()) dates_fields = [structfield("name", stringtype(), true), structfield("date", timestamptype(), true)] dates_schema = structtype(dates_fields) dates = [('day_%s' % x, datetime(year=2015, day=x, month=1)) x in range(1,5)] dates_df = sqlcontext.createdataframe(dates, dates_schema) dates_df.show() # extend dates_df time ranges dates_df = dates_df.withcolumn('lower_timestamp', lower_range(dates_df['date'])).\ withcolumn('upper_timestamp', upper_range(dates_df['date'])) event_fields = [structfield("time", timestamptype(), true), structfield("event", stringtype(), true)] event_schema = structtype(event_fields) events = [(datetime(year=2015, day=3, month=1, second=3), 'meeting')] events_df = sqlcontext.createdataframe(events, event_schema) events_df.show() # finally, join data joined_df = dates_df.join(events_df, dates_df.lower_timestamp < events_df.time < dates_df.upper_timestamp) joined_df.show() i following output:
+-----+--------------------+ | name| date| +-----+--------------------+ |day_1|2015-01-01 00:00:...| |day_2|2015-01-02 00:00:...| |day_3|2015-01-03 00:00:...| |day_4|2015-01-04 00:00:...| +-----+--------------------+ +--------------------+-------+ | time| event| +--------------------+-------+ |2015-01-03 00:00:...|meeting| +--------------------+-------+ +-----+--------------------+--------------------+--------------------+--------------------+-------+ | name| date| lower_timestamp| upper_timestamp| time| event| +-----+--------------------+--------------------+--------------------+--------------------+-------+ |day_3|2015-01-03 00:00:...|2015-01-02 23:59:...|2015-01-03 00:00:...|2015-01-03 00:00:...|meeting| |day_4|2015-01-04 00:00:...|2015-01-03 23:59:...|2015-01-04 00:00:...|2015-01-03 00:00:...|meeting| +-----+--------------------+--------------------+--------------------+--------------------+-------+
i did spark sql query explain() see how done, , replicated same behavior in python. first here how same sql spark:
dates_df.registertemptable("dates") events_df.registertemptable("events") results = sqlcontext.sql("select * dates inner join events on dates.lower_timestamp < events.time , events.time < dates.upper_timestamp") results.explain() this works, question how in python, solution seems plain join, followed 2 filters:
joined_df = dates_df.join(events_df).filter(dates_df.lower_timestamp < events_df.time).filter(events_df.time < dates_df.upper_timestamp) joined_df.explain() yields same query sql spark results.explain() assume how things done.
Comments
Post a Comment