WebHU - Programozási kérdések és válaszok

A strukturált adatfolyam figyelése

Beállítottam egy strukturált adatfolyamot, amely jól működik, de reméltem, hogy figyelni fogom, miközben fut.

Felépítettem egy EventCollector-t

class EventCollector extends StreamingQueryListener{
  override def onQueryStarted(event: QueryStartedEvent): Unit = {
    println("Start")
  }

  override def onQueryProgress(event: QueryProgressEvent): Unit = {
    println(event.queryStatus.prettyJson)
  }

  override def onQueryTerminated(event: QueryTerminatedEvent): Unit = {
    println("Term")
  }

Felépítettem egy EventCollector-t, és hozzáadtam a hallgatót a spark munkamenetemhez

val listener = new EventCollector()
spark.streams.addListener(listener)

Aztán lekapcsolom a lekérdezést

val query = inputDF.writeStream
  //.format("console")
  .queryName("Stream")
  .foreach(writer)
  .start()

query.awaitTermination()

Az onQueryProgress-t azonban soha nem éri találat. az onQueryStarted igen, de reméltem, hogy bizonyos időközönként megkapom a lekérdezés előrehaladását, hogy figyelemmel kísérhessem a lekérdezések teljesítését. Tud valaki ebben segíteni?


  • A legutóbbi Spark Summit után. Megtudtuk, hogy az általunk keresett információk egy része megtalálható a szikraellenőrző fájlokban. 14.02.2017

Válaszok:


1

A téma hosszas kutatása után ezt találtam...

Az OnQueryProgress lekérdezések között találatot kap. Nem vagyok benne biztos, hogy ez a funkció szándékos-e vagy sem, de miközben egy fájlból adatfolyamot küldünk, az OnQueryProgress nem indul el.

Egy megoldást találtam az volt, hogy a foreach írói sinkre hagyatkoztam, és elvégeztem a saját teljesítményelemzésemet a folyamat funkción belül. Sajnos nem tudunk hozzáférni a futó lekérdezéssel kapcsolatos konkrét információkhoz. Vagy még nem jöttem rá, hogyan kell. Ezt implementáltam a sandboxomban a teljesítmény elemzéséhez:

val writer = new ForeachWriter[rawDataRow] {
    def open(partitionId: Long, version: Long):Boolean = {
        //We end up here in between files
        true
    }
    def process(value: rawDataRow) = {
        counter += 1

        if(counter % 1000 == 0) {
            val currentTime = System.nanoTime()
            val elapsedTime = (currentTime - startTime)/1000000000.0

            println(s"Records Written:  $counter")
            println(s"Time Elapsed: $elapsedTime seconds")
        }
     }
}

A mutatók lekérésének alternatív módja:

A futó lekérdezésekről való információszerzés másik módja a Spark által biztosított GET-végpont elérése.

http://localhost:4040/metrics

or

http://localhost:4040/api/v1/

A dokumentáció itt található: http://spark.apache.org/docs/latest/monitoring.html

2017. szeptember 2. számú frissítés: Szokásos spark streamingen tesztelve, nem strukturált adatfolyamon

Felelősség kizárása, előfordulhat, hogy ez nem vonatkozik a strukturált adatfolyamra. A megerősítéshez be kell állítanom egy tesztpadot. Mindazonáltal működik a rendszeres szikraközvetítéssel (ebben a példában a Kafka fogyasztása).

Úgy gondolom, hogy a 2.2-es spark streaming megjelenése óta új végpontok léteznek, amelyek több mérőszámot tudnak lekérni az adatfolyam teljesítményéről. Lehet, hogy ez a korábbi verziókban is létezett, és csak hiányoltam, de szerettem volna megbizonyosodni arról, hogy bárki más számára is dokumentálva van, aki ezt az információt keresi.

http://localhost:4040/api/v1/applications/{applicationIdHere}/streaming /statisztika

Ez az a végpont, amely úgy néz ki, mintha a 2.2-ben lett volna hozzáadva (Vagy már létezett, és csak a dokumentációhoz adták hozzá, nem vagyok benne biztos, nem ellenőriztem).

Mindenesetre metrikákat ad hozzá ebben a formátumban a megadott streaming alkalmazáshoz:

{
  "startTime" : "2017-09-13T14:02:28.883GMT",
  "batchDuration" : 1000,
  "numReceivers" : 0,
  "numActiveReceivers" : 0,
  "numInactiveReceivers" : 0,
  "numTotalCompletedBatches" : 90379,
  "numRetainedCompletedBatches" : 1000,
  "numActiveBatches" : 0,
  "numProcessedRecords" : 39652167,
  "numReceivedRecords" : 39652167,
  "avgInputRate" : 771.722,
  "avgSchedulingDelay" : 2,
  "avgProcessingTime" : 85,
  "avgTotalDelay" : 87
}

Ez lehetővé teszi számunkra, hogy saját egyéni metrika-/figyelőalkalmazásainkat építsük fel a Spark által elérhető REST-végpontok használatával.

08.12.2016
  • Mit értesz azon, hogy az OnQueryProgress találatot kap a lekérdezések között? Tájékoztatásul, a következő Spark 2.1-ben ezek az események akkor kerülnek közzétételre, amikor egy köteg fut, és 10 másodpercenként, még akkor is, ha nem érkezik új adat. 13.12.2016
  • Bocsánat, arra gondoltam, hogy az OnQueryProgress függvény a lekérdezések között meghívásra kerül. Azt akartuk, hogy az OnQueryProgress meghívást kapjon egy lekérdezés közben. Így boncolgathatjuk, hogyan teljesít. Ez még lehetséges, de még nem jöttem rá. 24.12.2016
  • Látom. Ha nagy tétel fut, hosszú ideig tarthat a befejezés. A FileStreamSource maxFilesPerTrigger opcióval korlátozza az egyes kötegekben feldolgozandó fájlok számát. Használhatja kis kötegek generálására nagy köteg helyett. 25.12.2016
  • Kíváncsi vagyok, hogy a strukturált streamingnek miért nincs ugyanaz a szép streamelési felhasználói felülete, mint a spark micro-match streaming... tudja valaki miért? 10.11.2018
  • Új anyagok

    A rádiógomb ellenőrzött eseményének használata a jQueryben
    Ebben a cikkben látni fogjuk, hogyan kell dolgozni a jquery választógombbal ellenőrzött eseményeivel. A választógombok HTML gombok, amelyek segítenek kiválasztani egyetlen értéket egy csoportból...

    Körkörös függőségek megoldása terraformban adatforrásokkal – lépésről lépésre
    Mi az a körkörös függőségek Dolgozzunk egy egyszerű eseten, amikor az SQS-sor és az S3-vödör közötti körkörös függőség problémája van egy egymástól függő címkeérték miatt. provider..

    Miért érdemes elkezdeni a kódolást 2023-ban?
    01100011 01101111 01100100 01100101 — beep boop beep boop Világunk folyamatosan fejlődik a technológia körül, és naponta fejlesztenek új technológiákat a valós problémák megoldására. Amint..

    🎙 Random Noise #2  – Örökbefogadás és hit
    az analitika íratlan világának gondozása Szeretné, hogy ezek a frissítések a postaládájába kerüljenek? Iratkozzon fel itt . "Ha önvezető autókat gyártanak, akkor mi miért ne..

    A legrosszabb politika és prediktív modellek májátültetésre jelöltek számára az Egyesült Államokban
    A máj (vagy óangolul lifer) az emberi test legnehezebb belső szervére utal, amely csendesen működik a nap 24 órájában. Mit csinál a máj? 500 feladatot hajt végre a szervezet egészségének..

    5 webhely, amely 2022-ben fejleszti front-end fejlesztői készségeit
    Frontendmentor.io A tényleges projektek létrehozásával a Frontendmentor.io segítséget nyújt a front-end kódolási képességeinek fejlesztésében. A kódolást azután kezdheti meg, hogy..

    Mikor kell használni a Type-t az interfészhez képest a TypeScriptben?
    A TypeScript a JavaScript gépelt szuperkészlete, amely statikus gépelést ad a nyelvhez. Ez megkönnyíti a robusztus és karbantartható kód írását azáltal, hogy a hibákat a fordítási időben..