병행성을 하는 여러가지 좋은 이유
- 성능
느린요소를 기다리지 않고, 빠른 요소를 바쁘게 유지한다.
- 견고함
하드웨어 및 소프트웨어의 장애를 피하기 위해 작업을 복제하여 여러 가지 안정적인 방식으로 운영한다,
- 간소화
복잡한 작업을 좀 더 이해하기 쉽고, 해결하기 쉬운 여러 작은 작업으로 분해한다.
- 커뮤니케이션
데이터(바이트)를 보내고 싶은 곳에 원격으로 전송하고, 다시 데이터를 수신 받는다.
콜백, 그린스레드, 코루틴 같은 접근 방법이 있다.
컴퓨터가 기다리는 이유는 보통 두가지
- I/O 바운드
대부분의 경우에 해당. 컴퓨터의 CPU는 엄청나게 빠르다. 메모리보다 몇백배, 디스크나 네트워크보다 몇 천배 빠르다.
- CPU 바운드.
과학이나 그래픽 작업과 같이 엄청난 계산이 필요할 때 발생한다.
병행성과 관련이 있는 것
- 동기
한 줄의 장례 핼렬처럼. 한 작업은 다른 작업을 따른다.
- 비동기
사람들이 각기 다른 차를 타고 파티에 가는 것처럼, 작업들이 독립적이다.
복잡한 병행성을 잘 관리하게 해주는 큐 로 시작해보자
11.1.1 큐
- 일반적으로 큐는 메시지를 전달 한다.
- 분산작업 관리를 위한 큐의 경우 작업 큐 라고 알려져 있다.
11.1.2 프로세스
import multiprocessing as mp
def washer(dishes, output):
for dish in dishes:
print('Washing', dish, 'dish')
output.put(dish)
def dryer(input):
while True:
dish = input.get()
print('Drying', dish, 'dish')
input.task_done()
dish_queue = mp.JoinableQueue()
dryer_proc = mp.Process(target=dryer, args=(dish_queue, ))
dryer_proc.daemon = True
dryer_proc.start()
dishes = ['salad', 'bread', 'entree', 'desert']
washer(dishes, dish_queue)
dish_queue.join()
11.1.3 스레드
import threading
def do_this(what):
whoami(what)
def whoami(what):
print("Thread %s says: %s" % (threading.current_thread(), what))
if __name__ == '__main__':
whoami("I'm the main program")
for n in range(4):
p = threading.Thread(target=do_this, args=("I'm fuction %s" % n,))
p.start()
import threading, queue
import time
def washer(dishes, dish_queue):
for dish in dishes:
print("Washing", dish)
time.sleep(5)
dish_queue.put(dish)
def dryer(dish_queue):
while True:
dish = dish_queue.get()
print("Drying", dish)
time.sleep(10)
dish_queue.task_done()
dish_queue = queue.Queue()
for n in range(2):
dryer_thread = threading.Thread(target=dryer, args=(dish_queue,))
dryer_thread.start()
dishes = ['salad', 'bread', 'entree', 'desert']
washer(dishes, dish_queue)
dish_queue.join()
- I/O 바운드 문제 - 스레드 사용
- CPU 바운드 문제 - 프로세스, 네트워킹, 이벤트사용
파이썬의 멀티 스레드 프로그램은 싱글 스레드 혹은 멀티 프로세스 버전의 프로그램보다 느릴 수 있다.
11.1.4 그린 스레드와 gevent
import gevent
from gevent import socket
hosts = ['www.crappytaxidermy.com', 'www.walterpottertxidermy.com', 'www.antique-taxidermy.com']
jobs = [gevent.spawn(gevent.socket.gethostbyname, host) for host in hosts]
gevent.joinall(jobs, timeout=5)
for job in jobs:
print(job.value)
from gevent import monkey
monkey.patch_socket()
import gevent
from gevent import monkey; monkey.patch_all()
import socket
hosts = ['www.crappytaxidermy.com', 'www.walterpottertxidermy.com', 'www.antique-taxidermy.com']
jobs = [gevent.spawn(gevent.socket.gethostbyname, host) for host in hosts]
gevent.joinall(jobs, timeout=5)
for job in jobs:
print(job.value)
11.1.5 twisted
from twisted.internet import protocol, reactor
class Knock(protocol.Protocol):
def dataReceived(self, data):
print('Client', data)
if data.startswith("Knock Knock"):
response = "Who's there?"
else:
response = data + " who?"
print('Server:', response)
self.transport.write(response)
class KnockFactory(protocol.Factory):
print('KnockFactory')
def buildProtocol(self, addr):
return Knock()
reactor.listenTCP(8000, KnockFactory())
reactor.run()
class KnockClient(protocol.Protocol):
def connectionMade(self):
print('connectionMade')
self.transport.write("Knock Knock")
def dataReceived(self, data):
print('dataReceived')
if data.startswith("Who's there?"):
response = "Disappearing client"
self.transport.write(response)
else:
self.transport.loseConnection()
reactor.stop()
class KnockFactory(protocol.ClientFactory):
print('KnockFactory')
protocol = KnockClient
f = KnockFactory()
reactor.connectTCP("localhost", 8000, f)
reactor.run()
11.1.6 asyncio
- 비동기 입출력 지원 재정리
- twisted와 gevent 그리고 다른 비동기 메서드와 호환될 수 있는 일반적인 이벤트 루프를 제공한다.
11.1.7 Redis
- 이 장에서 Redis는 병행성에 대해 다룬다.
- 큐를 만들 수 있는 빠른 방법은 Redis의 리스트다.
- Redis서버는 하나의 머신에서 실행한다.
- 클라이언트는 같은 머신에서 실행하거나 네트워크를 통해서 접근할 수 있다.
- 두 경우 모두 클라이언트는 TCP를 통해 서버와 통신을 하여 네트워킹을 한다.
- 하나 이상의 공급자 클라이언트는 리스트의 한쪽 끝에 메시지를 푸시 한다.
- 하나 이상의 클라이언트 워커는 리스트를 감시하며, 블로킹 팝 연산을 수행한다.
- 리스트가 비어 있는 경우에는 메시지를 기다린다.
- 메시지가 도착하자마자 첫 번째 워커가 메시지를 처리한다.
import redis
conn = redis.Redis()
print('Washer is starting')
dishes = ['salad', 'bread', 'entree','dessert']
for dish in dishes:
msg = dish.encode('utf-8')
conn.rpush('dishes', msg)
print('Washed', dish)
conn.rpush('dishes', quit)
print('Washer is done')
import redis
conn = redis.Redis()
print('Dryer is stating')
while True:
msg = conn.blpop('dishes')
if not msg:
break
val = msg[1].decode('utf-8')
if val == 'quit':
break
print('Dried', val)
print('Dished are dried')
def dryer():
import redis
import os
import time
conn = redis.Redis()
pid = os.getpid()
timeout = 20
print('Dryer process {pid} is starting'.format(pid=pid))
while True:
msg = conn.blpop('dishes', timeout)
if not msg:
break
val = msg[1].decode('utf-8')
if val == 'quit':
break
print('{pid}: dried {val}'.format(pid=pid, val=val))
time.sleep(0.1)
print('Dryer process {pid} is done'.format(pid=pid))
import multiprocessing
DRYERS = 3
for num in range(DRYERS):
p = multiprocessing.Process(target=dryer)
p.start()
Dryer process 25468 is starting
Dryer process 25466 is starting
Dryer process 25465 is starting
25468: dried salad
25466: dried bread
25465: dried entree
Dryer process 25466 is done
Dryer process 25468 is done
Dryer process 25465 is done
11.1.8 큐를 넘어서
- 실행후 잊어버리기
접시를 전달할 곳에 아무것도 없더라도, 전달한 후 그 결과에 대해 걱정하지 않는다. 접시를 바닥에 놓는 방법이다.
- 요청-응답
식기세척기는 건조기로부터, 건조기는 접시를 정리하는 기계로부터 파이프라인의 각 접시에 대한 신호를 받는다.
- 역압 또는 압력 조절
느린 워커의 속도가 빠른 워커의 속도를 따라갈수 없을 때, 빠른 워커의 속도를 조절한다.
Comments