Programiranje

Kako z Apache Flink zgraditi aplikacije za pretakanje s stanjem

Fabian Hueske je zavezanec in član PMC projekta Apache Flink in soustanovitelj Data Artisans.

Apache Flink je okvir za izvajanje aplikacij za obdelavo tokov s stanjem in njihovo izvajanje v obsegu na računski gruči. V prejšnjem članku smo preučili, kaj je obdelava tokov s stanjem, katere primere uporabe obravnava in zakaj bi morali svoje programe za pretakanje implementirati in zagnati z Apache Flink.

V tem članku bom predstavil primere dveh pogostih primerov obdelave tokov s stanjem in razpravljal o tem, kako ju je mogoče uporabiti s Flinkom. Prvi primer uporabe so aplikacije, ki jih vodijo dogodki, tj. Aplikacije, ki zaužijejo neprekinjene tokove dogodkov in na njih uporabljajo nekaj poslovne logike. Drugi je primer uporabe pretočne analitike, kjer bom predstavil dve analitični poizvedbi, izvedeni s Flink-ovim API-jem SQL, ki v realnem času združijo pretočne podatke. Pri podjetju Data Artisans izvorno kodo vseh naših primerov ponujamo v javnem skladišču GitHub.

Preden se poglobimo v podrobnosti primerov, vam predstavim tok dogodkov, ki ga prevzamejo primeri aplikacij, in razložim, kako lahko zaženete kodo, ki jo nudimo.

Tok prireditev s taksijem

Naši primeri temeljijo na javnem nizu podatkov o vožnjah s taksijem, ki so se zgodile v New Yorku leta 2013. Organizatorji DEBS (ACM International Conference on Distributed Event-Based Systems) 2015 so preuredili prvotni nabor podatkov in ga pretvorili v ena datoteka CSV, iz katere beremo naslednjih devet polj.

  • Medaljon - MD5 vsota taksija
  • Hack_license - ID vsote MD5 dovoljenja za taksi
  • Pickup_datetime - čas prevzema potnikov
  • Dropoff_datetime - čas, ko so potniki odložili
  • Pickup_ Longitude - zemljepisna dolžina mesta prevzema
  • Pickup_latitude - zemljepisna širina mesta prevzema
  • Dropoff_longitude - dolžina mesta odlaganja
  • Dropoff_latitude - zemljepisna širina mesta odlaganja
  • Total_amount - skupno plačano v dolarjih

Datoteka CSV zapise shrani v naraščajočem vrstnem redu njihovega atributa čas odlaganja. Tako lahko datoteko obravnavamo kot urejeni dnevnik dogodkov, ki so bili objavljeni ob koncu potovanja. Če želite zagnati primere, ki jih ponujamo na GitHub, morate iz Googla Drive prenesti nabor podatkov izziva DEBS.

Vsi primeri aplikacij zaporedno berejo datoteko CSV in jo zaužijejo kot tok dogodkov vožnje s taksijem. Od takrat naprej aplikacije obdelujejo dogodke tako kot kateri koli drug tok, tj. Kot tok, ki ga prevzame sistem za objavo in naročanje, ki temelji na dnevniku, kot sta Apache Kafka ali Kinesis. Pravzaprav je branje datoteke (ali katere koli druge vrste trajnih podatkov) in obravnava kot tok temelj Flinkovega pristopa k poenotenju obdelave paketov in tokov.

Zagon primerov Flink

Kot smo že omenili, smo izvorno kodo naših primerov aplikacij objavili v skladišču GitHub. Priporočamo vam, da shrambo razstavite in klonirate. Primere lahko enostavno izvedete znotraj izbranega IDE; za njihovo zagon vam ni treba nastaviti in konfigurirati grozda Flink. Najprej uvozite izvorno kodo primerov kot projekt Maven. Nato izvedite glavni razred aplikacije in kot programski parameter navedite mesto shranjevanja podatkovne datoteke (glejte zgoraj povezavo za prenos podatkov).

Ko zaženete aplikacijo, bo zagnala lokalni, vdelani primerek Flink znotraj procesa JVM aplikacije in oddala aplikacijo za njegovo izvajanje. Med zagonom Flink in razporejanjem nalog opravila boste videli kopico zapisov dnevnika. Ko se aplikacija zažene, se njen izpis zapiše v standardni izhod.

Izdelava aplikacije, ki jo vodijo dogodki v Flinku

Zdaj pa razpravljajmo o našem prvem primeru uporabe, ki je aplikacija, ki temelji na dogodkih. Aplikacije, ki jih vodijo dogodki, prevzamejo tokove dogodkov, izvajajo izračune, ko so dogodki prejeti, in lahko oddajajo nove dogodke ali sprožijo zunanja dejanja. Več aplikacij, ki jih vodijo dogodki, je mogoče sestaviti tako, da jih povežete prek sistemov dnevnika dogodkov, podobno kot lahko velike sisteme sestavljajo mikro storitve. Aplikacije, dnevniki dogodkov in posnetki stanja aplikacij, ki jih vodijo dogodki (v Flinku so znane kot točke shranjevanja), vsebujejo zelo močan vzorec načrtovanja, saj lahko ponastavite njihovo stanje in predvajate njihov vnos, da se obnovijo po okvari, odpravijo napako ali preselijo aplikacijo v drugo skupino.

V tem članku bomo preučili aplikacijsko vodeno aplikacijo, ki podpira storitev, ki nadzira delovni čas taksistov. Leta 2016 se je komisija za taksije in limuzine v New Yorku odločila, da bo delovni čas taksistov omejila na 12-urne izmene in zahtevala vsaj osemurni odmor pred začetkom naslednje izmene. Izmena se začne z začetkom prve vožnje. Od takrat naprej lahko voznik nove vožnje začne v roku 12 ur. Naša aplikacija spremlja vožnje voznikov, označuje končni čas njihovega 12-urnega okna (tj. Čas, ko lahko začnejo zadnjo vožnjo) in označuje vožnje, ki kršijo predpis. Celotno izvorno kodo tega primera najdete v našem skladišču GitHub.

Naša aplikacija je implementirana s Flink-ovim API-jem DataStream in a KeyedProcessFunction. API DataStream je funkcionalen API in temelji na konceptu tipiziranih podatkovnih tokov. A DataStream je logična predstavitev toka dogodkov tipa T. Tok se obdela z uporabo funkcije, ki ustvari drug podatkovni tok, morda druge vrste. Flink vzporedno obdeluje tokove z distribucijo dogodkov na pretočne particije in za vsako particijo uporablja različne primerke funkcij.

Naslednji delček kode prikazuje pretok naše aplikacije za spremljanje na visoki ravni.

// zaužijemo tok voženj s taksijem.

DataStream vožnja = TaxiRides.getRides (env, inputPath);

DataStream obvestila = vožnja

// tok particije z ID-jem vozniškega dovoljenja

.keyBy (r -> r.licenseId)

// spremljanje dogodkov vožnje in ustvarjanje obvestil

.process (nov MonitorWorkTime ());

// tiskanje obvestil

notifications.print ();

Aplikacija začne vnašati tok dogodkov vožnje s taksijem. V našem primeru so dogodki prebrani iz besedilne datoteke, razčlenjeni in shranjeni v TaxiRide POJO predmeti. Resnična aplikacija običajno zajema dogodke iz čakalne vrste sporočil ali dnevnika dogodkov, na primer Apache Kafka ali Pravega. Naslednji korak je vnos TaxiRide dogodki s strani licenceId voznika. The keyBy operacija razdeli tok na deklarirano polje, tako da vse dogodke z istim ključem obdela isti vzporedni primerek naslednje funkcije. V našem primeru ločimo na licenceId polje, ker želimo spremljati delovni čas vsakega posameznega voznika.

Nato uporabimo MonitorWorkTime funkcija na razdeljeni TaxiRide dogodkov. Funkcija sledi vožnji na voznika in spremlja njihove izmene in čas odmora. Oddaja dogodke tipa Tuple2, kjer vsak nabor predstavlja obvestilo, sestavljeno iz ID-ja voznika in sporočila. Na koncu naša aplikacija odda sporočila tako, da jih natisne na standardni izhod. Resnična aplikacija bi obvestila zapisala v zunanje sporočilo ali sistem za shranjevanje, kot je Apache Kafka, HDFS ali sistem zbirke podatkov, ali pa bi sprožila zunanji klic, ki bi jih takoj potisnil ven.

Zdaj, ko smo razpravljali o celotnem toku aplikacije, si oglejmo MonitorWorkTime funkcija, ki vsebuje večino dejanske poslovne logike aplikacije. The MonitorWorkTime funkcija je stanje KeyedProcessFunction ki jih zaužije TaxiRide dogodke in oddaje Tuple2 zapisov. The KeyedProcessFunction Vmesnik ima dva načina za obdelavo podatkov: processElement () in onTimer (). The processElement () za vsak prihodnji dogodek. The onTimer () metoda se pokliče, ko se sproži predhodno registriran časovnik. Naslednji delček prikazuje okostje MonitorWorkTime in vse, kar je prijavljeno zunaj načinov obdelave.

javni statični razred MonitorWorkTime

razširi KeyedProcessFunction {

// časovne konstante v milisekundah

zasebni statični končni dolgi ALLOWED_WORK_TIME = 12 * 60 * 60 * 1000; // 12 ur

zasebni statični končni dolg REQ_BREAK_TIME = 8 * 60 * 60 * 1000; // 8 ur

zasebni statični končni dolg CLEAN_UP_INTERVAL = 28 * 60 * 60 * 1000; // 24 ur

zasebni prehodni oblikovalnik DateTimeFormatter;

// državni ročaj za shranjevanje začetnega časa izmene

ValueState shiftStart;

@Override

javna praznina odprta (konfiguracija konfiguracije) {

// register stanja registra

shiftStart = getRuntimeContext (). getState (

novi ValueStateDescriptor (“shiftStart”, Types.LONG));

// inicializiramo oblikovalnik časa

this.formatter = DateTimeFormat.forPattern (“llll-MM-dd HH: mm: ss”);

  }

// processElement () in onTimer () so podrobno obravnavani spodaj.

}

Funkcija razglasi nekaj konstant za časovne intervale v milisekundah, oblikovalnik časa in ročaj stanja za stanje ključa, ki ga upravlja Flink. Upravljano stanje se občasno preverja in samodejno obnavlja v primeru okvare. Stanje ključa je organizirano na ključ, kar pomeni, da bo funkcija ohranila eno vrednost na ročaj in ključ. V našem primeru je MonitorWorkTime funkcija vzdržuje a dolga vrednost za vsak ključ, torej za vsako licenceId. The shiftStart država shrani čas začetka voznikove izmene. Ročaj stanja je inicializiran v odprto() metoda, ki se pokliče enkrat pred obdelavo prvega dogodka.

Zdaj pa si oglejmo processElement () metoda.

@Override

public void processElement (

Vožnja s taksijem,

Kontekst ctx,

Zbiralec out) vrže izjemo {

// poiščemo čas začetka zadnje izmene

Dolgi zagoni = shiftStart.value ();

if (startTs == null ||

startTs <ride.pickUpTime - (ALLOWED_WORK_TIME + REQ_BREAK_TIME)) {

// to je prva vožnja v novi izmeni.

startTs = ride.pickUpTime;

shiftStart.update (startTs);

long endTs = startTs + ALLOWED_WORK_TIME;

out.collect (Tuple2.of (ride.licenseId,

"Sprejemate lahko nove potnike do" + formatter.print (endTs)));

// registriraj časovnik za čiščenje stanja v 24 urah

ctx.timerService (). registerEventTimeTimer (startTs + CLEAN_UP_INTERVAL);

} sicer če (startTs <ride.pickUpTime - ALLOWED_WORK_TIME) {

// ta vožnja se je začela po koncu dovoljenega delovnega časa.

// gre za kršitev predpisov!

out.collect (Tuple2.of (ride.licenseId,

"Ta vožnja je kršila predpise o delovnem času."));

  }

}

The processElement () metoda se zahteva za vsakega TaxiRide dogodek. Najprej metoda izvleče začetni čas voznikovega premika iz držala stanja. Če država ne vsebuje začetnega časa (startTs == null) ali če se je zadnja izmena začela več kot 20 ur (ALLOWED_WORK_TIME + REQ_BREAK_TIME) prej kot trenutna vožnja, je trenutna vožnja prva vožnja v novi izmeni. V obeh primerih funkcija začne novo prestavo s posodobitvijo začetnega časa izmene na čas začetka trenutne vožnje, vozniku odda sporočilo s časom konca nove izmene in registrira časovnik za čiščenje stanje v 24 urah.

Če trenutna vožnja ni prva vožnja v novi izmeni, funkcija preveri, ali krši predpis o delovnem času, tj. Ali se je začela več kot 12 ur pozneje od začetka trenutne voznikove izmene. V tem primeru funkcija odda sporočilo, s katerim voznika obvesti o kršitvi.

The processElement () metoda MonitorWorkTime funkcija registrira časovnik za čiščenje stanja 24 ur po začetku izmene. Odstranjevanje stanja, ki ni več potrebno, je pomembno za preprečevanje naraščanja velikosti držav zaradi stanja puščanja. Časovnik se sproži, ko čas aplikacije preide časovni žig časovnika. Takrat je onTimer () metoda se imenuje. Podobno kot stanje se časovniki vzdržujejo na ključ in funkcija se postavi v kontekst povezanega ključa pred onTimer () metoda se imenuje. Zato je ves državni dostop usmerjen na ključ, ki je bil aktiven, ko je bil časovnik registriran.

Oglejmo si onTimer () metoda MonitorWorkTime.

@Override

javna void onTimer (

dolgi merilniki časa,

OnTimerContext ctx,

Zbiralec out) vrže izjemo {

// odstranimo stanje premika, če se nov premik že ni začel.

Dolgi startTs = shiftStart.value ();

če (startTs == timerTs - CLEAN_UP_INTERVAL) {

shiftStart.clear ();

  }

}

The processElement () metoda registrira časovnike 24 ur po začetku izmene, da se počisti stanje, ki ni več potrebno. Čiščenje države je edina logika, ki jo onTimer () metoda izvaja. Ko se časovnik sproži, preverimo, ali je voznik v tem času začel novo izmeno, torej, ali se je čas začetka izmene spremenil. V nasprotnem primeru za voznika počistimo stanje prestav.

$config[zx-auto] not found$config[zx-overlay] not found