Programiranje

Kako uporabljati Redis za obdelavo tokov v realnem času

Roshan Kumar je višji vodja izdelkov v laboratorijih Redis.

Zaužitje pretočnih podatkov v realnem času je pogosta zahteva za številne primere uporabe velikih podatkov. Na področjih, kot so IoT, e-trgovina, varnost, komunikacije, zabava, finance in trgovina na drobno, kjer je toliko odvisno od pravočasnega in natančnega sprejemanja odločitev na podlagi podatkov, sta zbiranje in analiza podatkov v realnem času pravzaprav jedro poslovanja.

Vendar zbiranje, shranjevanje in obdelava pretočnih podatkov v velikih količinah in z veliko hitrostjo predstavlja arhitekturne izzive. Pomemben prvi korak pri zagotavljanju analize podatkov v realnem času je zagotoviti, da so na voljo ustrezni omrežni, računski, pomnilniški in pomnilniški viri za zajemanje hitrih podatkovnih tokov. Toda sklad programske opreme podjetja se mora ujemati z zmogljivostjo njegove fizične infrastrukture. V nasprotnem primeru se bodo podjetja soočala z velikim zaostankom podatkov ali, kar je še huje, z manjkajočimi ali nepopolnimi podatki.

Redis je postal priljubljena izbira za takšne scenarije hitrega vnosa podatkov. Redis, lahka platforma baze podatkov v pomnilniku, doseže prepustnost v milijonih operacij na sekundo s podmilisekundnimi zakasnitvami, hkrati pa črpa minimalne vire. Ponuja tudi preproste izvedbe, ki jih omogočajo številne podatkovne strukture in funkcije.

V tem članku bom pokazal, kako lahko Redis Enterprise reši pogoste izzive, povezane z zaužitjem in obdelavo velike količine podatkov z visoko hitrostjo. Sprehodili se bomo med tremi različnimi pristopi (vključno s kodo) za sprotno obdelavo vira Twitterja z uporabo Redis Pub / Sub, Redis Lists in Redis Sorted Sets. Kot bomo videli, imajo vse tri metode pomembno vlogo pri hitrem zajemanju podatkov, odvisno od primera uporabe.

Izzivi pri oblikovanju hitrih rešitev za vnos podatkov

Hitro zajemanje podatkov pogosto vključuje več različnih vrst zapletenosti:

  • Velike količine podatkov včasih pritečejo rafalno. Razpršeni podatki zahtevajo rešitev, ki je sposobna obdelati velike količine podatkov z minimalno zakasnitvijo. V idealnem primeru bi moral biti sposoben izvesti milijone zapisov na sekundo s sub-milisekundno zakasnitvijo z minimalnimi sredstvi.
  • Podatki iz več virov. Rešitve za vnos podatkov morajo biti dovolj prilagodljive, da lahko obdelujejo podatke v številnih različnih oblikah, po potrebi ohranijo identiteto vira in preoblikujejo ali normalizirajo v realnem času.
  • Podatki, ki jih je treba filtrirati, analizirati ali posredovati. Večina rešitev za vnos podatkov ima enega ali več naročnikov, ki jih porabijo. Pogosto gre za različne aplikacije, ki delujejo na istih ali različnih lokacijah z različnimi predpostavkami. V takih primerih mora baza podatkov ne samo pretvoriti podatke, temveč tudi filtrirati ali združiti, odvisno od zahtev uporabniških aplikacij.
  • Podatki iz geografsko porazdeljenih virov. V tem primeru je pogosto priročno distribuirati vozlišča za zbiranje podatkov in jih postaviti blizu virov. Vozlišča sama postanejo del rešitve za hiter vnos podatkov za zbiranje, obdelavo, posredovanje ali preusmerjanje podatkov za vnos.

Ravnanje s hitrim vnosom podatkov v Redis

Številne rešitve, ki podpirajo hiter vnos podatkov, so danes zapletene, bogate s funkcijami in preveč zasnovane za preproste zahteve. Redis pa je izjemno lahek, hiter in enostaven za uporabo. Z odjemalci, ki so na voljo v več kot 60 jezikih, je Redis enostavno integrirati s priljubljenimi programskimi paketi.

Redis ponuja podatkovne strukture, kot so seznami, nabori, razvrščeni nabori in razpršilci, ki ponujajo preprosto in vsestransko obdelavo podatkov. Redis zagotavlja več kot milijon operacij branja / pisanja na sekundo z zakasnitvijo v milisekundah na zmerno velikem primerku oblaka blaga, zaradi česar je za velike količine podatkov izjemno učinkovit. Redis podpira tudi storitve sporočanja in odjemalske knjižnice v vseh priljubljenih programskih jezikih, zaradi česar je zelo primeren za kombiniranje hitrega vnosa podatkov in analitike v realnem času. Ukazi Redis Pub / Sub mu omogočajo, da igra vlogo posrednika sporočil med založniki in naročniki, funkcija, ki se pogosto uporablja za pošiljanje obvestil ali sporočil med distribucijskimi vozlišči za vnos podatkov.

Redis Enterprise izboljša Redis z brezšivnim skaliranjem, neprekinjeno razpoložljivostjo, avtomatizirano uvedbo in zmožnostjo uporabe stroškovno učinkovitega pomnilnika flash kot razširitvenega pomnilnika RAM-a, tako da lahko obdelavo velikih naborov podatkov dosežemo stroškovno učinkovito.

V spodnjih razdelkih bom opisal, kako uporabljati Redis Enterprise za reševanje pogostih izzivov pri zaužitju podatkov.

Redis s hitrostjo Twitterja

Za ponazoritev preprostosti Redisa bomo raziskali vzorec rešitve za hitro vnašanje podatkov, ki zbira sporočila iz Twitter vira. Cilj te rešitve je obdelati tvite v realnem času in jih med obdelavo potiskati po cevi.

Podatke Twitterja, ki jih vnese rešitev, nato več procesorjev po vrsti porabi. Kot je prikazano na sliki 1, ta primer obravnava dva procesorja - angleški procesor Tweet in procesor Influencer. Vsak procesor filtrira tvite in jih po svojih kanalih posreduje drugim potrošnikom. Ta veriga lahko seže tako daleč, kot zahteva rešitev. Vendar se v našem primeru ustavimo na tretji ravni, kjer združujemo priljubljene razprave med govorci angleščine in vrhunskimi vplivneži.

Redis Labs

Upoštevajte, da uporabljamo primer obdelave virov Twitter zaradi hitrosti prihoda in preprostosti podatkov. Upoštevajte tudi, da podatki Twitterja do našega hitrega vnosa podatkov pridejo po enem kanalu. V mnogih primerih, na primer IoT, lahko več virov podatkov pošilja podatke glavnemu sprejemniku.

Obstajajo trije možni načini za izvedbo te rešitve z uporabo Redisa: vnos z Redis Pub / Sub, vnos s strukturo podatkov seznama ali vnos s strukturo podatkov Sorted Set. Preučimo vsako od teh možnosti.

Zaužijte z Redis Pub / Sub

To je najpreprostejša izvedba hitrega vnosa podatkov. Ta rešitev uporablja funkcijo Redis's Pub / Sub, ki aplikacijam omogoča objavo in naročanje na sporočila. Kot je prikazano na sliki 2, vsaka stopnja obdela podatke in jih objavi v kanalu. Naslednja stopnja se naroči na kanal in prejema sporočila za nadaljnjo obdelavo ali filtriranje.

Redis Labs

Prednosti

  • Enostaven za izvedbo.
  • Dobro deluje, če so viri podatkov in procesorji geografsko porazdeljeni.

Slabosti

  • Rešitev zahteva, da so založniki in naročniki ves čas pripravljeni. Naročniki izgubijo podatke, ko se ustavijo ali ko se povezava izgubi.
  • Zahteva več povezav. Program ne more objaviti in se naročiti na isto povezavo, zato vsak vmesni procesor podatkov zahteva dve povezavi - eno za naročanje in eno za objavo. Če Redis izvajate na platformi DBaaS, je pomembno, da preverite, ali vaš paket ali raven storitve omejuje število povezav.

Opomba o povezavah

Če se na kanal naroči več odjemalcev, Redis podatke za vsakega odjemalca linearno potiska enega za drugim. Velik tovor podatkov in številne povezave lahko povzročijo zakasnitev med založnikom in njegovimi naročniki. Čeprav je privzeta trdna omejitev za največje število povezav 10.000, morate preizkusiti in primerjati, koliko povezav je primernih za vaš tovor.

Redis vzdržuje odjemalski vmesni pomnilnik za vsako stranko. Privzete omejitve za odjemalski medpomnilnik za Pub / Sub so nastavljene kot:

client-output-buffer-limit pubsub 32mb 8mb 60

S to nastavitvijo bo Redis odjemalce prisilil, da se odklopijo pod dvema pogojema: če izhodni vmesni pomnilnik zraste preko 32 MB ali če izhodni vmesni pomnilnik dosledno hrani 8 MB podatkov 60 sekund.

To kaže na to, da stranke porabljajo podatke počasneje, kot so objavljeni. V takem primeru poskusite najprej optimizirati potrošnike tako, da med porabo podatkov ne dodajajo zakasnitev. Če opazite, da vaše stranke še vedno prekinjajo povezavo, lahko omejitve za client-output-buffer-limit pubsub lastnina v redis.conf. Upoštevajte, da lahko spremembe nastavitev povečajo zakasnitev med založnikom in naročnikom. Vse spremembe je treba temeljito preizkusiti in preveriti.

Oblikovanje kode za rešitev Redis Pub / Sub

Redis Labs

To je najpreprostejša od treh rešitev, opisanih v tem članku. Tu so pomembni razredi Java, ki se izvajajo za to rešitev. Prenesite izvorno kodo s popolno izvedbo tukaj: //github.com/redislabsdemo/IngestPubSub.

The Naročnik class je osrednji razred tega oblikovanja. Vsak Naročnik object ohranja novo povezavo z Redisom.

class Subscriber razširja JedisPubSub izvaja Runnable {

ime zasebnega niza;

zasebno RedisConnection conn = null;

zasebno Jedis jedis = null;

private String subscriberChannel;

javni naročnik (niz naročnika, niz kanalaName) vrže izjemo {

name = subscriptionName;

subscriberChannel = channelName;

Tema t = nova nit (ta);

t.start ();

       }

@Override

javni void run () {

poskusite {

conn = RedisConnection.getRedisConnection ();

jedis = conn.getJedis ();

medtem ko (res) {

jedis.subscribe (this, this.subscriberChannel);

                      }

} ulov (izjema e) {

e.printStackTrace ();

              }

       }

@Override

public void onMessage (String channel, String message) {

super.onMessage (kanal, sporočilo);

       }

}

The Založnik class ohranja ločeno povezavo z Redisom za objavo sporočil na kanalu.

javni razred založnik {

RedisConnection conn = null;

Jedis jedis = null;

zasebni kanal String;

public Publisher (String channelName) vrže izjemo {

channel = channelName;

conn = RedisConnection.getRedisConnection ();

jedis = conn.getJedis ();

       }

public void objav (String msg) vrže izjemo {

jedis.publish (kanal, sporočilo);

       }

}

The EnglishTweetFilter, InfluencerTweetFilter, HashTagCollector, in InfluencerCollector filtri se razširijo Naročnik, ki jim omogoča poslušanje vhodnih kanalov. Ker za naročanje in objavljanje potrebujete ločene povezave Redis, ima vsak razred filtra svojega RedisConnection predmet. Filtri nova sporočila v svojih kanalih poslušajo v zanki. Tu je vzorčna koda EnglishTweetFilter razred:

javni razred EnglishTweetFilter razširja naročnika

{

zasebno RedisConnection conn = null;

zasebno Jedis jedis = null;

zasebni niz publisherChannel = null;

public EnglishTweetFilter (Ime niza, Niz naročnika, String izdajateljKanal) vrže izjemo {

super (ime, naročnikkanal);

this.publisherChannel = publisherChannel;

conn = RedisConnection.getRedisConnection ();

jedis = conn.getJedis ();

       }

@Override

public void onMessage (String subscriberChannel, String message) {

JsonParser jsonParser = nov JsonParser ();

JsonElement jsonElement = jsonParser.parse (sporočilo);

JsonObject jsonObject = jsonElement.getAsJsonObject ();

// filtriranje sporočil: objavite samo angleške tvite

if (jsonObject.get (“lang”)! = null &&

jsonObject.get (“lang”). getAsString (). equals (“en”)) {

jedis.publish (publisherChannel, sporočilo);

              }

       }

}

The Založnik class ima metodo objave, ki objavlja sporočila na zahtevanem kanalu.

javni razred založnik {

.

.     

public void objav (String msg) vrže izjemo {

jedis.publish (kanal, sporočilo);

       }

.

}

Glavni razred bere podatke iz prevzetega toka in jih objavlja v AllData kanal. Glavna metoda tega razreda zažene vse predmete filtra.

javni razred IngestPubSub

{

.

public void start () vrže izjemo {

       .

       .

publisher = nov založnik (“AllData”);

englishFilter = novi EnglishTweetFilter ("Angleški filter", "AllData",

"EnglishTweets");

influencerFilter = nov InfluencerTweetFilter (“Influencer filter”,

"AllData", "InfluencerTweets");

hashtagCollector = nov HashTagCollector (“Hashtag Collector”,

"EnglishTweets");

influencerCollector = nov InfluencerCollector (“Zbiralnik Influencer”,

»InfluencerTweets«);

       .

       .

}

Zaužijte s seznami Redis

Struktura podatkov seznama v Redisu omogoča enostavno in enostavno izvajanje rešitve za čakanje. V tej rešitvi proizvajalec potisne vsako sporočilo na zadnji del čakalne vrste, naročnik pa anketira vrsto in potegne nova sporočila z drugega konca.

Redis Labs

Prednosti

  • Ta metoda je zanesljiva v primeru izgube povezave. Ko so podatki potisnjeni na sezname, se tam hranijo, dokler jih naročniki ne preberejo. To velja tudi, če naročnike ustavijo ali izgubijo povezavo s strežnikom Redis.
  • Proizvajalci in potrošniki med seboj ne potrebujejo povezave.

Slabosti

  • Ko so podatki potegnjeni s seznama, so odstranjeni in jih ni več mogoče pridobiti. Če potrošniki ne vztrajajo pri podatkih, se izgubijo takoj, ko jih porabijo.
  • Vsak potrošnik potrebuje ločeno vrsto, ki zahteva shranjevanje več kopij podatkov.

Oblikovanje kode za rešitev Redis Lists

Redis Labs

Izvorno kodo za rešitev Redis Lists lahko prenesete tukaj: //github.com/redislabsdemo/IngestList. Glavni razredi te rešitve so razloženi spodaj.

MessageList vdela strukturo podatkov Redis List. The push () metoda potisne novo sporočilo levo od čakalne vrste in pop () počaka na novo sporočilo z desne, če je vrsta prazna.

javni razred MessageList {

zaščiteno ime niza = “MyList”; // Ime

.

.     

javni void push (String msg) vrže izjemo {

jedis.lpush (ime, sporočilo); // Levi potisk

       }

public String pop () vrže izjemo {

vrni jedis.brpop (0, ime) .toString ();

       }

.

.

}

MessageListener je abstraktni razred, ki izvaja logiko poslušalca in založnika. A MessageListener objekt posluša samo en seznam, lahko pa objavi na več kanalih (MessageFilter predmeti). Ta rešitev zahteva ločeno MessageFilter predmet za vsakega naročnika po cevi.

razred MessageListener izvaja Runnable {

ime zasebnega niza = null;

private MessageList inboundList = null;

Map outBoundMsgFilters = nov HashMap ();

.

.     

public void registerOutBoundMessageList (MessageFilter msgFilter) {

if (msgFilter! = null) {

if (outBoundMsgFilters.get (msgFilter.name) == null) {

outBoundMsgFilters.put (msgFilter.name, msgFilter);

                      }

              }

       }

.

.

@Override

javni void run () {

.

medtem ko (res) {

Niz msg = inboundList.pop ();

processMessage (msg);

                      }                                  

.

       }

.

protected void pushMessage (String msg) vrže izjemo {

Set outBoundMsgNames = outBoundMsgFilters.keySet ();

za (Ime niza: outBoundMsgNames) {

MessageFilter msgList = outBoundMsgFilters.get (ime);

msgList.filterAndPush (msg);

              }

       }

}

MessageFilter je razred staršev, ki omogoča filterAndPush () metoda. Ko se podatki pretakajo skozi sistem za vnos, se pogosto filtrirajo ali preoblikujejo, preden se pošljejo v naslednjo stopnjo. Razredi, ki razširjajo MessageFilter razred preglasi filterAndPush () in implementirajo lastno logiko, da filtrirano sporočilo potisnejo na naslednji seznam.

javni razred MessageFilter {

MessageList messageList = null;

.

.

public void filterAndPush (String msg) vrže izjemo {

messageList.push (msg);

       }

.

.     

}

AllTweetsListener je vzorčna izvedba a MessageListener razred. Ta posluša vse tvite na AllData kanal in podatke objavi na EnglishTweetsFilter in InfluencerFilter.

javni razred AllTweetsListener razširja MessageListener {

.

.     

public static void main (String [] args) vrže izjemo {

MessageListener allTweetsProcessor = AllTweetsListener.getInstance ();

allTweetsProcessor.registerOutBoundMessageList (novo

EnglishTweetsFilter (“EnglishTweetsFilter”, “EnglishTweets”));

allTweetsProcessor.registerOutBoundMessageList (novo

InfluencerFilter ("InfluencerFilter", "Influencers"));

allTweetsProcessor.start ();

       }

.

.

}

EnglishTweetsFilter podaljša MessageFilter. Ta razred izvaja logiko za izbiro samo tistih tweetov, ki so označeni kot angleški tweets. Filter zavrže tweetove, ki niso v angleščini, in jih potisne na naslednji seznam.

javni razred EnglishTweetsFilter razširja MessageFilter {

public EnglishTweetsFilter (ime niza, niz nizov) vrže izjemo {

super (ime, seznamName);

       }

@Override

public void filterAndPush (String message) vrže izjemo {

JsonParser jsonParser = nov JsonParser ();

JsonElement jsonElement = jsonParser.parse (sporočilo);

JsonArray jsonArray = jsonElement.getAsJsonArray ();

JsonObject jsonObject = jsonArray.get (1) .getAsJsonObject ();

if (jsonObject.get (“lang”)! = null &&

jsonObject.get (“lang”). getAsString (). enako (“en”)) {

Jedis jedis = super.getJedisInstance ();

if (jedis! = null) {

jedis.lpush (super.ime, jsonObject.toString ());

                             }

              }

       }

}

$config[zx-auto] not found$config[zx-overlay] not found