A téma hosszas kutatása után ezt találtam...
Az OnQueryProgress lekérdezések között találatot kap. Nem vagyok benne biztos, hogy ez a funkció szándékos-e vagy sem, de miközben egy fájlból adatfolyamot küldünk, az OnQueryProgress nem indul el.
Egy megoldást találtam az volt, hogy a foreach írói sinkre hagyatkoztam, és elvégeztem a saját teljesítményelemzésemet a folyamat funkción belül. Sajnos nem tudunk hozzáférni a futó lekérdezéssel kapcsolatos konkrét információkhoz. Vagy még nem jöttem rá, hogyan kell. Ezt implementáltam a sandboxomban a teljesítmény elemzéséhez:
val writer = new ForeachWriter[rawDataRow] {
def open(partitionId: Long, version: Long):Boolean = {
//We end up here in between files
true
}
def process(value: rawDataRow) = {
counter += 1
if(counter % 1000 == 0) {
val currentTime = System.nanoTime()
val elapsedTime = (currentTime - startTime)/1000000000.0
println(s"Records Written: $counter")
println(s"Time Elapsed: $elapsedTime seconds")
}
}
}
A mutatók lekérésének alternatív módja:
A futó lekérdezésekről való információszerzés másik módja a Spark által biztosított GET-végpont elérése.
http://localhost:4040/metrics
or
http://localhost:4040/api/v1/
A dokumentáció itt található: http://spark.apache.org/docs/latest/monitoring.html
2017. szeptember 2. számú frissítés: Szokásos spark streamingen tesztelve, nem strukturált adatfolyamon
Felelősség kizárása, előfordulhat, hogy ez nem vonatkozik a strukturált adatfolyamra. A megerősítéshez be kell állítanom egy tesztpadot. Mindazonáltal működik a rendszeres szikraközvetítéssel (ebben a példában a Kafka fogyasztása).
Úgy gondolom, hogy a 2.2-es spark streaming megjelenése óta új végpontok léteznek, amelyek több mérőszámot tudnak lekérni az adatfolyam teljesítményéről. Lehet, hogy ez a korábbi verziókban is létezett, és csak hiányoltam, de szerettem volna megbizonyosodni arról, hogy bárki más számára is dokumentálva van, aki ezt az információt keresi.
http://localhost:4040/api/v1/applications/{applicationIdHere}/streaming /statisztika
Ez az a végpont, amely úgy néz ki, mintha a 2.2-ben lett volna hozzáadva (Vagy már létezett, és csak a dokumentációhoz adták hozzá, nem vagyok benne biztos, nem ellenőriztem).
Mindenesetre metrikákat ad hozzá ebben a formátumban a megadott streaming alkalmazáshoz:
{
"startTime" : "2017-09-13T14:02:28.883GMT",
"batchDuration" : 1000,
"numReceivers" : 0,
"numActiveReceivers" : 0,
"numInactiveReceivers" : 0,
"numTotalCompletedBatches" : 90379,
"numRetainedCompletedBatches" : 1000,
"numActiveBatches" : 0,
"numProcessedRecords" : 39652167,
"numReceivedRecords" : 39652167,
"avgInputRate" : 771.722,
"avgSchedulingDelay" : 2,
"avgProcessingTime" : 85,
"avgTotalDelay" : 87
}
Ez lehetővé teszi számunkra, hogy saját egyéni metrika-/figyelőalkalmazásainkat építsük fel a Spark által elérhető REST-végpontok használatával.
08.12.2016
maxFilesPerTrigger
opcióval korlátozza az egyes kötegekben feldolgozandó fájlok számát. Használhatja kis kötegek generálására nagy köteg helyett. 25.12.2016