Kotlin kódpéldákkal

Az első alkalommal, amikor minden új Kafka mérnök valami bonyolultabbat próbál megtenni, mint egy egyszerű olvasás-gyors folyamat-írás, elkerülhetetlenül meg kell kerülnie azt a tényt, hogy a KafkaConsumer nem szálbiztos ( lásd: doc).

A Kafka felépítése természetesen vízszintes méretezésre kényszeríti Önt azáltal, hogy több fogyasztót ad hozzá egy csoporthoz. A Kafkában egy témakör összes rekordja partíciókba van csoportosítva, amelyekre egymástól függetlenül lehet írni és ki lehet olvasni anélkül, hogy az adatok minősége romlana.

Tehát, ha nagyobb feldolgozási teljesítményt akarunk, több partícióra van szükségünk, és legalább annyi fogyasztót kell előfizetnünk egy csoportban, ahol minden fogyasztó külön szál. (A fogyasztók száma lehet nagyobb vagy egyenlő a partíciók számával, az esetleges többletfogyasztók csak készenlétben állnak, ha valamelyik megszakad.)

Ez azonban nem az a többszálú, amelyet megvitatunk. A szál-fogyasztónkénti modell a legegyszerűbb. Ez azonban azon a feltételezésen alapul, hogy a legtöbb használati eset egy egyszerű és gyors feldolgozási lépést tartalmaz az iratok felhasználása után. A való életben néha többet kell tennünk, mint egy-két mezőt hozzáfűzni az üzenetekhez, lehet, hogy néhány I/O műveletet, összetett számítási műveletet kell végeznünk, harmadik fél API-t kell hívnunk, vagy koordinálni kell az alkalmazás más bitjeivel.

Ezután létre kell hoznunk valami egyedit, hogy megkerüljük a könyvtár korlátait. Mint minden más mérnöki probléma, az első megközelítés, ami eszünkbe jut, nem az, hogy mihez akarunk jutni, hanem kezdjük vele, és kövessük a természetes folyamatot.

1. megközelítés – csak használjon zászlókat (Duh..)

Tegyük fel, hogy van egy fogyasztónk, akit értesíteni kell, hogy mikor kell bezárni. Természetesen hozzáadunk egy egyszerű zászlót, és egy ciklusban ellenőrizzük, nem?

...
private final closed = new AtomicBoolean(false);
...
// this runs in a separate thread than the main 
try {
   consumer.subscribe(Arrays.asList("topic"));
   while (!closed.get()) {
      val records = consumer.poll(Duration.ofSeconds(1));
      // processing..
   }
} catch (e: Exception) {
   // Ignore exception if closing
   if (!closed.get()) throw e;
} finally {
   consumer.close();
}

Most tegyük fel, hogy olyan alkalmazással dolgozunk, ahol a feldolgozásnak sikeresen be kell fejeződnie a véglegesítés előtt, tehát kikapcsoltuk az automatikus véglegesítést, és manuálisan vezéreljük.

Ezen túlmenően szükségünk van arra, hogy az egyik téma adatait feldolgozzuk egy másik témában lévő bitek és darabok felhasználásával, vagy el kell küldenünk azokat egy harmadik fél API-nak átalakítani, és meg kell várnunk az eredményt.

Tehát ezzel a nehéz feladattal a kezünkben megyünk tovább, és... természetesen további zászlókat adunk hozzá.

...
private final closed = new AtomicBoolean(false);
private final readyToCommit = new AtomicBoolean(false);
...
try {
   consumer.subscribe(Arrays.asList("topic"));
   while (!closed.get()) {
      val records = consumer.poll(Duration.ofSeconds(1));
      // start processing in another thread - call API, stitch data from this topic with another etc..
     while (!readyToCommit.get()) {
      // idle loop?,  maybe add Thread.sleep() or delay()
     }
     
   }
} catch (e: Exception) {
   // Ignore exception if closing
   if (!closed.get()) throw e;
} finally {
   consumer.close();
}

Ez nem néz ki jól, igaz? Mi van, ha több érintkezési pontunk van más szálakkal? Mi a teendő, ha van egy esemény, amely szünetet,egyállapotot vagy időszakos statisztikákat igényel egy fájlba, esetleg bezárja a fogyasztót, de a szál leállítása nélkül hogy később újra csatlakozzon egy másik eseménnyel? Ez annyival több zászlót jelentene, sokkal több hibalehetőséget jelentene.

Azt hisszük, mindannyian egyetértünk abban, hogy egyszerű logikai értékek használata nem optimális és nem méretezhető. Térjünk át a következő lehetőségre.

2. megközelítés – alakítsa át a zászlókat állapotgéppé

Hogyan lenne jobb? Nos, ha jól csinálják, akkor egyetlen irányítási, hozzáférési és meghibásodási pontot kaphat. Olvashatóbbá tenné a kódot, egy kicsit könnyebbé tenné a hibakezelést, és egy kicsit elviselhetőbbé a hibakeresést (a többszálú soha nem az).

A következő állapotokat vesszük figyelembe:

enum class State {
   SUBSCRIBING,
   CONSUMING
   PAUSING,
   PAUSED,
   RESUMING,
   CLOSING
   CLOSED,
   TERMINATING,
   IN_ERROR;
   fun isHealthy(): Boolean {
      return this != IN_ERROR
   }  
   fun shouldRun(): Boolean {
      return this != TERMINATING
   }     
}

Fontos világos és szigorú szabályokat meghatározni arra vonatkozóan, hogy mely átmenetek érvényesek, mi fordulhat elő többszálú esetben, de elfogadhatók, és mi a probléma jele, azaz mi a normál áramlás, mi tekinthető figyelmeztetésnek, és minek kellene riasztást kiváltania.

A fenti állapotok esetében a következő folyamatban tudunk megegyezni:

Ez lefordítható magában a StateMachine-ban lévő szabályokra, például:

class StateMachine {
    companion object {
       private val rules = mapOf(
          SUBSCRIBING to Predicate<State> { it == RESUMING || it == CLOSED },
          CONSUMING to Predicate<State> { it == SUBSCRIBING },
          PAUSING to Predicate<State> { it == CONSUMING },
          PAUSED to Predicate<State> { it == PAUSING },
          RESUMING to Predicate<State> { it == PAUSED },
          CLOSING to Predicate<State> { it == CONSUMING },
          CLOSED to Predicate<State> { it == CLOSING },
          TERMINATING to Predicate<State> { it == SUBSCRIBING ||
   it == CONSUMING || it == PAUSING || it == PAUSED || it == RESUMING || it == CLOSING || it == CLOSED || it == IN_ERROR }
          IN_ERROR to Predicate<State> { it == SUBSCRIBING ||
   it == CONSUMING || it == PAUSING || it == PAUSED || it == RESUMING || it == CLOSING || it == CLOSED }  
        )
        fun transitionRuleFor(state: State) {
           return rules[state]|| 
       }
    }
    private val state = AtomicReference(State.CLOSED)
    fun transitionTo(newState: State){
         state.getAndUpdate{
            if(transitionRuleFor(newState).test(oldState)) {
               newState 
            } else {
               // handle invalid transitions...
             }
         }
    } 
}

A fentiek felhasználásával a fogyasztói szál while ciklusa a következőképpen módosítható:

...
val state = AtomicReference<StateMachine>(StateMachine())
...
while (state.get().shouldRun()) {
   when(state.get()) {
     SUBSCRIBING -> subscribe()
     CONSUMING -> processRecords()
     PAUSING -> pause()
     PAUSED -> doNothing() //well almost, will discuss later 
     RESUMING -> resume()
     CLOSING -> close()
     CLOSED -> doNothing() //maybe add delays if right for the app
     IN_ERROR - > logAndAlert() // any other error handling and either break or attempt a recovery
   }
}
// handle successful termination like app TERM on deploy or else

A when kifejezésben meghívott egyedi metódusoknak a lehetséges kivételekkel és a következő állapotba való átmenettel kell foglalkozniuk.

Ennek a kódrészletnek egy olyan osztályban kell élnie, amely a tényleges KafkaConsumer-példányt csomagolja, és külön korutinként indul el. Ugyanilyen jól működne egy vagy több fogyasztóból álló csoportnál.

3. megközelítés — színészek használata.. az Állapotgép tetején

A 2. megközelítés elegendő, ha a fogyasztói osztály csak egy másik szálból kap értesítést az eseményekről, vagy egy menedzser több fogyasztói osztálynak küld értesítést küldés és elfelejt stratégiával.

Ha azonban több szál is szolgálhat az értesítések forrásaként, amelyeket egymás után kell feldolgozni, vagy a fogyasztóknak visszajelzést kell küldeniük egy menedzsernek, akkor fel kell tűrnünk az ingujjat, és valami kifinomultabbat kell használni. Attól függően, hogy az első vagy a második esettel van dolgunk, lehet, hogy ezt csak a menedzser oldalon vagy a fogyasztói oldalon kell megvalósítani.

Ehhez a State osztályon kívül létre kell hoznunk egy hierarchiát az üzenetosztályokból, amelyeket a csatornán keresztül küldünk. Lehetne egy ControlMessage osztály, amelyet a menedzser használ a fogyasztói osztályok munkafolyamatának vezérlésére, és a FeedbackMessage osztály, amelyet a fogyasztói osztályok használnak, hogy értesítsék a vezetőt a kért művelet végrehajtásáról. vagy hiba történt.

sealed class Message {
   sealed class ControlMessage: Message() {
      class SubscribeMessage(topics: List<String>): ControlMessage()
      object CloseMessage: ControlMessage()
      object PauseMessage: ControlMessage()
      object ResumeMessage: ControlMessage()
      object TerminateMessage: ControlMessage()
   }
sealed class FeedbackMessage(consumer: Consumer): Message() {
      class SubscribedMessage(c: Consumer): FeedbackMessage(c)
      class ClosedMessage(c: Consumer): FeedbackMessage(c)
      class PausedMessage(c: Consumer): FeedbackMessage(c)
      class ConsumingMessage(c: Consumer): FeedbackMessage(c)
      class TerminatedMessage(c: Consumer): FeedbackMessage(c)
      class InErrorMessage(c: Consumer, e: Exception): FeedbackMessage(c)
   }
}

Ha több implementációra van szükségünk, mint fentebb említettük, jó lenne, ha lenne egy gyár a csatorna létrehozásához.

class ChannelFactory {
   fun createChannel(consumer: Consumer<Message>): SendChannel<Message> {
      return object: CoroutineScope {
         override val coroutineContext = Dispatchers.Unconfined + Job()
         val channel = actor<Message> {
            for(message in channel) {
                consumer.accept(message)
            }
         }
       }.channel
   }
}

Az üzenetfeldolgozás a csatornákban így fog kinézni a fogyasztói osztálynál (és analógok a menedzsernél, de a FeedbackMessage hierarchiával).

val consumer = Consumer<Message> { message ->
   when (message) {
       is SubscribeMessage -> subscribe(message.topics)
       is CloseMessage -> close()
       is PauseMessage -> pause()
       is ResumeMessage -> resume()
       is TerminateMessage -> terminate()
    }
}
val channel = channelFactory.createChannel(consumer)

A trükkös dolog itt az egészet inicializálni kell, mielőtt bármilyen üzenetet küldene, mivel a fogyasztói osztályoknak hozzá kell férniük a menedzser csatornájához, hogy visszajelzést küldhessenek, és fordítva – a menedzsernek hozzáféréssel kell rendelkeznie a fogyasztói osztályok csatornáihoz. .

Ha odafigyelt az általunk megbeszélt forgatókönyvre, máris felmerülhet néhány kérdés. Az egyik valószínűleg az, hogy – hogyan lehet megakadályozni, hogy a fogyasztók hosszú időn át, kötelezettségek nélkül, Kafkának küldött szívdobbanásaival kiszoruljanak a csoportból?

Három kafka fogyasztói konfigurációt kell megértenünk, mielőtt foglalkoznánk ezzel:

  • session.timeout.ms— meghatározza azt a maximális időt, amelyet a bróker vár, mielőtt inaktívnak tekintené az ügyfelet. Ezt követően az ügyfél eltávolításra kerül a csoportból, és újraegyensúlyozásra kerül sor. Az alapértelmezett 45 mp, de minden értéknek group.min.session.timeout.ms és group.max.session.timeout.ms között kell lennie, amelyek a közvetítő oldali konfigurációk.
  • heartbeat.inteval.ms — meghatározza, hogy milyen gyakran kell szívveréseket küldeni. Az alapértelmezett érték 3 sde bármely értéknek kisebbnek kell lennie session.timeout.ms-nél, és nem lehet nagyobb ennek 1/3-ánál.
  • max.poll.interval.ms —meghatározza a maximális késleltetést a poll() meghívása között. Ha ennyi idő elteltével a rendszer nem kér le új rekordot, a fogyasztó sikertelennek minősül, és újraegyensúlyozás indul, hogy a partíciókat a csoport egy másik tagjához hozzárendeljék. Az alapértelmezett 5 perc. Azok a fogyasztók, akik nem nulla group.instance.id értéket használnak, amikor eléri az időtúllépést, a partíciók nem kerülnek azonnal újra kiosztásra, csak a session.timeout.ms lejárta után. Ez egy statikus fogyasztó viselkedését tükrözi, amely leállt.

Tehát lényegében minden heartbeat.interval.ms -ben el kell küldeni a szívverést, különben session.timeout.ms után a bróker halottnak tekinti az ügyfelet, kirúgja, egyensúlyba hozza és továbbmegy.

A Java Library-nél van egy szál, amely ezt kezeli a háttérben, de csak akkor, ha a max.poll.interval.ms -on belül szavaz, azaz amíg a feldolgozás gyors és az új rekordok elég gyakran lekérdeznek, a szívverés miatt nem kell aggódni. egyáltalán.

A fenti forgatókönyv azonban hosszú távú, összetett feldolgozást feltételez, amely meghaladhatja az 5 perces időtúllépést.

Vannak kézenfekvő megoldások, mint például a kód optimalizálása vagy a konfigurációs idő növelése. Előfordulhat azonban, hogy az első lehetetlen vagy nem elég, az utóbbi pedig meglehetősen veszélyes, mert megakadályozza, hogy elég gyorsan észleljük a hibákat.

Ne feledje, hogy azt is mondtuk, hogy meg kell győződnünk arról, hogy a feldolgozás sikeresen megtörtént, mielőtt elkötelezi magát. Tehát más lehetőségre van szükségünk.

A jó hír az, hogy már félúton vagyunk – SZÜNETELTETT állapotunk és/vagy Szünetüzenetünk van. A megoldás az, hogy szünetet tart, miközben egy külön szálban végzi el a régóta futó feldolgozási feladatot. Szüneteltetve, a fogyasztói szálban folytassa biztonságosan a lekérdezést (és ezzel együtt folytassa a szívverések küldését), és ténylegesen nem küldenek vissza rekordokat. Igen, ez még egy szálat ad hozzá, de megéri.

A második, állapotgépes megközelítésünkhöz most a következő az eredmény:

...
while (state.get().shouldRun()) {
   when(state.get()) {
     ...
     CONSUMING -> pollStartThreadToProcessAndSetToPausing()
     PAUSING -> pause()
     PAUSED -> keepPollingAndVerifyNothingPolled()  
     ...
   }
}
...

Alternatív megoldásként a rekordok pufferelhetők, amíg a menedzser szünetet nem tart, hogy elindítsa a hosszan tartó feldolgozást.

...
while (state.get().shouldRun()) {
   when(state.get()) {
     ...
     CONSUMING -> pollAndBufferRawOrTransformedRecords()
     PAUSING -> pauseAndStartThreadForProcessing() 
     PAUSED -> keepPollingAndVerifyNothingPolled()  
     ...
   }
}
...

Ha ezen felül van az aktor réteg, akkor a változtatás egyszerű, mivel a nagy terhelést az állapotgép szakasz végzi, és csak értesítenünk kell a fogyasztói osztályt arról, hogy milyen állapotba kell lépnie (megjegyzendő, hogy ez csak pufferelési esetre vonatkozik).

Nos, nem volt ez olyan szórakoztató? Tudassa velem, hogy kipróbálta-e, és milyen könnyen vagy nehezen találta meg. Milyen problémákkal találkozott?

Boldog kódolást!