The following is a python implementation of the reliable transport protocol. I wrote this implementation as a part of my Networks class in the fall of 2022. The protocol ensures that data is delivered in order, without duplicates, missing data, or errors. Below, the “send” client handles the sending of data across the network, and the “receive” client handles the receiving of data and printing it out in order. This program runs through a simulator that feeds the sender data to send to be outputted by the receiver.
Code
Send client
importargparse,socket,time,json,select,struct,sys,math,hashlibimportastfrom_socketimporttimeoutDATA_SIZE=1375classSender:done=Falsepackets=[]actual=[]acks=[]# This function initializes the values we use for our Sender.
def__init__(self,host,port):self.host=hostself.remote_port=int(port)self.log("Sender starting up using port %s"%self.remote_port)self.socket=socket.socket(socket.AF_INET,socket.SOCK_DGRAM)self.socket.bind(('0.0.0.0',0))self.waiting=False# This function takes in a message and displays it in the simulator.
deflog(self,message):sys.stderr.write(message+"\n")sys.stderr.flush()# This function takes in a message and sends it to the proper location.
defsend(self,message):self.socket.sendto(json.dumps(message).encode('utf-8'),(self.host,self.remote_port))# This function runs our Sender.
defrun(self):whileTrue:# This if-statement checks to ensure that we have packets left and if our sender is done.
iflen(self.packets)>0andself.done:self.retransmit()sockets=[self.socket,sys.stdin]ifnotself.waitingelse[self.socket]socks=select.select(sockets,[],[],0.1)[0]forconninsocks:ifconn==self.socket:# This will loop through the packets that are being sent in the program.
forxinrange(len(self.packets)):try:conn.settimeout(.1)k,addr=conn.recvfrom(65535)# This try-except checks the hash of our message and handles the error when our JSON gets
# corrupted accordingly.
try:msg=json.loads(k.decode('utf-8'))exceptjson.decoder.JSONDecodeError:continueif'number'notinmsg:sys.exit(0)if"hash"notinmsgor'type'notinmsg:continueforpacketinself.packets:ifpacket[2]==msg['hash']andpacket[0][12:19]==msg['number']:self.packets.remove(packet)excepttimeout:self.retransmit()self.waiting=False# This else-if handles the case when we have data that we still need to be gathering from the simulator.
elifconn==sys.stdin:whilelen(self.packets)<4andnotself.done:data=sys.stdin.read(DATA_SIZE)self.send_message(data)self.waiting=Truereturn# This function takes in the data from a packet, sends it to the receiver and adds the sent packet to our list of
# packets.
defsend_message(self,data):msg_hash=hashlib.sha256(data.encode()).hexdigest()iflen(data)==0:msg={"type":"msg","data":"finished"}self.send(msg)self.packets.append(("finished",time.time(),msg_hash))self.done=Truereturnmsg={"type":"msg","data":data,'hash':msg_hash}self.send(msg)self.packets.append((data,time.time(),msg_hash))# This function checks to see if our data has been transmitted within a certain time frame and handles the
# retransmission of the packet accordingly.
defretransmit(self):packet=self.packets[0]iftime.time()-packet[1]>1.0:# msg = {"type": "msg", "data": packet[0]}
# self.log("Retransmitting message '%s'" % msg)
self.packets.remove(packet)self.send_message(packet[0])if__name__=="__main__":parser=argparse.ArgumentParser(description='send data')parser.add_argument('host',type=str,help="Remote host to connect to")parser.add_argument('port',type=int,help="UDP port number to connect to")args=parser.parse_args()sender=Sender(args.host,args.port)sender.run()
Recieve client
importargparse,socket,time,json,select,struct,sys,math,hashlibclassReceiver:sequence_numbers={}work_list=[]next_up=0finished=False# This function initializes the values we use for our Receiver.
def__init__(self):self.socket=socket.socket(socket.AF_INET,socket.SOCK_DGRAM)self.socket.bind(('0.0.0.0',0))self.port=self.socket.getsockname()[1]self.log("Bound to port %d"%self.port)self.remote_host=Noneself.remote_port=None# This function takes in a message and sends it to the proper location.
defsend(self,message):self.socket.sendto(json.dumps(message).encode('utf-8'),(self.remote_host,self.remote_port))# This function takes in a message and displays it in the simulator.
deflog(self,message):sys.stderr.write(message+"\n")sys.stderr.flush()# This function runs our Receiver.
defrun(self):whileTrue:socks=select.select([self.socket],[],[])[0]forconninsocks:data,addr=conn.recvfrom(65535)# Grab the remote host/port if we don't already have it
ifself.remote_hostisNone:self.remote_host=addr[0]self.remote_port=addr[1]msg=json.loads(data.decode('utf-8'))# self.log("Received message %s" % msg)
# This initial if-statement checks to see if the program has finished sending data from the sender.
ifmsg['data']!="finished":number=msg["data"][12:19]hash_msg=hashlib.sha256(msg['data'].encode()).hexdigest()# This if-statement is where we check if the message we receive has the correct hash value assigned
# it so that we can determine if a packet has become corrupted or not.
ifhash_msg==msg['hash']:self.send({"type":"ack","number":number,'hash':hash_msg})# In this if-statement we check to see if a sequence number has already been delievered and if
# it hasn't, we add it to a list of the other sequence numbers.
ifnumbernotinself.sequence_numbers:self.sequence_numbers[number]=msg["data"]self.work_list.append(int(number))self.work_list.sort()iflen(self.work_list)>0:whileself.work_list[0]==self.next_up:self.next_up+=1self.work_through()iflen(self.work_list)==0:breakelse:pass# self.log("Received data duplicate message %s" % msg)
else:pass# self.log("Received corrupted message %s" % msg)
else:iflen(self.work_list)==0:self.send({"type":"ack"})elifself.work_list[0]==self.next_up:key=("%07d"%self.work_list[0])self.work_list.pop(0)print(self.sequence_numbers[key],end='',flush=True)return# This function goes through our work list of sequence numbers and takes them out from the list starting from the
# first element.
defwork_through(self):key=("%07d"%self.work_list[0])self.work_list.pop(0)print(self.sequence_numbers[key],end='',flush=True)if__name__=="__main__":parser=argparse.ArgumentParser(description='receive data')args=parser.parse_args()sender=Receiver()sender.run()