Nástroje implementující fronty zpráv

Presentations

Nástroje implementující fronty zpráv

Obsah přednášky

Message broker a fronty zpráv

Topic (téma)

Další často používané termíny v oboru MQ

Proč vůbec používat tuto technologii?

Modifikace architektury aplikací

Základní komunikační strategie

Příklady použití komunikačních strategií

Protokoly používané pro komunikaci

Další protokoly používané pro komunikaci

STOMP

Vybrané implementace message brokerů

Redis Queue (RQ)

Redis Queue (RQ) - vytvoření úlohy

from redis import Redis
from rq import Queue
  
from worker import do_work
  
  
q = Queue(connection=Redis())
  
for i in range(10):
    result = q.enqueue(do_work, i)
    print(result)

Redis Queue (RQ) - implementace workera

from time import sleep
  
  
def do_work(param):
    print("Working, received parameter {}".format(param))
    # simulace práce :-)
    sleep(2)
    print("Done")

Spuštění workera

$ rq worker
16:59:02 RQ worker 'rq:worker:localhost.32100' started, version 0.12.0
16:59:02 *** Listening on default...
16:59:02 Cleaning registries for queue: default

Povšimněte si použití výchozí fronty nazvané “defaut”

Redis Queue (RQ) - “padající” worker

from time import sleep
  
  
def do_work():
    print("Working")
    sleep(2)
    # výjimka je zachycena až samotným systémem RQ
    assert False
    print("Done")

Pád workera

16:59:03 default: worker.do_work() (c5468250-e2c5-494f-8bd8-f1f51b9a81f2)
Working
16:59:05 AssertionError
Traceback (most recent call last):
  File "/home/tester/.local/lib/python3.6/site-packages/rq/worker.py", line 793, in perform_job
    rv = job.perform()
  File "/home/tester/.local/lib/python3.6/site-packages/rq/job.py", line 599, in perform
    self._result = self._execute()
  File "/home/tester/.local/lib/python3.6/site-packages/rq/job.py", line 605, in _execute
    return self.func(*self.args, **self.kwargs)
  File "./worker.py", line 7, in do_work
    assert False
AssertionError
Traceback (most recent call last):
  File "/home/tester/.local/lib/python3.6/site-packages/rq/worker.py", line 793, in perform_job
    rv = job.perform()
  File "/home/tester/.local/lib/python3.6/site-packages/rq/job.py", line 599, in perform
    self._result = self._execute()
  File "/home/tester/.local/lib/python3.6/site-packages/rq/job.py", line 605, in _execute
    return self.func(*self.args, **self.kwargs)
  File "./worker.py", line 7, in do_work
    assert False
AssertionError
16:59:05 Moving job to 'failed' queue

Redis Queue (RQ) - informace o nezpracovaných úlohách

from redis import Redis
from rq import Queue
from time import sleep
  
from worker import do_work
  
# speciální fronta s úlohami, které nebyly dokončeny
q_failed = Queue("failed", connection=Redis())
  
print("Reading failed jobs")
  
job_ids = q_failed.job_ids
  
print(job_ids)
  
for job_id in job_ids:
    print(job_id)
    job = q_failed.fetch_job(job_id)
    # podrobnější informace o těchto úlohách
    print(job.origin)
    print(job.enqueued_at)
    print(job.started_at)
    print(job.ended_at)
    print(job.exc_info)

Redis Queue (RQ) - informace o nezpracovaných úlohách

Reading failed jobs
['62d5d473-cc31-4738-8397-7dd18b09fe64']
62d5d473-cc31-4738-8397-7dd18b09fe64
default
2018-11-28 16:24:45.094810
2018-11-28 16:24:45.103332
2018-11-28 16:24:47.107423
Traceback (most recent call last):
  File "/home/tester/.local/lib/python3.6/site-packages/rq/worker.py", line 793, in perform_job
    rv = job.perform()
  File "/home/tester/.local/lib/python3.6/site-packages/rq/job.py", line 599, in perform
    self._result = self._execute()
  File "/home/tester/.local/lib/python3.6/site-packages/rq/job.py", line 605, in _execute
    return self.func(*self.args, **self.kwargs)
  File "./worker.py", line 7, in do_work
    assert False
AssertionError

Opětovné spuštění zhavarovaných úloh

$ rq requeue 
  Requeueing 1 jobs from failed queue
  [####################################]  100%

Využití takzvaného burst režimu workerů

$ rq worker 

RQ - předávání parametrů workerům

from redis import Redis
from rq import Queue
  
from worker import do_work
  
  
q_low = Queue("low", connection=Redis())
q_high = Queue("high", connection=Redis())
  
  
for i in range(10):
    result = q_low.enqueue(do_work, i)
    result = q_high.enqueue(do_work, i)
    print(result)

RQ - Implementace workera akceptujícího parametr

from time import sleep
  
  
def do_work(param):
    print("Working, received parameter {}".format(param))
    sleep(2)
    print("Done")

Zjištění stavu front

$ rq info
low          |██████████ 10
failed       |██ 2
default      | 0
high         |██████████ 10
4 queues, 22 jobs total
  
localhost.4312 idle: default
1 workers, 4 queues
  
Updated: 2018-11-26 13:22:06.236766

Dashboard Redis Queue pro sledování stavu front a workerů

RabbitMQ

RabbitMQ

RabbitMQ

RabbitMQ

RabbitMQ a Python

Publisher pro RabbitMQ a Pika

import pika
  
# připojení k serveru RabbitMQ a vytvoření komunikačního kanálu
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
  
# žádost o vytvoření či o použití fronty „test“
channel.queue_declare(queue='test')
  
# poslání zprávy se strategií direct
channel.basic_publish(exchange='',
                      routing_key='test',
                      body='Hello World!')
  
print("Sent 'Hello World!'")
# uzavření komunikace
connection.close()

Subscriber pro RabbitMQ a Pika

#!/usr/bin/env python
import pika
  
# připojení k serveru RabbitMQ a vytvoření komunikačního kanálu
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
  
# pokus o nové vytvoření fronty ve skutečnosti neovlivní již existující frontu
# pokud samozřejmě existuje
channel.queue_declare(queue='test')
  
  
# callback funkce volaná při přijetí zprávy
def on_receive(ch, method, properties, body):
    print("Received %r" % body)
  
  
# přihlášení se k odebírání zpráv z fronty "test"
channel.basic_consume(on_receive,
                      queue='test',
                      no_ack=True)
  
print('Waiting for messages. To exit press CTRL+C')
print("...")
  
# smyčka s odebíráním zpráv
channel.start_consuming()

„Dělba práce“ mezi workery

channel.basic_qos(prefetch_count=1)
channel.basic_consume(on_receive,
                      queue='test',
                      no_ack=False)

Potvrzování zpráv

channel.basic_consume(on_receive,
                      queue='test',
                      no_ack=True)
channel.basic_consume(on_receive,
                      queue='test',
                      no_ack=False)
channel.basic_ack(delivery_tag = method.delivery_tag)

Manuální potvrzování zpráv v konzumentovi

from time import sleep
from rabbitmq_connect import connect
  
connection, channel = connect()
  
def on_receive(ch, method, properties, body):
    print("Received %r" % body)
    sleep(5)
    print("Done processing %r" % body)
    ch.basic_ack(delivery_tag = method.delivery_tag)
  
  
channel.basic_consume(on_receive,
                      queue='test',
                      no_ack=False)
  
print('Waiting for messages. To exit press CTRL+C')
print("...")
channel.start_consuming()

Příkaz rabbitmqctl

$ sudo rabbitmqctl node_health_check
Timeout: 70.0 seconds
Checking health of node rabbit@localhost
Health check passed
$ sudo rabbitmqctl list_users
Listing users
guest   [administrator]
$ sudo rabbitmqctl list_queues
Listing queues
t3      0
testX   0
test    0
t1      2
t2      0
$ sudo rabbitmqctl list_exchanges
Listing exchanges
amq.topic           topic
amq.rabbitmq.log    topic
amq.headers         headers
amq.fanout          fanout
                    direct <- výchozí směrovač beze jména
amq.rabbitmq.trace  topic
amq.match           headers
amq.direct          direct

Rozvětvení (fanout) zpráv do většího množství front

def use_fanout(channel, exchange_name='fanout_exchange'):
    print(exchange_name)
    channel.exchange_declare(exchange=exchange_name,
                             exchange_type='fanout')
  
use_fanout(channel)
bind_queue(channel, 'fronta1')
bind_queue(channel, 'fronta2')
bind_queue(channel, 'fronta3')
$ sudo rabbitmqctl list_queues
Listing queues
t3      0
testX   0
test    0
t1      2
t2      0
fronta2 1
fronta1 1
fronta3 1

Směrování zpráv do front na základě klíče a nakonfigurovaných výrazů

Výraz        Fronta
europe.*     europe_queue
asia.*       asia_queue
americas.*   americas_queue
*.org        org_queue
*.*.rabbit   rabbit_queue
#.other      other_queue
def bind_queue(channel, queue_name, routing_pattern='', exchange_name='fanout_exchange'):
    channel.queue_declare(queue=queue_name)
    channel.queue_bind(exchange=exchange_name,
                       queue=queue_name,
                       routing_key=routing_pattern)
  
    bind_queue(channel, 'europe_queue',
               routing_pattern='europe.*', exchange_name='topic_exchange')
    bind_queue(channel, 'asia_queue',
               routing_pattern='asia.*', exchange_name='topic_exchange')
    bind_queue(channel, 'americas_queue',
               routing_pattern='americas.*', exchange_name='topic_exchange')

Prioritní fronty

channel.queue_declare(queue=queue_name, arguments={"x-max-priority": max_priority})
  
def open_channel(connection, queue_name='test', max_priority=10):
    # pokus o nové vytvoření fronty ve skutečnosti neovlivní již existující frontu
    channel = connection.channel()
    channel.queue_declare(queue=queue_name, arguments={"x-max-priority": max_priority})
    return channel

Specifikace priority posílané zprávy

prop = BasicProperties(priority=priority)
  
channel.basic_publish(exchange='',
                      routing_key='priority_queue_2',
                      body='Hello World! #{i} msg with priority {p}'.format(i=i, p=priority),
                      properties=prop)

Apache Active MQ

Podporované protokoly

Apache Active MQ

Apache Active MQ + STOMP + Python

CLI klient stomp

$ stomp
CONNECTED
server: ActiveMQ/5.15.8
heart-beat: 0,0
session: ID:localhost.localdomain-38215-1549567803551-3:3
version: 1.1

CLI klient stomp

send /queue/test hello world
MESSAGE
content-length: 11
expires: 0
destination: /queue/test

CLI klient stomp

subscribe /queue/test
Subscribing to "/queue/test" with acknowledge set to "auto", id set to "1"
subscription: 1
priority: 4
message-id: ID:localhost.localdomain-38215-1549567803551-3:3:-1:1:1
timestamp: 1549631641256
   
hello world

Producent zpráv (stomp.py)

#!/usr/bin/env python
   
import time
import stomp
   
   
destination1 = "/topic/event"
destination2 = "/topic/event2"
  
MESSAGES = 10
 
conn = stomp.Connection(host_and_ports=[("localhost", 61613)])
conn.start()
conn.connect(login="admin", passcode="admin")
  
  
for i in range(0, MESSAGES):
    message = "Hello world #{i}!".format(i=i)
    conn.send(destination1, message, persistent='true')
    conn.send(destination2, message, persistent='true')
 
 
conn.disconnect()

Konzument zpráv (stomp.py)

#!/usr/bin/env python
  
import time
import stomp
  
  
class SimpleListener:
  
    def __init__(self, conn):
        self.conn = conn
  
    def on_message(self, headers, message):
        print("Received message: {m}".format(m=message))
  
    def on_error(self, headers, message):
        print("Received an error {e}".format(e=message))
  
  
destination = "/topic/event"
  
conn = stomp.Connection(host_and_ports=[("localhost", 61613)])
conn.set_listener('', SimpleListener(conn))
conn.start()
  
conn.connect(login="admin", passcode="admin")
conn.subscribe(id='simple_listener', destination=destination, ack='auto')
  
print("Waiting for messages...")
   
while True:
    time.sleep(10)

Celery

Brokeři podporovaní nástrojem Celery

 Broker           Současný stav   Monitoring  Vzdálené ovládání workerů
 RabbitMQ         stabilní        ✓           ✓
 Redis            stabilní        ✓           ✓
 Amazon SQS       stabilní        ×           ×
 Apache Zookeeper experimentální  ×           ×

Message queuing service

Další užitečné technologie