· 7 years ago · Dec 09, 2018, 12:18 AM
1require 'socket'
2require 'thread'
3
4Thread.abort_on_exception = true
5
6$port = nil
7$hostname = nil
8
9# timing information
10$sleep_interval = nil
11$clock_semaphore = Mutex.new
12$clock = nil
13
14# priority queue for storing events
15$event_q = []
16$q_semaphore = Mutex.new
17
18# config vars
19$updateInterval = nil
20$maxPayload = nil
21$pingTimeout = nil
22
23# contains node properties
24$nodes = {}
25
26# contains message properties
27$messages = {}
28# accepts incoming connections
29$server = nil
30$connections = [] # holds all connections
31
32$pings = []
33# --------------------- Part 0 --------------------- #
34
35def tcpServer()
36 $server = TCPServer.open($port)
37 loop do
38 # use IO.select to keep track of sockets
39 # restart loop if no change after 1 second: allows IO.select to be re-run with
40 # an updated $connections list (outgoing connections could be made)
41 socket_set = IO.select([$server] + $connections, nil, $connections, 1)
42 if socket_set == nil
43 next
44 end
45 # disconnect clients with errors
46 socket_set[2].each do |sock|
47 sock.close
48 $connections.delete(sock)
49 end
50 # accept new connections and process incoming messages
51 read_sockets = socket_set[0]
52 read_sockets.each do |socket|
53 # incoming connection: connect and process with EDGEB information
54 if socket == $server
55 client,addr = $server.accept
56
57 msg = client.gets("\n").chomp
58 if (msg != nil)
59 if msg.start_with?("EDGEB")
60 info = msg.split(",")
61 ip_name = info[1]
62 ip_addr = info[2]
63 $nodes[ip_name]["IP"] = ip_addr
64 $nodes[ip_name]["COST"] = 1
65 $nodes[ip_name]["SOCKET"] = client
66 $connections.push(client)
67 $nodes[$hostname]["NEIGHBORS"][ip_name] = 1
68 end
69 end
70 # client msg
71 else
72 # connection dropped
73 if socket.eof?
74 socket.flush
75 $connections.delete(socket)
76 end
77 msg = socket.gets("\n")
78 if (msg != nil)
79 msg.chomp
80 # handle network updates (LSPs)
81 if msg.start_with?("EDGEU")
82 info = msg.split(",")
83 node_from = info[1]
84 neighbors = info[2]
85 seq_num = info[3].to_i
86 ttl = info[4].to_i
87 # check if LSP is valid before processing
88 if ttl > 0 and node_from != $hostname and seq_num > $nodes[node_from]["SEQNUM"]
89 # store msg info
90 # remove brackets
91 neighbor_pairs = neighbors[1..neighbors.length-2].split(";")
92 neighbor_pairs.each do |s|
93 node_to, cost = s.split(">")
94 $nodes[node_from]["NEIGHBORS"][node_to] = cost.to_i
95 end
96 $nodes[node_from]["SEQNUM"] = seq_num
97 # create new message with deceremented TTL
98 new_msg = info.first(4).push(ttl-1).join(",") + "\n"
99 # send to rest of neighbors
100 $connections.reject{|s| s == socket}.each do |sock|
101 sock.write(new_msg)
102 sock.flush
103 end
104 end
105 end
106
107 if msg.start_with?("SNDMSG")
108 info = msg.split(",")
109 src = info[1]
110 dst = info[2]
111 i = info[3].to_i
112 total = info[4].to_i
113 frag = info[5]
114
115 if (dst != $hostname)
116 next_node = $nodes[dst]["NEXTHOP"]
117 $nodes[next_node]["SOCKET"].write("#{msg}")
118 else
119 #NEED TO IMPLEMENT FRAGMENTATION HERE!
120 $messages[src]["MSG"].push(frag)
121 #if ($messages[src]["MSG"].size == total)
122 # updated_msg = $message[src]["MSG"].join("")
123 STDOUT.puts("SNDMSG: #{src} --> #{frag}")
124 #end
125 end
126 end
127 if msg.start_with?("ACKP")
128 info = msg.split(",")
129 seqID = info[1].to_i
130 src = info[2]
131 dst = info[3].chomp
132
133 if ($hostname == dst)
134 #diff = Time.now - ping[3].to_i
135 diff = 0
136 STDOUT.puts("#{seqID} #{src} #{diff}")
137 else
138 next_node = $nodes[dst]["NEXTHOP"]
139
140 $nodes[next_node]["SOCKET"].write("#{msg}")
141
142 end
143 end
144 if msg.start_with?("PING")
145 info = msg.split(",")
146 seqID = info[1].to_i
147 src = info[2]
148 dst = info[3].chomp
149
150 if (dst != $hostname)
151
152 next_node = $nodes[dst]["NEXTHOP"]
153 $nodes[next_node]["SOCKET"].write("#{msg}\n")
154 #STDOUT.puts msg
155 #$nodes[next_node]["SOCKET"].flush
156 else
157 #need to change the msg layout
158 next_node = $nodes[src]["NEXTHOP"]
159 msg = "ACKP,#{seqID},#{$hostname},#{src}"
160 #STDOUT.puts msg
161 #STDOUT.puts next_node
162 $nodes[next_node]["SOCKET"].write("#{msg}\n")
163 #STDOUT.puts msg
164 end
165 #need to send acknowledgment BACK
166 #flip src and dst and proceed as follows with a PINGACK
167 end
168
169 if msg.start_with?("TRACE")
170 info = msg.split(",")
171 seqID = info[1].to_i
172 src = info[2]
173 dst = info[3].strip
174 if (dst != $hostname)
175 seqID += 1
176 msg = "TRACE,#{src},#{dst}"
177 next_node = $nodes[dst]["NEXTHOP"]
178 $nodes[next_node]["SOCKET"].write("#{msg}\n")
179 STDOUT.puts msg
180 #also send a packet BACK
181
182 trace_msg = "ACKT,#{$hostname},#{src}"
183 prev_node = $nodes[src]["NEXTHOP"]
184 $nodes[prev_node]["SOCKET"].write("#{trace_msg}\n")
185 else
186 seqID +=1
187 trace_msg = "ACKT,#{$hostname},#{src}"
188 prev_node = $nodes[src]["NEXTHOP"]
189 $nodes[prev_node]["SOCKET"].write("#{trace_msg}\n")
190 STDOUT.puts msg
191 end
192 end
193
194 if msg.start_with?("ACKT")
195 info = msg.split(",")
196 seqID = info[1].to_i
197 src = info[2]
198 dst = info[3].strip
199 if (dst != $hostname)
200 next_node = $nodes[dst]["NEXTHOP"]
201 $nodes[next_node]["SOCKET"].write("#{msg}\n")
202 else
203 STDOUT.puts "seqID #{src} 0"
204 end
205 end
206
207 end
208 end
209 end
210 end
211end
212
213# FORMAT: EDGEB [SRCIP][DSTIP][DST]
214# Creates symmetric edge BETWEEN node on which command is run AND the node specified by DST.
215# Make sure the called on node is sharing the information with the DST node
216# so that it can set up the reverse edge.
217# Must check the DST is a DIRECT neighbor of the node otherwise fail silently.
218# Each node should have a table pre-filled from setup from nodes.txt
219def edgeb(cmd)
220 src_ip_addr = cmd[0]
221 dst_ip_addr = cmd[1]
222 dst = cmd[2]
223 dst_port_num = $nodes[dst]["PORT"]
224
225 # connect to DST and add connection to connection list
226 sock = TCPSocket.new dst_ip_addr, dst_port_num
227 sock.write("EDGEB,#{$hostname},#{src_ip_addr}\n")
228 $connections.push(sock)
229
230 # fill in neighbor properties
231 $nodes[dst]["SOCKET"] = sock
232 $nodes[dst]["IP"] = dst_ip_addr
233 $nodes[$hostname]["NEIGHBORS"][dst] = 1
234 $nodes[dst]["COST"] = 1
235end
236
237# Dumps the routing table to the first argument in cmd.
238def dumptable(cmd)
239 # remove "./" from filestring if it exists
240 cmd[0].slice("./")
241 file = File.open(cmd[0], "w")
242 STDOUT.puts "#{$nodes}"
243
244 # create and sort the table
245 table = []
246 # ignore implicit self-edge
247 $nodes.reject{|k,v| k == $hostname}.each do |k,v|
248 # dumptable format: src, dst, nexthop, cost
249 table.push([$hostname, k, v["NEXTHOP"], v["COST"]])
250 end
251 # reject nodes without a path
252 table.reject!{|src, dst, nexthop, cost| nexthop == nil}
253 # sort in increasing order by (src), dst, nexthop, cost
254 table.sort_by!{|src, dst, nexthop, cost| [dst, nexthop, cost]}
255
256 # print the table; join by comma
257 table.each do |line|
258 file.puts("#{line.join(",")}")
259 end
260
261 file.close
262end
263
264def shutdown(cmd)
265
266 STDOUT.flush
267 STDERR.flush
268
269 $connections.each do |socket|
270 socket.close
271 end
272 $server.close
273
274 exit(0)
275end
276
277
278# --------------------- Part 1 --------------------- #
279
280# schedule a new event in the priority queue.
281def schedule(time, event)
282 i = 0
283 $event_q.each do |e|
284 i += 1
285 break if e[0] > time
286 end
287 $event_q.insert(i, [time,event])
288end
289
290# thread function to update $clock variable.
291def updateTime
292 $sleep_interval = $updateInterval / 2.0
293 $clock = Time.now
294 while(true)
295 sleep($sleep_interval)
296 $clock_semaphore.synchronize {
297 $clock = $clock + $sleep_interval
298 }
299 end
300end
301
302# responsible for scheduling regular router updates.
303def handleUpdates
304 while(true)
305 $q_semaphore.synchronize {
306 schedule($clock + $updateInterval / 2.0, method(:updateNetwork))
307 schedule($clock + $updateInterval, method(:updateRouting))
308 }
309 sleep($updateInterval)
310 end
311end
312
313# responsible for executing scheduled functions in the queue.
314def handleEvents
315 while(true)
316 $q_semaphore.synchronize {
317 $event_q.each do |e|
318 if e[0] < $clock
319 Thread.new {e[1].call}
320 $event_q.delete(e)
321 break
322 end
323 end
324 }
325 end
326end
327
328# run with $hostname as the source node.
329def dijkstra
330 source = $hostname
331 q = []
332 dist = {}
333 prev = {}
334 $nodes.keys.each do |n|
335 dist[n] = Float::INFINITY
336 prev[n] = nil
337 q.push(n)
338 end
339
340 dist[source] = 0
341 while not q.empty?
342 # node with least distance will be selected first
343 u, _ = dist.reject{|n| not q.include?(n)}.min_by{|k,v| v}
344 q.delete(u)
345 # for each neighbor v of u, where v is still in q
346 $nodes[u]["NEIGHBORS"].reject{|n,cost| not q.include?(n)}.each do |v,cost|
347 alt = dist[u] + cost
348 if alt < dist[v]
349 dist[v] = alt
350 # if v is a neighbor of $hostname, nexthop[v] = v
351 if $nodes[$hostname]["NEIGHBORS"].include?(v)
352 prev[v] = v
353 elsif prev[u] != nil
354 prev[v] = prev[u]
355 end
356 end
357 end
358 end
359 return dist, prev
360end
361
362# update costs of links in the network
363def updateNetwork
364 # calculate shortest paths using Dijkstra's alg
365 dist, prev = dijkstra()
366 # update graph information
367 dist.each do |n,c|
368 $nodes[n]["COST"] = c
369 end
370 prev.each do |u,v|
371 # node is not a direct neighbor
372 if v != $hostname
373 $nodes[u]["NEXTHOP"] = v
374 else
375 $nodes[u]["NEXTHOP"] = u
376 end
377
378 end
379end
380
381# send information about current connections to neighbors to all neighbors
382def updateRouting
383 if not $connections.empty?
384 ttl = $nodes.size
385 # generate string to send
386 # format of neighbors array: [nodename>cost;nodename>cost]
387 neighbor_costs = "[#{$nodes[$hostname]["NEIGHBORS"].map{|k,v| "#{k}>#{v}"}.join(";")}]"
388 routing_string = "EDGEU,#{$hostname},#{neighbor_costs},#{$nodes[$hostname]["SEQNUM"]},#{ttl}\n"
389 # send routing update to all neighbors
390 $connections.each do |sock|
391 sock.write(routing_string)
392 sock.flush
393 end
394 $nodes[$hostname]["SEQNUM"] += 1
395 end
396end
397
398# FORMAT: EDGED [DST]
399# This method destroys the edge from the source node to the dst node (i.e.
400# removes all state information).
401# This method is non-symmetric and only destroys the edge on one node.
402# This makes the edge invalid in both directions.
403# DST must be a direct neighbor with a previously established link via EDGEB.
404def edged(cmd)
405 # make sure number of commands is valid
406 return unless cmd.length == 1
407 # close and destroy link
408 $nodes[$hostname]["NEIGHBORS"].delete(cmd[0])
409 $nodes[cmd[0]]["SOCKET"].close
410end
411
412# FORMAT: EDGEU [DST] [COST]
413# This method updates the cost of the link from the current node to the neighbor
414# node specified by DST. This method is not symmetric and only updates one edge.
415# COST must be a valid 32-bit integer.
416# DST must be a direct neighbor with a previously established link via EDGEB.
417def edgeu(cmd)
418 # make sure number of commands is valid
419 return unless cmd.length == 2
420 # update cost of link
421 $nodes[$hostname]["NEIGHBORS"][cmd[0]] = cmd[1].to_i
422end
423
424# prints nodename, port, and neighbor information to STDOUT
425def status()
426 STDOUT.puts("Name: #{$hostname}")
427 STDOUT.puts("Port: #{$port}")
428 # invalid neighbors from routing table: cost of infinity (unconnected) or cost of 0 (implicit self-edge)
429 valid_neighbors = $nodes[$hostname]["NEIGHBORS"].reject{|k,v| v == Float::INFINITY or v == 0}
430 STDOUT.puts("Neighbors: #{valid_neighbors.keys.sort.join(",")}")
431 STDOUT.flush
432end
433
434
435# --------------------- Part 2 --------------------- #
436def sendmsg(cmd)
437 dst = cmd[0]
438 # in case msg is longer than payload size
439 msg = cmd[1..-1].join(" ")
440 #msg = msg.join(",")
441
442 # fail silently if msg does not exist
443 if (msg == nil)
444 return
445 end
446 if (dst == $hostname)
447 msg.split(",")
448 msg.join(" ")
449 STDOUT.puts("SENDMSG: [#{$hostname}] --> [#{msg}]")
450 end
451
452 next_node = $nodes[dst]["NEXTHOP"]
453
454 if (next_node == nil)
455 STDOUT.puts ("SENDMSG ERROR: HOST UNREACHABLE")
456 end
457
458 # Condition where msg exceeds payload max size - must fragment
459 fragment = []
460 i = 0
461 while (msg.length > $maxPayload)
462 fragment[i] = msg[0..($maxPayload-1)]
463 msg = msg[$maxPayload..-1]
464 i = i+1
465 end
466
467 fragment[i] = msg
468 total = i+1
469 fragment_i = 1
470
471 #fragment.each do |n|
472 str = "SNDMSG,#{$hostname},#{dst},#{fragment_i},#{total},#{msg}"
473 $nodes[next_node]["SOCKET"].write("#{str}\n")
474 # fragment_i += 1
475 #STDOUT.puts(str)
476 #end
477 #STDOUT.puts "SENDMSG: not implemented"
478end
479
480def ping(cmd)
481 #HAVEN'T TESTED FOR DELAY AND IF THE PING GOES OUT OF ORDER
482 dst = cmd[0]
483 numpings = cmd[1].to_i
484 delay = cmd[2].to_i
485 plist = []
486
487 for seqID in 0..(numpings - 1)
488 sleep delay
489 parr = [seqID, $hostname, dst, Time.now]
490 plist.push(parr)
491
492 next_node = $nodes[dst]["NEXTHOP"]
493
494 if (next_node == nil)
495 STDOUT.puts("PING ERROR: HOST UNREACHABLE")
496 end
497 str = "PING,#{seqID},#{$hostname},#{dst}"
498 $nodes[next_node]["SOCKET"].write("#{str}\n")
499 $nodes[next_node]["SOCKET"].flush
500
501
502 #timer = Thread.new do
503 # timeout = [parr[0], $hostname, dst, parr[3]]
504 # sleep $pingTimeout
505 # or if plist val parr[3] is > timeout val
506 # if (plist.include? timeout)
507 # STDOUT.puts "PING ERROR: HOST UNREACHABLE"
508 #end
509
510 #here maybe include sorting when pings come in different order
511 #end
512 end
513 #seqId should start 0 and increase by one every round
514 #
515
516 #STDOUT.puts "PING: not implemented"
517end
518
519#didn't implement time delay for this either - will do later
520def traceroute(cmd)
521 dst = cmd[0]
522 seqID = 0
523 str = "TRACE,#{seqID},#{$hostname},#{dst}"
524 next_node = $nodes[dst]["NEXTHOP"]
525 STDOUT.puts "#{seqID} #{$hostname} 0"
526 $nodes[next_node]["SOCKET"].write("#{str}\n")
527
528 #STDOUT.puts "TRACEROUTE: not implemented"
529end
530
531# --------------------- Part 3 --------------------- #
532
533
534def ftp(cmd)
535 STDOUT.puts "FTP: not implemented"
536end
537
538def circuit(cmd)
539 STDOUT.puts "CIRCUIT: not implemented"
540end
541
542
543
544
545# do main loop here....
546def main()
547
548 while(line = STDIN.gets())
549 line = line.strip()
550 arr = line.split(' ')
551 cmd = arr[0]
552 args = arr[1..-1]
553 case cmd
554 when "EDGEB"; edgeb(args)
555 when "EDGED"; edged(args)
556 when "EDGEU"; edgeu(args)
557 when "DUMPTABLE"; dumptable(args)
558 when "SHUTDOWN"; shutdown(args)
559 when "STATUS"; status()
560 when "SENDMSG"; sendmsg(args)
561 when "PING"; ping(args)
562 when "TRACEROUTE"; traceroute(args)
563 when "FTP"; ftp(args);
564 when "CIRCUIT"; circuit(args);
565 else STDERR.puts "ERROR: INVALID COMMAND \"#{cmd}\""
566 end
567 end
568
569end
570
571def setup(hostname, port, nodes, config)
572 $hostname = hostname
573 $port = port.to_i
574
575 #set up ports, server, buffers
576 #Nodes will be saved in HashMap Table (hashmap within a hashmap)
577 fHandle = File.open(nodes)
578 while (line = fHandle.gets())
579 arr = line.chomp.split(',')
580
581 # node name and port
582 $nodes[arr[0]] = {}
583 $nodes[arr[0]]["IP"] = nil
584 $nodes[arr[0]]["SOCKET"] = nil
585 $nodes[arr[0]]["PORT"] = arr[1].to_i #important-specifies which port each other node is running on
586 $nodes[arr[0]]["COST"] = Float::INFINITY # shortest cost to node
587 $nodes[arr[0]]["NEIGHBORS"] = {} # hash: neighbor => (cost of edge from $nodes key to neighbor)
588 $nodes[arr[0]]["NEXTHOP"] = nil
589 $nodes[arr[0]]["SEQNUM"] = 1
590 $nodes[arr[0]]["MESSAGE"] = nil
591
592 $messages[arr[0]] = {}
593 $messages[arr[0]]["MSG"] = []
594 end
595
596 $nodes[$hostname]["COST"] = 0
597
598 # handle config
599 config_string = File.read(config)
600 $updateInterval = config_string.match(/updateInterval=(\d+)/)[1].to_i
601 $maxPayload = config_string.match(/maxPayload=(\d+)/)[1].to_i
602 $pingTimeout = config_string.match(/pingTimeout=(\d+)/)[1].to_i
603
604 time_thread = Thread.new do
605 updateTime()
606 end
607
608 server_thread = Thread.new do
609 tcpServer()
610 end
611
612 # ensure server and timing threads are setup before update threads begin
613 sleep(1)
614
615 update_thread = Thread.new do
616 handleUpdates()
617 end
618
619 event_thread = Thread.new do
620 handleEvents()
621 end
622
623 main()
624
625end
626
627setup(ARGV[0], ARGV[1], ARGV[2], ARGV[3])