home..
Raft
David Alade /
raft
python
The following is a python implementation of the Raft consensus protocol for a distributed key-value store. I wrote this implementation as a part of my Networks class in the fall of 2022. The implementation represents the protocol for a replica in the cluster to follow to keep the whole cluster updated as a comprehensive database. The code was tested through a simulator that would send various put, and get requests to replicas while also randomly killing replicas, and leaders.
Code
#!/usr/bin/env python3
import argparse, socket, time, json, select, struct, sys, math, os, random
import copy
BROADCAST = "FFFF"
class Replica:
def __init__(self, port, id, others):
self.port = port
self.id = id
self.others = others
self.state = "follower"
self.leader_id = BROADCAST
self.state_machine = {}
self.current_term = 0
self.voted = False
self.log = []
self.log_confirmations = {}
self.last_heartbeat = 0
self.election_timeout = random.uniform(.150, .300)
self.election_start = 0
self.commit_index = 0
self.last_applied = 0
self.votes = 0
self.worklist = []
self.sent = {}
# leader only
self.prev_log_index = -1
self.prev_log_term = 0
self.next_index = {}
self.match_index = {}
self.store_time = {}
self.timeout = False
self.socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self.socket.bind(('localhost', 0))
print("Replica %s starting up" % self.id, flush=True)
hello = {"src": self.id, "dst": BROADCAST, "leader": BROADCAST, "type": "hello"}
self.send(hello)
print("Sent hello message: %s" % hello, flush=True)
def send(self, message):
print("Sending message: %s" % message, flush=True)
self.socket.sendto(json.dumps(message).encode('utf-8'), ('localhost', self.port))
def run(self):
while True:
socks = select.select([self.socket], [], [], 0.1)[0]
if self.state == 'leader':
self.check_follower()
if self.socket in socks:
data, addr = self.socket.recvfrom(65535)
msg = data.decode('utf-8')
print("Received message '%s'" % msg, flush=True)
msg_dict = json.loads(msg)
self.handle_message(msg_dict)
if self.state == 'follower' and time.time() - self.last_heartbeat > .5 and not self.voted:
self.state = 'candidate'
self.start_election()
if self.state == 'candidate':
if time.time() - self.election_start > self.election_timeout:
self.start_election()
if self.state == 'leader' and time.time() - self.last_heartbeat >= .1:
self.heartbeat()
def start_election(self):
print("STARTING VOTE")
self.current_term += 1
self.votes = 1
self.request_vote()
self.election_start = time.time()
self.election_timeout = random.uniform(.15, .4)
def handle_message(self, msg):
print("HANDLING" + str(msg), flush=True)
if self.leader_id != BROADCAST:
while len(self.worklist) != 0:
handle = self.worklist.pop(0)
# print("HANDLING FROM WORKLIST " + str(handle), flush=True)
self.handle_message(handle)
if msg['type'] == 'get':
self.handle_get(msg)
elif msg['type'] == 'put':
self.handle_put(msg)
elif msg['type'] == 'request_vote':
self.handle_request_vote(msg)
elif msg['type'] == 'append_entry':
self.handle_append_entry(msg)
elif msg['type'] == 'vote':
if self.state == 'candidate':
self.votes += 1
majority = int(len(self.others) / 2) + 1
print(str(self.votes) + " votes so far.", flush=True)
if self.votes >= majority:
print('I AM THE LEADER ' + self.id, flush=True)
self.state = 'leader'
self.leader_id = self.id
self.heartbeat()
self.votes = 0
if len(self.log) == 0:
for follower in self.others:
self.next_index[follower] = 0
else:
for follower in self.others:
self.next_index[follower] = self.log[-1]['index'] + 1
else:
return
elif msg['type'] == 'stored':
if self.state == 'leader':
self.store_time[msg['src']] = time.time()
if self.log_confirmations[msg['index']] != -1:
self.log_confirmations[msg['index']] += 1
if msg['index'] > self.match_index.get(msg['src'], -1):
self.match_index[msg['src']] = msg['index']
majority = int(len(self.others) / 2) + 1
if self.log_confirmations[msg['index']] >= majority:
self.update_log(msg['index'])
self.log_confirmations[msg['index']] = -1
elif msg['type'] == 'rejected':
self.next_index[msg['src']] -= 1
index = self.next_index[msg['src']]
if index < 0:
entry = self.log[0]
else:
entry = self.log[index]
message = {
'src': self.id,
'dst': msg['src'],
'type': "new_log",
'leader': self.leader_id,
'log': self.log,
'highest_log': self.commit_index,
'term': self.current_term
}
self.send(message)
elif msg['type'] == 'new_log':
self.log = msg['log']
def handle_get(self, msg):
value = self.state_machine.get(msg['key'], '')
message = {
"src": self.id,
"dst": msg['src'],
"leader": self.leader_id,
"MID": msg['MID'],
'term': self.current_term
}
if self.state != 'leader':
message['type'] = 'redirect'
self.send(message)
if self.state == 'follower' or self.state == 'leader':
message['type'] = 'ok'
message['value'] = value
if value is not None:
self.send(message)
else:
# print("ADDING TO WORK LIST IN GET" + str(msg))
self.worklist.append(msg)
def heartbeat(self):
self.last_heartbeat = time.time()
for follower in self.others:
message = {
"src": self.id,
"dst": follower,
'type': 'append_entry',
'leader': self.id,
'log': [],
'term': self.current_term,
'highest_log': len(self.state_machine)
}
self.send(message)
def handle_request_vote(self, msg):
if len(self.log) > 0:
latest_log = self.log[-1]
if msg['lastLogTerm'] != latest_log['term']:
up_to_date = latest_log['term'] > msg['lastLogTerm']
else:
up_to_date = latest_log['index'] > msg['lastLogIndex']
else:
up_to_date = False
if self.state == 'follower' and not self.voted and not up_to_date:
message = {
'src': self.id,
'dst': msg['src'],
'type': 'vote',
'leader': BROADCAST,
'term': self.current_term
}
self.voted = True
self.current_term += 1
self.send(message)
else:
return
def request_vote(self):
for peer in self.others:
if len(self.log) > 0:
lastLogIndex = self.log[-1]['index']
lastLogTerm = self.log[-1]['term']
else:
lastLogIndex = 0
lastLogTerm = self.current_term
message = {
'src': self.id,
'dst': peer,
'type': 'request_vote',
'leader': BROADCAST,
'term': self.current_term,
'lastLogIndex': lastLogIndex,
'lastLogTerm': lastLogTerm
}
self.send(message)
def handle_append_entry(self, msg):
print("IN APPEND ENTRY")
if self.state == 'follower':
self.election_timeout = random.uniform(.15, .3)
self.last_heartbeat = time.time()
if len(msg['log']) == 0: # a heartbeat
print("append 1", flush=True)
self.votes = 0
self.leader_id = msg['src']
self.state = 'follower'
self.voted = False
else: # an actual entry
message = {
'src': self.id,
'dst': msg['src'],
'type': 'stored',
'index': msg['log']['index'],
'leader': self.leader_id
}
if len(self.log) > 0:
for ent in reversed(self.log):
if ent['term'] == msg['log']['prevLogTerm'] and ent['index'] == msg['log']['prevLogIndex']:
self.log.append(msg['log'])
self.commit_index = msg['highest_log']
self.update_log(self.commit_index)
self.send(message)
return
print(self.log)
reject_message = {
'src': self.id,
'dst': self.leader_id,
'leader': self.leader_id,
'type': 'rejected',
'term': self.current_term
}
self.send(reject_message)
else:
self.log.append(msg['log'])
self.commit_index = msg['highest_log']
self.update_log(self.commit_index)
self.send(message)
elif self.state == 'candidate' or self.state == 'leader':
if msg['term'] >= self.current_term:
self.state = 'follower'
if len(msg['log']) != 0:
self.handle_append_entry(msg)
def handle_put(self, msg):
message = {
"src": msg['dst'],
"dst": msg['src'],
"leader": self.leader_id,
"MID": msg['MID']
}
if self.state == 'follower':
if self.leader_id != BROADCAST:
message['type'] = 'redirect'
self.send(message)
else:
# print("ADDING TO WORKLIST IN PUT" + str(msg), flush=True)
self.worklist.append(msg)
elif self.state == 'leader':
log_entry = {
'term': self.current_term,
'key': msg['key'],
'value': msg['value'],
'type': 'ok',
'index': len(self.log)
}
new_entry = {**log_entry, **message, 'prevLogIndex': self.prev_log_index,
'prevLogTerm': self.current_term}
self.log.append(new_entry)
self.prev_log_index += 1
self.log_confirmations[log_entry['index']] = 0
self.append_entry(new_entry)
elif self.state == 'candidate':
# print("ADDING TO WORKLIST IN PUT AS CANDIDATE" + str(msg), flush=True)
self.state = 'follower'
self.worklist.append(msg)
def append_entry(self, entry):
for follower in self.others:
message = {
'src': self.id,
'dst': follower,
'type': "append_entry",
'leader': self.leader_id,
'log': entry,
'highest_log': self.commit_index,
'term': self.current_term
}
self.send(message)
def update_log(self, index):
if self.state == 'leader':
log = self.log[index]
self.state_machine[log['key']] = log['value']
print("KEY " + log['key'] + " ADDED AS VALUE: " + log['value'], flush=True)
message = {
'src': self.id,
'dst': log['dst'],
'leader': self.leader_id,
'type': 'ok',
'MID': log['MID']
}
self.commit_index = index
self.send(message)
if self.state == 'follower':
while self.last_applied < self.commit_index and self.last_applied < len(self.log) and len(self.log) > 0:
print("CATCHING UP", flush=True)
commit = self.log[self.last_applied]
self.state_machine[commit['key']] = commit['value']
self.last_applied += 1
print("DONE CATCHING UP", flush=True)
def check_follower(self):
for follower in self.others:
response_time = self.store_time.get(follower)
if response_time is not None:
if time.time() - response_time > 1.5:
print("REMOVING " + follower + "BECAUSE WE HAVEN'T HEARD FROM IT")
self.others.remove(follower)
if __name__ == "__main__":
parser = argparse.ArgumentParser(description='run a key-value store')
parser.add_argument('port', type=int, help="Port number to communicate")
parser.add_argument('id', type=str, help="ID of this replica")
parser.add_argument('others', metavar='others', type=str, nargs='+', help="IDs of other replicas")
args = parser.parse_args()
replica = Replica(args.port, args.id, args.others)
replica.run()
Theme Moonwalk