Zookeeper Example with Python

Zookeeper 는 apache 에서 진행하는 분산 코디네이터 입니다. 이를 이용해서 분산 락이라든 지, 현재 사용가능 한 Worker or Server 정보를 얻어오는 등의 여러가지 작업을 쉽게 할 수 있습니다. zookeeper 의 구조에 대해서는 다음 기회에 좀 더 자세히 설명해 드리기로 하고 일단 간단한 zookeeper 를 이용한 example을 보여주려고 합니다.

Zookeeper 에는 임시노드를 생성할 수 있는 기능이 있습니다. 임시노드는 세션이 끊어지면, 지정된 시간뒤에 사라지는 노드입니다. 즉, 특정 서버가 죽거나 하면 연결된 정보가 자동적으로 사라집니다. 다음 예제는 Echo 서버가 시작될 때 Zookeeper 에 자신의 Information( ip, port ) 을 등록하고, 장애로 서버가 동작하지 못하는 경우에 대한 샘플입니다.

실제 동작은 세션이 끊어진 뒤에 일정 시간 동안은 노드가 남아있으므로, 잠시 동안 오류가 발생하다가, 시간이 지나면 해당 노드 정보가 사라지므로 클라이언트들은 정상인 서버로만 접속을 하게 됩니다. 이런 부분은 분산 컴퓨팅에서 장애의 허용, 복구 같은 내용과 연관이 되게 되는데, 잠시 동안만 실패하므로, 사용자는 거의 정상적인 서비스를 할 수 있게 됩니다.

다음 그림과 같이 서버는 최초에 실행되면서 zookeeper 에 자신의 정보를 저장합니다.

클라이언트는 시작시 zookeeper 에서 위의 정보를 읽어서 서버에 접속하게 됩니다.

장애시에는 다음과 같이 자동으로 연결 정보가 사라지므로 클라이언트는 정상 서버로 계속 접속하게 됩니다.

다음은 서버 Sample 입니다.

#/usr/bin/env python

import zookeeper, time, threading
from gevent.server import StreamServer
from gevent import monkey; monkey.patch_socket()
import socket
monkey.patch_socket()

END_MARK='rn'

def handler(socket, address):
    print 'New connection from %s:%s' % address
    # using a makefile because we want to use readline()
    packet = ''
    socket.settimeout(2)

    while True:
        data = socket.recv(1024)
        if not data:
            print 'Connection Close from %s:%s' % address
            return

        packet += data
        if END_MARK in packet:
            socket.sendall(packet)
            print packet
            break

PORT_NUM = 12345

if __name__ == '__main__':
    # to make the server use SSL, pass certfile and keyfile arguments to the constructor
    server = StreamServer(('0.0.0.0', PORT_NUM),handler)
    # to start the server asynchronously, use its start() method;
    # we use blocking serve_forever() here because we have no other jobs

    connected = False
    conn_cv = threading.Condition()

    def my_connection_watcher(handle,type,state,path):
        global connected, conn_cv
        print "Connected, handle is ", handle
        conn_cv.acquire()
        connected = True
        conn_cv.notifyAll()
        conn_cv.release()

    conn_cv.acquire()
    print "Connecting to Zookeeper -- "
    handle = zookeeper.init("172.27.0.2:2181,172.27.0.3:2181,172.27.0.4:2181", my_connection_watcher)
    while not connected:
        conn_cv.wait()

    conn_cv.release()

    ZOO_OPEN_ACL_UNSAFE = {"perms":0x1f, "scheme":"world", "id" :"anyone"};

    rootnode_name = "/zk-servers"
    ret = zookeeper.exists(handle, rootnode_name, None )
    if None == ret:
        zookeeper.create(handle, rootnode_name, "data", [ZOO_OPEN_ACL_UNSAFE], 0 )

    host = socket.gethostbyname(socket.gethostname())
    data = host + ":" + str(PORT_NUM)
    zookeeper.create(handle, "/zk-servers/echoserverlist", data, [ZOO_OPEN_ACL_UNSAFE], zookeeper.EPHEMERA
L|zookeeper.SEQUENCE)

    print 'Starting echo server on port %s' % PORT_NUM
    server.serve_forever()

다음은 클라이언트 입니다. 모두 최초 접속시에 목록 리스트를 가져와서 랜덤하게 하나를 선택해서
접속하게 됩니다.

import zookeeper, time, threading
from gevent.server import StreamServer
from gevent import monkey; monkey.patch_socket()
import random
import socket

connected = False
conn_cv = threading.Condition()

def my_connection_watcher(handle,type,state,path):
    global connected, conn_cv
    print "Connected, handle is ", handle
    conn_cv.acquire()
    connected = True
    conn_cv.notifyAll()
    conn_cv.release()

conn_cv.acquire()
print "Connecting to localhost:2181 -- "
handle = zookeeper.init("172.27.0.2:2181,172.27.0.3:2181,172.27.0.4:2181", my_connection_watcher)
while not connected:
    conn_cv.wait()
conn_cv.release()

ZOO_OPEN_ACL_UNSAFE = {"perms":0x1f, "scheme":"world", "id" :"anyone"};

rootnode_name = "/zk-servers"
ret = zookeeper.exists(handle, rootnode_name, None )
if None == ret:
    print "There is no zk-servers keys"
    exit()

children = zookeeper.get_children(handle, rootnode_name, None)
size = len(children)
if 0 == size:
    print "There is no zk-servers echo server lists."
    exit()

idx = random.randint(0,size-1)


child = children[idx]
nodename = rootnode_name + "/" +  child
(data,stat) = zookeeper.get(handle, nodename, None)
print idx, data, stat

ip, port = data.split(':')
print ip, port

clientsock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
clientsock.connect( (ip, int(port)) )
clientsock.sendall("Hi, this is test!rn")
recvdata = clientsock.recv(1024)
print "Data: ", recvdata
clientsock.close()