Sok bejegyzés van erről a témáról, de egyik sem válaszolt a kérdésemre.
OutOfMemoryError
s-be futok a PySparkban, miközben sok különböző adatkeretet próbálok összekapcsolni.
A helyi gépem 16 GB memóriával rendelkezik, és a Spark konfigurációimat így állítottam be:
class SparkRawConsumer:
def __init__(self, filename, reference_date, FILM_DATA):
self.sparkContext = SparkContext(master='local[*]', appName='my_app')
SparkContext.setSystemProperty('spark.executor.memory', '3g')
SparkContext.setSystemProperty('spark.driver.memory', '15g')
Nyilvánvalóan sok-sok SO bejegyzés található a Spark OOM-hibáiról, de alapvetően a legtöbbjük azt mondja, hogy növelje a memória tulajdonságait.
Lényegében 50-60 kisebb adatkeretből hajtok végre összekapcsolásokat, amelyeknek két oszlopa van: uid
és data_in_the_form_of_lists
(általában Python karakterláncok listája). A fő adatkeretem, amelyhez csatlakozom, körülbelül 10 oszlopból áll, de tartalmaz egy uid
oszlopot is (amelyhez csatlakozom).
Csak 1500 adatsort próbálok egyesíteni. Azonban gyakran találkozom OutOfMemory hibákkal, amikor ezek az adatok egyértelműen elférnek a memóriában. Ezt úgy erősítem meg, hogy megnézem a SparkUI-mben a Tárhelyemet:
A kódban a csatlakozásaim így néznek ki:
# lots of computations to read in my dataframe and produce metric1, metric2, metric3, .... metric 50
metrics_df = metrics_df.join(
self.sqlContext.createDataFrame(metric1, schema=["uid", "metric1"]), on="uid")
metrics_df.count()
metrics_df.repartition("gid_value")
metrics_df = metrics_df.join(
self.sqlContext.createDataFrame(metric2, schema=["uid", "metric2"]),
on="gid_value")
metrics_df.repartition("gid_value")
metrics_df = metrics_df.join(
self.sqlContext.createDataFrame(metric3, schema=["uid", "metric3"]),
on="uid")
metrics_df.count()
metrics_df.repartition("gid_value")
Ahol metric1
, metric2
és metric3
azok az RDD-k, amelyeket az összekapcsolás előtt adatkeretekké alakítok át (ne feledje, hogy valójában 50 ilyen kisebb metric
df-ből csatlakozom).
Felhívom a metric.count()
-t, hogy erőltessem az értékelést, mert úgy tűnt, hogy segít megelőzni a memóriahibákat (különben sokkal több illesztőprogram-hibát kapnék a végső gyűjtésnél).
A hibák nem determinisztikusak. Nem látom, hogy a csatlakozásaim egy adott pontján következetesen előfordulnának, és néha úgy tűnik, hogy az utolsó metrics_df.collect()
hívásom során, néha pedig a kisebb csatlakozások során.
Nagyon gyanítom, hogy vannak problémák a feladatok szerializálásával/deserializálásával. Például, amikor megnézem az esemény idővonalát egy tipikus szakaszra vonatkozóan, azt látom, hogy annak nagy részét a feladat deszerializálása teszi ki:
Azt is észreveszem, hogy a szemétszállítási idők nagy száma van:
A szemétgyűjtés okozza a memóriahibákat? Vagy ez feladatsorosítás?
Szerkessze a megjegyzésekkel kapcsolatos kérdések megválaszolásához
A Spark-feladatot egy nagyobb PyCharm-projekt részeként futtattam (ezért került a spark-kontextus egy osztály köré). Átdolgoztam a kódot, hogy szkriptként fusson, a következő spark submit használatával:
spark-submit spark_consumer.py \
--driver-memory=10G \
--executor-memory=5G \
--conf spark.executor.extraJavaOptions='-XX:+UseParallelGC -XX:+PrintGCDetails -XX:+PrintGCTimeStamps'