Spark java.lang.OutOfMemoryError: Java-heapruimte

Mijn cluster: 1 master, 11 slaves, elk knooppunt heeft 6 GB geheugen.

Mijn instellingen:

spark.executor.memory=4g, Dspark.akka.frameSize=512

Dit is het probleem:

Eerstlees ik enkele gegevens (2,19 GB) van HDFS naar RDD:

val imageBundleRDD = sc.newAPIHadoopFile(...)

Ten tweede, doe iets op deze RDD:

val res = imageBundleRDD.map(data => {
                               val desPoints = threeDReconstruction(data._2, bg)
                                 (data._1, desPoints)
                             })

Laatste, uitvoer naar HDFS:

res.saveAsNewAPIHadoopFile(...)

Als ik mijn programma start, wordt het volgende weergegeven:

.....
14/01/15 21:42:27 INFO cluster.ClusterTaskSetManager: Starting task 1.0:24 as TID 33 on executor 9: Salve7.Hadoop (NODE_LOCAL)
14/01/15 21:42:27 INFO cluster.ClusterTaskSetManager: Serialized task 1.0:24 as 30618515 bytes in 210 ms
14/01/15 21:42:27 INFO cluster.ClusterTaskSetManager: Starting task 1.0:36 as TID 34 on executor 2: Salve11.Hadoop (NODE_LOCAL)
14/01/15 21:42:28 INFO cluster.ClusterTaskSetManager: Serialized task 1.0:36 as 30618515 bytes in 449 ms
14/01/15 21:42:28 INFO cluster.ClusterTaskSetManager: Starting task 1.0:32 as TID 35 on executor 7: Salve4.Hadoop (NODE_LOCAL)
Uncaught error from thread [spark-akka.actor.default-dispatcher-3] shutting down JVM since 'akka.jvm-exit-on-fatal-error' is enabled for ActorSystem[spark]
java.lang.OutOfMemoryError: Java heap space

Er zijn te veel taken?

PS: Alles is in orde als de invoergegevens ongeveer 225 MB zijn.

Hoe kan ik dit probleem oplossen?


Antwoord 1, autoriteit 100%

Ik heb een paar suggesties:

  • Als je nodes zo zijn geconfigureerd dat ze maximaal 6g hebben voor Spark (en een beetje overlaten voor andere processen), gebruik dan 6g in plaats van 4g, spark.executor.memory=6g. Zorg ervoor dat je zoveel mogelijk geheugen gebruiktdoor de gebruikersinterface te controleren (er staat hoeveel geheugen je gebruikt)
  • Probeer meer partities te gebruiken, je zou 2 – 4 per CPU moeten hebben. IME het aantal partities vergroten is vaak de gemakkelijkste manier om een ​​programma stabieler (en vaak sneller) te maken. Voor enorme hoeveelheden gegevens heb je misschien veel meer dan 4 per CPU nodig, ik heb in sommige gevallen 8000 partities moeten gebruiken!
  • Verklein het gedeelte van het geheugen dat is gereserveerd voor caching, met behulp van spark.storage.memoryFraction. Als u cache()of persistniet in uw code gebruikt, kan dit net zo goed 0 zijn. De standaardwaarde is 0,6, wat betekent dat u slechts 0,4 * 4g geheugen krijgt voor je hoop. IME die de mem-fractie vermindert, zorgt er vaak voor dat OOM’s verdwijnen. UPDATE:Vanaf spark 1.6 hoeven we blijkbaar niet meer met deze waarden te spelen, spark bepaalt ze automatisch.
  • Vergelijkbaar met hierboven, maar shuffle geheugenfractie. Als uw taak niet veel shuffle-geheugen nodig heeft, stelt u deze in op een lagere waarde (dit kan ertoe leiden dat uw shuffles op de schijf terechtkomen, wat een catastrofale invloed kan hebben op de snelheid). Soms, wanneer het een shuffle-bewerking is die OOMing is, moet je het tegenovergestelde doen, d.w.z. stel het in op iets groots, zoals 0.8, of zorg ervoor dat je shuffles naar de schijf kunnen lopen (het is de standaard sinds 1.0.0).
  • Pas op voor geheugenlekken, deze worden vaak veroorzaakt door het per ongeluk sluiten van objecten die je niet nodig hebt in je lambda’s. De manier om te diagnosticeren is om te kijken naar de “taak geserialiseerd als XXX bytes” in de logs. Als XXX groter is dan een paar k of meer dan een MB, heb je mogelijk een geheugenlek. Zie https://stackoverflow.com/a/25270600/1586965
  • In verband met hierboven; gebruik uitzendvariabelenals je echt grote objecten nodig hebt.
  • Als je grote RDD’s in de cache plaatst en wat toegangstijd kunt verspillen, overweeg dan om de RDD te serialiseren http://spark.apache.org/docs/latest/tuning.html#serialized-rdd-storage. Of ze zelfs op schijf te cachen (wat soms niet zo erg is als je SSD’s gebruikt).
  • (Geavanceerd) Gerelateerd aan het bovenstaande, vermijd Stringen zwaar geneste structuren (zoals Mapen geneste hoofdlettertypen). Probeer indien mogelijk alleen primitieve typen te gebruiken en indexeer alle niet-primitieven, vooral als u veel duplicaten verwacht. Kies waar mogelijk WrappedArrayboven geneste structuren. Of implementeer zelfs uw eigen serialisatie – U heeft de meeste informatie over hoe u uw gegevens efficiënt in bytes kunt back-uppen, GEBRUIK HET!
  • (bit hacky) Nogmaals, overweeg bij het cachen een Datasette gebruiken om je structuur in de cache op te slaan, omdat deze efficiëntere serialisatie zal gebruiken. Dit moet als een hack worden beschouwd in vergelijking met het vorige opsommingsteken. Door uw domeinkennis in uw algo/serialisatie op te nemen, kan geheugen/cache-ruimte met 100x of 1000x worden geminimaliseerd, terwijl een Datasetwaarschijnlijk slechts 2x – 5x geheugen en 10x gecomprimeerd (parket) op schijf is.

http://spark.apache.org/docs/1.2.1/ configuratie.html

EDIT: (zodat ik mezelf makkelijker kan googlen) Het volgende wijst ook op dit probleem:

java.lang.OutOfMemoryError : GC overhead limit exceeded

Antwoord 2, autoriteit 17%

Om hier een use case aan toe te voegen die vaak niet wordt besproken, zal ik een oplossing bieden bij het indienen van een Spark-aanvraag via spark-submitin localmodus.

Volgens het gitbook Mastering Apache Sparkdoor Jacek Laskowski:

Je kunt Spark in de lokale modus uitvoeren. In deze niet-gedistribueerde single-JVM-implementatiemodus spawnt Spark alle uitvoeringscomponenten – stuurprogramma, uitvoerder, backend en master – in dezelfde JVM. Dit is de enige modus waarin een stuurprogramma wordt gebruikt voor uitvoering.

Als u dus OOM-fouten ervaart met de heap, volstaat het om het driver-memoryaan te passen in plaats van de executor-memory.

Hier is een voorbeeld:

spark-1.6.1/bin/spark-submit
  --class "MyClass"
  --driver-memory 12g
  --master local[*] 
  target/scala-2.10/simple-project_2.10-1.0.jar 

Antwoord 3, autoriteit 8%

U moet de offHeap-geheugeninstellingen configureren zoals hieronder weergegeven:

val spark = SparkSession
     .builder()
     .master("local[*]")
     .config("spark.executor.memory", "70g")
     .config("spark.driver.memory", "50g")
     .config("spark.memory.offHeap.enabled",true)
     .config("spark.memory.offHeap.size","16g")   
     .appName("sampleCodeForReference")
     .getOrCreate()

Geef het stuurprogrammageheugen en het uitvoerdergeheugen volgens de RAM-beschikbaarheid van uw machine. Je kunt de offHeap-grootte vergroten als je nog steeds met het OutofMemory-probleem wordt geconfronteerd.


Antwoord 4, autoriteit 4%

U moet het stuurprogrammageheugen vergroten. In je $SPARK_HOME/conf map zou je het bestand spark-defaults.confmoeten vinden, bewerken en de spark.driver.memory 4000minstellen, afhankelijk van het geheugen op je master, I denk.
Dit is wat het probleem voor mij heeft opgelost en alles verloopt soepel


Antwoord 5, autoriteit 4%

Bekijk de opstartscriptsdaar is een Java-heapgrootte ingesteld, het lijkt erop dat u dit niet instelt voordat u Spark worker uitvoert.

# Set SPARK_MEM if it isn't already set since we also use it for this process
SPARK_MEM=${SPARK_MEM:-512m}
export SPARK_MEM
# Set JAVA_OPTS to be able to load native libraries and to set heap size
JAVA_OPTS="$OUR_JAVA_OPTS"
JAVA_OPTS="$JAVA_OPTS -Djava.library.path=$SPARK_LIBRARY_PATH"
JAVA_OPTS="$JAVA_OPTS -Xms$SPARK_MEM -Xmx$SPARK_MEM"

Je kunt hierde documentatie vinden om scripts te implementeren.


Antwoord 6, autoriteit 3%

Ik had veel last van dit probleem bij het gebruik van dynamische brontoewijzing. Ik had gedacht dat het mijn clusterbronnen zou gebruiken om het beste bij de toepassing te passen.

Maar de waarheid is dat de dynamische toewijzing van bronnen het stuurprogrammageheugen niet instelt en op de standaardwaarde houdt, namelijk 1G.

Ik heb dit probleem opgelost door spark.driver.memoryin te stellen op een nummer dat past bij het geheugen van mijn chauffeur (voor 32GB ram heb ik dit ingesteld op 18G).

Je kunt het als volgt instellen met het spark-submit-commando:

spark-submit --conf spark.driver.memory=18g

Heel belangrijke opmerking, er wordt geen rekening gehouden met deze eigenschap als je deze instelt vanuit code, volgens Spark-documentatie – Spark-eigenschappen dynamisch laden:

Spark-eigenschappen kunnen hoofdzakelijk in twee soorten worden verdeeld: de ene is gerelateerd aan deployment, zoals “spark.driver.memory”, “spark.executor.instances”, dit soort eigenschappen wordt mogelijk niet beïnvloed wanneer deze programmatisch wordt ingesteld via SparkConf in runtime, of het gedrag is afhankelijk van de clustermanager en de implementatiemodus die u kiest, dus het wordt aangeraden om via configuratiebestand of spark-submit-opdrachtregelopties in te stellen; een andere is voornamelijk gerelateerd aan Spark-runtime-controle, zoals “spark.task.maxFailures”, dit soort eigenschappen kan op beide manieren worden ingesteld.


Antwoord 7

In het algemeen kan het Spark Executor JVM-geheugen in twee delen worden verdeeld. Spark-geheugen en gebruikersgeheugen. Dit wordt geregeld door eigenschap spark.memory.fraction– de waarde ligt tussen 0 en 1.
Als u met afbeeldingen werkt of geheugenintensieve verwerking uitvoert in Spark-toepassingen, kunt u overwegen de spark.memory.fractionte verlagen. Hierdoor komt er meer geheugen beschikbaar voor uw toepassingswerk. Spark kan morsen, dus het werkt nog steeds met minder geheugenaandeel.

Het tweede deel van het probleem is de werkverdeling. Deel uw gegevens indien mogelijk op in kleinere brokken. Kleinere gegevens hebben mogelijk minder geheugen nodig. Maar als dat niet mogelijk is, offer je compute op voor geheugen. Gewoonlijk zal een enkele uitvoerder meerdere kernen uitvoeren. Het totale geheugen van uitvoerders moet voldoende zijn om de geheugenvereisten van alle gelijktijdige taken af ​​te handelen. Als het vergroten van het geheugen van de uitvoerder geen optie is, kun je het aantal cores per uitvoerder verkleinen, zodat elke taak meer geheugen krijgt om mee te werken.
Test met 1 core-uitvoerders die het grootst mogelijke geheugen hebben dat u kunt geven en blijf dan het aantal cores verhogen totdat u het beste aantal cores vindt.


Antwoord 8

Heb je je master gc-log gedumpt? Dus ik ontmoette een soortgelijk probleem en ik ontdekte dat SPARK_DRIVER_MEMORY alleen de Xmx-heap instelde. De initiële heapgrootte blijft 1G en de heapgrootte wordt nooit opgeschaald naar de Xmx-heap.

Het doorgeven van “–conf “spark.driver.extraJavaOptions=-Xms20g” lost mijn probleem op.

ps aux | grep java en je ziet het volgende logboek:=

24501 30,7 1,7 41782944 2318184 pts/0 Sl+ 18:49 0:33 /usr/java/latest/bin/java-cp /opt/spark/conf /:/opt/spark/jars/* -Xmx30g -Xms20g


Antwoord 9

De locatie om de geheugenheapgrootte in te stellen (tenminste in spark-1.0.0) is in conf/spark-env.
De relevante variabelen zijn SPARK_EXECUTOR_MEMORY& SPARK_DRIVER_MEMORY.
Meer documenten staan ​​in de implementatiehandleiding

Vergeet ook niet om het configuratiebestand naar alle slave-knooppunten te kopiëren.


Antwoord 10

Ik heb weinig suggesties voor de bovengenoemde fout.

● Controleer of het geheugen van de uitvoerder dat is toegewezen als uitvoerder te maken kan krijgen met partities die meer geheugen nodig hebben dan is toegewezen.

● Probeer te zien of meer shuffles live zijn als shuffles dure bewerkingen zijn, omdat ze disk I / O, gegevensserialisatie en netwerk I / O

betrekken

● Gebruik de uitzending van de verbinding

● Vermijd het gebruik van GroupByKeys en probeer het te vervangen door DrillbyKey

● Vermijd het gebruik van enorme JAVA-objecten waar het opnieuw schuifelen


11

Heap-ruimtevouten optreden in het algemeen als gevolg van het brengen van te veel gegevens terug naar de bestuurder of de uitvoerder.
In uw code lijkt het niet alsof u iets terugbrengt naar de bestuurder, maar in plaats daarvan kunt u de uitvoerders overbelasten die een ingangsrecord / rij naar een andere mapporten in kaart brengen met behulp van de methode van het thermreconstruction (). Ik weet niet zeker wat er in de methode-definitie bevindt, maar dat veroorzaakt absoluut deze overbelasting van de executeur.
Nu heb je 2 opties,

  1. Bewerk uw code om de 3D-reconstructie op een efficiëntere manier te doen.
  2. Nee bewerken code, maar geef meer geheugen aan uw executors, evenals geef meer geheugen-overhead. [Spark.Executor. Memory of Spark.Driver.Memoryoverhead]

Ik zou adviseren om voorzichtig te zijn met de toename en gebruik alleen zoveel als u nodig hebt. Elke taak is uniek in termen van zijn geheugenvereisten, dus ik zou empirisch proberen verschillende waarden te proberen elke keer toenemen door een vermogen van 2 (256m, 512m, 1g .. enzovoort)

U komt tot een waarde voor het uitvoeringsgeheugen dat werkt. Probeer de taak opnieuw uit te voeren met deze waarde 3 of 5 keer voordat u voor deze configuratie wordt gereserveerd.

Other episodes