Presentations
cz.eshop-brno.pokladna1
us.eshop-nyc.pokladna42
CONNECT
SEND
SUBSCRIBE
UNSUBSCRIBE
BEGIN
COMMIT
ABORT
ACK
NACK
DISCONNECT
rq
dostupný na PyPIrq
from redis import Redis
from rq import Queue
from worker import do_work
q = Queue(connection=Redis())
for i in range(10):
result = q.enqueue(do_work, i)
print(result)
from time import sleep
def do_work(param):
print("Working, received parameter {}".format(param))
# simulace práce :-)
sleep(2)
print("Done")
$ rq worker
16:59:02 RQ worker 'rq:worker:localhost.32100' started, version 0.12.0
16:59:02 *** Listening on default...
16:59:02 Cleaning registries for queue: default
Povšimněte si použití výchozí fronty nazvané “defaut”
from time import sleep
def do_work():
print("Working")
sleep(2)
# výjimka je zachycena až samotným systémem RQ
assert False
print("Done")
16:59:03 default: worker.do_work() (c5468250-e2c5-494f-8bd8-f1f51b9a81f2)
Working
16:59:05 AssertionError
Traceback (most recent call last):
File "/home/tester/.local/lib/python3.6/site-packages/rq/worker.py", line 793, in perform_job
rv = job.perform()
File "/home/tester/.local/lib/python3.6/site-packages/rq/job.py", line 599, in perform
self._result = self._execute()
File "/home/tester/.local/lib/python3.6/site-packages/rq/job.py", line 605, in _execute
return self.func(*self.args, **self.kwargs)
File "./worker.py", line 7, in do_work
assert False
AssertionError
Traceback (most recent call last):
File "/home/tester/.local/lib/python3.6/site-packages/rq/worker.py", line 793, in perform_job
rv = job.perform()
File "/home/tester/.local/lib/python3.6/site-packages/rq/job.py", line 599, in perform
self._result = self._execute()
File "/home/tester/.local/lib/python3.6/site-packages/rq/job.py", line 605, in _execute
return self.func(*self.args, **self.kwargs)
File "./worker.py", line 7, in do_work
assert False
AssertionError
16:59:05 Moving job to 'failed' queue
from redis import Redis
from rq import Queue
from time import sleep
from worker import do_work
# speciální fronta s úlohami, které nebyly dokončeny
q_failed = Queue("failed", connection=Redis())
print("Reading failed jobs")
job_ids = q_failed.job_ids
print(job_ids)
for job_id in job_ids:
print(job_id)
job = q_failed.fetch_job(job_id)
# podrobnější informace o těchto úlohách
print(job.origin)
print(job.enqueued_at)
print(job.started_at)
print(job.ended_at)
print(job.exc_info)
Reading failed jobs
['62d5d473-cc31-4738-8397-7dd18b09fe64']
62d5d473-cc31-4738-8397-7dd18b09fe64
default
2018-11-28 16:24:45.094810
2018-11-28 16:24:45.103332
2018-11-28 16:24:47.107423
Traceback (most recent call last):
File "/home/tester/.local/lib/python3.6/site-packages/rq/worker.py", line 793, in perform_job
rv = job.perform()
File "/home/tester/.local/lib/python3.6/site-packages/rq/job.py", line 599, in perform
self._result = self._execute()
File "/home/tester/.local/lib/python3.6/site-packages/rq/job.py", line 605, in _execute
return self.func(*self.args, **self.kwargs)
File "./worker.py", line 7, in do_work
assert False
AssertionError
$ rq requeue
Requeueing 1 jobs from failed queue
[####################################] 100%
$ rq worker
from redis import Redis
from rq import Queue
from worker import do_work
q_low = Queue("low", connection=Redis())
q_high = Queue("high", connection=Redis())
for i in range(10):
result = q_low.enqueue(do_work, i)
result = q_high.enqueue(do_work, i)
print(result)
from time import sleep
def do_work(param):
print("Working, received parameter {}".format(param))
sleep(2)
print("Done")
$ rq info
low |██████████ 10
failed |██ 2
default | 0
high |██████████ 10
4 queues, 22 jobs total
localhost.4312 idle: default
1 workers, 4 queues
Updated: 2018-11-26 13:22:06.236766
import pika
# připojení k serveru RabbitMQ a vytvoření komunikačního kanálu
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
# žádost o vytvoření či o použití fronty „test“
channel.queue_declare(queue='test')
# poslání zprávy se strategií direct
channel.basic_publish(exchange='',
routing_key='test',
body='Hello World!')
print("Sent 'Hello World!'")
# uzavření komunikace
connection.close()
#!/usr/bin/env python
import pika
# připojení k serveru RabbitMQ a vytvoření komunikačního kanálu
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
# pokus o nové vytvoření fronty ve skutečnosti neovlivní již existující frontu
# pokud samozřejmě existuje
channel.queue_declare(queue='test')
# callback funkce volaná při přijetí zprávy
def on_receive(ch, method, properties, body):
print("Received %r" % body)
# přihlášení se k odebírání zpráv z fronty "test"
channel.basic_consume(on_receive,
queue='test',
no_ack=True)
print('Waiting for messages. To exit press CTRL+C')
print("...")
# smyčka s odebíráním zpráv
channel.start_consuming()
channel.basic_qos(prefetch_count=1)
channel.basic_consume(on_receive,
queue='test',
no_ack=False)
channel.basic_consume(on_receive,
queue='test',
no_ack=True)
channel.basic_consume(on_receive,
queue='test',
no_ack=False)
channel.basic_ack(delivery_tag = method.delivery_tag)
from time import sleep
from rabbitmq_connect import connect
connection, channel = connect()
def on_receive(ch, method, properties, body):
print("Received %r" % body)
sleep(5)
print("Done processing %r" % body)
ch.basic_ack(delivery_tag = method.delivery_tag)
channel.basic_consume(on_receive,
queue='test',
no_ack=False)
print('Waiting for messages. To exit press CTRL+C')
print("...")
channel.start_consuming()
rabbitmqctl
$ sudo rabbitmqctl node_health_check
Timeout: 70.0 seconds
Checking health of node rabbit@localhost
Health check passed
$ sudo rabbitmqctl list_users
Listing users
guest [administrator]
$ sudo rabbitmqctl list_queues
Listing queues
t3 0
testX 0
test 0
t1 2
t2 0
$ sudo rabbitmqctl list_exchanges
Listing exchanges
amq.topic topic
amq.rabbitmq.log topic
amq.headers headers
amq.fanout fanout
direct <- výchozí směrovač beze jména
amq.rabbitmq.trace topic
amq.match headers
amq.direct direct
def use_fanout(channel, exchange_name='fanout_exchange'):
print(exchange_name)
channel.exchange_declare(exchange=exchange_name,
exchange_type='fanout')
use_fanout(channel)
bind_queue(channel, 'fronta1')
bind_queue(channel, 'fronta2')
bind_queue(channel, 'fronta3')
$ sudo rabbitmqctl list_queues
Listing queues
t3 0
testX 0
test 0
t1 2
t2 0
fronta2 1
fronta1 1
fronta3 1
*
- libovolné slovo#
- libovolné množství slovVýraz Fronta
europe.* europe_queue
asia.* asia_queue
americas.* americas_queue
*.org org_queue
*.*.rabbit rabbit_queue
#.other other_queue
def bind_queue(channel, queue_name, routing_pattern='', exchange_name='fanout_exchange'):
channel.queue_declare(queue=queue_name)
channel.queue_bind(exchange=exchange_name,
queue=queue_name,
routing_key=routing_pattern)
bind_queue(channel, 'europe_queue',
routing_pattern='europe.*', exchange_name='topic_exchange')
bind_queue(channel, 'asia_queue',
routing_pattern='asia.*', exchange_name='topic_exchange')
bind_queue(channel, 'americas_queue',
routing_pattern='americas.*', exchange_name='topic_exchange')
channel.queue_declare(queue=queue_name, arguments={"x-max-priority": max_priority})
def open_channel(connection, queue_name='test', max_priority=10):
# pokus o nové vytvoření fronty ve skutečnosti neovlivní již existující frontu
channel = connection.channel()
channel.queue_declare(queue=queue_name, arguments={"x-max-priority": max_priority})
return channel
prop = BasicProperties(priority=priority)
channel.basic_publish(exchange='',
routing_key='priority_queue_2',
body='Hello World! #{i} msg with priority {p}'.format(i=i, p=priority),
properties=prop)
./activemq producer##
./activemq producer##
./activemq producer
./activemq consumer
stomp.py
stomp
stomp
$ stomp
CONNECTED
server: ActiveMQ/5.15.8
heart-beat: 0,0
session: ID:localhost.localdomain-38215-1549567803551-3:3
version: 1.1
stomp
send /queue/test hello world
MESSAGE
content-length: 11
expires: 0
destination: /queue/test
stomp
subscribe /queue/test
Subscribing to "/queue/test" with acknowledge set to "auto", id set to "1"
subscription: 1
priority: 4
message-id: ID:localhost.localdomain-38215-1549567803551-3:3:-1:1:1
timestamp: 1549631641256
hello world
#!/usr/bin/env python
import time
import stomp
destination1 = "/topic/event"
destination2 = "/topic/event2"
MESSAGES = 10
conn = stomp.Connection(host_and_ports=[("localhost", 61613)])
conn.start()
conn.connect(login="admin", passcode="admin")
for i in range(0, MESSAGES):
message = "Hello world #{i}!".format(i=i)
conn.send(destination1, message, persistent='true')
conn.send(destination2, message, persistent='true')
conn.disconnect()
#!/usr/bin/env python
import time
import stomp
class SimpleListener:
def __init__(self, conn):
self.conn = conn
def on_message(self, headers, message):
print("Received message: {m}".format(m=message))
def on_error(self, headers, message):
print("Received an error {e}".format(e=message))
destination = "/topic/event"
conn = stomp.Connection(host_and_ports=[("localhost", 61613)])
conn.set_listener('', SimpleListener(conn))
conn.start()
conn.connect(login="admin", passcode="admin")
conn.subscribe(id='simple_listener', destination=destination, ack='auto')
print("Waiting for messages...")
while True:
time.sleep(10)
Broker Současný stav Monitoring Vzdálené ovládání workerů
RabbitMQ stabilní ✓ ✓
Redis stabilní ✓ ✓
Amazon SQS stabilní × ×
Apache Zookeeper experimentální × ×