WebHU - Programozási kérdések és válaszok

PySpark OutOfMemoryErrors sok adatkeret-csatlakozás végrehajtásakor

Sok bejegyzés van erről a témáról, de egyik sem válaszolt a kérdésemre.

OutOfMemoryErrors-be futok a PySparkban, miközben sok különböző adatkeretet próbálok összekapcsolni.

A helyi gépem 16 GB memóriával rendelkezik, és a Spark konfigurációimat így állítottam be:

class SparkRawConsumer:

    def __init__(self, filename, reference_date, FILM_DATA):
        self.sparkContext = SparkContext(master='local[*]', appName='my_app')
        SparkContext.setSystemProperty('spark.executor.memory', '3g')
        SparkContext.setSystemProperty('spark.driver.memory', '15g')

Nyilvánvalóan sok-sok SO bejegyzés található a Spark OOM-hibáiról, de alapvetően a legtöbbjük azt mondja, hogy növelje a memória tulajdonságait.

Lényegében 50-60 kisebb adatkeretből hajtok végre összekapcsolásokat, amelyeknek két oszlopa van: uid és data_in_the_form_of_lists (általában Python karakterláncok listája). A fő adatkeretem, amelyhez csatlakozom, körülbelül 10 oszlopból áll, de tartalmaz egy uid oszlopot is (amelyhez csatlakozom).

Csak 1500 adatsort próbálok egyesíteni. Azonban gyakran találkozom OutOfMemory hibákkal, amikor ezek az adatok egyértelműen elférnek a memóriában. Ezt úgy erősítem meg, hogy megnézem a SparkUI-mben a Tárhelyemet:

Spark UI képernyőkép

A kódban a csatlakozásaim így néznek ki:

# lots of computations to read in my dataframe and produce metric1, metric2, metric3, .... metric 50
metrics_df = metrics_df.join(
                self.sqlContext.createDataFrame(metric1, schema=["uid", "metric1"]), on="uid")

metrics_df.count()
metrics_df.repartition("gid_value")
metrics_df = metrics_df.join(
                self.sqlContext.createDataFrame(metric2, schema=["uid", "metric2"]),
                on="gid_value")

metrics_df.repartition("gid_value")
metrics_df = metrics_df.join(
                self.sqlContext.createDataFrame(metric3, schema=["uid", "metric3"]),
                on="uid")

metrics_df.count()
metrics_df.repartition("gid_value")

Ahol metric1, metric2 és metric3 azok az RDD-k, amelyeket az összekapcsolás előtt adatkeretekké alakítok át (ne feledje, hogy valójában 50 ilyen kisebb metric df-ből csatlakozom).

Felhívom a metric.count()-t, hogy erőltessem az értékelést, mert úgy tűnt, hogy segít megelőzni a memóriahibákat (különben sokkal több illesztőprogram-hibát kapnék a végső gyűjtésnél).

A hibák nem determinisztikusak. Nem látom, hogy a csatlakozásaim egy adott pontján következetesen előfordulnának, és néha úgy tűnik, hogy az utolsó metrics_df.collect() hívásom során, néha pedig a kisebb csatlakozások során.

Nagyon gyanítom, hogy vannak problémák a feladatok szerializálásával/deserializálásával. Például, amikor megnézem az esemény idővonalát egy tipikus szakaszra vonatkozóan, azt látom, hogy annak nagy részét a feladat deszerializálása teszi ki:

Spark UI képernyőkép sorozatosítása

Azt is észreveszem, hogy a szemétszállítási idők nagy száma van:

Spark UI képernyőkép szemétgyűjtés

A szemétgyűjtés okozza a memóriahibákat? Vagy ez feladatsorosítás?

Szerkessze a megjegyzésekkel kapcsolatos kérdések megválaszolásához

A Spark-feladatot egy nagyobb PyCharm-projekt részeként futtattam (ezért került a spark-kontextus egy osztály köré). Átdolgoztam a kódot, hogy szkriptként fusson, a következő spark submit használatával:

spark-submit spark_consumer.py \
  --driver-memory=10G \
  --executor-memory=5G \
  --conf spark.executor.extraJavaOptions='-XX:+UseParallelGC -XX:+PrintGCDetails -XX:+PrintGCTimeStamps'

  • Nem vagyok benne biztos, hogy ez az oka, de nem tudja beállítani az illesztőprogram-memóriát az alkalmazásból. Az alkalmazások indítása előtt be kell állítania (spark-env vagy spark-submit). 12.07.2018
  • Meg tudod osztani a használt spark-submit parancsot? A magas GC idő a memória nyomását jelzi, még OOM hiba nélkül is. Figyelje meg, hogy a Storage Memory oszlop azt írja, hogy 384MB áll rendelkezésre, ez arra utal, hogy @hamzatuna helyesen mondta, hogy nem a megfelelő memóriakiosztással indítja el az illesztőprogramot. 12.07.2018
  • @TravisHegner A fenti szerkesztésemben válaszoltam kérdésére 12.07.2018
  • A SpartContext spark inicializálása során a beküldött konfigurációk el lesznek vetve. Érdemes a SparkContext().getOrCreate() metódust. 12.07.2018
  • @hamzatuna A kódomat self.sparkContext = SparkContext.getOrCreate()-re változtattam... de még mindig azt látom, hogy kevesebb, mint 400 MB szabad memóriám van. 12.07.2018
  • Beállítottál valami konfigurációt a kódon? 12.07.2018
  • @hamzatuna. Nem – az általam megadott kód az. A megjegyzésed alapján eltávolítottam a SparkContext.setSystemProperty('spark.executor.memory', '3g')-emet. 12.07.2018
  • Megpróbálhatja = operátor nélkül is a spark-submitben. Kipróbálhatod így --driver-memory 10g 12.07.2018
  • @hamzatuna sajnos a rendelkezésre álló memóriám továbbra is ugyanaz marad. 12.07.2018
  • Hozzátettem, hogyan oldottam meg az esetemben. Sikerült, esetleg más oka van? Bármilyen végrehajtója vagy klasztere van. 12.07.2018
  • Most fut, és azt látom, hogy a memóriaterhelésem megnőtt. Meglátjuk sikeresen tud-e futni! 12.07.2018

Válaszok:


1

Hasonló problémával szembesültem, és működött a következőkkel:
Spark Submit:

spark-submit --driver-memory 3g\
            --executor-memory 14g\
            *.py

Kód:

sc = SparkContext().getOrCreate()
12.07.2018
Új anyagok

A rádiógomb ellenőrzött eseményének használata a jQueryben
Ebben a cikkben látni fogjuk, hogyan kell dolgozni a jquery választógombbal ellenőrzött eseményeivel. A választógombok HTML gombok, amelyek segítenek kiválasztani egyetlen értéket egy csoportból...

Körkörös függőségek megoldása terraformban adatforrásokkal – lépésről lépésre
Mi az a körkörös függőségek Dolgozzunk egy egyszerű eseten, amikor az SQS-sor és az S3-vödör közötti körkörös függőség problémája van egy egymástól függő címkeérték miatt. provider..

Miért érdemes elkezdeni a kódolást 2023-ban?
01100011 01101111 01100100 01100101 — beep boop beep boop Világunk folyamatosan fejlődik a technológia körül, és naponta fejlesztenek új technológiákat a valós problémák megoldására. Amint..

🎙 Random Noise #2  – Örökbefogadás és hit
az analitika íratlan világának gondozása Szeretné, hogy ezek a frissítések a postaládájába kerüljenek? Iratkozzon fel itt . "Ha önvezető autókat gyártanak, akkor mi miért ne..

A legrosszabb politika és prediktív modellek májátültetésre jelöltek számára az Egyesült Államokban
A máj (vagy óangolul lifer) az emberi test legnehezebb belső szervére utal, amely csendesen működik a nap 24 órájában. Mit csinál a máj? 500 feladatot hajt végre a szervezet egészségének..

5 webhely, amely 2022-ben fejleszti front-end fejlesztői készségeit
Frontendmentor.io A tényleges projektek létrehozásával a Frontendmentor.io segítséget nyújt a front-end kódolási képességeinek fejlesztésében. A kódolást azután kezdheti meg, hogy..

Mikor kell használni a Type-t az interfészhez képest a TypeScriptben?
A TypeScript a JavaScript gépelt szuperkészlete, amely statikus gépelést ad a nyelvhez. Ez megkönnyíti a robusztus és karbantartható kód írását azáltal, hogy a hibákat a fordítási időben..