11.1 병행성

병행성을 하는 여러가지 좋은 이유

  • 성능

    느린요소를 기다리지 않고, 빠른 요소를 바쁘게 유지한다.

  • 견고함

    하드웨어 및 소프트웨어의 장애를 피하기 위해 작업을 복제하여 여러 가지 안정적인 방식으로 운영한다,

  • 간소화

    복잡한 작업을 좀 더 이해하기 쉽고, 해결하기 쉬운 여러 작은 작업으로 분해한다.

  • 커뮤니케이션

    데이터(바이트)를 보내고 싶은 곳에 원격으로 전송하고, 다시 데이터를 수신 받는다.

콜백, 그린스레드, 코루틴 같은 접근 방법이 있다.

컴퓨터가 기다리는 이유는 보통 두가지

  • I/O 바운드

    대부분의 경우에 해당. 컴퓨터의 CPU는 엄청나게 빠르다. 메모리보다 몇백배, 디스크나 네트워크보다 몇 천배 빠르다.

  • CPU 바운드.

    과학이나 그래픽 작업과 같이 엄청난 계산이 필요할 때 발생한다.

병행성과 관련이 있는 것

  • 동기

    한 줄의 장례 핼렬처럼. 한 작업은 다른 작업을 따른다.

  • 비동기

    사람들이 각기 다른 차를 타고 파티에 가는 것처럼, 작업들이 독립적이다.

복잡한 병행성을 잘 관리하게 해주는 큐 로 시작해보자

11.1.1 큐

  • 일반적으로 큐는 메시지를 전달 한다.
  • 분산작업 관리를 위한 큐의 경우 작업 큐 라고 알려져 있다.

11.1.2 프로세스

import multiprocessing as mp

def washer(dishes, output):
    for dish in dishes:
        print('Washing', dish, 'dish')
        output.put(dish)
        
def dryer(input):
    while True:
        dish = input.get()
        print('Drying', dish, 'dish')
        input.task_done()
        
dish_queue = mp.JoinableQueue()
dryer_proc = mp.Process(target=dryer, args=(dish_queue, ))
dryer_proc.daemon = True
dryer_proc.start()

dishes = ['salad', 'bread', 'entree', 'desert']
washer(dishes, dish_queue)
dish_queue.join()

11.1.3 스레드

import threading

def do_this(what):
    whoami(what)
def whoami(what):
    print("Thread %s says: %s"  % (threading.current_thread(), what))
    
if __name__ == '__main__':
    whoami("I'm the main program")
    for n in range(4):
        p = threading.Thread(target=do_this, args=("I'm fuction %s" % n,))
        p.start()
import threading, queue
import time

def washer(dishes, dish_queue):
    for dish in dishes:
        print("Washing", dish)
        time.sleep(5)
        dish_queue.put(dish)
    
def dryer(dish_queue):
    while True:
        dish = dish_queue.get()
        print("Drying", dish)
        time.sleep(10)
        dish_queue.task_done()

dish_queue = queue.Queue()
for n in range(2):
    dryer_thread = threading.Thread(target=dryer, args=(dish_queue,))
    dryer_thread.start()
    
dishes = ['salad', 'bread', 'entree', 'desert']
washer(dishes, dish_queue)
dish_queue.join()
  • I/O 바운드 문제 - 스레드 사용
  • CPU 바운드 문제 - 프로세스, 네트워킹, 이벤트사용

파이썬의 멀티 스레드 프로그램은 싱글 스레드 혹은 멀티 프로세스 버전의 프로그램보다 느릴 수 있다.

11.1.4 그린 스레드와 gevent

import gevent
from gevent import socket
hosts = ['www.crappytaxidermy.com', 'www.walterpottertxidermy.com', 'www.antique-taxidermy.com']
jobs = [gevent.spawn(gevent.socket.gethostbyname, host) for host in hosts]
gevent.joinall(jobs, timeout=5)
for job in jobs:
    print(job.value)
from gevent import monkey
monkey.patch_socket()
import gevent
from gevent import monkey; monkey.patch_all()
import socket
hosts = ['www.crappytaxidermy.com', 'www.walterpottertxidermy.com', 'www.antique-taxidermy.com']
jobs = [gevent.spawn(gevent.socket.gethostbyname, host) for host in hosts]
gevent.joinall(jobs, timeout=5)
for job in jobs:
    print(job.value)

11.1.5 twisted

from twisted.internet import protocol, reactor
class Knock(protocol.Protocol):
    def dataReceived(self, data):
        print('Client', data)
        if data.startswith("Knock Knock"):
            response = "Who's there?"
        else:
            response = data + " who?"
        print('Server:', response)
        self.transport.write(response)
        
class KnockFactory(protocol.Factory):
    print('KnockFactory')
    def buildProtocol(self, addr):
        return Knock()
    
reactor.listenTCP(8000, KnockFactory())
reactor.run()
class KnockClient(protocol.Protocol):
    def connectionMade(self):
        print('connectionMade')
        self.transport.write("Knock Knock")
    
    def dataReceived(self, data):
        print('dataReceived')
        if data.startswith("Who's there?"):
            response = "Disappearing client"
            self.transport.write(response)
        else:
            self.transport.loseConnection()
            reactor.stop()
class KnockFactory(protocol.ClientFactory):
    print('KnockFactory')
    protocol = KnockClient

f = KnockFactory()
reactor.connectTCP("localhost", 8000, f)
reactor.run()

11.1.6 asyncio

  • 비동기 입출력 지원 재정리
  • twisted와 gevent 그리고 다른 비동기 메서드와 호환될 수 있는 일반적인 이벤트 루프를 제공한다.

11.1.7 Redis

  • 이 장에서 Redis는 병행성에 대해 다룬다.
  • 큐를 만들 수 있는 빠른 방법은 Redis의 리스트다.
  • Redis서버는 하나의 머신에서 실행한다.
  • 클라이언트는 같은 머신에서 실행하거나 네트워크를 통해서 접근할 수 있다.
  • 두 경우 모두 클라이언트는 TCP를 통해 서버와 통신을 하여 네트워킹을 한다.
  • 하나 이상의 공급자 클라이언트는 리스트의 한쪽 끝에 메시지를 푸시 한다.
  • 하나 이상의 클라이언트 워커는 리스트를 감시하며, 블로킹 팝 연산을 수행한다.
  • 리스트가 비어 있는 경우에는 메시지를 기다린다.
  • 메시지가 도착하자마자 첫 번째 워커가 메시지를 처리한다.
import redis
conn = redis.Redis()
print('Washer is starting')
dishes = ['salad', 'bread', 'entree','dessert']
for dish in dishes:
    msg = dish.encode('utf-8')
    conn.rpush('dishes', msg)
    print('Washed', dish)
conn.rpush('dishes', quit)
print('Washer is done')
import redis
conn = redis.Redis()
print('Dryer is stating')
while True:
    msg = conn.blpop('dishes')
    if not msg:
        break
    val = msg[1].decode('utf-8')
    if val == 'quit':
        break
    print('Dried', val)
print('Dished are dried')
def dryer():
    import redis
    import os
    import time
    conn = redis.Redis()
    pid = os.getpid()
    timeout = 20
    print('Dryer process {pid} is starting'.format(pid=pid))
    while True:
        msg = conn.blpop('dishes', timeout)
        if not msg:
            break
        val = msg[1].decode('utf-8')
        if val == 'quit':
            break
        print('{pid}: dried {val}'.format(pid=pid, val=val))
        time.sleep(0.1)
    print('Dryer process {pid} is done'.format(pid=pid))

import multiprocessing
DRYERS = 3
for num in range(DRYERS):
    p = multiprocessing.Process(target=dryer)
    p.start()
Dryer process 25468 is starting
Dryer process 25466 is starting
Dryer process 25465 is starting
25468: dried salad
25466: dried bread
25465: dried entree
Dryer process 25466 is done
Dryer process 25468 is done
Dryer process 25465 is done

11.1.8 큐를 넘어서

  • 실행후 잊어버리기

    접시를 전달할 곳에 아무것도 없더라도, 전달한 후 그 결과에 대해 걱정하지 않는다. 접시를 바닥에 놓는 방법이다.

  • 요청-응답

    식기세척기는 건조기로부터, 건조기는 접시를 정리하는 기계로부터 파이프라인의 각 접시에 대한 신호를 받는다.

  • 역압 또는 압력 조절

    느린 워커의 속도가 빠른 워커의 속도를 따라갈수 없을 때, 빠른 워커의 속도를 조절한다.

Comments