· 5 years ago · Nov 05, 2020, 04:02 PM
1#!/usr/bin/env python3
2
3import argparse
4import logging
5import random
6
7from dslib import Message, Process, Runtime
8
9interval_ack = 1
10interval_ping = 1
11k = 3
12
13
14class Node(Process):
15 def __init__(self, name):
16 super().__init__(name)
17 self.nodes = dict() # key - address, value - name
18 self.msg_by_timer = dict()
19 self.timer_count = 0
20 self.left = True
21
22 def receive(self, ctx, msg):
23 if msg.is_local():
24
25 # Client commands (API) ***************************************************************
26
27 # Join the group
28 # - message body contains the address of some existing group member
29 if msg.type == 'JOIN':
30 self.left = False
31 seed = msg.body
32 if seed == ctx.addr():
33 # create new empty group and add local node to it
34 self.nodes[ctx.addr()] = self._name
35
36 ping_msg = Message('PING', headers={"nodes": self.nodes})
37 self.msg_by_timer[self.timer_count] = ping_msg
38 ctx.set_timer(str(self.timer_count), interval_ping)
39 else:
40 # remember timer related to message
41 self.nodes[ctx.addr()] = self._name
42 cur_msg = Message("JOINMAIN", headers={"from_addr": ctx.addr(),
43 "from_name": self._name,
44 "timer": self.timer_count,
45 "to": seed,
46 "nodes": self.nodes})
47 self.msg_by_timer[self.timer_count] = cur_msg
48 ctx.set_timer(str(self.timer_count), interval=interval_ack)
49 self.timer_count += 1
50 # send JOIN message to node in group
51 ctx.send(cur_msg, seed)
52
53 # Leave the group
54 elif msg.type == 'LEAVE':
55 self.left = True
56 # broadcast leave message
57 leave_msg = Message("BCASTLEAVE", headers={"from_addr": ctx.addr()})
58 for node_addr in self.nodes:
59 ctx.send(leave_msg, node_addr)
60
61 # clear data
62 self.nodes.clear()
63 self.msg_by_timer.clear()
64 self.timer_count = 0
65
66 # Get a list of group members
67 # - return the list of all known alive nodes in MEMBERS message
68 elif msg.type == 'GET_MEMBERS':
69 ctx.send_local(Message('MEMBERS', list(self.nodes.values())))
70
71 else:
72 err = Message('ERROR', 'unknown command: %s' % msg.type)
73 ctx.send_local(err)
74
75 else:
76
77 # Node-to-Node messages ***************************************************************
78
79 # You can introduce any messages for node-to-node communcation
80 if msg.type == 'JOINMAIN':
81 # add new node to self.nodes
82 self.nodes.update(msg.headers['nodes'])
83 from_addr = msg.headers['from_addr']
84
85 # broadcast to self.nodes
86 bcast_msg = Message("BCASTJOIN", headers={"nodes": self.nodes})
87 for node_addr in self.nodes:
88 ctx.send(bcast_msg, node_addr)
89
90 # ACK to join
91 timer_name = msg.headers['timer']
92 ack_msg = Message("ACKJOIN", headers={"timer": timer_name,
93 "nodes": self.nodes})
94 ctx.send(ack_msg, from_addr)
95
96 elif msg.type == 'ACKJOIN':
97 # cancel timer
98 ctx.cancel_timer(str(msg.headers['timer']))
99
100 # add all group members to self.nodes
101 self.nodes.update(msg.headers['nodes'])
102
103 ping_msg = Message('PING', headers={"nodes": self.nodes,
104 "from_addr": ctx.addr()})
105 self.msg_by_timer[self.timer_count] = ping_msg
106 # set timer to ping every interval_ping seconds
107 ctx.set_timer(str(self.timer_count), interval_ping)
108 self.timer_count += 1
109
110 elif msg.type == 'BCASTJOIN':
111 # update nodes
112 self.nodes.update(msg.headers['nodes'])
113
114 elif msg.type == 'BCASTLEAVE':
115 from_addr = msg.headers['from_addr']
116 self.nodes.pop(from_addr, None)
117
118 elif msg.type == 'PING':
119 self.nodes.update(msg.headers['nodes'])
120 from_addr = msg.headers['from_addr']
121 timer = msg.headers['timer']
122 pinack_msg = Message('PINGACK', headers={'timer': timer, "from_addr": ctx.addr()})
123 ctx.send(pinack_msg, from_addr)
124
125 elif msg.type == 'PINGACK':
126 ctx.cancel_timer(str(msg.headers['timer']))
127
128 else:
129 err = Message('ERROR', 'unknown message: %s' % msg.type)
130 ctx.send(err, msg.sender)
131
132 def on_timer(self, ctx, timer):
133 # type: # (Context, str) -> None
134 if self.left:
135 return
136 timer = int(timer)
137 msg = self.msg_by_timer[timer]
138 if msg.type == 'JOINMAIN':
139 to = msg['to']
140
141 # relate message with timer
142 self.msg_by_timer[self.timer_count] = msg
143
144 ctx.set_timer(str(self.timer_count), interval_ack)
145 self.timer_count += 1
146 # send JOIN message to node in group
147 ctx.send(msg, to)
148
149 elif msg.type == 'PING':
150 ping_msg = Message('PING', headers={"nodes": self.nodes,
151 "from_addr": ctx.addr()})
152 self.msg_by_timer[self.timer_count] = ping_msg
153 # set timer to ping every interval_ping seconds
154 ctx.set_timer(str(self.timer_count), interval_ping)
155 self.timer_count += 1
156 for node_addr in self.nodes:
157 noack_msg = Message("NOACK", headers={'addr': node_addr, 'timer': self.timer_count})
158 self.msg_by_timer[self.timer_count] = noack_msg
159 # set timer that will ring if ack wasn't recieved
160 ctx.set_timer(str(self.timer_count), interval_ping)
161 cur_ping_msg = Message('PING', headers={"nodes": self.nodes,
162 "from_addr": ctx.addr(),
163 "timer": self.timer_count})
164 self.timer_count += 1
165 ctx.send(cur_ping_msg, node_addr)
166
167 elif msg.type == 'NOACK':
168 addr = msg.headers['addr']
169 # delete addr from neighbours
170 self.nodes.pop(addr, None)
171 leave_msg = Message("BCASTLEAVE", headers={"from_addr": addr})
172 for node_addr in self.nodes:
173 ctx.send(leave_msg, node_addr)
174
175
176def main():
177 parser = argparse.ArgumentParser()
178 parser.add_argument('-n', dest='name',
179 help='node name (should be unique)', default='1')
180 parser.add_argument('-l', dest='addr', metavar='host:port',
181 help='listen on specified address', default='127.0.0.1:9701')
182 parser.add_argument('-d', dest='log_level', action='store_const', const=logging.DEBUG,
183 help='print debugging info', default=logging.WARNING)
184 args = parser.parse_args()
185 logging.basicConfig(format="%(asctime)s - %(message)s", level=args.log_level)
186
187 node = Node(args.name)
188 Runtime(node, args.addr).start()
189
190
191if __name__ == "__main__":
192 main()
193