-
Notifications
You must be signed in to change notification settings - Fork 3
/
pastrynode.py
executable file
·207 lines (179 loc) · 7.71 KB
/
pastrynode.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
#!/usr/bin/env python
# Author: John Dickinson
# pastry implementation
import hashlib
import socket
import threading
import cPickle
PASTRY_BASE = 1
NEIGHBORS_TO_KEEP = 2 * (2 >> PASTRY_BASE)
PASTRY_PORT = 54321
BUFFER_SIZE = 4096
JOIN_MESSAGE = 'join'
HEARTBEAT_MESSAGE = 'heartbeat ping'
class PastryError(Exception):
pass
class ConnectionError(PastryError):
pass
class ProtocolError(PastryError):
pass
# TODO: could this be optimized? surely there is a better way...
def get_common_prefix(string1, string2):
ret = []
for i, c in enumerate(string1):
try:
if string2[i] == c:
ret.append(c)
except IndexError:
break
return ''.join(ret)
class PastryNode(object):
def __init__(self, known_host, credentials=None, application=None):
self.credentials = credentials
self.application = application
# the routing table is a list of lists, populated such that each row shares a common prefix
# with this node's node_id and each column contains the node_id with the
# (len(common prefix)+1)'th digit as one of the total possibilities
# see the Pastry specification for more details
self.routing_table = list()
# neighborhood nodes are nodes that are known to be close in the underlying network
self.neighborhood = list()
# leaf nodes are nodes that are close in the node_id space (close in the pastry network)
self.leafs = list()
self._connection_dict = dict() # contains all connections keyed on node_id
# hash the current host's ip address to get the node id
my_ip = socket.gethostbyname(socket.gethostname())
self.node_id = hashlib.sha1(my_ip).hexdigest()
# listen on the pastry port
listen_location = (my_ip, PASTRY_PORT)
self.listen_thread = threading.Thread(target=self._listen_thread, args=(listen_location,))
self.listen_thread.start()
# using the known_host, connect to the existing pastry network
# if known_host is None, we are the first node in a new network
if known_host is not None:
# connect
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
try:
sock.connect(known_host)
except socket.error:
raise ConnectionError, 'Error connecting to the server at %s' % `known_host`
# send the join message with my node_id as the key
self._send(sock, (JOIN_MESSAGE, self.node_id))
def route(self, message, key):
if self.leafs and min(self.leafs) < key < max(self.leafs):
# key is in range for the leafs, send it to the closest key
closest = self.leafs[0] # first one
distance = abs(closest - key)
for x in self.leafs[1:]:
if abs(x - key) < distance:
distance = abs(x - key)
closest = x
# forward the message to closest
self._send(self._connection_dict[closest], (message, key))
else:
# key is not in range for the leafs, use the routing table
common_prefix = get_common_prefix(key, self.node_id)
prefix_len = len(common_prefix)
try:
next_host = self.routing_table[key[prefix_len]][prefix_len]
except IndexError:
next_host = None
if next_host is None:
# rare case
# find the node with the node_id closest to key in the set of all nodes I know
all_node_ids = self.leafs
all_node_ids += self.neighborhood
all_node_ids += [node_id for node_id in [row for row in self.routing_table] if node_id is not None]
if all_node_ids:
candidates = (x for x in all_node_ids if len(get_common_prefix(x,key)) > prefix_len)
closest = candidates.next() # first one in the generator
distance = abs(closest - key)
for x in candidates: # the first one has already been consumed
if abs(x - key) < distance:
distance = abs(x - key)
closest = x
# forward the message to closest
self._send(self._connection_dict[closest], (message, key))
else:
# forward the message to next_host
self._send(self._connection_dict[next_host], (message, key))
def _send(self, sock, raw_data):
print 'sending %s to %s' % (repr(raw_data), sock.getpeername())
data = cPickle.dumps(raw_data)
return sock.sendall(data)
def _recv(self, sock):
try:
raw = sock.recv(BUFFER_SIZE)
except socket.error:
pass
else:
try:
return cPickle.loads(raw)
except:
return raw
def _listen_thread(self, listen_location):
'''thread target that runs and listens for incoming connections. When new
connections are received, spawn a thread and handle it'''
listen_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
listen_sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, True)
try:
listen_sock.bind(listen_location)
except socket.error, err:
raise ConnectionError, 'Error binding to %s' % `listen_location`
else:
print 'Listening on %s' % repr(listen_location)
listen_sock.listen(1)
while True:
conn, addr = listen_sock.accept()
handler_thread = threading.Thread(target=self._incoming_connection_handler, args=(conn, addr))
handler_thread.setDaemon(True)
handler_thread.start()
def _incoming_connection_handler(self, conn, addr):
while True:
data = self._recv(conn)
if not data:
break
response = self._process_received_data(data, addr)
if response is not None:
self._send(conn, response)
conn.close()
def _process_received_data(self, data, from_addr):
'''handles all messages'''
print 'received %s from %s' % (repr(data), repr(from_addr))
try:
message, key = data
except ValueError:
return data
else:
# One eventual goal is to have each of these messages handled as a plugin.
# The goal would be to simply add another message by writing a handler and defining
# the message payload. In this way, the protocol can be changed or extended very simply.
if message == JOIN_MESSAGE:
print 'join from %s' % key
return 'welcome'
elif message == HEARTBEAT_MESSAGE:
# update last heard time for this key
print 'heartbeat from %s' % key
return None
elif self.application is not None:
return self.application.handle_message(message, key, from_addr)
else:
print 'unknown message'
return None
def _testing(script_name, hostname=None, host_port=PASTRY_PORT, *unused_args):
known_host = None
if hostname is not None:
known_host = (hostname, int(host_port))
node = PastryNode(known_host)
print node.node_id
import time
while True:
try:
time.sleep(1)
except KeyboardInterrupt:
break
return 0
if __name__ == '__main__':
# testing
import sys
sys.exit(_testing(sys.argv[0], *sys.argv[1:]))