Zookeeper 는 apache 에서 진행하는 분산 코디네이터 입니다. 이를 이용해서 분산 락이라든 지, 현재 사용가능 한 Worker or Server 정보를 얻어오는 등의 여러가지 작업을 쉽게 할 수 있습니다. zookeeper 의 구조에 대해서는 다음 기회에 좀 더 자세히 설명해 드리기로 하고 일단 간단한 zookeeper 를 이용한 example을 보여주려고 합니다.
Zookeeper 에는 임시노드를 생성할 수 있는 기능이 있습니다. 임시노드는 세션이 끊어지면, 지정된 시간뒤에 사라지는 노드입니다. 즉, 특정 서버가 죽거나 하면 연결된 정보가 자동적으로 사라집니다. 다음 예제는 Echo 서버가 시작될 때 Zookeeper 에 자신의 Information( ip, port ) 을 등록하고, 장애로 서버가 동작하지 못하는 경우에 대한 샘플입니다.
실제 동작은 세션이 끊어진 뒤에 일정 시간 동안은 노드가 남아있으므로, 잠시 동안 오류가 발생하다가, 시간이 지나면 해당 노드 정보가 사라지므로 클라이언트들은 정상인 서버로만 접속을 하게 됩니다. 이런 부분은 분산 컴퓨팅에서 장애의 허용, 복구 같은 내용과 연관이 되게 되는데, 잠시 동안만 실패하므로, 사용자는 거의 정상적인 서비스를 할 수 있게 됩니다.
다음 그림과 같이 서버는 최초에 실행되면서 zookeeper 에 자신의 정보를 저장합니다.
클라이언트는 시작시 zookeeper 에서 위의 정보를 읽어서 서버에 접속하게 됩니다.
장애시에는 다음과 같이 자동으로 연결 정보가 사라지므로 클라이언트는 정상 서버로 계속 접속하게 됩니다.
다음은 서버 Sample 입니다.
#/usr/bin/env python
import zookeeper, time, threading
from gevent.server import StreamServer
from gevent import monkey; monkey.patch_socket()
import socket
monkey.patch_socket()
END_MARK='rn'
def handler(socket, address):
print 'New connection from %s:%s' % address
# using a makefile because we want to use readline()
packet = ''
socket.settimeout(2)
while True:
data = socket.recv(1024)
if not data:
print 'Connection Close from %s:%s' % address
return
packet += data
if END_MARK in packet:
socket.sendall(packet)
print packet
break
PORT_NUM = 12345
if __name__ == '__main__':
# to make the server use SSL, pass certfile and keyfile arguments to the constructor
server = StreamServer(('0.0.0.0', PORT_NUM),handler)
# to start the server asynchronously, use its start() method;
# we use blocking serve_forever() here because we have no other jobs
connected = False
conn_cv = threading.Condition()
def my_connection_watcher(handle,type,state,path):
global connected, conn_cv
print "Connected, handle is ", handle
conn_cv.acquire()
connected = True
conn_cv.notifyAll()
conn_cv.release()
conn_cv.acquire()
print "Connecting to Zookeeper -- "
handle = zookeeper.init("172.27.0.2:2181,172.27.0.3:2181,172.27.0.4:2181", my_connection_watcher)
while not connected:
conn_cv.wait()
conn_cv.release()
ZOO_OPEN_ACL_UNSAFE = {"perms":0x1f, "scheme":"world", "id" :"anyone"};
rootnode_name = "/zk-servers"
ret = zookeeper.exists(handle, rootnode_name, None )
if None == ret:
zookeeper.create(handle, rootnode_name, "data", [ZOO_OPEN_ACL_UNSAFE], 0 )
host = socket.gethostbyname(socket.gethostname())
data = host + ":" + str(PORT_NUM)
zookeeper.create(handle, "/zk-servers/echoserverlist", data, [ZOO_OPEN_ACL_UNSAFE], zookeeper.EPHEMERA
L|zookeeper.SEQUENCE)
print 'Starting echo server on port %s' % PORT_NUM
server.serve_forever()
다음은 클라이언트 입니다. 모두 최초 접속시에 목록 리스트를 가져와서 랜덤하게 하나를 선택해서
접속하게 됩니다.
import zookeeper, time, threading
from gevent.server import StreamServer
from gevent import monkey; monkey.patch_socket()
import random
import socket
connected = False
conn_cv = threading.Condition()
def my_connection_watcher(handle,type,state,path):
global connected, conn_cv
print "Connected, handle is ", handle
conn_cv.acquire()
connected = True
conn_cv.notifyAll()
conn_cv.release()
conn_cv.acquire()
print "Connecting to localhost:2181 -- "
handle = zookeeper.init("172.27.0.2:2181,172.27.0.3:2181,172.27.0.4:2181", my_connection_watcher)
while not connected:
conn_cv.wait()
conn_cv.release()
ZOO_OPEN_ACL_UNSAFE = {"perms":0x1f, "scheme":"world", "id" :"anyone"};
rootnode_name = "/zk-servers"
ret = zookeeper.exists(handle, rootnode_name, None )
if None == ret:
print "There is no zk-servers keys"
exit()
children = zookeeper.get_children(handle, rootnode_name, None)
size = len(children)
if 0 == size:
print "There is no zk-servers echo server lists."
exit()
idx = random.randint(0,size-1)
child = children[idx]
nodename = rootnode_name + "/" + child
(data,stat) = zookeeper.get(handle, nodename, None)
print idx, data, stat
ip, port = data.split(':')
print ip, port
clientsock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
clientsock.connect( (ip, int(port)) )
clientsock.sendall("Hi, this is test!rn")
recvdata = clientsock.recv(1024)
print "Data: ", recvdata
clientsock.close()