Presentations
multiprocessing
packagefrom multiprocessing import Process, Pipe
import time
def worker(name, conn):
while True:
cmd = conn.recv()
print("{} received {}".format(name, cmd))
if cmd == "quit":
return
else:
conn.send("{} accepted {}".format(name, cmd))
time.sleep(1)
def main():
parent_conn, child_conn = Pipe()
p = Process(target=worker, args=("Worker", child_conn))
p.start()
for i in range(10):
parent_conn.send("command {}".format(i))
print(parent_conn.recv())
parent_conn.send("quit")
p.join()
if __name__ == '__main__':
main()
threading
packageimport threading
import time
def worker(threadName, delay, n):
for counter in range(1, n+1):
time.sleep(delay)
print("{}: {}/{} - {}".format(threadName, counter, n, time.ctime(time.time())))
threading.Thread(target=worker, args=("Thread-1", 0.5, 10)).start()
threading.Thread(target=worker, args=("Thread-2", 1.0, 10)).start()
threading.Thread(target=worker, args=("Thread-3", 1.5, 10)).start()
“Don’t communicate by sharing memory, share memory by communicating”
import time
import threading
import queue
q = queue.Queue()
def producer():
name = threading.current_thread().name
for job in range(10):
print(f'{name} thread: Starting producing {job}')
q.put(job)
time.sleep(0.3)
print(f'{name} thread: Produced {job}')
def consumer():
name = threading.current_thread().name
while True:
job = q.get()
print(f'{name} thread: Starting consuming {job}')
time.sleep(0.4)
print(f'{name} thread: Consumed {job}')
q.task_done()
threading.Thread(target=consumer, daemon=True, name="1st").start()
threading.Thread(target=consumer, daemon=True, name="2nd").start()
threading.Thread(target=consumer, daemon=True, name="3rd").start()
threading.Thread(target=producer, daemon=True, name="1st").start()
threading.Thread(target=producer, daemon=True, name="2nd").start()
threading.Thread(target=producer, daemon=True, name="3rd").start()
threading.Thread(target=producer, daemon=True, name="3rd").start()
q.join()
print('Done')
ThreadPoolExecutor
from concurrent.futures.thread import ThreadPoolExecutor
import time
def worker(threadName, delay, n):
for counter in range(1, n + 1):
time.sleep(delay)
print("{}: {}/{} - {}".format(threadName, counter, n, time.ctime(time.time())))
with ThreadPoolExecutor(max_workers=3) as executor:
executor.submit(worker, "Thread-1", 0.5, 10)
executor.submit(worker, "Thread-2", 1.0, 10)
executor.submit(worker, "Thread-3", 1.5, 10)
print("Done!")
ProcessPoolExecutor
from concurrent.futures import ProcessPoolExecutor
import time
def worker(processName, delay, n):
for counter in range(1, n + 1):
time.sleep(delay)
print("{}: {}/{} - {}".format(processName, counter, n, time.ctime(time.time())))
with ProcessPoolExecutor(max_workers=3) as executor:
executor.submit(worker, "Process-1", 0.5, 10)
executor.submit(worker, "Process-2", 1.0, 10)
executor.submit(worker, "Process-3", 1.5, 10)
print("Done!")
asyncio
import asyncio
import time
async def task(name):
print(f"{name} task started")
await asyncio.sleep(5)
print(f"{name} task finished")
return name[::-1]
async def main():
task1 = asyncio.create_task(task("first"))
print("first task created")
task2 = asyncio.create_task(task("second"))
print("second task created")
task3 = asyncio.create_task(task("third"))
print("third task created")
print("result of task #1:", await task1)
print("result of task #2:", await task2)
print("result of task #3:", await task3)
print("done")
asyncio.run(main())
asyncio
and queuesimport asyncio
import time
async def task(name, queue):
while not queue.empty():
param = await queue.get()
print(f"Task named {name} started with parameter {param}")
await asyncio.sleep(5)
print(f"{name} task finished")
async def main():
queue = asyncio.Queue()
for i in range(20):
await queue.put(i)
for n in range(1, 2):
asyncio.create_task(task(f"{n}", queue))
asyncio.run(main())
aiohttp
import asyncio
import aiohttp
import time
async def download(name, queue):
async with aiohttp.ClientSession() as session:
while not queue.empty():
url = await queue.get()
print(f"Task named {name} getting URL: {url}")
async with session.get(url) as response:
t = await response.text()
print(f"Task named {name} downloaded {len(t)} characters")
print(f"Task named {name} finished")
async def main():
queue = asyncio.Queue()
for url in (
"http://www.root.cz",
"http://duckduckgo.com",
"http://seznam.com",
"https://www.root.cz/programovaci-jazyky/",
"https://www.root.cz/clanky/soubezne-a-paralelne-bezici-ulohy-naprogramovane-v-pythonu/",
"https://github.com/"
):
await queue.put(url)
await asyncio.gather(
asyncio.create_task(download(1, queue)),
asyncio.create_task(download(2, queue)))
asyncio.run(main())
aiohttp
import aiohttp
import time
async def download(name, queue, results):
async with aiohttp.ClientSession() as session:
while not queue.empty():
url = await queue.get()
t1 = time.time()
print(f"Task named {name} getting URL: {url}")
async with session.get(url) as response:
t = await response.text()
t2 = time.time()
print(f"Task named {name} downloaded {len(t)} characters in {t2-t1} seconds")
await results.put(t2-t1)
print(f"Task named {name} finished")
async def main():
queue = asyncio.Queue()
results = asyncio.Queue()
t1 = time.time()
for url in (
"http://www.root.cz",
"http://duckduckgo.com",
"http://seznam.com",
"https://www.root.cz/programovaci-jazyky/",
"https://www.root.cz/clanky/soubezne-a-paralelne-bezici-ulohy-naprogramovane-v-pythonu/",
"https://www.root.cz/clanky/pywebio-interaktivni-webove-dialogy-a-formulare-v-cistem-pythonu/",
"https://streamlit.io/",
"https://pglet.io/",
"https://www.root.cz/serialy/graficke-uzivatelske-rozhrani-v-pythonu/",
"https://github.com/"
):
await queue.put(url)
await asyncio.gather(
asyncio.create_task(download(1, queue, results)),
asyncio.create_task(download(2, queue, results)),
asyncio.create_task(download(3, queue, results)))
process_time = 0
while not results.empty():
process_time += await results.get()
print(f"Process time: {process_time} seconds")
t2 = time.time()
print(f"Total time: {t2-t1} seconds")
asyncio.run(main())
Task named 1 getting URL: http://www.root.cz
Task named 2 getting URL: http://duckduckgo.com
Task named 3 getting URL: http://seznam.com
Task named 2 downloaded 5775 characters in 0.31725001335144043 seconds
Task named 2 finished
Task named 2 getting URL: https://www.root.cz/programovaci-jazyky/
Task named 3 downloaded 1555 characters in 0.43852806091308594 seconds
Task named 3 finished
Task named 3 getting URL: https://www.root.cz/clanky/soubezne-a-paralelne-bezici-ulohy-naprogramovane-v-pythonu/
Task named 1 downloaded 249707 characters in 0.535081148147583 seconds
Task named 1 finished
Task named 1 getting URL: https://www.root.cz/clanky/pywebio-interaktivni-webove-dialogy-a-formulare-v-cistem-pythonu/
Task named 2 downloaded 251515 characters in 0.3788483142852783 seconds
Task named 2 finished
Task named 2 getting URL: https://streamlit.io/
Task named 1 downloaded 235679 characters in 0.2868804931640625 seconds
Task named 1 finished
Task named 1 getting URL: https://pglet.io/
Task named 3 downloaded 236045 characters in 0.41786885261535645 seconds
Task named 3 finished
Task named 3 getting URL: https://www.root.cz/serialy/graficke-uzivatelske-rozhrani-v-pythonu/
Task named 2 downloaded 444341 characters in 0.3120858669281006 seconds
Task named 2 finished
Task named 2 getting URL: https://github.com/
Task named 1 downloaded 10145 characters in 0.21546196937561035 seconds
Task named 1 finished
Task named 3 downloaded 263683 characters in 0.29593372344970703 seconds
Task named 3 finished
Task named 2 downloaded 209173 characters in 0.28455424308776855 seconds
Task named 2 finished
Process time: 3.482492685317993 seconds
Total time: 1.2949903011322021 seconds
Task named 1 getting URL: http://www.root.cz
Task named 2 getting URL: http://duckduckgo.com
Task named 3 getting URL: http://seznam.com
Task named 4 getting URL: https://www.root.cz/programovaci-jazyky/
Task named 5 getting URL: https://www.root.cz/clanky/soubezne-a-paralelne-bezici-ulohy-naprogramovane-v-pythonu/
Task named 6 getting URL: https://www.root.cz/clanky/pywebio-interaktivni-webove-dialogy-a-formulare-v-cistem-pythonu/
Task named 7 getting URL: https://streamlit.io/
Task named 8 getting URL: https://pglet.io/
Task named 9 getting URL: https://www.root.cz/serialy/graficke-uzivatelske-rozhrani-v-pythonu/
Task named 10 getting URL: https://github.com/
Task named 8 downloaded 10145 characters in 0.2201550006866455 seconds
Task named 8 finished
Task named 10 downloaded 209167 characters in 0.26105284690856934 seconds
Task named 10 finished
Task named 7 downloaded 444341 characters in 0.31197690963745117 seconds
Task named 7 finished
Task named 2 downloaded 5775 characters in 0.3665473461151123 seconds
Task named 2 finished
Task named 4 downloaded 252883 characters in 0.41240453720092773 seconds
Task named 4 finished
Task named 9 downloaded 265455 characters in 0.41875481605529785 seconds
Task named 9 finished
Task named 3 downloaded 1555 characters in 0.46982383728027344 seconds
Task named 3 finished
Task named 6 downloaded 235728 characters in 0.4692232608795166 seconds
Task named 6 finished
Task named 1 downloaded 250629 characters in 0.4968068599700928 seconds
Task named 1 finished
Task named 5 downloaded 236190 characters in 0.5269002914428711 seconds
Task named 5 finished
Process time: 3.953645706176758 seconds
Total time: 0.529677152633667 seconds
trio
libraryimport trio
async def task():
print("task started")
await trio.sleep(5)
print("task finished")
def main():
print("main started")
trio.run(task)
print("done")
main()
import trio
async def task(name, n, s):
print(f"{name} task started")
for i in range(n):
print(f"{name} {i+1}/{n}")
await trio.sleep(s)
print(f"{name} task finished")
async def main():
print("main started")
async with trio.open_nursery() as nursery:
nursery.start_soon(task, "1st", 10, 1)
nursery.start_soon(task, "2nd", 10, 1)
nursery.start_soon(task, "3rd", 10, 1)
print("done")
trio.run(main)
import trio
async def task1():
dct = {}
return dct["foo"]
async def task2():
l = []
return l[10]
async def task3():
x = 0
return 1/x
async def main():
print("main started")
async with trio.open_nursery() as nursery:
nursery.start_soon(task1)
nursery.start_soon(task2)
nursery.start_soon(task3)
print("done")
trio.run(main)
import trio
async def task(name, n, s):
print(f"{name} task started")
for i in range(n):
print(f"{name} {i+1}/{n}")
await trio.sleep(s)
print(f"{name} task finished")
async def main():
print("main started")
async with trio.open_nursery() as nursery:
for i in range(10000):
nursery.start_soon(task, f"Task {i}", 1, 10000)
print("done")
trio.run(main)
$ ps ax |grep python
614 ? Ss 0:00 /usr/bin/python3 /usr/bin/networkd-dispatcher --run-startup-triggers
4410 ? S 0:59 /usr/bin/python3 /usr/share/system-config-printer/applet.py
4428 ? S 0:00 python3 /usr/lib/blueberry/safechild /usr/sbin/rfkill event
3049130 pts/2 Ss+ 0:00 python3 trio_11.py
3049132 pts/3 S+ 0:00 grep --color=auto python
$ pmap 3049130
3049128: python3 trio_11.py
00007fa95aed5000 28K r--s- gconv-modules.cache
00007fa95aedc000 4K r---- ld-2.31.so
00007fa95aedd000 140K r-x-- ld-2.31.so
00007fa95af00000 32K r---- ld-2.31.so
00007fa95af09000 4K r---- ld-2.31.so
00007fa95af0a000 4K rw--- ld-2.31.so
00007fa95af0b000 4K rw--- [ anon ]
00007fff488fe000 132K rw--- [ stack ]
00007fff489f8000 12K r---- [ anon ]
00007fff489fb000 4K r-x-- [ anon ]
ffffffffff600000 4K --x-- [ anon ]
total 87168K
import trio
async def producer(send_channel):
for i in range(1, 10):
message = f"message {i}"
print(f"Producer: {message}")
await send_channel.send(message)
async def consumer(receive_channel):
async for value in receive_channel:
print(f"Consumer: received{value!r}")
await trio.sleep(1)
async def main():
async with trio.open_nursery() as nursery:
send_channel, receive_channel = trio.open_memory_channel(0)
nursery.start_soon(producer, send_channel)
nursery.start_soon(consumer, receive_channel)
trio.run(main)