· 5 years ago · Nov 05, 2020, 11:04 AM
1#!/usr/bin/env python3
2
3import argparse
4import logging
5import random
6
7from dslib import Message, Process, Runtime
8
9interval_ack = 1
10interval_ping = 1
11k = 3
12
13
14class Node(Process):
15 def __init__(self, name):
16 super().__init__(name)
17 self.nodes = dict() # key - address, value - name
18 self.msg_by_timer = dict()
19 self.timer_count = 0
20 self.left = True
21
22 def receive(self, ctx, msg):
23 if msg.is_local():
24
25 # Client commands (API) ***************************************************************
26
27 # Join the group
28 # - message body contains the address of some existing group member
29 if msg.type == 'JOIN':
30 self.left = False
31 seed = msg.body
32 if seed == ctx.addr():
33 # create new empty group and add local node to it
34 self.nodes[ctx.addr()] = self._name
35 else:
36 # remember timer related to message
37 self.nodes[ctx.addr()] = self._name
38 cur_msg = Message("JOINMAIN", headers={"from_addr": ctx.addr(), "from_name": self._name,
39 "timer": self.timer_count, "to": seed})
40 # self.msg_by_timer[self.timer_count] = cur_msg
41 # ctx.set_timer(str(self.timer_count), interval=interval_ack)
42 # self.timer_count += 1
43 # send JOIN message to node in group
44 ctx.send(cur_msg, seed)
45
46 # Leave the group
47 elif msg.type == 'LEAVE':
48 self.left = True
49 # broadcast leave message
50 leave_msg = Message("BCASTLEAVE", headers={"from_addr": ctx.addr(),
51 "from_name": self._name})
52 for node_addr in self.nodes:
53 ctx.send(leave_msg, node_addr)
54
55 # clear data
56 self.nodes.clear()
57 self.msg_by_timer.clear()
58 self.timer_count = 0
59
60 # Get a list of group members
61 # - return the list of all known alive nodes in MEMBERS message
62 elif msg.type == 'GET_MEMBERS':
63 ctx.send_local(Message('MEMBERS', list(self.nodes.values())))
64
65 else:
66 err = Message('ERROR', 'unknown command: %s' % msg.type)
67 ctx.send_local(err)
68
69 else:
70
71 # Node-to-Node messages ***************************************************************
72
73 # You can introduce any messages for node-to-node communcation
74 if msg.type == 'JOINMAIN':
75 # add new node to self.nodes
76 from_addr = msg.headers['from_addr']
77 from_name = msg.headers['from_name']
78 self.nodes[from_addr] = from_name
79
80 # broadcast to self.nodes
81 bcast_msg = Message("BCASTJOIN", headers={"nodes": self.nodes})
82 for node_addr in self.nodes:
83 ctx.send(bcast_msg, node_addr)
84
85 # ACK to join
86 timer_name = msg.headers['timer']
87 ack_msg = Message("ACKJOIN", headers={"timer": timer_name,
88 "nodes": self.nodes})
89 ctx.send(ack_msg, from_addr)
90
91 elif msg.type == 'ACKJOIN':
92 # cancel timer
93 # ctx.cancel_timer(str(msg.headers['timer']))
94
95 # add all group members to self.nodes
96 self.nodes.update(msg.headers['nodes'])
97
98 logging.debug(Message("DEBUG", "ACKJOIN in " + self._name))
99 # print("in ACKJOIN:", self._name)
100
101 # set time to always ping group
102 # if ctx.addr() in self.nodes and len(self.nodes) == 1:
103 # return
104 # rand_addr = random.choice(list(self.nodes))
105 # while rand_addr == ctx.addr():
106 # rand_addr = random.choice(list(self.nodes))
107 ping_msg = Message('PING', headers={"nodes": self.nodes})
108 self.msg_by_timer[self.timer_count] = ping_msg
109 ctx.set_timer(str(self.timer_count), interval_ping)
110 self.timer_count += 1
111 for node_addr in self.nodes:
112 ctx.send(ping_msg, node_addr)
113
114 elif msg.type == 'BCASTJOIN':
115 # update nodes
116 self.nodes.update(msg.headers['nodes'])
117
118 elif msg.type == 'BCASTLEAVE':
119 from_addr = msg.headers['from_addr']
120 self.nodes.pop(from_addr, None)
121
122 elif msg.type == 'PING':
123 self.nodes.update(msg.headers["nodes"])
124
125 elif msg.type == 'PINGACK':
126 pass
127
128 else:
129 err = Message('ERROR', 'unknown message: %s' % msg.type)
130 ctx.send(err, msg.sender)
131
132 def on_timer(self, ctx, timer):
133 # type: (Context, str) -> None
134 if self.left:
135 return
136 timer = int(timer)
137 msg = self.msg_by_timer[timer]
138 if msg.type == 'JOINMAIN':
139 to = msg['to']
140
141 # relate message with timer
142 self.msg_by_timer[self.timer_count] = msg
143
144 ctx.set_timer(str(self.timer_count), interval_ack)
145 self.timer_count += 1
146 # send JOIN message to node in group
147 ctx.send(msg, to)
148 elif msg.type == 'PING':
149 ping_msg = Message('PING', headers={'timer': self.timer_count,
150 'nodes': self.nodes})
151 self.msg_by_timer[self.timer_count] = ping_msg
152 ctx.set_timer(str(self.timer_count), interval_ack)
153 self.timer_count += 1
154 for node_addr in self.nodes:
155 ctx.send(ping_msg, node_addr)
156
157
158def main():
159 parser = argparse.ArgumentParser()
160 parser.add_argument('-n', dest='name',
161 help='node name (should be unique)', default='1')
162 parser.add_argument('-l', dest='addr', metavar='host:port',
163 help='listen on specified address', default='127.0.0.1:9701')
164 parser.add_argument('-d', dest='log_level', action='store_const', const=logging.DEBUG,
165 help='print debugging info', default=logging.WARNING)
166 args = parser.parse_args()
167 logging.basicConfig(format="%(asctime)s - %(message)s", level=args.log_level)
168
169 node = Node(args.name)
170 Runtime(node, args.addr).start()
171
172
173if __name__ == "__main__":
174 main()
175