Programiranje

Zasnovan za sproten čas: sporočanje velikih podatkov z Apachejem Kafko, 1. del

Ko se je začelo gibanje velikih podatkov, je bilo večinoma osredotočeno na serijsko obdelavo. Orodja za porazdeljeno shranjevanje podatkov in poizvedbe, kot so MapReduce, Hive in Pig, so bila zasnovana za obdelavo podatkov v serijah in ne neprekinjeno. Podjetja bi vsako noč izvajala več opravil za pridobivanje podatkov iz baze podatkov, nato analizirala, preoblikovala in na koncu shranila podatke. V zadnjem času so podjetja odkrila moč analiziranja in obdelave podatkov in dogodkov ko se zgodijo, ne le enkrat na nekaj ur. Večina tradicionalnih sistemov za sporočanje pa se v realnem času ne poveča za obdelavo velikih podatkov. Inženirji pri LinkedInu so zgradili Apache Kafka z odprtim virom: okvir za porazdeljeno sporočanje, ki ustreza potrebam po velikih podatkih s skaliranjem na osnovni strojni opremi.

V zadnjih nekaj letih se je pojavil Apache Kafka, ki je rešil različne primere uporabe. V najpreprostejšem primeru bi lahko šlo za preprost medpomnilnik za shranjevanje dnevnikov aplikacij. V kombinaciji s tehnologijo, kot je Spark Streaming, se lahko uporablja za sledenje spremembam podatkov in ukrepanje v zvezi s temi podatki, preden jih shrani na končni cilj. Kafkin način predvidevanja je močno orodje za odkrivanje prevar, na primer preverjanje veljavnosti transakcije s kreditno kartico, ko se zgodi, in ne čakanje na paketno obdelavo kasneje.

Ta dvodelna vadnica predstavlja Kafko, začenši s tem, kako jo namestiti in zagnati v svojem razvojnem okolju. Dobili boste pregled arhitekture Kafke, ki ji bo sledil uvod v razvijanje modrega sistema sporočil Apache Kafka. Na koncu boste zgradili aplikacijo proizvajalca / potrošnika po meri, ki pošilja in porablja sporočila prek strežnika Kafka. V drugi polovici vadnice boste izvedeli, kako razdeliti in razvrstiti sporočila ter kako nadzirati, katera sporočila bo porabil potrošnik Kafke.

Kaj je Apache Kafka?

Apache Kafka je sistem za pošiljanje sporočil, zasnovan tako, da meri za velike podatke. Podobno kot Apache ActiveMQ ali RabbitMq tudi Kafka omogoča, da aplikacije, zgrajene na različnih platformah, komunicirajo prek asinhronega prenosa sporočil. Toda Kafka se od teh bolj tradicionalnih sistemov za sporočanje razlikuje na ključne načine:

  • Zasnovan je za horizontalno lestvico z dodajanjem več blagovnih strežnikov.
  • Zagotavlja veliko večjo pretočnost tako za proizvajalce kot za potrošnike.
  • Uporablja se lahko za podporo paketnim primerom in primerom uporabe v realnem času.
  • Ne podpira JMS, Javinega API-ja za vmesno programsko opremo, usmerjenega v sporočila.

Arhitektura Apacheja Kafke

Preden bomo raziskali Kafkino arhitekturo, morate poznati njeno osnovno terminologijo:

  • A proizvajalec je postopek, ki lahko objavi sporočilo temi.
  • a potrošnik je postopek, ki se lahko naroči na eno ali več tem in porabi sporočila, objavljena na temah.
  • A kategorija teme je ime vira, v katerem so objavljena sporočila.
  • A posrednik je postopek, ki se izvaja na enem stroju.
  • A grozd je skupina posrednikov, ki sodelujejo.

Arhitektura Apache Kafke je zelo preprosta, kar lahko privede do boljše zmogljivosti in zmogljivosti v nekaterih sistemih. Vsaka tema v Kafki je kot preprosta dnevniška datoteka. Ko proizvajalec objavi sporočilo, ga strežnik Kafka doda na konec dnevniške datoteke za dano temo. Strežnik dodeli tudi odmik, ki je številka, ki se uporablja za trajno identifikacijo vsakega sporočila. Ko število sporočil raste, se vrednost vsakega odmika poveča; na primer, če proizvajalec objavi tri sporočila, lahko prvo dobi odmik 1, drugo odmik 2 in tretje odmik 3.

Ko se potrošnik Kafke prvič zažene, bo strežniku poslal zahtevo za vlečenje, v kateri bo zahteval, da pridobi katero koli sporočilo za določeno temo z vrednostjo odmika večjo od 0. Strežnik bo preveril datoteko dnevnika za to temo in vrnil tri nova sporočila . Potrošnik bo sporočil obdelal, nato pa z zamikom poslal zahtevo za sporočila višje kot 3 itd.

V Kafki je odjemalec odgovoren za zapomnitev števila odmikov in pridobivanje sporočil. Strežnik Kafka ne spremlja in ne upravlja porabe sporočil. Privzeto strežnik Kafka sporočilo hrani sedem dni. Niti v ozadju strežnika preveri in izbriše sporočila, ki so stara sedem dni ali več. Potrošnik lahko dostopa do sporočil, dokler so na strežniku. Sporočilo lahko prebere večkrat in celo prebere sporočila v obratnem vrstnem redu prejema. Če pa potrošnik sporočila ne uspe pridobiti, preden poteče sedem dni, ga bo pogrešal.

Merila Kafka

Proizvodna uporaba LinkedIna in drugih podjetij je pokazala, da lahko Apache Kafka z ustrezno konfiguracijo obdela dnevno stotine gigabajtov podatkov. Leta 2011 so trije inženirji LinkedIn s testiranjem primerjalnih preizkusov dokazali, da lahko Kafka doseže veliko večjo pretočnost kot ActiveMQ in RabbitMQ.

Hitra namestitev in predstavitev Apache Kafka

V tej vadnici bomo sestavili aplikacijo po meri, vendar začnimo z namestitvijo in preizkušanjem primerka Kafke z zunanjim proizvajalcem in potrošnikom.

  1. Obiščite stran za prenos Kafke, da namestite najnovejšo različico (0.9 od tega pisanja).
  2. Izvlecite binarne datoteke v programska oprema / kafka mapo. Za trenutno različico je programska oprema / kafka_2.11-0.9.0.0.
  3. Spremenite svoj trenutni imenik tako, da kaže na novo mapo.
  4. Zaženite strežnik Zookeeper z izvajanjem ukaza: bin / zookeeper-server-start.sh config / zookeeper.properties.
  5. Zaženite strežnik Kafka z izvajanjem: bin / kafka-server-start.sh config / server.properties.
  6. Ustvarite preskusno temo, ki jo lahko uporabite za testiranje: bin / kafka-topics.sh --create --zookeeper localhost: 2181 - faktor replikacije 1 - particije 1 --top javaworld.
  7. Zaženite preprostega potrošnika konzole, ki lahko porabi sporočila, objavljena na določeno temo, na primer javaworld: bin / kafka-console-consumer.sh --zookeeper localhost: 2181 --top javaworld --od začetka.
  8. Zaženite preprosto konzolo proizvajalca, ki lahko objavi sporočila na preskusno temo: bin / kafka-console-producer.sh --broker-list localhost: 9092 --topic javaworld.
  9. Poskusite vtipkati eno ali dve sporočili v konzolo proizvajalca. Vaša sporočila naj se prikažejo v potrošniški konzoli.

Primer aplikacije z Apachejem Kafko

Videli ste, kako deluje Apache Kafka. Nato razvijemo aplikacijo proizvajalca / potrošnika po meri. Proizvajalec bo s konzole pridobil uporabniške vnose in vsako novo vrstico poslal kot sporočilo strežniku Kafka. Potrošnik bo poiskal sporočila za določeno temo in jih natisnil na konzolo. V tem primeru so komponente proizvajalca in potrošnika vaše lastne izvedbe kafka-console-producer.sh in kafka-console-consumer.sh.

Začnimo z ustvarjanjem Producer.java razred. Ta odjemalski razred vsebuje logiko za branje uporabniškega vnosa s konzole in pošiljanje tega vnosa kot sporočila strežniku Kafka.

Proizvajalca konfiguriramo tako, da iz datoteke java.util.Lastnosti razreda in nastavitev njegovih lastnosti. Razred ProducerConfig definira vse različne razpoložljive lastnosti, vendar privzete vrednosti Kafke zadostujejo za večino uporab. Za privzeto konfiguracijo moramo nastaviti le tri obvezne lastnosti:

  • BOOTSTRAP_SERVERS_CONFIG
  • KEY_SERIALIZER_CLASS_CONFIG
  • VALUE_SERIALIZER_CLASS_CONFIG

BOOTSTRAP_SERVERS_CONFIG (bootstrap.servers) nastavi seznam parov gostitelj: vrata, ki se uporabljajo za vzpostavljanje začetnih povezav do gruče Kakfa v gostitelj1: vrata1, gostitelj2: vrata2, ... format. Tudi če imamo v svoji skupini Kafka več kot enega posrednika, moramo določiti le vrednost prvega posrednika gostitelj: vrata. Odjemalec Kafka bo to vrednost uporabil za odkritje klica posrednika, ki bo vrnil seznam vseh posrednikov v grozdu. Dobro je, da v. Navedete več kot enega posrednika BOOTSTRAP_SERVERS_CONFIG, tako da bo stranka, če ta prvi posrednik ne deluje, lahko preizkusila druge posrednike.

Strežnik Kafka pričakuje sporočila v bajt [] ključ, bajt [] vrednost format. Namesto pretvorbe vsakega ključa in vrednosti nam Kafkina odjemalska knjižnica omogoča uporabo prijaznejših vrst, kot je Vrvica in int za pošiljanje sporočil. Knjižnica jih bo pretvorila v ustrezen tip. Primer vzorčne aplikacije nima ključa za sporočilo, zato bomo uporabili nič za ključ. Za vrednost bomo uporabili a Vrvica, kar so podatki, ki jih uporabnik vnese na konzolo.

Če želite konfigurirati tipko za sporočila, smo nastavili vrednost KEY_SERIALIZER_CLASS_CONFIG na org.apache.kafka.common.serialization.ByteArraySerializer. To deluje, ker nič ni treba pretvoriti v bajt []. Za vrednost sporočila, smo nastavili VALUE_SERIALIZER_CLASS_CONFIG na org.apache.kafka.common.serialization.StringSerializer, ker ta razred ve, kako pretvoriti a Vrvica v a bajt [].

Predmeti po meri / vrednosti po meri

Podoben StringSerializer, Kafka ponuja serializatorje za druge primitive, kot je int in dolga. Če želite uporabiti predmet po meri za svoj ključ ali vrednost, bi morali ustvariti razred, ki izvaja org.apache.kafka.common.serialization.Serializer. Nato bi lahko dodali logiko za serializacijo razreda bajt []. V svoji potrošniški kodi bi morali uporabiti tudi ustrezen deserializator.

Proizvajalec Kafka

Po polnjenju Lastnosti razreda z potrebnimi konfiguracijskimi lastnostmi, ga lahko uporabimo za ustvarjanje predmeta KafkaProducer. Kadar koli po tem želimo poslati sporočilo na strežnik Kafka, bomo ustvarili objekt ProducerRecord in pokličite KafkaProducerje pošlji () s tem zapisom poslati sporočilo. The ProducerRecord ima dva parametra: ime teme, v katero naj bo objavljeno sporočilo, in dejansko sporočilo. Ne pozabite poklicati Producer.close () metoda, ko končate z uporabo producenta:

Seznam 1. KafkaProducer

 javni razred Producer {zasebni statični skener v; public static void main (String [] argv) vrže izjemo {if (argv.length! = 1) {System.err.println ("Navedite 1 parameter"); System.exit (-1); } Niz temeName = argv [0]; in = nov optični bralnik (System.in); System.out.println ("Vnesite sporočilo (vnesite izhod za izhod)"); // Konfiguriranje lastnosti proizvajalca configProperties = nove lastnosti (); configProperties.put (ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost: 9092"); configProperties.put (ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer"); configProperties.put (ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); org.apache.kafka.clients.producer.Producer proizvajalec = nov KafkaProducer (configProperties); Vrstica niza = in.nextLine (); while (! line.equals ("exit")) {ProducerRecord rec = novo ProducerRecord (imeName, vrstica); produce.send (rec); vrstica = in.nextLine (); } in.close (); produce.close (); }} 

Konfiguriranje potrošnika sporočila

Nato bomo ustvarili preprostega potrošnika, ki se bo naročil na temo. Kadar koli je novo sporočilo objavljeno v temi, ga bo prebralo in natisnilo na konzolo. Potrošniška koda je precej podobna kodi proizvajalca. Začnemo z ustvarjanjem predmeta java.util.Lastnosti, nastavitev njegovih lastnosti, specifičnih za potrošnika, in nato uporaba za ustvarjanje novega predmeta KafkaConsumer. Razred ConsumerConfig definira vse lastnosti, ki jih lahko nastavimo. Obstajajo samo štiri obvezne lastnosti:

  • BOOTSTRAP_SERVERS_CONFIG (bootstrap.servers)
  • KEY_DESERIALIZER_CLASS_CONFIG (key.deserializer)
  • VALUE_DESERIALIZER_CLASS_CONFIG (value.deserializer)
  • GROUP_ID_CONFIG (bootstrap.servers)

Tako kot smo to storili za produkcijski razred, bomo uporabili BOOTSTRAP_SERVERS_CONFIG za konfiguriranje parov gostitelj / vrata za potrošniški razred. Ta konfiguracija nam omogoča vzpostavitev začetnih povezav z grozdom Kakfa v gostitelj1: vrata1, gostitelj2: vrata2, ... format.

Kot sem že omenil, strežnik Kafka pričakuje sporočila v bajt [] tipko in bajt [] vrednosti in ima lastno izvedbo za serializacijo različnih vrst v bajt []. Tako kot pri proizvajalcu bomo tudi za potrošnika za pretvorbo morali uporabiti prilagojevalnik deserializatorjev bajt [] nazaj v ustrezen tip.

V primeru primera vemo, da proizvajalec uporablja ByteArraySerializer za ključ in StringSerializer za vrednost. Na strani stranke moramo torej uporabljati org.apache.kafka.common.serialization.ByteArrayDeserializer za ključ in org.apache.kafka.common.serialization.StringDeserializer za vrednost. Nastavitev teh razredov kot vrednosti za KEY_DESERIALIZER_CLASS_CONFIG in VALUE_DESERIALIZER_CLASS_CONFIG bo potrošniku omogočil deserializacijo bajt [] kodirane vrste, ki jih pošlje proizvajalec.

Končno moramo nastaviti vrednost GROUP_ID_CONFIG. To bi moralo biti ime skupine v obliki niza. Več o tej konfiguraciji bom razložil čez minuto. Za zdaj si oglejte samo potrošnika Kafke s štirimi obveznimi lastnostmi:

$config[zx-auto] not found$config[zx-overlay] not found