Leg de aggregatiefunctionaliteit in Spark uit (met Python en Scala)

Ik ben op zoek naar een betere uitleg van de geaggregeerde functionaliteit die beschikbaar is via spark in python.

Het voorbeeld dat ik heb is als volgt (met pyspark van Spark 1.2.0-versie)

sc.parallelize([1,2,3,4]).aggregate(
  (0, 0),
  (lambda acc, value: (acc[0] + value, acc[1] + 1)),
  (lambda acc1, acc2: (acc1[0] + acc2[0], acc1[1] + acc2[1])))

Uitvoer:

(10, 4)

Ik krijg het verwachte resultaat (10,4)dat de som is van 1+2+3+4en 4 elementen. Als ik de beginwaarde die aan de aggregatiefunctie is doorgegeven, verander in (1,0)van (0,0)krijg ik het volgende resultaat

sc.parallelize([1,2,3,4]).aggregate(
    (1, 0),
    (lambda acc, value: (acc[0] + value, acc[1] + 1)),
    (lambda acc1, acc2: (acc1[0] + acc2[0], acc1[1] + acc2[1])))

Uitvoer:

(19, 4)

De waarde neemt toe met 9. Als ik het verander in (2,0), gaat de waarde naar (28,4)enzovoort.

Kan iemand mij uitleggen hoe deze waarde wordt berekend? Ik verwachtte dat de waarde met 1 zou stijgen, niet met 9, verwacht dat ik (11,4)zou zien in plaats daarvan zie ik (19,4).


Antwoord 1, autoriteit 100%

Ik was niet helemaal overtuigd van het geaccepteerde antwoord, en het antwoord van JohnKnight hielp, dus hier is mijn standpunt:

Laten we eerst uitleggen aggregate()in mijn eigen woorden:

Prototype:

aggregaat(zeroValue, seqOp, combOp)

Beschrijving:

Met

aggregate()kun je een RDD nemen en een enkele waarde genereren die van een ander type is dan wat in de oorspronkelijke RDD was opgeslagen.

Parameters:

  1. zeroValue: de initialisatiewaarde, voor uw resultaat, in de gewenste
    formaat.
  2. seqOp: de bewerking die u wilt toepassen op RDD-records. Loopt eenmalig voor
    elk record in een partitie.
  3. combOp: bepaalt hoe de resulterende objecten (één voor elke partitie),
    wordt gecombineerd.

Voorbeeld:

Bereken de som van een lijst en de lengte van die lijst. Retourneer het resultaat in een paar van (sum, length).

In een Spark-shell heb ik eerst een lijst gemaakt met 4 elementen, met 2 partities:

listRDD = sc.parallelize([1,2,3,4], 2)

toen definieerde ik mijn seqOp:

seqOp = (lambda local_result, list_element: (local_result[0] + list_element, local_result[1] + 1) )

en mijn combOp:

combOp = (lambda some_local_result, another_local_result: (some_local_result[0] + another_local_result[0], some_local_result[1] + another_local_result[1]) )

en toen verzamelde ik:

listRDD.aggregate( (0, 0), seqOp, combOp)
Out[8]: (10, 4)

Zoals je kunt zien, heb ik beschrijvende namen gegeven aan mijn variabelen, maar laat me het verder uitleggen:

De eerste partitie heeft de sublijst [1, 2]. We zullen de seqOp toepassen op elk element van die lijst en dit zal een lokaal resultaat opleveren, een paar (sum, length), dat het resultaat lokaal zal weerspiegelen, alleen in die eerste partitie.

Dus, laten we beginnen: local_resultwordt geïnitialiseerd met de parameter zeroValuewaarmee we de aggregate()hebben voorzien, dwz (0, 0) en list_elementis het eerste element van de lijst, dwz 1. Het resultaat is wat er gebeurt:

0 + 1 = 1
0 + 1 = 1

Het lokale resultaat is (1, 1), dat betekent dat tot dusver, voor de 1e partitie, na verwerking van alleen het eerste element, de som 1 is en de lengte 1. Merk op dat local_resultwordt bijgewerkt van (0, 0) naar (1, 1).

1 + 2 = 3
1 + 1 = 2

en nu is het lokale resultaat (3, 2), wat het uiteindelijke resultaat zal zijn van de 1e partitie, aangezien ze geen andere elementen zijn in de sublijst van de 1e partitie.

Als we hetzelfde doen voor de 2e partitie, krijgen we (7, 2).

Nu passen we de combOp toe op elk lokaal resultaat, zodat we het uiteindelijke, globale resultaat als volgt kunnen vormen: (3,2) + (7,2) = (10, 4)


Voorbeeld beschreven in ‘figuur’:

           (0, 0) <-- zeroValue
[1, 2]                  [3, 4]
0 + 1 = 1               0 + 3 = 3
0 + 1 = 1               0 + 1 = 1
1 + 2 = 3               3 + 4 = 7
1 + 1 = 2               1 + 1 = 2       
    |                       |
    v                       v
  (3, 2)                  (7, 2)
      \                    / 
       \                  /
        \                /
         \              /
          \            /
           \          / 
           ------------
           |  combOp  |
           ------------
                |
                v
             (10, 4)

Geïnspireerd door dit geweldige voorbeeld.


Dus als de zeroValueniet (0, 0), maar (1, 0) is, zou je verwachten dat (8 + 4, 2 + 2) = (12, 4) , wat niet verklaart wat je ervaart. Zelfs als we het aantal partities van mijn voorbeeld wijzigen, kan ik dat niet meer krijgen.

De sleutel hier is het antwoord van JohnKnight, waarin staat dat de zeroValueniet alleen analoog is aan het aantal partities, maar vaker kan worden toegepast dan u verwacht.


Antwoord 2, autoriteit 32%

Uitleg met Scala

Met Aggregate kun je de waarden van de RDD naar believen transformeren en combineren.

Het gebruikt twee functies:

De eerste transformeert en voegt de elementen van de originele collectie [T] toe in een lokaal aggregaat [U] en neemt de vorm aan: (U,T) => U. Je kunt het zien als een vouw en daarom heeft het ook een nul nodig voor die operatie. Deze bewerking wordt lokaal op elke partitie parallel toegepast.

Hier ligt de sleutel van de vraag: de enige waarde die hier moet worden gebruikt, is de NUL-waarde voor de reductiebewerking.
Deze bewerking wordt lokaal op elke partitie uitgevoerd, dus als u iets toevoegt aan die nulwaarde, wordt het resultaat vermenigvuldigd met het aantal partities van de RDD.

De tweede bewerking neemt 2 waarden van het resultaattype van de vorige bewerking [U] en combineert deze tot één waarde. Deze operatie zal de gedeeltelijke resultaten van elke partitie verminderen en het werkelijke totaal produceren.

Bijvoorbeeld:
Gegeven een RDD van Strings:

val rdd:RDD[String] = ???

Stel dat u de lengte van de strings in die RDD wilt optellen, dus u zou het volgende doen:

  1. De eerste bewerking transformeert strings naar size (int) en verzamelt de waarden voor size.

    val stringSizeCummulator: (Int, String) => Int = (totaal, tekenreeks) => totaal + string.lengte`

  2. geef de NUL op voor de optelbewerking (0)

    waarde NUL = 0

  3. een bewerking om twee gehele getallen bij elkaar op te tellen:

    val toevoegen: (Int, Int) => Int = _ + _

Alles bij elkaar:

rdd.aggregate(ZERO, stringSizeCummulator, add)

met Spark 2.4 en hogere versie

rdd.aggregate(ZERO)(stringAccumulator,add)

Dus, waarom is de NUL nodig?
Wanneer de cummulatorfunctie wordt toegepast op het eerste element van een partitie, is er geen lopend totaal. NUL wordt hier gebruikt.

Bijv. Mijn RDD is:

  • Partitie 1: [“Spring”, “over”]
  • Partitie 2: [“de”, “muur”]

Dit resulteert in:

P1:

  1. stringSizeCummulator(ZERO, “Jump”) = 4
  2. stringSizeCummulator(4, “over”) = 8

P2:

  1. stringSizeCummulator(ZERO, “the”) = 3
  2. stringSizeCummulator(3, “muur”) = 7

Verminderen: toevoegen(P1, P2) = 15


Antwoord 3, autoriteit 17%

Ik heb niet genoeg reputatiepunten om commentaar te geven op het vorige antwoord van Maasg.
Eigenlijk zou de nulwaarde ‘neutraal’ moeten zijn ten opzichte van de seqop, wat betekent dat het het seqop-resultaat niet zou verstoren, zoals 0 voor optellen, of 1 voor *;

Probeer het NOOIT met niet-neutrale waarden, omdat deze willekeurig kunnen worden toegepast.
Dit gedrag is niet alleen gebonden aan het aantal partities.

Ik heb hetzelfde experiment geprobeerd als vermeld in de vraag.
bij 1 partitie werd de nulwaarde 3 keer toegepast.
met 2 partities, 6 keer.
met 3 partities, 9 keer en dit gaat maar door.


Antwoord 4, autoriteit 2%

Je kunt de volgende code (in scala) gebruiken om precies te zien wat aggregatedoet. Het bouwt een boomstructuur van alle optel- en samenvoegbewerkingen:

sealed trait Tree[+A]
case class Leaf[A](value: A) extends Tree[A]
case class Branch[A](left: Tree[A], right: Tree[A]) extends Tree[A]
val zero : Tree[Int] = Leaf(0)
val rdd = sc.parallelize(1 to 4).repartition(3)

En dan, in de schaal:

scala> rdd.glom().collect()
res5: Array[Array[Int]] = Array(Array(4), Array(1, 2), Array(3))

We hebben dus deze 3 partities: [4], [1,2] en [3].

scala> rdd.aggregate(zero)((l,r)=>Branch(l, Leaf(r)), (l,r)=>Branch(l,r))
res11: Tree[Int] = Branch(Branch(Branch(Leaf(0),Branch(Leaf(0),Leaf(4))),Branch(Leaf(0),Leaf(3))),Branch(Branch(Leaf(0),Leaf(1)),Leaf(2)))

U kunt het resultaat weergeven als een boom:

+
| \__________________
+                    +
| \________          | \
+          +         +   2
| \        | \       | \         
0  +       0  3      0  1
   | \
   0  4

Je kunt zien dat een eerste nul-element wordt gemaakt op het stuurprogrammaknooppunt (links van de boom), en dan worden de resultaten voor alle partities één voor één samengevoegd. Je ziet ook dat als je 0 door 1 vervangt, zoals je deed in je vraag, het 1 optelt bij elk resultaat op elke partitie en ook 1 optelt bij de beginwaarde op de driver. Het totale aantal keren dat de waarde nuldie u geeft, wordt gebruikt, is dus:

number of partitions + 1.

Dus in jouw geval het resultaat van

aggregate(
  (X, Y),
  (lambda acc, value: (acc[0] + value, acc[1] + 1)),
  (lambda acc1, acc2: (acc1[0] + acc2[0], acc1[1] + acc2[1])))

zal zijn:

(sum(elements) + (num_partitions + 1)*X, count(elements) + (num_partitions + 1)*Y)

De implementatie van aggregateis vrij eenvoudig. Het is gedefinieerd in RDD.scala, regel 1107:

 def aggregate[U: ClassTag](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U): U = withScope {
    // Clone the zero value since we will also be serializing it as part of tasks
    var jobResult = Utils.clone(zeroValue, sc.env.serializer.newInstance())
    val cleanSeqOp = sc.clean(seqOp)
    val cleanCombOp = sc.clean(combOp)
    val aggregatePartition = (it: Iterator[T]) => it.aggregate(zeroValue)(cleanSeqOp, cleanCombOp)
    val mergeResult = (index: Int, taskResult: U) => jobResult = combOp(jobResult, taskResult)
    sc.runJob(this, aggregatePartition, mergeResult)
    jobResult
}

Antwoord 5

Geweldige uitleg, het heeft me echt geholpen om de onderliggende werking van de aggregatiefunctie te begrijpen. Ik heb er een tijdje mee gespeeld en kwam erachter zoals hieronder.

  • als u de acc als (0,0) gebruikt, verandert het resultaat van de uitvoer van de functie niet.

  • Als de initiële accumulator wordt gewijzigd, zal het resultaat als volgt worden verwerkt

[ som van RDD-elementen + acc beginwaarde * Aantal RDD-partities +
acc beginwaarde ]

voor de vraag hier, zou ik willen voorstellen om de partities te controleren, aangezien het aantal partities 8 zou moeten zijn, zoals ik heb begrepen, aangezien elke keer dat we de seq op een partitie van RDD verwerken, het begint met de initiële som van acc resultaat en ook wanneer het de comb Op gaat doen, zal het opnieuw de acc initiële waarde eenmaal gebruiken.

voor bijv.
Lijst (1,2,3,4) & acc (1,0)

Verkrijg partities in scala door RDD.partitions.size

als Partities 2 & aantal elementen is 4 dan => [ 10 + 1 * 2 + 1 ] => (13,4)

als Partitie 4 & aantal elementen is 4 dan => [ 10 + 1 * 4 + 1 ] => (15,4)

Ik hoop dat dit helpt, je kunt hiervoor uitleg. Bedankt.


Antwoord 6

Met dank aan gsamaras.

Mijn weergave is als volgt,
voer hier de afbeeldingsbeschrijving in


Antwoord 7

Voor mensen die op zoek zijn naar Scala Equivalent-code voor het bovenstaande voorbeeld – hier is het. Zelfde logica, zelfde invoer/resultaat.

scala> val listRDD = sc.parallelize(List(1,2,3,4), 2)
listRDD: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[2] at parallelize at <console>:21
scala> listRDD.collect()
res7: Array[Int] = Array(1, 2, 3, 4)
scala> listRDD.aggregate((0,0))((acc, value) => (acc._1+value,acc._2+1),(acc1,acc2) => (acc1._1+acc2._1,acc1._2+acc2._2))
res10: (Int, Int) = (10,4)

Antwoord 8

Ik probeer veel experimenten met deze vraag. Het is beter om het aantal partities voor aggregaat in te stellen. de seqOp zal elke partitie verwerken en de initiële waarde toepassen, bovendien zal combOp ook de initiële waarde toepassen wanneer alle partities worden gecombineerd.
Dus presenteer ik het formaat voor deze vraag:

final result = sum(list) + num_Of_Partitions * initial_Value + 1

Other episodes