Programiranje

Zasnovan za sproten čas: sporočanje velikih podatkov z Apachejem Kafko, 2. del

V prvi polovici tega uvoda JavaWorld v Apache Kafka ste z uporabo Kafke razvili nekaj manjših aplikacij za proizvajalce / potrošnike. Iz teh vaj bi se morali seznaniti z osnovami sistema sporočil Apache Kafka. V tej drugi polovici boste izvedeli, kako uporabljati particije za porazdelitev obremenitve in vodoravno lestvico aplikacije, pri čemer lahko dnevno obdelate do milijone sporočil. Izvedeli boste tudi, kako Kafka uporablja odmike sporočil za sledenje in upravljanje zapletene obdelave sporočil ter kako zaščititi svoj sistem sporočil Apache Kafka pred okvaro, če potrošnik pade. Primer aplikacije iz 1. dela bomo razvili tako za primere uporabe objavi-naroči kot tudi od točke do točke.

Predelne stene v Apache Kafki

Teme v Kafki lahko razdelimo na particije. Na primer, med ustvarjanjem teme z imenom Demo lahko nastavite, da ima tri particije. Strežnik bi ustvaril tri dnevniške datoteke, po eno za vsako predstavitveno particijo. Ko je proizvajalec temo objavil sporočilo, bi temu sporočilu dodelil ID particije. Strežnik bi nato sporočilu dodal datoteko dnevnika samo za to particijo.

Če ste nato zagnali dva porabnika, lahko strežnik prvemu porabniku dodeli particije 1 in 2, drugemu pa particijo 3. Vsak potrošnik bi bral samo z dodeljenih particij. Na sliki 1 lahko vidite predstavitveno temo, konfigurirano za tri particije.

Če želite razširiti scenarij, si predstavljajte grozd Kafka z dvema posrednikoma, nameščenima v dveh strojih. Ko ste razdelili predstavitveno temo, bi jo nastavili tako, da ima dve particiji in dve kopiji. Za to vrsto konfiguracije bi strežnik Kafka dodelil dve particiji dvema posrednikoma v vaši gruči. Vsak posrednik bi bil vodja ene od particij.

Ko je producent objavil sporočilo, je šel do vodje razdelka. Vodja je vzel sporočilo in ga dodal dnevniški datoteki na lokalnem računalniku. Drugi posrednik bi ta dnevnik zapisov pasivno kopiral na svoj računalnik. Če bi vodja razdelka padel, bi drugi posrednik postal novi vodja in začel posluževati zahteve strank. Na enak način, ko potrošnik pošlje zahtevo na particijo, gre ta zahteva najprej vodji particije, ta pa vrne zahtevana sporočila.

Prednosti razdelitve

Upoštevajte prednosti razdelitve sistema sporočil na osnovi Kafke:

  1. Razširljivost: V sistemu z samo eno particijo so sporočila, objavljena v temi, shranjena v dnevniški datoteki, ki obstaja na enem računalniku. Število sporočil za temo mora ustrezati eni sami dnevniški datoteki odobritve, velikost shranjenih sporočil pa ne sme biti večja od prostora na disku te naprave. Z razdelitvijo teme lahko prilagodite sistem tako, da shranite sporočila na različne stroje v gruči. Če ste na primer želeli shraniti 30 gigabajtov (GB) sporočil za predstavitveno temo, lahko zgradite gručo Kafka iz treh naprav, vsaka z 10 GB prostora na disku. Nato bi temo nastavili tako, da bo imela tri particije.
  2. Uravnavanje obremenitve strežnika: Če imate več particij, lahko razširjate zahteve po sporočilih med posredniki. Če ste imeli na primer temo, ki je obdelala milijon sporočil na sekundo, jo lahko razdelite na 100 particij in v svojo gručo dodate 100 posrednikov. Vsak posrednik bi bil vodja posamezne particije, odgovoren za odziv na samo 10.000 zahtev strank na sekundo.
  3. Izravnava potrošniške obremenitve: Podobno kot uravnoteženje obremenitve strežnika tudi gostovanje več potrošnikov na različnih strojih omogoča širjenje potrošniške obremenitve. Recimo, da ste želeli porabiti milijon sporočil na sekundo iz teme s 100 particijami. Lahko ustvarite 100 potrošnikov in jih vzporedno vodite. Strežnik Kafka bi vsakemu potrošniku dodelil eno particijo, vsak potrošnik pa bi vzporedno obdelal 10.000 sporočil. Ker Kafka vsako particijo dodeli le enemu potrošniku, bi bilo znotraj particije vsako sporočilo porabljeno po vrstnem redu.

Dva načina ločevanja

Producent je odgovoren za odločitev, na katero particijo bo šlo sporočilo. Producent ima dve možnosti za nadzor te naloge:

  • Po meri particija: Lahko ustvarite razred, ki izvaja org.apache.kafka.clients.producer.Partitioner vmesnik. Ta navada Pregradna stena bo implementiral poslovno logiko, da se bo odločil, kam bodo poslana sporočila.
  • DefaultPartitioner: Če ne ustvarite razreda razdelilnika po meri, potem je privzeto org.apache.kafka.clients.producer.internals.DefaultPartitioner razred bo uporabljen. Privzeti razdelilnik je v večini primerov dovolj dober in ponuja tri možnosti:
    1. Priročnik: Ko ustvarite datoteko ProducerRecord, uporabite preobremenjeni konstruktor new ProducerRecord (ime teme, particijaId, sporočiloKey, sporočilo) da določite ID particije.
    2. Razprševanje (občutljivo na lokacijo): Ko ustvarite datoteko ProducerRecord, navedite a messageKey, s klicem new ProducerRecord (ime teme, sporočiloKey, sporočilo). DefaultPartitioner bo z zgoščenostjo ključa zagotovil, da bodo vsa sporočila istega ključa namenjena istemu proizvajalcu. To je najlažji in najpogostejši pristop.
    3. Škropljenje (naključno uravnoteženje obremenitve): Če ne želite nadzorovati, na katero sporočilo particije gredo, preprosto pokličite new ProducerRecord (ime teme, sporočilo) da ustvarite svoj ProducerRecord. V tem primeru bo razdelilnik pošiljal sporočila na vse particije na način, ki zagotavlja uravnoteženo obremenitev strežnika.

Razdelitev aplikacije Apache Kafka

Za preprost primer proizvajalec / potrošnik v 1. delu smo uporabili a DefaultPartitioner. Zdaj bomo namesto tega poskusili ustvariti particijo po meri. Za ta primer predpostavimo, da imamo spletno mesto za prodajo na drobno, s katerim lahko potrošniki naročajo izdelke kjer koli po svetu. Glede na uporabo vemo, da je večina potrošnikov v ZDA ali Indiji. Našo aplikacijo želimo razdeliti na pošiljanje naročil iz ZDA ali Indije njihovim lastnim potrošnikom, medtem ko bodo naročila od koder koli prejemala tretji potrošniki.

Za začetek bomo ustvarili CountryPartitioner ki izvaja org.apache.kafka.clients.producer.Partitioner vmesnik. Izvesti moramo naslednje metode:

  1. Kafka bo poklical configure () ko inicializiramo Pregradna stena razred, z a Zemljevid lastnosti konfiguracije. Ta metoda inicializira funkcije, specifične za poslovno logiko aplikacije, na primer povezavo z bazo podatkov. V tem primeru želimo dokaj splošen razdelilnik, ki ga potrebuje countryName kot lastnina. Nato lahko uporabimo configProperties.put ("partitions.0", "USA") preslikati tok sporočil na particije. V prihodnosti lahko s to obliko spreminjamo, katere države dobijo lastno particijo.
  2. The Producent Klici API particija () enkrat za vsako sporočilo. V tem primeru ga bomo uporabili za branje sporočila in razčlenitev imena države iz sporočila. Če je ime države v countryToPartitionMap, se bo vrnilo partitionId shranjene v Zemljevid. V nasprotnem primeru bo razpršil vrednost države in jo uporabil za izračun, na katero particijo naj gre.
  3. Mi kličemo zapri () da zaustavite razdelilnik. Uporaba te metode zagotavlja, da se med zaustavitvijo očistijo vsi viri, pridobljeni med inicializacijo.

Ko Kafka pokliče, upoštevajte configure (), bo proizvajalec Kafke vse lastnosti, ki smo jih konfigurirali za proizvajalca, prenesel na Pregradna stena razred. Bistveno je, da beremo samo tiste lastnosti, ki se začnejo predelne stene., jih razčlenite, da dobite partitionIdin shranite ID v countryToPartitionMap.

Spodaj je naša izvedba po meri Pregradna stena vmesnik.

Seznam 1. CountryPartitioner

 javni razred CountryPartitioner implementira Partitioner {private static Map countryToPartitionMap; javna void konfiguracija (konfiguracije zemljevidov) {System.out.println ("Inside CountryPartitioner.configure" + configs); countryToPartitionMap = nov HashMap (); za (Map.Entry vnos: configs.entrySet ()) {if (entry.getKey (). startWith ("particije.")) {String keyName = entry.getKey (); Vrednost niza = (niz) entry.getValue (); System.out.println (keyName.substring (11)); int paritionId = Integer.parseInt (keyName.substring (11)); countryToPartitionMap.put (value, paritionId); }}} javna int particija (tema niza, ključ objekta, bajt [] keyBytes, vrednost objekta, bajt [] valueBytes, grozd grozda) {Seznam particij = gruča.availablePartitionsForTopic (tema); String valueStr = (String) vrednost; Niz countryName = ((String) vrednost) .split (":") [0]; if (countryToPartitionMap.containsKey (countryName)) {// Če je država preslikana na določeno particijo, jo vrnite countryToPartitionMap.get (countryName); } else {// Če nobena država ni preslikana na določeno particijo, porazdeli med preostale particije int noOfPartitions = cluster.topics (). size (); vrnjena vrednost.hashCode ()% noOfPartitions + countryToPartitionMap.size (); }} javna void close () {}} 

The Producent razred v seznamu 2 (spodaj) je zelo podoben našemu preprostemu proizvajalcu iz 1. dela, pri čemer sta dve krepko označeni spremembi:

  1. Lastnost config nastavimo s ključem, ki je enak vrednosti ProducerConfig.PARTITIONER_CLASS_CONFIG, ki se ujema s polno kvalificiranim imenom našega CountryPartitioner razred. Tudi mi smo nastavili countryName do partitionId, s čimer preslikava lastnosti, ki jih želimo prenesti CountryPartitioner.
  2. Prenosimo primer razreda, ki izvaja org.apache.kafka.clients.producer.Callback vmesnik kot drugi argument produce.send () metoda. Stranka Kafke jo bo poklicala onCompletion () metoda, ko je sporočilo uspešno objavljeno, priloži a RecordMetadata predmet. S tem objektom bomo lahko ugotovili, na katero particijo je bilo poslano sporočilo, in odmik, dodeljen objavljenemu sporočilu.

Seznam 2. Razdeljeni proizvajalec

 javni razred Producer {zasebni statični skener v; public static void main (String [] argv) vrže izjemo {if (argv.length! = 1) {System.err.println ("Navedite 1 parameter"); System.exit (-1); } Niz temeName = argv [0]; in = nov optični bralnik (System.in); System.out.println ("Vnesite sporočilo (vnesite izhod za izhod)"); // Konfiguriranje lastnosti proizvajalca configProperties = nove lastnosti (); configProperties.put (ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost: 9092"); configProperties.put (ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer"); configProperties.put (ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");  configProperties.put (ProducerConfig.PARTITIONER_CLASS_CONFIG, CountryPartitioner.class.getCanonicalName ()); configProperties.put ("particija.1", "ZDA"); configProperties.put ("particija.2", "Indija");  org.apache.kafka.clients.producer.Producer proizvajalec = nov KafkaProducer (configProperties); Vrstica niza = in.nextLine (); while (! line.equals ("exit")) {ProducerRecord rec = novo ProducerRecord (imeName, null, vrstica); produce.send (rec, nov povratni klic () {javna void onCompletion (metapodatki RecordMetadata, izjema izjema) {System.out.println ("Sporočilo poslano v temo ->" + metadata.topic () + ", particija->" + metadata.partition () + "shranjeno pri offset->" + metadata.offset ()); ; }}); vrstica = in.nextLine (); } in.close (); produce.close (); }} 

Dodelitev particij potrošnikom

Strežnik Kafka zagotavlja, da je particija dodeljena samo enemu potrošniku, s čimer zagotavlja vrstni red porabe sporočil. Particijo lahko dodelite ročno ali pa jo dodelite samodejno.

Če vaša poslovna logika zahteva več nadzora, boste morali ročno dodeliti particije. V tem primeru bi uporabili KafkaConsumer.assign () da posreduje seznam razdelkov, ki jih zanima vsak potrošnik, strežniku Kakfa.

Samodejna dodelitev particij je privzeta in najpogostejša izbira. V tem primeru bo strežnik Kafka vsakemu potrošniku dodelil particijo in novim uporabnikom dodelil particije po meri.

Recimo, da ustvarjate novo temo s tremi particijami. Ko zaženete prvega potrošnika za novo temo, bo Kafka vse tri particije dodelil istemu potrošniku. Če nato zaženete drugega potrošnika, bo Kafka prerazporedil vse particije, tako da bo eni particiji dodelil prvega, preostali dve particiji pa drugemu. Če dodate tretjega potrošnika, bo Kafka znova prerazporedil particije, tako da bo vsakemu potrošniku dodeljena ena particija. Če zaženete četrtega in petega potrošnika, bodo imeli trije uporabniki dodeljeno particijo, drugi pa ne bodo prejeli nobenega sporočila. Če se ena od začetnih treh particij zniža, bo Kafka z isto logiko particioniranja dodal particijo tega potrošnika enemu od dodatnih potrošnikov.

$config[zx-auto] not found$config[zx-overlay] not found