V prvi polovici tega uvoda JavaWorld v Apache Kafka ste z uporabo Kafke razvili nekaj manjših aplikacij za proizvajalce / potrošnike. Iz teh vaj bi se morali seznaniti z osnovami sistema sporočil Apache Kafka. V tej drugi polovici boste izvedeli, kako uporabljati particije za porazdelitev obremenitve in vodoravno lestvico aplikacije, pri čemer lahko dnevno obdelate do milijone sporočil. Izvedeli boste tudi, kako Kafka uporablja odmike sporočil za sledenje in upravljanje zapletene obdelave sporočil ter kako zaščititi svoj sistem sporočil Apache Kafka pred okvaro, če potrošnik pade. Primer aplikacije iz 1. dela bomo razvili tako za primere uporabe objavi-naroči kot tudi od točke do točke.
Predelne stene v Apache Kafki
Teme v Kafki lahko razdelimo na particije. Na primer, med ustvarjanjem teme z imenom Demo lahko nastavite, da ima tri particije. Strežnik bi ustvaril tri dnevniške datoteke, po eno za vsako predstavitveno particijo. Ko je proizvajalec temo objavil sporočilo, bi temu sporočilu dodelil ID particije. Strežnik bi nato sporočilu dodal datoteko dnevnika samo za to particijo.
Če ste nato zagnali dva porabnika, lahko strežnik prvemu porabniku dodeli particije 1 in 2, drugemu pa particijo 3. Vsak potrošnik bi bral samo z dodeljenih particij. Na sliki 1 lahko vidite predstavitveno temo, konfigurirano za tri particije.
Če želite razširiti scenarij, si predstavljajte grozd Kafka z dvema posrednikoma, nameščenima v dveh strojih. Ko ste razdelili predstavitveno temo, bi jo nastavili tako, da ima dve particiji in dve kopiji. Za to vrsto konfiguracije bi strežnik Kafka dodelil dve particiji dvema posrednikoma v vaši gruči. Vsak posrednik bi bil vodja ene od particij.
Ko je producent objavil sporočilo, je šel do vodje razdelka. Vodja je vzel sporočilo in ga dodal dnevniški datoteki na lokalnem računalniku. Drugi posrednik bi ta dnevnik zapisov pasivno kopiral na svoj računalnik. Če bi vodja razdelka padel, bi drugi posrednik postal novi vodja in začel posluževati zahteve strank. Na enak način, ko potrošnik pošlje zahtevo na particijo, gre ta zahteva najprej vodji particije, ta pa vrne zahtevana sporočila.
Prednosti razdelitve
Upoštevajte prednosti razdelitve sistema sporočil na osnovi Kafke:
- Razširljivost: V sistemu z samo eno particijo so sporočila, objavljena v temi, shranjena v dnevniški datoteki, ki obstaja na enem računalniku. Število sporočil za temo mora ustrezati eni sami dnevniški datoteki odobritve, velikost shranjenih sporočil pa ne sme biti večja od prostora na disku te naprave. Z razdelitvijo teme lahko prilagodite sistem tako, da shranite sporočila na različne stroje v gruči. Če ste na primer želeli shraniti 30 gigabajtov (GB) sporočil za predstavitveno temo, lahko zgradite gručo Kafka iz treh naprav, vsaka z 10 GB prostora na disku. Nato bi temo nastavili tako, da bo imela tri particije.
- Uravnavanje obremenitve strežnika: Če imate več particij, lahko razširjate zahteve po sporočilih med posredniki. Če ste imeli na primer temo, ki je obdelala milijon sporočil na sekundo, jo lahko razdelite na 100 particij in v svojo gručo dodate 100 posrednikov. Vsak posrednik bi bil vodja posamezne particije, odgovoren za odziv na samo 10.000 zahtev strank na sekundo.
- Izravnava potrošniške obremenitve: Podobno kot uravnoteženje obremenitve strežnika tudi gostovanje več potrošnikov na različnih strojih omogoča širjenje potrošniške obremenitve. Recimo, da ste želeli porabiti milijon sporočil na sekundo iz teme s 100 particijami. Lahko ustvarite 100 potrošnikov in jih vzporedno vodite. Strežnik Kafka bi vsakemu potrošniku dodelil eno particijo, vsak potrošnik pa bi vzporedno obdelal 10.000 sporočil. Ker Kafka vsako particijo dodeli le enemu potrošniku, bi bilo znotraj particije vsako sporočilo porabljeno po vrstnem redu.
Dva načina ločevanja
Producent je odgovoren za odločitev, na katero particijo bo šlo sporočilo. Producent ima dve možnosti za nadzor te naloge:
- Po meri particija: Lahko ustvarite razred, ki izvaja
org.apache.kafka.clients.producer.Partitioner
vmesnik. Ta navadaPregradna stena
bo implementiral poslovno logiko, da se bo odločil, kam bodo poslana sporočila. - DefaultPartitioner: Če ne ustvarite razreda razdelilnika po meri, potem je privzeto
org.apache.kafka.clients.producer.internals.DefaultPartitioner
razred bo uporabljen. Privzeti razdelilnik je v večini primerov dovolj dober in ponuja tri možnosti:- Priročnik: Ko ustvarite datoteko
ProducerRecord
, uporabite preobremenjeni konstruktornew ProducerRecord (ime teme, particijaId, sporočiloKey, sporočilo)
da določite ID particije. - Razprševanje (občutljivo na lokacijo): Ko ustvarite datoteko
ProducerRecord
, navedite amessageKey
, s klicemnew ProducerRecord (ime teme, sporočiloKey, sporočilo)
.DefaultPartitioner
bo z zgoščenostjo ključa zagotovil, da bodo vsa sporočila istega ključa namenjena istemu proizvajalcu. To je najlažji in najpogostejši pristop. - Škropljenje (naključno uravnoteženje obremenitve): Če ne želite nadzorovati, na katero sporočilo particije gredo, preprosto pokličite
new ProducerRecord (ime teme, sporočilo)
da ustvarite svojProducerRecord
. V tem primeru bo razdelilnik pošiljal sporočila na vse particije na način, ki zagotavlja uravnoteženo obremenitev strežnika.
- Priročnik: Ko ustvarite datoteko
Razdelitev aplikacije Apache Kafka
Za preprost primer proizvajalec / potrošnik v 1. delu smo uporabili a DefaultPartitioner
. Zdaj bomo namesto tega poskusili ustvariti particijo po meri. Za ta primer predpostavimo, da imamo spletno mesto za prodajo na drobno, s katerim lahko potrošniki naročajo izdelke kjer koli po svetu. Glede na uporabo vemo, da je večina potrošnikov v ZDA ali Indiji. Našo aplikacijo želimo razdeliti na pošiljanje naročil iz ZDA ali Indije njihovim lastnim potrošnikom, medtem ko bodo naročila od koder koli prejemala tretji potrošniki.
Za začetek bomo ustvarili CountryPartitioner
ki izvaja org.apache.kafka.clients.producer.Partitioner
vmesnik. Izvesti moramo naslednje metode:
- Kafka bo poklical configure () ko inicializiramo
Pregradna stena
razred, z aZemljevid
lastnosti konfiguracije. Ta metoda inicializira funkcije, specifične za poslovno logiko aplikacije, na primer povezavo z bazo podatkov. V tem primeru želimo dokaj splošen razdelilnik, ki ga potrebujecountryName
kot lastnina. Nato lahko uporabimoconfigProperties.put ("partitions.0", "USA")
preslikati tok sporočil na particije. V prihodnosti lahko s to obliko spreminjamo, katere države dobijo lastno particijo. - The
Producent
Klici API particija () enkrat za vsako sporočilo. V tem primeru ga bomo uporabili za branje sporočila in razčlenitev imena države iz sporočila. Če je ime države vcountryToPartitionMap
, se bo vrnilopartitionId
shranjene vZemljevid
. V nasprotnem primeru bo razpršil vrednost države in jo uporabil za izračun, na katero particijo naj gre. - Mi kličemo zapri () da zaustavite razdelilnik. Uporaba te metode zagotavlja, da se med zaustavitvijo očistijo vsi viri, pridobljeni med inicializacijo.
Ko Kafka pokliče, upoštevajte configure ()
, bo proizvajalec Kafke vse lastnosti, ki smo jih konfigurirali za proizvajalca, prenesel na Pregradna stena
razred. Bistveno je, da beremo samo tiste lastnosti, ki se začnejo predelne stene.
, jih razčlenite, da dobite partitionId
in shranite ID v countryToPartitionMap
.
Spodaj je naša izvedba po meri Pregradna stena
vmesnik.
Seznam 1. CountryPartitioner
javni razred CountryPartitioner implementira Partitioner {private static Map countryToPartitionMap; javna void konfiguracija (konfiguracije zemljevidov) {System.out.println ("Inside CountryPartitioner.configure" + configs); countryToPartitionMap = nov HashMap (); za (Map.Entry vnos: configs.entrySet ()) {if (entry.getKey (). startWith ("particije.")) {String keyName = entry.getKey (); Vrednost niza = (niz) entry.getValue (); System.out.println (keyName.substring (11)); int paritionId = Integer.parseInt (keyName.substring (11)); countryToPartitionMap.put (value, paritionId); }}} javna int particija (tema niza, ključ objekta, bajt [] keyBytes, vrednost objekta, bajt [] valueBytes, grozd grozda) {Seznam particij = gruča.availablePartitionsForTopic (tema); String valueStr = (String) vrednost; Niz countryName = ((String) vrednost) .split (":") [0]; if (countryToPartitionMap.containsKey (countryName)) {// Če je država preslikana na določeno particijo, jo vrnite countryToPartitionMap.get (countryName); } else {// Če nobena država ni preslikana na določeno particijo, porazdeli med preostale particije int noOfPartitions = cluster.topics (). size (); vrnjena vrednost.hashCode ()% noOfPartitions + countryToPartitionMap.size (); }} javna void close () {}}
The Producent
razred v seznamu 2 (spodaj) je zelo podoben našemu preprostemu proizvajalcu iz 1. dela, pri čemer sta dve krepko označeni spremembi:
- Lastnost config nastavimo s ključem, ki je enak vrednosti
ProducerConfig.PARTITIONER_CLASS_CONFIG
, ki se ujema s polno kvalificiranim imenom našegaCountryPartitioner
razred. Tudi mi smo nastavilicountryName
dopartitionId
, s čimer preslikava lastnosti, ki jih želimo prenestiCountryPartitioner
. - Prenosimo primer razreda, ki izvaja
org.apache.kafka.clients.producer.Callback
vmesnik kot drugi argumentproduce.send ()
metoda. Stranka Kafke jo bo poklicalaonCompletion ()
metoda, ko je sporočilo uspešno objavljeno, priloži aRecordMetadata
predmet. S tem objektom bomo lahko ugotovili, na katero particijo je bilo poslano sporočilo, in odmik, dodeljen objavljenemu sporočilu.
Seznam 2. Razdeljeni proizvajalec
javni razred Producer {zasebni statični skener v; public static void main (String [] argv) vrže izjemo {if (argv.length! = 1) {System.err.println ("Navedite 1 parameter"); System.exit (-1); } Niz temeName = argv [0]; in = nov optični bralnik (System.in); System.out.println ("Vnesite sporočilo (vnesite izhod za izhod)"); // Konfiguriranje lastnosti proizvajalca configProperties = nove lastnosti (); configProperties.put (ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost: 9092"); configProperties.put (ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer"); configProperties.put (ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); configProperties.put (ProducerConfig.PARTITIONER_CLASS_CONFIG, CountryPartitioner.class.getCanonicalName ()); configProperties.put ("particija.1", "ZDA"); configProperties.put ("particija.2", "Indija"); org.apache.kafka.clients.producer.Producer proizvajalec = nov KafkaProducer (configProperties); Vrstica niza = in.nextLine (); while (! line.equals ("exit")) {ProducerRecord rec = novo ProducerRecord (imeName, null, vrstica); produce.send (rec, nov povratni klic () {javna void onCompletion (metapodatki RecordMetadata, izjema izjema) {System.out.println ("Sporočilo poslano v temo ->" + metadata.topic () + ", particija->" + metadata.partition () + "shranjeno pri offset->" + metadata.offset ()); ; }}); vrstica = in.nextLine (); } in.close (); produce.close (); }}
Dodelitev particij potrošnikom
Strežnik Kafka zagotavlja, da je particija dodeljena samo enemu potrošniku, s čimer zagotavlja vrstni red porabe sporočil. Particijo lahko dodelite ročno ali pa jo dodelite samodejno.
Če vaša poslovna logika zahteva več nadzora, boste morali ročno dodeliti particije. V tem primeru bi uporabili KafkaConsumer.assign ()
da posreduje seznam razdelkov, ki jih zanima vsak potrošnik, strežniku Kakfa.
Samodejna dodelitev particij je privzeta in najpogostejša izbira. V tem primeru bo strežnik Kafka vsakemu potrošniku dodelil particijo in novim uporabnikom dodelil particije po meri.
Recimo, da ustvarjate novo temo s tremi particijami. Ko zaženete prvega potrošnika za novo temo, bo Kafka vse tri particije dodelil istemu potrošniku. Če nato zaženete drugega potrošnika, bo Kafka prerazporedil vse particije, tako da bo eni particiji dodelil prvega, preostali dve particiji pa drugemu. Če dodate tretjega potrošnika, bo Kafka znova prerazporedil particije, tako da bo vsakemu potrošniku dodeljena ena particija. Če zaženete četrtega in petega potrošnika, bodo imeli trije uporabniki dodeljeno particijo, drugi pa ne bodo prejeli nobenega sporočila. Če se ena od začetnih treh particij zniža, bo Kafka z isto logiko particioniranja dodal particijo tega potrošnika enemu od dodatnih potrošnikov.