Spark Window-functies – bereik tussen datums

Ik heb een Spark SQL DataFramemet gegevens en wat ik probeer te krijgen zijn alle rijen die voorafgaan aan de huidige rij in een bepaald datumbereik. Dus ik wil bijvoorbeeld alle rijen van 7 dagen terug voorafgaand aan de gegeven rij hebben. Ik kwam erachter dat ik een Window Functionmoet gebruiken zoals:

Window \
    .partitionBy('id') \
    .orderBy('start')

en hier komt het probleem. Ik wil een rangeBetween7 dagen hebben, maar in de Spark-documenten kan ik hierover niets vinden. Biedt Spark zelfs zo’n optie? Voor nu krijg ik alleen alle voorgaande rijen met:

.rowsBetween(-sys.maxsize, 0)

maar zou graag iets willen bereiken als:

.rangeBetween("7 days", 0)

Als iemand me hierbij kan helpen, ben ik u zeer dankbaar. Bij voorbaat dank!


Antwoord 1, autoriteit 100%

Spark >= 2,3

Sinds Spark 2.3 is het mogelijk om intervalobjecten te gebruiken met SQL API, maar de DataFrameAPI-ondersteuning is werk in uitvoering.

df.createOrReplaceTempView("df")
spark.sql(
    """SELECT *, mean(some_value) OVER (
        PARTITION BY id 
        ORDER BY CAST(start AS timestamp) 
        RANGE BETWEEN INTERVAL 7 DAYS PRECEDING AND CURRENT ROW
     ) AS mean FROM df""").show()
## +---+----------+----------+------------------+       
## | id|     start|some_value|              mean|
## +---+----------+----------+------------------+
## |  1|2015-01-01|      20.0|              20.0|
## |  1|2015-01-06|      10.0|              15.0|
## |  1|2015-01-07|      25.0|18.333333333333332|
## |  1|2015-01-12|      30.0|21.666666666666668|
## |  2|2015-01-01|       5.0|               5.0|
## |  2|2015-01-03|      30.0|              17.5|
## |  2|2015-02-01|      20.0|              20.0|
## +---+----------+----------+------------------+

Spark < 2.3

Voor zover ik weet is het niet direct mogelijk, noch in Spark noch in Hive. Beide vereisen dat de ORDER BY-clausule die wordt gebruikt met RANGEnumeriek is. Het dichtstbijzijnde wat ik vond, is conversie naar tijdstempel en werken op seconden. Ervan uitgaande dat de kolom startdatebevat, typt u:

from pyspark.sql import Row
row = Row("id", "start", "some_value")
df = sc.parallelize([
    row(1, "2015-01-01", 20.0),
    row(1, "2015-01-06", 10.0),
    row(1, "2015-01-07", 25.0),
    row(1, "2015-01-12", 30.0),
    row(2, "2015-01-01", 5.0),
    row(2, "2015-01-03", 30.0),
    row(2, "2015-02-01", 20.0)
]).toDF().withColumn("start", col("start").cast("date"))

Een kleine hulp en vensterdefinitie:

from pyspark.sql.window import Window
from pyspark.sql.functions import mean, col
# Hive timestamp is interpreted as UNIX timestamp in seconds*
days = lambda i: i * 86400 

Eindelijk een vraag:

w = (Window()
   .partitionBy(col("id"))
   .orderBy(col("start").cast("timestamp").cast("long"))
   .rangeBetween(-days(7), 0))
df.select(col("*"), mean("some_value").over(w).alias("mean")).show()
## +---+----------+----------+------------------+
## | id|     start|some_value|              mean|
## +---+----------+----------+------------------+
## |  1|2015-01-01|      20.0|              20.0|
## |  1|2015-01-06|      10.0|              15.0|
## |  1|2015-01-07|      25.0|18.333333333333332|
## |  1|2015-01-12|      30.0|21.666666666666668|
## |  2|2015-01-01|       5.0|               5.0|
## |  2|2015-01-03|      30.0|              17.5|
## |  2|2015-02-01|      20.0|              20.0|
## +---+----------+----------+------------------+

Verre van mooi, maar werkt.


* Hive-taalhandleiding, typen


Antwoord 2, autoriteit 4%

Fantastische oplossing @zero323, als je met minuten wilt werken in plaats van dagen zoals ik moet, en je hoeft niet te partitioneren met id, dus je hoeft alleen maar een eenvoudig onderdeel aan te passen van de code zoals ik laat zien:

df.createOrReplaceTempView("df")
spark.sql(
    """SELECT *, sum(total) OVER (
        ORDER BY CAST(reading_date AS timestamp) 
        RANGE BETWEEN INTERVAL 45 minutes PRECEDING AND CURRENT ROW
     ) AS sum_total FROM df""").show()

Other episodes