AMQP

Versiunea actuală a paginii nu a fost încă examinată de colaboratori experimentați și poate diferi semnificativ de versiunea revizuită pe 17 iunie 2020; verificările necesită 107 editări .

AMQP (Advanced Message Queuing Protocol) este un protocol  deschis la nivel de aplicație pentru transmiterea mesajelor între componentele sistemului. Ideea principală este că subsistemele individuale (sau aplicațiile independente) pot face schimb de mesaje în mod arbitrar printr-un broker AMQP care realizează rutarea , eventual garantează livrarea, distribuirea fluxurilor de date, abonamentul la tipurile de mesaje dorite.

Arhitectura protocolului a fost dezvoltată de John O'Hara de la JP Morgan Chase & Co [1] .

Protocol

AMQP se bazează pe trei concepte:

  1. Mesaj (mesaj) - o unitate de date transmise, partea sa principală (conținutul) nu este interpretată de server în niciun fel, antete structurate pot fi atașate mesajului.
  2. Punct de schimb - i se trimit mesaje. Schimbul distribuie mesaje către una sau mai multe cozi . În același timp, mesajele nu sunt stocate la punctul de schimb. Punctele de schimb sunt de trei tipuri:
    • fanout - mesajul este transferat la toate cozile atașate acestuia;
    • direct - mesajul este trimis la coadă cu un nume care se potrivește cu cheia de rutare (cheia de rutare) (cheia de rutare este specificată la trimiterea mesajului);
    • subiect - ceva între fanout și direct, mesajul este trimis în cozi pentru care se potrivește masca pentru cheia de rutare, de exemplu, app.notification.sms. # - vor fi livrate toate mesajele trimise cu chei care încep cu app.notification.sms la coada .
  3. Coadă - mesajele sunt stocate aici până când sunt preluate de client. Clientul trage întotdeauna mesaje dintr-una sau mai multe cozi.


Protocolul poate fi împărțit în două straturi:

  1. Strat funcțional - definește un set de comenzi care efectuează lucrări în numele aplicației.
  2. Transport Layer - gestionează cererile de la aplicație către server și de la server către aplicație, gestionează multiplexarea canalelor, încadrarea, codificarea, bataia inimii, prezentarea datelor, tratarea erorilor.


Exemple de coadă:

Protocolul nu se limitează la aceste trei tipuri. Sunt date ca exemplu de implementare.

Terminologie

Schimb

Primește mesaje de la furnizor și le direcționează către coada de mesaje conform unor criterii predefinite. Astfel de criterii se numesc legături. Schimbul este un mecanism de negociere și rutare a mesajelor. Pe baza mesajelor și a parametrilor acestora (legături), aceștia iau o decizie cu privire la redirecționarea către o coadă sau un alt schimb. Nu stocați mesaje.

Termenul schimb înseamnă algoritm și instanță de algoritm. Ei spun, de asemenea, tip de schimb și instanță de schimb.

AMQP definește un set de tipuri de schimb standard. Aplicațiile își pot crea propria instanță de schimb.


Fiecare schimb implementează propriul algoritm de rutare. Există mai multe tipuri de schimb standard descrise în specificația funcțională a standardului. Dintre acestea, două sunt importante:

Serverul va crea mai multe schimburi, inclusiv directe și tematice. Vor avea nume binecunoscute și aplicațiile client vor putea lucra cu ei.

Ciclul de viață al schimbului

Fiecare server AMQP pre-creează mai multe instanțe de schimb. Aceste instanțe există atunci când serverul rulează și nu pot fi distruse. Aplicațiile AMQP își pot crea propriile schimburi. AMQP nu folosește metoda create pentru aceasta, în schimb se declară instanța, care urmează logica: „creați dacă nu este creat, continuați altfel”. Se poate spune că crearea schimbului este idempotent . Probabil că aplicațiile vor crea schimburi după cum este necesar și apoi le vor distruge ca fiind inutile. AMQP oferă o metodă de distrugere a schimbului.

Cheie de rutare

În general, schimbul examinează proprietățile mesajului, câmpurile de antet și conținutul corpului său și, folosind acestea și, eventual, date din alte surse, decide cum să direcționeze mesajul. În cele mai multe cazuri simple, Exchange ia în considerare un câmp cheie, pe care îl numim Cheia de rutare . Cheia de rutare este o adresă virtuală pe care serverul de schimb o poate folosi pentru a decide dacă trimite un mesaj. Pentru rutarea punct la punct , cheia de rutare este de obicei numele cozii de mesaje. Pentru rutarea pub-sub , cheia de rutare este de obicei valoarea ierarhiei subiectelor (subiect - vezi publicație/subscruber). În cazuri mai complexe, cheia de rutare poate fi combinată cu rutarea după câmpurile antetului mesajului și/sau conținutul mesajului.

Coada de mesaje

Când o aplicație client creează o coadă de mesaje, poate specifica următoarele proprietăți:

Ciclul de viață al unui mesaj

Un mesaj AMQP constă dintr-un set de proprietăți și conținut non-public. Un nou mesaj este creat de producător folosind API-ul client AMQP. Producătorul adaugă conținut mesajului și eventual setează unele proprietăți ale mesajului. Producătorul marchează mesajul cu informații de rutare care arată ca o adresă, dar poate fi orice. Producătorul trimite apoi mesajul la schimb . Când un mesaj ajunge la server, schimbul (de obicei) îl direcționează către un set de cozi care există și pe server. Dacă mesajul nu este rutabil, Exchange îl poate renunța sau îl poate returna în aplicație. Producătorul decide cum să trateze mesajele care nu sunt rutabile.

Un mesaj poate exista în multe cozi de mesaje. Serverul poate gestiona acest lucru în moduri diferite, cum ar fi copierea mesajului utilizând contorizarea referințelor etc. Acest lucru nu afectează interoperabilitatea. Cu toate acestea, atunci când un mesaj este direcționat către mai multe cozi de mesaje, este identic în fiecare coadă de mesaje. Nu există un identificator unic aici care să facă distincția între diferitele copii.

Când un mesaj ajunge în coada de mesaje, acesta încearcă imediat să-l livreze consumatorului prin AMQP. Dacă acest lucru nu este posibil, atunci mesajul este stocat în coada de mesaje (în memorie sau pe disc la cererea producătorului ) și așteaptă ca consumatorul să fie gata. Dacă nu există niciun consumator , atunci coada poate returna un mesaj producătorului prin AMQP (din nou, dacă producătorul a cerut-o).

Când coada de mesaje poate livra un mesaj către consumator , acesta îl elimină din stocarea sa internă. Acest lucru se poate întâmpla imediat sau după ce consumatorul recunoaște că și-a încheiat cu succes munca, a procesat mesajul. Consumatorul alege cum și când mesajele sunt „recunoscute”. consumatorul poate şi respinge mesajul (confirmare negativă).

Mesajele producătorilor și confirmările consumatorilor sunt grupate în tranzacții. Atunci când o aplicație joacă ambele roluri, ceea ce este adesea cazul, ea face o treabă mixtă de trimitere de mesaje și de confirmare, apoi comiterea sau anularea tranzacției.

Livrarea mesajelor de pe server către consumator nu este tranzacțională.

Producător

Producer este o aplicație client care publică mesaje pentru schimb .

Prin analogie cu dispozitivul de e-mail, puteți vedea că producătorul nu trimite mesaje direct la coadă (coada de mesaje). Orice alt comportament ar rupe abstracția din modelul AMQ. Acest lucru ar fi similar cu ciclul de viață al unui mesaj de e-mail: rezolvarea e-mailului, ocolirea tabelelor de rutare ale MTA și lovirea directă a cutiei poștale. Acest lucru ar face imposibilă introducerea de filtrare și procesare intermediară, cum ar fi detectarea spam-ului.

Modelul AMQ folosește același principiu ca și sistemul de e-mail: toate mesajele sunt trimise către un singur schimb sau MTA , care inspectează mesajele pe baza unor reguli și informații care sunt ascunse de la expeditor și le direcționează către punctele de distribuție care sunt, de asemenea, ascunse de către expeditor. expeditor. (și le direcționează către punctele de livrare care sunt, de asemenea, ascunse de expeditor - aici punctele de distribuție sunt puncte de livrare din documentație).

consumator

Consumer este aplicația client care primește mesaje din coada de mesaje.

Analogia noastră cu e-mailul începe să se defecteze când ne uităm la consumator (destinatari). Clienții de e-mail sunt pasivi - pot citi cutiile poștale, dar nu au niciun efect asupra modului în care sunt populate aceste cutii poștale. Cu AMQP , consumatorul poate fi și pasiv, la fel ca clienții de e-mail. Adică, putem scrie o aplicație care ascultă o anumită coadă de mesaje și pur și simplu procesează informațiile primite. În acest caz, coada de mesaje trebuie să fie gata înainte de a începe aplicația și trebuie să fie „atașată” acesteia.

Consumatorul are, de asemenea, următoarele caracteristici:

Este ca și cum ai avea un sistem de e-mail care, la nivel de protocol, poate:

Modul automat

Majoritatea arhitecturilor de integrare nu au nevoie de acest nivel de complexitate. Majoritatea utilizatorilor AMQP au nevoie de funcționalități de bază din cutie. AMQP oferă acest lucru în următorul mod:

Drept urmare, legarea de bază permite producătorului să trimită mesaje direct în coada de mesaje, emulând astfel cea mai simplă schemă de trimitere a unui mesaj către un receptor la care oamenii s-ar aștepta de la un middleware tradițional.

Legarea de bază nu împiedică utilizarea cozii de mesaje în proiecte mai complexe. Vă permite să utilizați AMQP fără o înțelegere specifică a mecanismelor de legare și schimb.

Arhitectura de comandă AMQP

Secțiunea descrie procesul de interacțiune dintre aplicație și server

Comenzi de protocol (clase și metode)

Middleware-ul este complex, iar atunci când au proiectat structura protocolului, creatorii săi au încercat să îmblânzească această complexitate. Abordarea lor a fost să modeleze un API tradițional bazat pe clase care conțin metode, fiecare metodă făcând exact un lucru și făcând-o bine. Acest lucru are ca rezultat un set mare de comenzi, dar unul relativ ușor de înțeles.

Comenzile AMQP sunt grupate în clase. Fiecare clasă acoperă o zonă funcțională specifică. Unele clase sunt opționale - fiecare peer implementează clasele pe care trebuie să le suporte.

Există două metode diferite de dialog:

Pentru a simplifica procesarea metodei, definim răspunsuri separate pentru fiecare cerere sincronă. Adică, o metodă nu este utilizată pentru a răspunde la două solicitări diferite. Aceasta înseamnă că un peer, atunci când trimite o solicitare sincronă, poate accepta și procesa metodele primite până când este primit unul dintre răspunsurile sincrone valide. Acest lucru distinge AMQP de protocoalele RPC mai tradiționale.

O metodă este definită formal ca o solicitare sincronă, un răspuns sincron (la o cerere specifică) sau asincron. În cele din urmă, fiecare metodă este definită formal ca pe partea client (adică, server-client) sau pe partea server (client-server).

Maparea AMQP la API-ul Middleware

AMQP este proiectat să fie comparabil cu API-ul middleware. Procesul de potrivire este oarecum intelectual, adică. înțelege că nu toate metodele și nu toate argumentele au sens pentru aplicare, dar este și mecanică, adică. prin stabilirea anumitor reguli, toate metodele pot fi corelate fără intervenție manuală.

Beneficiile acestui lucru sunt că, învățând semantica AMQP, dezvoltatorii vor găsi aceeași semantică furnizată în orice cadru pe care îl folosesc.

Un exemplu de metodă Queue.Declare:

coadă . declara coada = my . coadă automat - ștergere = TRUE exclusiv = FALS

Poate fi convertit într-un cadru de rețea:

+--------+---------+----------+-----------+------- ----+ | coada | declara | al meu . coada | 1 | 0 | +--------+---------+----------+-----------+------- ----+ numele metodei clasei automat - șterge exclusiv

Sau într-o metodă API de nivel înalt

coadă . Declara ( "coada.mea" , ​​ADEVARAT , FALS );

Metoda asincronă de potrivire logică în pseudocod:

trimite metoda la server

Metoda sincronă de potrivire logică în pseudocod:

trimite metoda de solicitare la server repeta asteptati raspuns de la server dacă răspunsul este o metodă asincronă metoda procesului ( de obicei , conținut livrat sau returnat ) altfel afirmă metoda este un răspuns valid la cerere ieșire repetă sfârşitul dacă sfârşit - repetă

Este de remarcat faptul că pentru majoritatea aplicațiilor, middleware-ul poate fi complet ascuns în straturile tehnice ale sistemului și că API-ul efectiv utilizat este mai puțin important decât faptul că middleware-ul este robust și funcțional.

Notificări lipsă

Protocolul vorbăreț este lent. Folosim activ asincronia în cazurile în care există o problemă de performanță. Acesta este, de obicei, locul în care trimitem conținut de la un peer la altul. Expediem metodele cât mai repede posibil, fără să așteptăm confirmări. Acolo unde este necesar, implementăm windowing și throttling la un nivel superior, cum ar fi nivelul consumatorului.

Protocolul dispensează de la sesizări, pentru că implementează un model de afirmare pentru toate evenimentele. Ori reușește, ori se aruncă o excepție? care închide un canal sau o conexiune.

Nu există notificări în AMQP. Un eveniment reușit - în tăcere, un eșec - se declară. Când o aplicație are nevoie de urmărire explicită a succeselor și eșecurilor, ar trebui să utilizeze tranzacții.

Clasa de conexiune

Conexiunea este concepută pentru a fi durabilă și a gestiona multe canale.

Ciclul de viață al conexiunii
  • Clientul deschide o conexiune TCP/IP la server și trimite un antet de protocol. Aceasta este singura informație disponibilă pentru a fi trimisă de client care nu este formatată ca metodă.
  • Serverul răspunde cu o versiune de protocol și alte proprietăți, inclusiv lista de mecanisme de securitate pe care le acceptă (metoda Start)
  • Clientul alege un mecanism de securitate (Start-Ok).
  • Serverul inițiază un proces de autentificare care utilizează modelul SASL , trimite o provocare (Securizată) către client.
  • Clientul trimite un răspuns de autentificare (Secure-Ok). De exemplu, folosind mecanismul de autentificare „plat”, răspunsul conține un nume de utilizator și o parolă.
  • Serverul repetă provocarea (Secure) sau trece la negocieri prin trimiterea unui set de parametri, inclusiv dimensiunea maximă a cadrului (Tune).
  • Clientul acceptă sau reduce acești parametri (Tune-Ok).
  • Clientul deschide oficial o conexiune și alege o gazdă virtuală (Open).
  • Serverul confirmă selecția gazdei virtuale (Open-Ok).
  • Acum clientul folosește conexiunea așa cum crede de cuviință.
  • Un nod (client sau server) închide conexiunea (Închidere).
  • Celălalt nod trimite date despre închiderea conexiunii (Închidere-Ok).
  • Serverul și clientul închid socket-urile corespunzătoare conexiunii.

Informațiile nu sunt schimbate pentru erori ale conexiunilor incomplet deschise. Gazda care a întâmpinat eroarea ar trebui să închidă socket-ul fără o notificare ulterioară.

Clasa canal

AMQP este un protocol multicanal. Canalele oferă posibilitatea de a multiplexa o conexiune TCP/IP grea în mai multe conexiuni ușoare. Acest lucru face ca protocolul să fie mai „prietenos pentru firewall”, deoarece utilizarea portului este previzibilă. De asemenea, înseamnă că modelarea traficului și alte caracteristici QoS ale rețelei pot fi utilizate cu ușurință.

Canalele sunt independente unele de altele și pot îndeplini diferite funcții simultan cu alte canale, în timp ce lățimea de bandă disponibilă este împărțită între sarcini concurente.

Este de așteptat și încurajat ca aplicațiile client multi-thread să utilizeze adesea modelul „canal-pe-thread” pentru ușurința dezvoltării. Cu toate acestea, deschiderea mai multor conexiuni la unul sau mai multe servere AMQP de la un singur client este, de asemenea, perfect acceptabilă. Ciclul de viață al unui canal este următorul:

  • Clientul deschide un nou canal (Deschide)
  • Serverul confirmă deschiderea canalului (Open-Ok)
  • Clientul și serverul folosesc canalul așa cum consideră de cuviință.
  • Unul dintre noduri (client sau server) închide canalul (Închidere)
  • Celălalt nod confirmă închiderea canalului (Închidere-Ok)

Clasa Exchange

Permite unei aplicații să gestioneze instanțe de schimb pe server. Această clasă permite unei aplicații să scrie propriul script de gestionare a mesajelor fără a se baza pe nicio configurație.

Notă: Majoritatea aplicațiilor nu au nevoie de acest nivel de complexitate, iar middleware-ul vechi este puțin probabil să accepte această semantică.

Ciclul de viață al schimbului
  • Clientul cere serverului să se asigure că există schimbul (Declare). Clientul poate specifica acest lucru astfel: „creează schimbul dacă nu există” sau „avertizează-mă, dar nu-l crea dacă nu există”.
  • Clientul publică mesaje pentru a le schimba
  • Clientul poate decide să șterge schimbul (Ștergere)

Clasa de coadă

Clasa de coadă permite unei aplicații să gestioneze cozile de mesaje pe un server. Acesta este un pas de bază în aproape toate aplicațiile care primesc mesaje, cel puțin pentru a verifica dacă coada de mesaje așteptată există cu adevărat.


Ciclul de viață al cozii

Protocolul prevede două cicluri de viață de coadă:

  • Cozi de mesaje durabile - utilizate de mai mulți consumatori și există indiferent de prezența consumatorilor care ar putea primi mesaje
  • Cozi temporare de mesaje - cozi private pentru un anumit consumator. Coada este ștearsă atunci când nu există consumatori.


Ciclu de viață durabil al cozii de mesaje
  • Clientul declară o coadă de mesaje (Declarare cu argument „pasiv”)
  • Serverul recunoaște existența cozii (Declare-Ok)
  • Clientul citește mesajele din coadă
Ciclul de viață pentru cozile de mesaje temporare
  • Clientul creează o coadă de mesaje (Declarați adesea fără un nume de coadă, astfel încât serverul îi va da un nume). Serverul confirmă crearea (Declarare-Ok)
  • Clientul inițializează consumatorul pentru coada creată.
  • Clientul oprește consumatorul fie în mod explicit, fie prin închiderea canalului și/sau a conexiunii
  • Când ultimul consumator dispare din coada de mesaje și după un timeout politicos, serverul elimină coada de mesaje

AMQP implementează mecanismul de abonare la subiecte sub formă de cozi de mesaje. Acest lucru permite structuri interesante în care un abonament poate fi echilibrat de încărcare într-un grup de aplicații de abonați cooperanți.

Ciclul de viață a abonamentului
  • Clientul creează o coadă de mesaje (Declare), serverul confirmă (Declare-Ok)
  • Clientul potrivește coada de mesaje cu subiectul de schimb (Bind) și serverul confirmă potrivirea (Bind-Ok)
  • Clientul folosește coada de mesaje așa cum este descris mai sus

Clasa de bază

Clasa de bază implementează capabilitățile de mesagerie descrise în această specificație. Suportă următoarea semantică:

  • Trimiterea de mesaje de la client la server care are loc asincron (Publicare)
  • Porniți și opriți consumatorii (Consumați, Anulați)
  • Trimiterea de mesaje de la server la client care are loc asincron (livrare, returnare)
  • Confirmare mesaj (Confirmare, Respingere)
  • Obținerea mesajelor din coadă într-un mod sincron (Get)

Clasa de tranzacție

AMQP acceptă două tipuri de tranzacții:

  1. Tranzacții automate, în care fiecare mesaj și confirmare publicat este procesat ca o tranzacție autonomă.
  2. Tranzacții cu serverul local în care serverul tamponează mesajele și confirmările publicate și le comite la cererea clientului.

Clasa Tranzacție („tx”) oferă aplicațiilor acces la al doilea tip de tranzacție, tranzacțiile cu serverul local. Semantica clasei este următoarea:

  1. Aplicația solicită tranzacții pe server în fiecare canal în care dorește să primească astfel de tranzacții (Selectați)
  2. Aplicația lucrează (Publicare, Ack)
  3. Aplicația efectuează lucrări de confirmare sau rollback (Commit, Roll-back)
  4. Aplicația continuă să funcționeze

Tranzacțiile se referă la publicarea și confirmările de conținut, nu livrarea. Prin urmare, rollback-ul nu renunță la coadă și nu declanșează relivrarea. Clientul poate confirma aceste mesaje la următoarea tranzacție.

Arhitectura de transport AMQP

Această secțiune explică modul în care comenzile se mapează la protocolul la nivel de fir .

Descriere

AMQP este un protocol binar. Informațiile sunt organizate în cadre de diferite tipuri. Cadrele conțin metode de protocol și alte informații. Toate cadrele au același format general: antetul cadrului, sarcina utilă și sfârșitul cadrului. Formatul de încărcare a cadrului depinde de tipul de cadru.

La nivel de transport, se presupune utilizarea stivei TCP/IP sau a analogilor.

Într-o singură conexiune priză, pot exista mai multe fluxuri independente de control, numite canale. Fiecare cadru este numerotat cu un număr de canal. Prin intercalarea cadrelor lor, diferitele canale împărtășesc această conexiune. Pentru orice canal dat, cadrele sunt executate într-o secvență strictă care poate fi folosită pentru a conduce un analizor de protocol (de obicei o mașină de stări).

Construim cadre folosind un set mic de tipuri de date, cum ar fi biți, numere întregi, șiruri de caractere și tabele de câmpuri. Câmpurile cadru sunt strâns împachetate, fără a le face să fie lente sau greu de analizat. Este relativ ușor să creați un strat de încadrare mecanic din specificațiile protocolului.

Formatarea la nivel de fir este concepută pentru a fi scalabilă și suficient de versatilă pentru a fi utilizată în protocoale arbitrare de nivel înalt (nu doar AMQP). Anticipăm că AMQP se va extinde, se va îmbunătăți și, în alt mod, se va schimba în timp, iar formatul la nivel de fir va accepta acest lucru.

Tipuri de date

Tipuri de date AMQP utilizate în cadre:

  • Numerele întregi (de la 1 la 8 octeți) sunt folosite pentru a reprezenta dimensiuni, mărimi, limite etc. Numerele întregi sunt întotdeauna nesemnate și pot fi aliniate greșit într-un cadru.
  • biți
  • Șiruri scurte folosite pentru a stoca proprietăți scurte de text. Șirurile scurte sunt limitate la 255 de octeți și pot fi analizate fără riscul de depășire a memoriei tampon. (Bănuiesc că vorbim de un octet în 255 de state și nu despre 255 de octeți)
  • Șiruri lungi utilizate pentru a stoca părți de date binare
  • Câmpuri de tabel care conțin perechi nume-valoare. Valorile câmpurilor sunt introduse ca șiruri de caractere, numere întregi etc.

Negociere protocol

Clientul și serverul negociază un protocol. Aceasta înseamnă că atunci când un client se conectează, serverul oferă anumite opțiuni pe care clientul le poate accepta sau modifica. Când ambii sunt de acord asupra rezultatului, conexiunea este considerată stabilită. Negocierea este utilă deoarece vă permite să setați presetări de conexiune.

Coordonarea are loc pe mai multe aspecte:

  • Protocolul actual și versiunea acestuia. Un server POATE gestiona mai multe protocoale pe un singur port.
  • Argumente de criptare și autentificare a ambelor părți. Face parte din stratul funcțional al protocolului.
  • Dimensiunea maximă a cadrului, numărul de canale și alte limitări operaționale

Limitele convenite pot permite ambelor părți să prealoceze tampon de cheie, evitând blocajul. Fiecare cadru primit fie respectă limitele negociate și, prin urmare, este sigur, fie le depășește, caz în care cealaltă parte a eșuat și trebuie dezactivată. Acest lucru se aliniază foarte bine cu filozofia AMQP de „fie funcționează așa cum ar trebui, fie nu funcționează deloc”.

Ambele noduri negociază limite la cea mai mică valoare convenită, după cum urmează:

  • Serverul TREBUIE să spună clientului ce limite oferă.
  • Clientul răspunde și POATE reduce limitele de conectare

Delimitarea cadrului

Stiva TCP/IP - funcționează cu fluxuri, nu are un mecanism de demarcare a cadrelor încorporat. Protocoalele existente rezolvă această problemă în mai multe moduri diferite:

  • Se trimite un cadru per conexiune. Este simplu, dar lent
  • Adăugarea unui delimitator de cadru la un flux. Este simplu, dar face analizarea lentă
  • Numărați dimensiunea cadrului și trimiteți dimensiunea înainte de fiecare cadru. Este simplu și rapid, iar această abordare este implementată în AMQP.


Filmat în detaliu

Toate cadrele constau dintr-un antet (7 octeți), o sarcină utilă de dimensiune arbitrară și un octet „sfârșit de cadru” care detectează cadrele malformate:

0 1 3 7 mărime + 7 mărime + 8 +------+---------+-------------+ +------------+ +--- --------+ | tip | canal | dimensiune | | sarcină utilă | | cadru - capăt | +------+---------+-------------+ +------------+ +--- --------+ octet scurtă dimensiune lungă octeți octet

Cadrul se citește după cum urmează:

  1. Citiți antetul și verificați tipul de cadru și canalul
  2. În funcție de tipul de cadru, datele sunt citite din sarcina utilă și procesate
  3. Sfârșitul cadrului de citire.

În implementările realiste în ceea ce privește performanța, vom folosi „read-ahead buffering” sau „gathering reads” pentru a evita efectuarea a trei apeluri de sistem separate pentru a citi un cadru.

Cadre de metodă

Cadrele de metodă poartă comenzi de protocol de nivel înalt (pe care le numim „metode”). Un cadru de metodă poartă o comandă. Sarcina utilă a cadrului de metodă are următorul format:

0 2 4 +----------+-----------+-------------- - - | clasa - id | metoda - id | argumente ... +----------+-----------+-------------- - - scurt scurt ...

Cadrul metodei este tratat astfel:

1. Citirea cadrului metodei de încărcare utilă.

2. Desfacerea sa într-o structură. Această metodă are întotdeauna aceeași structură, așa că o puteți despacheta rapid

3. Verificați dacă această metodă este permisă în contextul actual.

4. Verificarea ca argumentele metodei sunt valide.

5. Executarea acestei metode.

Corpul cadrului metodei este construit ca o listă de câmpuri de date AMQP (biți, numere întregi, șiruri și tabele de șiruri). Codul de marshaling este generat trivial direct din specificațiile protocolului și poate fi foarte rapid.

Cadre de conținut

Conținutul este datele aplicației pe care le transferăm de la client la client prin serverul AMQP. Conținutul este, aproximativ vorbind, un set de proprietăți plus o parte binară a datelor. Setul de proprietăți permise este definit de clasa de bază și formează „cadru antet de conținut”. Datele pot fi de orice dimensiune și împărțite în mai multe (sau mai multe) blocuri, fiecare dintre acestea formând un „schelet al corpului de conținut”.

Privind cadrele pentru un anumit canal pe măsură ce acesta este transmis prin fir, putem vedea ceva de genul acesta:

[ metoda ] [ metoda ] [ antet ] [ corp ] [ corp [ metoda ] ...

Unele metode (cum ar fi Basic.Publish , Basic.Deliver , etc.) sunt definite oficial ca purtătoare de conținut. Când un peer trimite un astfel de cadru de metodă, îl urmează întotdeauna cu un antet de conținut și cu sau fără câteva cadre de corp de conținut. Antetul cadrului de conținut are următorul format:

0 2 4 12 14 +----------+--------+-----------+----------------+ ------------- - - | clasa - id | greutate | dimensiunea corpului | steaguri de proprietate | lista de proprietati ... +----------+--------+-----------+----------------+ ------------- - - scurt scurt lung lung scurt rest ...

Am pus corpul conținutului în cadre separate (în loc să îl includem într-o metodă), astfel încât AMQP să poată accepta metode „zero copie” în care conținutul nu este niciodată clasificat sau codificat. Punem proprietățile conținutului în propriul lor cadru, astfel încât destinatarii să poată renunța selectiv conținutul pe care nu doresc să îl proceseze.

Cadre Heartbeat

Heartbeat este o tehnică concepută pentru a suprascrie una dintre caracteristicile TCP/IP, și anume capacitatea sa de a se recupera de la o conexiune fizică întreruptă, închizându-se doar după un timeout destul de lung. În unele scenarii, trebuie să știm foarte repede dacă peer-ul este în jos sau nu răspunde din alte motive (de exemplu, se blochează într-o buclă). Deoarece bătăile inimii se pot face la un nivel scăzut, vom implementa acest lucru ca un tip special de cadru care este schimbat între nodurile la stratul de transport, mai degrabă decât ca o metodă de clasă.

Gestionarea erorilor

AMQP folosește excepții pentru tratarea erorilor. Orice eroare operațională (coada de mesaje nu a fost găsită, drepturi de acces insuficiente etc.) va declanșa o excepție de canal. Orice eroare structurală (argument prost, secvență proastă a metodei etc.) are ca rezultat o excepție de conexiune. Excepția închide canalul sau conexiunea și returnează un cod de răspuns și un corp de răspuns la aplicația client. Folosim un cod de răspuns din 3 cifre plus schema de text de răspuns care este utilizată în HTTP și multe alte protocoale.


Închiderea canalelor și conexiunilor

Se spune că conexiunea sau canalul este „deschis” pentru client când trimite un Open și pentru server când trimite un Open-Ok. De acum înainte, un peer care dorește să închidă un canal sau o conexiune trebuie să facă acest lucru folosind protocolul de strângere de mână, care este descris aici.

Închiderea unui canal sau a unei conexiuni din orice motiv – normal sau excepțional – trebuie făcută cu atenție. Închiderile bruște nu sunt întotdeauna detectate rapid și, după o excepție, este posibil să pierdem codurile de răspuns de eroare. Designul corect este de a negocia manual închiderea, astfel încât canalul/conexiunea să fie închisă numai după ce suntem siguri că cealaltă parte este conștientă de situație.

Când un peer decide să închidă un canal sau o conexiune, trimite metoda Close. Nodul receptor trebuie să răspundă la închidere cu un Close-Ok, iar apoi ambele părți își pot închide canalul sau conexiunea. Rețineți că dacă colegii ignoră Close, poate apărea un impas dacă ambii colegi trimit Close în același timp.


Arhitectura client AMQP

Este posibil să citiți și să scrieți cadre AMQP direct din aplicație, dar ar fi un design prost. Chiar și cea mai simplă conversație AMQP este mult mai complexă decât, să zicem, HTTP, iar dezvoltatorii de aplicații nu trebuie să înțeleagă lucruri precum formatele binare de încadrare pentru a trimite un mesaj la o coadă de mesaje. Arhitectura client AMQP recomandată constă din mai multe niveluri de abstractizare:

  1. Fraiming Layer - preia metodele de protocol AMQP într-un anumit format de limbă (structuri, clase etc.) și le serializează ca cadre la nivel de fir. Stratul de încadrare poate fi generat mecanic din specificațiile AMQP (care sunt definite într-un limbaj de modelare a protocolului, implementat în XML și proiectat special pentru AMQP).
  2. Stratul manager de conexiune - citește și scrie cadre AMQP și gestionează conexiunea generală și logica sesiunii. În acest strat, putem încapsula logica completă pentru deschiderea unei conexiuni și a unei sesiuni, tratarea erorilor, trimiterea și primirea conținutului și așa mai departe. Porțiuni mari din acest strat pot fi realizate automat din specificațiile AMQP. De exemplu, specificațiile definesc ce metode transportă conținut, astfel încât logica „trimite o metodă și apoi trimite opțional conținut” poate fi creată mecanic.
  3. API Layer - oferă un API specific pentru lucrul cu aplicații. Stratul API poate reflecta unele standarde existente sau poate oferi metode AMQP de nivel superior prin crearea unei mapări așa cum este descris mai devreme. Metodele AMQP sunt concepute pentru a face această mapare simplă și utilă. Stratul API în sine poate fi compus din mai multe straturi, cum ar fi un API de nivel superior construit pe API-ul AMQP Method.

De asemenea, există de obicei un anumit nivel de I/O, care poate fi foarte simplu (socket sincron de citire și scriere) sau complex (I/O multi-threaded complet asincron). Această diagramă arată arhitectura generală recomandată:

+------------------------+ | aplicare | +-----------+------------+ | +------------------------+ +---| Strat API |---- Strat API client -----+ | +-----------+------------+ | | | | | +-----------------------+ +----------------+ | | | Manager conexiune +----+ Strat de cadru | | | +-----------+------------+ +-------+ | | | | | +-----------------------+ | +---| Strat I / O asincron |--------------------------+ +-----------+------------+ | ------- - - - - Rețea - - - - -------

În acest document, când vorbim despre „client API” ne referim la toate straturile de sub aplicație (i/o, încadrare, manager de conexiune și straturi API. De obicei vorbim despre „client API” și „aplicație” ca două lucruri separate în care aplicația folosește API-ul client pentru a vorbi cu serverul middleware.

Specificație funcțională

Specificația funcțională a serverului

Mesaje și conținut

Un mesaj este unitatea de procesare atomică a unui sistem de rutare și middleware. Mesajele conțin conținut care constă dintr-un antet de conținut care conține un set de proprietăți și un corp de conținut care conține un bloc opac de date binare.

Un mesaj poate corespunde mai multor entități de aplicație diferite:

  • Mesajul stratului de aplicație
  • Se trimite fișier
  • cadru flux de date

Mesajele pot fi permanente. Un mesaj persistent este stocat în siguranță pe disc și este garantat că va fi livrat chiar și în cazul unei întreruperi majore a rețelei, a unei defecțiuni a serverului, a depășirii etc.

Mesajele pot avea prioritate. Un mesaj cu prioritate mare este trimis înaintea mesajelor cu prioritate inferioară care așteaptă în aceeași coadă de mesaje. Când mesajele trebuie să fie abandonate pentru a menține un anumit nivel de calitate a serviciului, serverul va renunța mai întâi la mesajele cu prioritate scăzută.

Serverul NU TREBUIE să modifice conținutul mesajelor pe care le primește și le transmite aplicațiilor de consum. Serverul POATE adăuga informații la antetele conținutului, dar NU TREBUIE să elimine sau să modifice informațiile existente.

Gazde virtuale

O gazdă virtuală este o secțiune de date dintr-un server, o comoditate administrativă care se va dovedi utilă celor care doresc să ofere AMQP ca serviciu pe o infrastructură partajată.

Gazda virtuală conține propriul spațiu de nume, set de schimburi, cozi de mesaje și toate obiectele asociate. Fiecare conexiune trebuie să fie asociată cu o gazdă virtuală.

Clientul selectează gazda virtuală în metoda Connection.Open după autentificare. Aceasta înseamnă că schema de autentificare a serverului este comună tuturor nodurilor virtuale de pe acel server. Cu toate acestea, schema de autorizare utilizată poate fi unică pentru fiecare gazdă virtuală. Acest lucru ar trebui să fie util pentru o infrastructură de găzduire partajată. Administratorii care necesită scheme de autentificare diferite pentru fiecare gazdă virtuală trebuie să utilizeze servere separate

Toate canalele dintr-o conexiune funcționează cu aceeași gazdă virtuală. Nu există nicio modalitate de a contacta o altă gazdă virtuală pe aceeași conexiune și nu există nicio modalitate de a comuta la o altă gazdă virtuală fără a întrerupe conexiunea și a începe de la capăt.

Protocolul nu oferă niciun mecanism pentru crearea sau configurarea gazdelor virtuale - acest lucru se face într-un mod nespecificat în cadrul serverului și depinde în întregime de implementare.

Schimburi

Exchange este un agent de rutare a mesajelor în interiorul unei gazde virtuale. Instanța de schimb (la care ne referim în mod obișnuit ca „schimb”) primește mesaje și informații de rutare - în principal cheia de rutare - și fie transmite mesajele către cozile de mesaje, fie către serviciile interne. Schimburile sunt denumite pe bază de gazdă virtuală.

Aplicațiile sunt libere să creeze, să partajeze și să distrugă instanțe de schimb aflate sub autoritatea lor.

Schimburile pot fi permanente, temporare sau șterse automat. Schimburile permanente există până când sunt eliminate. Schimburile temporare există până când serverul este oprit. Schimburile șterse automat există până când nu mai sunt folosite.

Serverul oferă un set specific de tipuri de schimb. Fiecare tip de schimb implementează o mapare și un algoritm specific, așa cum este definit în secțiunea următoare. AMQP prescrie un număr mic de tipuri de schimb și recomandă încă câteva. În plus, fiecare implementare de server poate adăuga propriile tipuri de schimb.

Un schimb poate direcționa un singur mesaj către mai multe cozi de mesaje în paralel. Acest lucru creează mai multe instanțe de mesaj care sunt consumate independent unele de altele.

Tipul de schimb direct

Tipul de schimb direct funcționează astfel:

  1. Coada de mesaje este mapată la serverul de schimb folosind cheia de rutare K.
  2. Editorul trimite un mesaj de schimb cu cheia de rutare R.
  3. Mesajul este trimis în coada de mesaje dacă K = R.

Serverul TREBUIE să implementeze o schimbare directă și TREBUIE să predefinite în fiecare gazdă virtuală cel puțin două schimburi directe: unul numit amqp.direct și unul fără nume public care servește ca schimb implicit pentru gestionarea metodelor publice.

Rețineți că cozile de mesaje pot fi contactate folosind orice valoare validă a cheii de rutare, dar cel mai adesea cozile de mesaje vor fi contactate folosind propriul nume ca cheie de rutare.

În special, toate cozile de mesaje TREBUIE să fie legate automat la un schimb fără nume public, folosind numele cozii de mesaje ca cheie de rutare.

Tipul de schimb Fanout

Fanout Exchange funcționează astfel:

  1. Coada de mesaje este legată de serverul de schimb fără niciun argument.
  2. editorul trimite un mesaj la schimb.
  3. Mesajul este transmis în coada de mesaje necondiționat.

Fanout Exchange este banal de proiectat și implementat. Acest tip de schimb și numele predeclarat amq.fanout sunt necesare.

Tipul de schimb de subiecte

Schimbul de subiecte funcționează astfel:

  1. Coada de mesaje este legată de serverul de schimb folosind modelul de rutare P.
  2. Editorul trimite un mesaj de schimb cu cheia de rutare R.
  3. Mesajul este trimis în coada de mesaje dacă R se potrivește cu P.

Cheia de rutare folosită pentru Topic Exchange trebuie să fie compusă din cuvinte separate prin puncte. Dimensiunea minimă a cuvântului este de 0 caractere. Fiecare cuvânt poate conține literele AZ și az, precum și numerele 0-9.

Modelul de rutare urmează aceleași reguli ca și cheia de rutare, cu adăugarea că * se potrivește cu un cuvânt și # se potrivește cu zero sau mai multe cuvinte. Astfel, schema de rutare *.stock.# se potrivește cu cheile de rutare usd.stock și eur.stock.db, dar nu stock.nasdaq.

O schemă sugerată pentru Topic Exchange este de a păstra un set de toate cheile de rutare cunoscute și de a-l actualiza atunci când editorii folosesc noi chei de rutare. Puteți defini toate legăturile pentru o anumită cheie de rutare și, astfel, puteți găsi rapid cozile de mesaje pentru un mesaj. Acest tip de schimb este opțional.

Serverul trebuie să implementeze tipul de schimb de subiecte, caz în care serverul trebuie să declare mai întâi cel puțin un schimb de subiecte denumit amq.topic în fiecare gazdă virtuală.

Tipul de schimb de anteturi

tipul de schimb de anteturi funcționează astfel:

  1. Coada de mesaje este obligată să facă schimb cu un tabel de argumente care conține antetele care ar trebui să fie potrivite pentru această legare și, opțional, valorile pe care ar trebui să le conțină. Cheia de rutare nu este utilizată.
  2. Editorul trimite un mesaj către schimb, unde proprietatea antete conține un tabel cu nume și valori.
  3. Mesajul este transmis în coadă dacă proprietatea antete se potrivește cu argumentele cu care a fost asociată coada.

Algoritmul de potrivire este controlat de un argument de legare special transmis ca o pereche nume-valoare în tabelul de argumente. Acest argument se numește „X-match”. Poate lua una dintre cele două valori, dictând modul în care alte perechi de valori de nume din tabel sunt gestionate în timpul potrivirii:

  • „toate” implică faptul că toate celelalte perechi trebuie să corespundă proprietății antete a mesajului pentru ca acest mesaj să fie redirecționat (ȘI)
  • „oricare” implică faptul că mesajul trebuie redirecționat dacă oricare dintre câmpurile din proprietatea antete se potrivește cu unul dintre câmpurile din tabelul de argumente (SAU)

Un câmp din argumentele de legare se potrivește cu un câmp din mesaj dacă următoarea condiție este adevărată: fie câmpul din argumentele de legare nu are valoare și câmpul cu același nume este prezent în antetele mesajului, fie dacă câmpul din legarea arguments are o valoare și câmpul cu același nume există în mesajele antete și are același sens.

Orice câmp care începe cu „x -”, altul decât „X-match” este rezervat pentru utilizare ulterioară și va fi ignorat

Serverul TREBUIE să implementeze tipul de schimb de anteturi, iar serverul TREBUIE să predeclare cel puțin un tip de schimb de anteturi numit amq.match în fiecare gazdă virtuală.

Tipul de schimb de sistem

Tipul de schimb de sistem funcționează astfel:

  1. Editorul trimite un mesaj de schimbat cu cheia de rutare S.
  2. Schimbul de sistem îl trimite către serviciul de sistem S.

Servicii de sistem care încep cu „amq”. rezervat AMQP. Toate celelalte nume pot fi folosite. Acest tip de schimb este opțional.

Tipuri de schimb personalizate

Toate numele de tip de schimb personalizat trebuie să înceapă cu „x -”. Tipurile de schimb care nu încep cu „x -” sunt rezervate pentru utilizare ulterioară în standardul AMQP.

Cozi de mesaje

O coadă de mesaje este un FIFO numit care conține mesaje de la aplicații. Aplicațiile sunt libere să creeze, să partajeze, să utilizeze și să distrugă cozile de mesaje aflate sub autoritatea lor.

Rețineți că atunci când există mai mulți cititori dintr-o coadă, sau tranzacții cu clientul, sau utilizând câmpuri prioritare, sau utilizând selectoare de mesaje sau optimizarea livrării specifice implementării, coada poate să nu aibă caracteristici FIFO adevărate. Singura modalitate de a garanta FIFO este să aveți un singur consumator conectat la coadă. În aceste cazuri, coada poate fi descrisă ca „fifo slab”.

Cozile de mesaje pot fi permanente, temporare sau șterse automat. Cozile de mesaje persistente există până când sunt eliminate. Cozile temporare de mesaje există până când serverul este oprit. Cozile de ștergere automată durează până când nu mai sunt utilizate.

Cozile de mesaje își stochează mesajele în memorie, pe disc sau o combinație a celor două. Cozile de mesaje sunt denumite pe baza gazdei virtuale.

Cozile de mesaje conțin mesaje și le distribuie unuia sau mai multor clienți consumatori. Un mesaj trimis la o coadă de mesaje nu este niciodată trimis la mai mult de un client decât dacă a fost respins

O singură coadă de mesaje poate conține diferite tipuri de conținut simultan și independent. Adică, dacă conținutul principal și conținutul fișierului sunt trimise la aceeași coadă de mesaje, acestea vor fi livrate aplicațiilor consumatoare în mod independent, la cerere.

Legături

Legarea este conexiunea dintre coada de mesaje și schimbul de date. Legarea definește argumentele de rutare care spun schimbului ce mesaje ar trebui să primească coada. Aplicațiile creează și distrug legături după cum este necesar pentru a direcționa fluxul de mesaje către cozile lor de mesaje. Durata de viață a unei legături depinde de cozile de mesaje pentru care sunt definite - atunci când o coadă de mesaje este distrusă, legarea ei este, de asemenea, distrusă. Semantica specifică a metodei Queue.Bind depinde de tipul de schimb.

Consumatori - consumatori

Folosim termenul consumator pentru a ne referi atât la aplicația client, cât și la entitatea care controlează modul în care o anumită aplicație client primește mesaje din coada de mesaje. Când un client „pornește un consumator”, creează o entitate consumator pe server. Când un client „anulează un consumator”, acesta distruge entitatea consumatorului de pe server. Consumatorii aparțin aceluiași canal de client și forțează coada de mesaje să trimită mesaje clientului în mod asincron.

Calitatea serviciului

Calitatea serviciului determină viteza cu care sunt trimise mesajele. Calitatea serviciului depinde de tipul de conținut distribuit. În general, QoS folosește conceptul de „prefetch” pentru a indica câte mesaje sau câți octeți de date vor fi trimiși înainte ca clientul să confirme mesajul. Scopul este de a trimite datele mesajului în avans pentru a reduce latența.

Mulțumiri

O confirmare este un semnal formal de la o aplicație client către coada de mesaje că a procesat cu succes un mesaj. Există două modele posibile de validare:

  1. Automată - în care serverul elimină conținutul din coada de mesaje imediat ce acesta este livrat în aplicație (folosind metodele Deliver sau Get-Ok).
  2. Explicit - în care aplicația client trebuie să trimită metoda Ack pentru fiecare mesaj sau lot de mesaje pe care le-a procesat

Straturile de clienți pot implementa ei înșiși confirmări explicite în diferite moduri, cum ar fi imediat după primirea unui mesaj sau când o aplicație indică că l-a procesat. Aceste diferențe nu afectează AMQP sau interoperabilitatea.

controlul fluxului

Controlul fluxului este o procedură de urgență utilizată pentru a opri fluxul de mesaje de la un egal. Funcționează în același mod între client și server și este implementat de comanda Channel.Flow. Controlul fluxului este singurul mecanism care poate opri un editor supraproductiv. Un Consumator poate folosi un mecanism de preluare a ferestrei mai elegant dacă folosește Confirmări (ceea ce înseamnă de obicei utilizarea tranzacțiilor).

Convenția de denumire

Aceste convenții guvernează denumirea entităților AMQP. Serverul și clientul trebuie să respecte aceste convenții:

  • Schimburile personalizate trebuie să înceapă cu prefixul „x-”.
  • Instanțele de schimb standard trebuie să înceapă cu prefixul „amq”.
  • Serviciile standard de sistem trebuie să înceapă cu prefixul „amq”.
  • Cozile de mesaje standard trebuie să înceapă cu prefixul „amq”.

Specificația comenzilor AMQP (clase și metode)

Note

Metodele AMQP pot defini valori minime (cum ar fi numărul de consumatori dintr-o coadă de mesaje) din motive de compatibilitate. Aceste minime sunt definite în descrierea fiecărei clase.

Implementările AMQP conforme ar trebui să implementeze valori destul de generoase pentru astfel de câmpuri, minimele sunt menite să fie folosite doar pe platformele cele mai puțin capabile.

Gramaticile folosesc această notație:

  • „S:” indică datele sau metoda trimise de la server către client;
  • „C:” indică datele sau metoda trimise de la client către server;
  • +condiție sau +(...) expresie înseamnă „1 sau mai multe instanțe”;
  • *condiția sau expresia *(...) înseamnă „zero sau mai multe instanțe”.

Definim metodele ca:

  • cerere sincronă („cerere de sincronizare”). Gazda care trimite trebuie să aștepte o anumită metodă de răspuns, dar poate implementa aceasta în mod asincron;
  • răspuns sincron („răspuns sincronizat pentru XYZ”);
  • cerere sau răspuns asincron ("async")

Specificație tehnică

Numerele de port definite de IANA

Numărul standard de port AMQP a fost atribuit de IANA ca 5672 atât pentru TCP, cât și pentru UDP. Port UDP rezervat pentru utilizare în viitoare implementări multicast

Format la nivel de fir pentru AMQP

Protocolul oficial de gramatică

Oferim o gramatică completă pentru AMQP (aceasta este furnizată pentru referință și s-ar putea să fiți mai interesat să urmăriți secțiunile care detaliază diferitele tipuri de cadre și formatele acestora):

amqp = protocol - antet * amqp - unitate protocol - antet = literal - protocol AMQP - protocol id - versiune literal - AMQP = % d65 .77.81.80 ; „AMQP” protocol - id = % d0 ; Trebuie să fie 0 protocol - versiune = % d0.9.1 ; _ 0-9-1 metoda = metoda - cadru [ continut ] metoda - cadru = % d1 cadru - metoda proprietati - cadru de sarcina utila - final cadru - proprietăți = sarcina utilă a canalului - dimensiune canal = short - uint ; non - zero sarcină utilă - dimensiune = lung - uint metoda - sarcina utilă = clasă - metoda id - id * amqp - câmp clasa - id = % x00 .01 - % xFF . FF metoda - id = % x00 .01 - % xFF . FF amqp - câmp = BIT / OCTET / scurt - uint / lung - uint / lung - lung - uint / scurt - șir / lung - șir / marca temporală / câmp - tabel scurt - uint = 2 * OCTET lung - uint = 4 * OCTET long - long - uint = 8 * OCTET short - string = OCTET * string - char ; lungime + continut șir - char = % x01 .. % xFF long - string = long - uint * OCTET ; lungime + continut timestamp = long - long - uint ; 64 - biți POSIX câmp - tabel = lung - uint * câmp - valoare - pereche câmp - valoare - pereche = câmp - nume câmp - valoare câmp - nume = scurt - șir câmp - valoare = 't' boolean / 'b' scurt - scurt - int / 'B' scurt - scurt - uint / 'U' scurt - int / 'u' scurt - uint / 'I' lung - int / 'i' lung - uint / 'L' lung - lung - int / 'l' long - long - uint / 'f' plutesc / 'd' dublu / „D” zecimală - valoare / 's' scurt - șir / „S” lung - șir / Câmp „A” - matrice / Marca temporală „T”. / Câmp „F” - tabel / 'V' ; nici un câmp boolean = OCTET ; 0 = FALS , altfel ADEVĂRAT scurt - scurt - int = OCTET scurt - scurt - uint = OCTET scurt - int = 2 * OCTET lung - int = 4 * OCTET lung - lung - int = 8 * OCTET float = 4 * OCTET ; IEEE -754 dublu = 8 * OCTET ; rfc1832 XDR dublu zecimală - valoare = scară lungă - uint scara = OCTET ; numărul de cifre zecimale field - array = long - int * field - value ; matrice de valori cadru - sfârşit = % xCE continut = % d2 continut - antet * continut - corp conținut - antet = cadru - antet proprietăți - cadru de sarcină utilă - final antet - sarcină utilă = conținut - conținut de clasă - conținut de greutate - corp - dimensiune proprietate - steaguri proprietate - listă continut - clasa = octet continut - greutate = % x00 continut - corp - marime = lung - lung - uint proprietate - steaguri = 15 * BIT % b0 / 15 * BIT % b1 proprietate - steaguri proprietate - listă = * amqp - câmp conținut - corp = % d3 cadru - proprietăți corp - cadru de sarcină utilă - sfârșit corp - sarcină utilă = * OCTET bataia inimii = % d8 % d0 % d0 frame - final


Folosim sintaxa BNF extinsă definită în IETF RFC 2234. În concluzie:

  • Numele regulii este doar numele în sine
  • Terminalele sunt specificate prin unul sau mai multe caractere numerice, cu interpretarea de bază a acelor caractere notate ca „d” sau „x”
  • O regulă poate defini un șir simplu ordonat de valori prin enumerarea unei secvențe de nume de reguli
  • O gamă de valori numerice alternative poate fi specificată compact, folosind o liniuță ( " - " ) pentru a indica intervalul
  • Elementele cuprinse între paranteze sunt tratate ca un singur element al cărui conținut este strict ordonat.
  • Elementele separate printr-o bară oblică ( " / " ) sunt alternative.
  • Operatorul * care precede un element indică repetarea. Forma lungă: „<a>*< b>element”, unde <a> și <b> sunt valori zecimale opționale, cel puțin <a> și cel mult <b> apariții ale elementelor.
Antet protocol

Clientul trebuie să înceapă o nouă conexiune prin trimiterea antetului protocolului. Aceasta este o secvență de 8 octeți:

+---+---+---+---+---+---+---+---+ | „A” | „M” | „Q” | „P” | 0 | 0 | 9 | 1 | +---+---+---+---+---+---+---+---+ 8 octeți

Antetul protocolului este format din literele mari „AMQP” urmate de constanta %d0 urmată de:

  1. Versiunea principală a protocolului utilizată în conformitate cu secțiunea 1.4.2. (versiunea de documentație dezactivată 0-9-1)
  2. Versiune minoră a protocolului utilizată în conformitate cu secțiunea 1.4.2. (versiunea de documentație dezactivată 0-9-1)
  3. Revizuirea protocolului utilizată în conformitate cu secțiunea 1.4.2. (versiunea de documentație dezactivată 0-9-1)

Modelul de negociere a protocolului este compatibil cu protocoalele existente precum HTTP, care inițiază o conexiune cu un șir de text constant și cu firewall-uri, care urmăresc începutul unui protocol pentru a decide ce reguli să i se aplice.

Clientul și serverul negociază protocolul și versiunea după cum urmează:

  • Clientul deschide o nouă conexiune socket la serverul AMQP și trimite un antet de protocol.
  • Serverul fie acceptă, fie respinge antetul protocolului. Dacă respinge antetul de protocol, scrie un antet de protocol valid în socket și apoi închide socket-ul.
  • În caz contrar, lasă socket-ul deschis și implementează protocolul în mod corespunzător.

Exemplu:

Clientul trimite : Serverul răspunde : AMQP % d0 .0.9.1 Conexiune . metoda de pornire AMQP % d0 .1.0.0 AMQP % d0 .0.9.1 < Închidere conexiune > HTTP AMQP % d0.0.9.1 < Închidere conexiune > _

Principii de implementare a protocolului:

  • Serverul poate accepta protocoale non-AMQP, cum ar fi HTTP
  • Dacă serverul nu recunoaște primii 5 octeți de date de pe socket sau nu acceptă versiunea particulară de protocol pe care o solicită clientul, trebuie să scrie un antet de protocol valid pe socket, apoi să șterge socket-ul (pentru a se asigura că clientul aplicația primește datele) și apoi închideți conexiunea cu socket. Serverul poate imprima un mesaj de diagnosticare în scopuri de depanare.
  • Un client poate determina versiunea de protocol a serverului încercând să se conecteze la cea mai înaltă versiune acceptată și să se reconecteze la o versiune inferioară dacă primește astfel de informații înapoi de la server.
  • Clienții și serverele care implementează mai multe versiuni de AMQP TREBUIE să folosească toți cei opt octeți ai antetului protocolului pentru a identifica protocolul.


Format de bază de cadru

Toate cadrele încep cu un antet de 7 octeți constând dintr-un câmp de tip (octet), un câmp de canal (întreg scurt) și un câmp de lungime (întreg lung):

0 1 3 7 mărime + 7 mărime + 8 +------+---------+---------+ +-------------+ +------ -----+ | tip | canal | dimensiune | | sarcină utilă | | cadru - capăt | +------+---------+---------+ +-------------+ +------ -----+ octet scurt lung dimensiune octet octet

AMQP definește următoarele tipuri de cadre:

  • Tip = 1, „METODA”: cadru de metodă.
  • Tip = 2, „HEADER”: cadru antet conținut
  • Tip = 3, „BODY”: cadru de corp de conținut.
  • Tip = 4, „HEARTBEAT”: cadru ritm cardiac.

Numărul canalului este 0 pentru toate cadrele care sunt globale pentru conexiune și 1-65535 pentru cadrele care se referă la anumite canale.

Câmpul de dimensiune este dimensiunea încărcăturii utile, excluzând octetul de sfârșit de cadru. În timp ce AMQP presupune un protocol conectat de încredere, folosim sfârșitul cadrului pentru a detecta erorile de încadrare cauzate de implementări incorecte de client sau server.

Principii de implementare a protocolului:

  • Octetul de sfârșit de cadru trebuie să aibă întotdeauna valoarea hexazecimală %xCE.
  • Dacă un peer primește un cadru cu un tip care nu este unul dintre aceste tipuri definite, ar trebui să trateze aceasta ca o eroare fatală de protocol și să închidă conexiunea fără a trimite alte date despre aceasta.
  • Când un peer citește un cadru, trebuie să verifice dacă sfârșitul cadrului este valid înainte de a încerca să decodeze cadrul. Dacă sfârșitul cadrului nu este valid, ar trebui să trateze acest lucru ca pe o eroare fatală de protocol și să închidă conexiunea fără a trimite alte date despre aceasta. Ar trebui să înregistreze informații despre problemă, deoarece aceasta indică o eroare în implementarea codului de încadrare al serverului sau al clientului.
  • Un peer NU TREBUIE să trimită cadre mai mari decât dimensiunea negociată. Un peer care primește un cadru care este prea mare TREBUIE să semnaleze o excepție de conexiune cu un cod de răspuns 501 (eroare de cadru).
  • Numărul canalului trebuie să fie zero pentru toate cadrele de bătăi inimii și pentru cadrele de metodă, antet și corp care se referă la clasa Connection. Un peer care primește un număr de canal diferit de zero pentru unul dintre aceste cadre TREBUIE să semnaleze o excepție de conexiune cu un cod de răspuns 503 (Comandă Invalidă).
Încărcături utile metodei

Corpurile de cadre ale metodei constau dintr-o listă invariantă de câmpuri de date numite „argumente”. Toate corpurile metodei încep cu identificatori de clasă și metodă:

0 2 4 +----------+-----------+-------------- - - | clasa - id | metoda - id | argumente ... +----------+-----------+-------------- - - scurt scurt ...

Principii de implementare a protocolului:

  • Id-ul clasei și id-ul metodei sunt constante definite în specificațiile clasei și metodei AMQP.
  • Argumentele sunt un set de câmpuri AMQP specifice fiecărei metode
  • Identificatorul de clasă de valori %x00.01 până la %xEF.FF sunt rezervate pentru clasele standard AMQP.
  • ID-urile de clasă de la %xF0.00 la %xFF.FF (%d61440-%d65535) pot fi utilizate atunci când sunt implementate pentru clase de extensie non-standard.
Câmpurile de date AMQP

AMQP are două niveluri de specificare a câmpurilor de date: câmpuri de date native utilizate pentru argumentele metodei și câmpuri de date transmise între aplicații în tabelele de câmp. Tabelele de câmpuri conțin un set superscript de câmpuri de date native.

Întregi

AMQP definește următoarele tipuri de numere întregi native:

  • Octet nesemnat (8 biți).
  • Numerele întregi scurte fără semn (16 biți).
  • Numerele întregi lungi fără semn (32 de biți).
  • Numerele întregi lungi lungi fără semn (64 de biți).

Numerele întregi și lungimile șirurilor sunt întotdeauna nesemnate și stocate în ordinea octeților de rețea. Nu încercăm să optimizăm cazul în care două sisteme low-high (de exemplu două procesoare Intel) vorbesc între ele.

Principii de implementare a protocolului:

  • Designerii nu ar trebui să presupună că numerele întregi codificate într-un cadru sunt aliniate pe limitele cuvintelor de memorie.
Biți

AMQP își definește propriul tip de câmp de biți. Biții se acumulează în octeți întregi. Când doi sau mai mulți biți se ating într-un cadru, aceștia vor fi împachetati în unul sau mai mulți octeți, începând cu bitul cel mai mic din fiecare octet. Nu există nicio cerință ca toate valorile biților dintr-un cadru să fie învecinate, dar acest lucru se face de obicei pentru a minimiza dimensiunile cadrului.

Șiruri

Șirurile AMQP au lungime variabilă și sunt reprezentate de o lungime întreagă urmată de zero sau mai mulți octeți de date. AMQP definește două tipuri de rânduri native:

  • Șiruri scurte stocate ca o lungime întreagă fără semn de 8 biți, urmată de zero sau mai mulți octeți de date. Șirurile scurte pot conține până la 255 de octeți de date UTF-8, dar nu pot conține octeți binari nuli.
  • Șiruri lungi stocate ca o lungime întreagă fără semn de 32 de biți, urmată de zero sau mai mulți octeți de date. Șirurile lungi pot conține orice date
Marcaje temporale

Marcajele temporale sunt stocate în formatul POSIX time_t pe 64 de biți cu o precizie de o secundă. Folosind 64 de biți, evităm problemele viitoare de împachetare asociate cu valorile time_t de 31 de biți și 32 de biți.

Marginile tabelului

Câmpurile de tabel sunt șiruri lungi care conțin perechi nume-valoare împachetate. Perechile de valori de nume sunt codificate ca un șir scurt care specifică numele și un octet care specifică tipul valorii, urmat de valoarea în sine. Tipurile de câmpuri valide pentru tabele sunt extensii ale tipurilor native întreg, bit, șir și timestamp și sunt afișate în gramatică. Câmpurile întregi cu octeți multipli sunt întotdeauna stocate în ordinea octeților de rețea.

Principii de implementare a protocolului:

  • Numele câmpurilor trebuie să înceapă cu litera „$” sau „#” și pot continua cu literele „$” sau „#”, cifre sau caractere de subliniere, până la o lungime maximă de 128 de caractere.
  • Serverul TREBUIE să valideze numele câmpurilor, iar când primește un nume de câmp invalid, TREBUIE să semnaleze o excepție de conexiune cu un cod de răspuns 503 (eroare de sintaxă).
  • Valorile zecimale nu sunt concepute pentru a suporta valori în virgulă mobilă, ci pentru a suporta valori comerciale în virgulă fixă, cum ar fi ratele valutare și sumele. Ele sunt codificate ca un octet reprezentând numărul de locuri urmat de un întreg lung semnat. Octet "zecimale" - nesemnat.
  • Câmpurile duplicat sunt ilegale. Comportamentul unui peer în raport cu un tabel care conține câmpuri duplicat este nedefinit.
Decuparea conținutului

Anumite metode specifice (Publish, Deliver etc.) procesează conținutul. Vă rugăm să consultați capitolul „Specificații funcționale” pentru specificațiile fiecărei metode. Metodele care procesează conținutul fac acest lucru necondiționat.

Conținutul constă dintr-o listă de 1 sau mai multe cadre, după cum urmează:

  1. Exact un cadru de antet de conținut care oferă proprietăți pentru conținut.
  2. Opțional, unul sau mai multe cadre de corp de conținut

Cadrele de conținut de pe un anumit canal sunt strict secvențiale. Adică pot fi amestecate cu cadre pentru alte canale, dar nu se pot amesteca două cadre de conținut de pe același canal și nu se pot „suprapune” unul pe altul, iar cadrele de conținut pentru același conținut nu pot fi amestecate cu cadre de metodă pe același canal . . (orig. Cadrele de conținut de pe un anumit canal sunt strict secvențiale. Adică pot fi amestecate cu cadre pentru alte canale, dar nu pot fi amestecate sau suprapuse două cadre de conținut de pe același canal și nici cadrele de conținut pentru un singur conținut nu pot fi amestecat cu cadre de metodă pe același canal.)

Rețineți că orice cadru non-conținut marchează în mod explicit sfârșitul conținutului. În timp ce dimensiunea conținutului este bine cunoscută din antetul conținutului (și, prin urmare, numărul de cadre de conținut), acest lucru permite expeditorului să abandoneze conținutul fără a fi nevoit să închidă canalul.

Principii de implementare a protocolului:

  • Un peer care primește conținut incomplet sau prost formatat ar trebui să arunce o excepție de conexiune cu un cod de răspuns 505 (cadru neașteptat). Acestea includ antete de conținut lipsă, ID-uri de clasă incorecte în anteturile de conținut, cadre de corp de conținut lipsă etc.
Titlul conținutului

Antetul încărcăturii utile de conținut are următorul format:

0 2 4 12 14 +----------+--------+-----------+----------------+ ------------- - - | clasa - id | greutate | dimensiunea corpului | steaguri de proprietate | lista de proprietati ... +----------+--------+-----------+----------------+ ------------- - - scurt scurt lung lung scurt rest ...

Principii de implementare a protocolului:

  • ID-ul clasei trebuie să se potrivească cu ID-ul clasei cadrului metodei. Peer-ul trebuie să răspundă la un identificator de clasă nevalid, lansând o excepție de conexiune cu un cod de răspuns 501 (eroare de cadru).
  • Câmpul de greutate nu este utilizat și trebuie să fie zero.
  • Dimensiunea corpului este o valoare de 64 de biți care specifică dimensiunea totală a corpului conținutului, care este suma dimensiunilor corpului următoarelor cadre ale corpului conținutului. Zero indică că nu există cadre de corp de conținut.
  • Indicatorii de proprietate sunt o matrice de biți care indică prezența sau absența fiecărei valori de proprietate din secvență. Biții sunt ordonați de la cel mai mare la cel mai mic. Bit 15 indică prima proprietate.
  • Indicatoarele de proprietate pot specifica mai mult de 16 proprietăți. Dacă ultimul bit (0) este setat, înseamnă că este urmat de un alt câmp de indicatori de proprietate. Există multe câmpuri de semnalizare a proprietății.
  • Valorile proprietăților sunt câmpuri de date AMQP specifice clasei.
  • Proprietățile biților sunt indicate doar de indicatorul de proprietate corespunzător (1 sau 0) și nu sunt niciodată prezente în lista de proprietăți.
  • Numărul canalului din cadrele de conținut nu trebuie să fie zero. Un peer care primește un număr de canal zero într-un cadru de conținut TREBUIE să semnaleze o excepție de conexiune cu un cod de răspuns 504 (eroare de canal).
Corpul conținutului

Sarcina utilă a corpului de conținut este un bloc binar „opac” care se termină cu un octet de sfârșit de cadru:

+----------------------+ +-----------+ | Sarcină utilă binară opac | | cadru - capăt | +----------------------+ +-----------+

Corpul de conținut poate fi împărțit în câte cadre este necesar. Dimensiunea maximă a sarcinii utile a cadrului este negociată de ambii colegi în timpul negocierii conexiunii.

Principii de implementare a protocolului:

  • Peer-ul trebuie să proceseze corpul de conținut care este împărțit în mai multe cadre, stocând acele cadre ca un singur set și fie retransmițându-le ca atare, împărțindu-le în cadre mai mici, fie îmbinându-le într-un singur bloc pentru livrare către aplicație.
cadre de bătăi ale inimii

Cadrele Heartbeat îi spun destinatarului că expeditorul este încă în viață. Frecvența și sincronizarea cadrelor Heartbeat sunt negociate în timpul configurării conexiunii.

Principii de implementare a protocolului:

  • Cadrele Heartbeat trebuie să aibă un număr de canal zero. Un peer care primește un cadru Heartbeat invalid TREBUIE să arunce o excepție de conexiune cu un cod de răspuns 501 (Eroare de cadru).
  • Dacă peer-ul nu acceptă Heartbeat, TREBUIE să renunțe la cadrul Heartbeat fără a semnala vreo eroare sau defecțiune.
  • Clientul ar trebui să înceapă să trimită Heartbeat după ce a primit metoda Connection.Tune și să înceapă să monitorizeze Heartbeat după ce a primit Connection.Open. Serverul ar trebui să înceapă să trimită și să monitorizeze Heartbeat după ce a primit Connection.Tune-Ok
  • Nodul trebuie să depună toate eforturile pentru a trimite Heartbeat la anumite intervale. Bătăile inimii pot fi trimise în orice moment. Orice octet trimis este un înlocuitor valid de Heartbeat, așa că Heartbeats trebuie trimis numai dacă traficul AMQP fără Heartbeat nu este trimis pentru mai mult de un interval Heartbeat. Dacă peer-ul nu detectează trafic de intrare (adică, octeți primiți) pentru două sau mai multe intervale Heartbeat, TREBUIE să închidă conexiunea fără să apeleze Connection.Close/Close-Ok handshaking și să înregistreze eroarea
  • Bătăile inimii ar trebui să continue până când priza este închisă, inclusiv în timpul și după conectare. Închidere/Închidere-Ok acordarea de mână

Multiplexarea canalelor

AMQP permite colegilor să creeze mai multe fluxuri independente de control. Fiecare canal acționează ca o conexiune virtuală care împarte un socket:

rame rame rame rame +-----------+-----------+-----------+-----------+ | canal | canal | canal | canal | +-----------+-----------+-----------+-----------+ | priză | +------------------------------------------------- ----+

Principii de implementare a protocolului:

  • Un peer AMQP POATE suporta mai multe canale. Numărul maxim de canale este determinat la negocierea unei conexiuni, iar un peer poate negocia acest număr până la 1.
  • Fiecare peer TREBUIE să echilibreze traficul pe toate canalele deschise într-un mod corect. Această echilibrare se poate face pe cadru sau pe baza volumului de trafic pe canal. Un peer NU TREBUIE să permită unui canal foarte ocupat să limiteze progresul unui canal mai puțin ocupat.

Vizibilitate garantată

Serverul trebuie să se asigure că observațiile clientului privind starea serverului sunt consecvente.

Următorul exemplu arată ce înseamnă observarea clientului în acest context:

  • Clientul 1 și clientul 2 sunt conectați la aceeași gazdă virtuală
  • Clientul 1 declară o coadă
  • Clientul 1 primește Declare.Ok
  • Clientul 1 îi spune clientului 2 despre asta
  • Clientul 2 face o declarație pasivă pentru aceeași coadă

Garanția de vizibilitate asigură că clientul 2 vede coada

Închiderea canalului

Serverul va considera canalul închis dacă se întâmplă oricare dintre următoarele:

  • Fie peer-ul închide canalul, fie conexiunea părinte folosind strângerea de mână Închidere/Închidere-Ok
  • Fie peer-ul lansează o excepție pe canal, fie conexiunea părinte.
  • Fie nodul închide conexiunea părinte fără strângere de mână Închidere/Închidere-Ok

Când serverul închide un canal, toate mesajele neconfirmate de pe canal sunt marcate pentru relivrare. Când serverul închide o conexiune, elimină toate entitățile șterse automat care aparțin acelei conexiuni.

Sincronizare conținut

În unele cazuri, metodele sincrone cerere-răspuns afectează livrarea asincronă a conținutului pe același canal, inclusiv:

  • Metode Basic.Consume și Basic.Anulează care pornesc și opresc fluxul de mesaje din coada de mesaje
  • metoda Basic.Recover care solicită relivrarea mesajelor către canal
  • Metode Queue.Bind, Queue.Unbind și Queue.Purge care afectează fluxul de mesaje direcționate către coada de mesaje

Principii de implementare a protocolului:

  • Efectele cererii-răspuns nu ar trebui să fie vizibile pe canal înainte de metoda de răspuns și ar trebui să fie vizibile după aceasta.

Garanția comenzii conținutului

Ordinea în care metodele trec printr-un canal este stabilă: metodele sunt primite în aceeași ordine în care sunt trimise. La nivelul de transport, acest lucru este asigurat de protocolul TCP/IP. În plus, conținutul este procesat stabil de server. În special, conținutul care urmează aceeași cale în cadrul serverului va rămâne ordonat. Pentru conținutul cu o anumită prioritate care trece printr-o singură cale, definim calea de procesare a conținutului ca fiind formată dintr-un canal de intrare, un schimb, o coadă și un canal de ieșire.

Principii de implementare a protocolului:

  • Serverul TREBUIE să păstreze ordinea conținutului care trece printr-o singură cale de procesare a conținutului, cu excepția cazului în care câmpul de relivrare a fost modificat în metodele Basic.Deliver sau Basic.Get-Ok și în conformitate cu regulile care definesc condițiile în care câmpul poate fi setat.

Gestionarea erorilor

Excepții

Folosind modelul standard de programare „excepții”, AMQP nu semnalează succes, ci doar eșec. AMQP definește două niveluri de excluderi:

  1. Excluderi de canale. Ei închid canalul care a cauzat eroarea. Excepțiile de canal se datorează de obicei erorilor „soft” care nu afectează restul aplicației.
  2. Excepții de conectare . Acestea închid conexiunea la soclu și se datorează de obicei unor erori „gre”, care indică o eroare de programare, o configurație proastă sau un alt eveniment care necesită atenție.

Documentăm în mod oficial afirmațiile în definiția fiecărei clase și metode.

Format cod de răspuns

Codurile de răspuns AMQP urmează definiția „Severități și teorie a codului de răspuns” din IETF RFC 2821.

Implementări

Caracteristicile protocolului AMQP

  • Șirurile din AMQP sunt sensibile la majuscule
  • Convenție de versiune - numărul versiunii este format din două sau trei cifre: major.minor.revision În acest caz, revizuirea este opțională. Numerele pot lua valori de la 0 la 99. Numerele de la 100 și mai sus sunt rezervate pentru uz intern. Versiunea 1.1 este echivalentă cu versiunea 1.1.0

Note

  1. Către un middleware pentru întreprinderi de mărfuri . Consultat la 14 iunie 2010. Arhivat din original pe 5 martie 2010.

Literatură

  • Emrah Ayanoglu; Yusuf Aytas; Dotan Nahum. Stăpânirea RabbitMQ. - Editura Packt, 2016. - 286 p. — ISBN 978-1-78398-153-3 .

Link -uri