· 6 years ago · Sep 05, 2019, 08:28 PM
1#!/usr/bin/env ruby
2
3# TODO (temporary here, we'll move this into the Github issues once
4# redis-trib initial implementation is completed).
5#
6# - Make sure that if the rehashing fails in the middle redis-trib will try
7# to recover.
8# - When redis-trib performs a cluster check, if it detects a slot move in
9# progress it should prompt the user to continue the move from where it
10# stopped.
11# - Gracefully handle Ctrl+C in move_slot to prompt the user if really stop
12# while rehashing, and performing the best cleanup possible if the user
13# forces the quit.
14# - When doing "fix" set a global Fix to true, and prompt the user to
15# fix the problem if automatically fixable every time there is something
16# to fix. For instance:
17# 1) If there is a node that pretend to receive a slot, or to migrate a
18# slot, but has no entries in that slot, fix it.
19# 2) If there is a node having keys in slots that are not owned by it
20# fix this condition moving the entries in the same node.
21# 3) Perform more possibly slow tests about the state of the cluster.
22# 4) When aborted slot migration is detected, fix it.
23
24require 'rubygems'
25require 'redis'
26
27ClusterHashSlots = 16384
28MigrateDefaultTimeout = 60000
29MigrateDefaultPipeline = 10
30RebalanceDefaultThreshold = 2
31
32$verbose = false
33
34def xputs(s)
35 case s[0..2]
36 when ">>>"
37 color="29;1"
38 when "[ER"
39 color="31;1"
40 when "[WA"
41 color="31;1"
42 when "[OK"
43 color="32"
44 when "[FA","***"
45 color="33"
46 else
47 color=nil
48 end
49
50 color = nil if ENV['TERM'] != "xterm"
51 print "\033[#{color}m" if color
52 print s
53 print "\033[0m" if color
54 print "\n"
55end
56
57class ClusterNode
58 def initialize(addr)
59 s = addr.split("@")[0].split(":")
60 if s.length < 2
61 puts "Invalid IP or Port (given as #{addr}) - use IP:Port format"
62 exit 1
63 end
64 port = s.pop # removes port from split array
65 ip = s.join(":") # if s.length > 1 here, it's IPv6, so restore address
66 @r = nil
67 @info = {}
68 @info[:host] = ip
69 @info[:port] = port
70 @info[:slots] = {}
71 @info[:migrating] = {}
72 @info[:importing] = {}
73 @info[:replicate] = false
74 @dirty = false # True if we need to flush slots info into node.
75 @friends = []
76 end
77
78 def friends
79 @friends
80 end
81
82 def slots
83 @info[:slots]
84 end
85
86 def has_flag?(flag)
87 @info[:flags].index(flag)
88 end
89
90 def to_s
91 "#{@info[:host]}:#{@info[:port]}"
92 end
93
94 def connect(o={})
95 return if @r
96 print "Connecting to node #{self}: " if $verbose
97 STDOUT.flush
98 begin
99 @r = Redis.new(:host => @info[:host], :port => @info[:port], :timeout => 60)
100 @r.ping
101 rescue
102 xputs "[ERR] Sorry, can't connect to node #{self}"
103 exit 1 if o[:abort]
104 @r = nil
105 end
106 xputs "OK" if $verbose
107 end
108
109 def assert_cluster
110 info = @r.info
111 if !info["cluster_enabled"] || info["cluster_enabled"].to_i == 0
112 xputs "[ERR] Node #{self} is not configured as a cluster node."
113 exit 1
114 end
115 end
116
117 def assert_empty
118 if !(@r.cluster("info").split("\r\n").index("cluster_known_nodes:1")) ||
119 (@r.info['db0'])
120 xputs "[ERR] Node #{self} is not empty. Either the node already knows other nodes (check with CLUSTER NODES) or contains some key in database 0."
121 exit 1
122 end
123 end
124
125 def load_info(o={})
126 self.connect
127 nodes = @r.cluster("nodes").split("\n")
128 nodes.each{|n|
129 # name addr flags role ping_sent ping_recv link_status slots
130 split = n.split
131 name,addr,flags,master_id,ping_sent,ping_recv,config_epoch,link_status = split[0..6]
132 slots = split[8..-1]
133 info = {
134 :name => name,
135 :addr => addr,
136 :flags => flags.split(","),
137 :replicate => master_id,
138 :ping_sent => ping_sent.to_i,
139 :ping_recv => ping_recv.to_i,
140 :link_status => link_status
141 }
142 info[:replicate] = false if master_id == "-"
143
144 if info[:flags].index("myself")
145 @info = @info.merge(info)
146 @info[:slots] = {}
147 slots.each{|s|
148 if s[0..0] == '['
149 if s.index("->-") # Migrating
150 slot,dst = s[1..-1].split("->-")
151 @info[:migrating][slot.to_i] = dst
152 elsif s.index("-<-") # Importing
153 slot,src = s[1..-1].split("-<-")
154 @info[:importing][slot.to_i] = src
155 end
156 elsif s.index("-")
157 start,stop = s.split("-")
158 self.add_slots((start.to_i)..(stop.to_i))
159 else
160 self.add_slots((s.to_i)..(s.to_i))
161 end
162 } if slots
163 @dirty = false
164 @r.cluster("info").split("\n").each{|e|
165 k,v=e.split(":")
166 k = k.to_sym
167 v.chop!
168 if k != :cluster_state
169 @info[k] = v.to_i
170 else
171 @info[k] = v
172 end
173 }
174 elsif o[:getfriends]
175 @friends << info
176 end
177 }
178 end
179
180 def add_slots(slots)
181 slots.each{|s|
182 @info[:slots][s] = :new
183 }
184 @dirty = true
185 end
186
187 def set_as_replica(node_id)
188 @info[:replicate] = node_id
189 @dirty = true
190 end
191
192 def flush_node_config
193 return if !@dirty
194 if @info[:replicate]
195 begin
196 @r.cluster("replicate",@info[:replicate])
197 rescue
198 # If the cluster did not already joined it is possible that
199 # the slave does not know the master node yet. So on errors
200 # we return ASAP leaving the dirty flag set, to flush the
201 # config later.
202 return
203 end
204 else
205 new = []
206 @info[:slots].each{|s,val|
207 if val == :new
208 new << s
209 @info[:slots][s] = true
210 end
211 }
212 @r.cluster("addslots",*new)
213 end
214 @dirty = false
215 end
216
217 def info_string
218 # We want to display the hash slots assigned to this node
219 # as ranges, like in: "1-5,8-9,20-25,30"
220 #
221 # Note: this could be easily written without side effects,
222 # we use 'slots' just to split the computation into steps.
223
224 # First step: we want an increasing array of integers
225 # for instance: [1,2,3,4,5,8,9,20,21,22,23,24,25,30]
226 slots = @info[:slots].keys.sort
227
228 # As we want to aggregate adjacent slots we convert all the
229 # slot integers into ranges (with just one element)
230 # So we have something like [1..1,2..2, ... and so forth.
231 slots.map!{|x| x..x}
232
233 # Finally we group ranges with adjacent elements.
234 slots = slots.reduce([]) {|a,b|
235 if !a.empty? && b.first == (a[-1].last)+1
236 a[0..-2] + [(a[-1].first)..(b.last)]
237 else
238 a + [b]
239 end
240 }
241
242 # Now our task is easy, we just convert ranges with just one
243 # element into a number, and a real range into a start-end format.
244 # Finally we join the array using the comma as separator.
245 slots = slots.map{|x|
246 x.count == 1 ? x.first.to_s : "#{x.first}-#{x.last}"
247 }.join(",")
248
249 role = self.has_flag?("master") ? "M" : "S"
250
251 if self.info[:replicate] and @dirty
252 is = "S: #{self.info[:name]} #{self.to_s}"
253 else
254 is = "#{role}: #{self.info[:name]} #{self.to_s}\n"+
255 " slots:#{slots} (#{self.slots.length} slots) "+
256 "#{(self.info[:flags]-["myself"]).join(",")}"
257 end
258 if self.info[:replicate]
259 is += "\n replicates #{info[:replicate]}"
260 elsif self.has_flag?("master") && self.info[:replicas]
261 is += "\n #{info[:replicas].length} additional replica(s)"
262 end
263 is
264 end
265
266 # Return a single string representing nodes and associated slots.
267 # TODO: remove slaves from config when slaves will be handled
268 # by Redis Cluster.
269 def get_config_signature
270 config = []
271 @r.cluster("nodes").each_line{|l|
272 s = l.split
273 slots = s[8..-1].select {|x| x[0..0] != "["}
274 next if slots.length == 0
275 config << s[0]+":"+(slots.sort.join(","))
276 }
277 config.sort.join("|")
278 end
279
280 def info
281 @info
282 end
283
284 def is_dirty?
285 @dirty
286 end
287
288 def r
289 @r
290 end
291end
292
293class RedisTrib
294 def initialize
295 @nodes = []
296 @fix = false
297 @errors = []
298 @timeout = MigrateDefaultTimeout
299 end
300
301 def check_arity(req_args, num_args)
302 if ((req_args > 0 and num_args != req_args) ||
303 (req_args < 0 and num_args < req_args.abs))
304 xputs "[ERR] Wrong number of arguments for specified sub command"
305 exit 1
306 end
307 end
308
309 def add_node(node)
310 @nodes << node
311 end
312
313 def reset_nodes
314 @nodes = []
315 end
316
317 def cluster_error(msg)
318 @errors << msg
319 xputs msg
320 end
321
322 # Return the node with the specified ID or Nil.
323 def get_node_by_name(name)
324 @nodes.each{|n|
325 return n if n.info[:name] == name.downcase
326 }
327 return nil
328 end
329
330 # Like get_node_by_name but the specified name can be just the first
331 # part of the node ID as long as the prefix in unique across the
332 # cluster.
333 def get_node_by_abbreviated_name(name)
334 l = name.length
335 candidates = []
336 @nodes.each{|n|
337 if n.info[:name][0...l] == name.downcase
338 candidates << n
339 end
340 }
341 return nil if candidates.length != 1
342 candidates[0]
343 end
344
345 # This function returns the master that has the least number of replicas
346 # in the cluster. If there are multiple masters with the same smaller
347 # number of replicas, one at random is returned.
348 def get_master_with_least_replicas
349 masters = @nodes.select{|n| n.has_flag? "master"}
350 sorted = masters.sort{|a,b|
351 a.info[:replicas].length <=> b.info[:replicas].length
352 }
353 sorted[0]
354 end
355
356 def check_cluster(opt={})
357 xputs ">>> Performing Cluster Check (using node #{@nodes[0]})"
358 show_nodes if !opt[:quiet]
359 check_config_consistency
360 check_open_slots
361 check_slots_coverage
362 end
363
364 def show_cluster_info
365 masters = 0
366 keys = 0
367 @nodes.each{|n|
368 if n.has_flag?("master")
369 puts "#{n} (#{n.info[:name][0...8]}...) -> #{n.r.dbsize} keys | #{n.slots.length} slots | "+
370 "#{n.info[:replicas].length} slaves."
371 masters += 1
372 keys += n.r.dbsize
373 end
374 }
375 xputs "[OK] #{keys} keys in #{masters} masters."
376 keys_per_slot = sprintf("%.2f",keys/16384.0)
377 puts "#{keys_per_slot} keys per slot on average."
378 end
379
380 # Merge slots of every known node. If the resulting slots are equal
381 # to ClusterHashSlots, then all slots are served.
382 def covered_slots
383 slots = {}
384 @nodes.each{|n|
385 slots = slots.merge(n.slots)
386 }
387 slots
388 end
389
390 def check_slots_coverage
391 xputs ">>> Check slots coverage..."
392 slots = covered_slots
393 if slots.length == ClusterHashSlots
394 xputs "[OK] All #{ClusterHashSlots} slots covered."
395 else
396 cluster_error \
397 "[ERR] Not all #{ClusterHashSlots} slots are covered by nodes."
398 fix_slots_coverage if @fix
399 end
400 end
401
402 def check_open_slots
403 xputs ">>> Check for open slots..."
404 open_slots = []
405 @nodes.each{|n|
406 if n.info[:migrating].size > 0
407 cluster_error \
408 "[WARNING] Node #{n} has slots in migrating state (#{n.info[:migrating].keys.join(",")})."
409 open_slots += n.info[:migrating].keys
410 end
411 if n.info[:importing].size > 0
412 cluster_error \
413 "[WARNING] Node #{n} has slots in importing state (#{n.info[:importing].keys.join(",")})."
414 open_slots += n.info[:importing].keys
415 end
416 }
417 open_slots.uniq!
418 if open_slots.length > 0
419 xputs "[WARNING] The following slots are open: #{open_slots.join(",")}"
420 end
421 if @fix
422 open_slots.each{|slot| fix_open_slot slot}
423 end
424 end
425
426 def nodes_with_keys_in_slot(slot)
427 nodes = []
428 @nodes.each{|n|
429 next if n.has_flag?("slave")
430 nodes << n if n.r.cluster("getkeysinslot",slot,1).length > 0
431 }
432 nodes
433 end
434
435 def fix_slots_coverage
436 not_covered = (0...ClusterHashSlots).to_a - covered_slots.keys
437 xputs ">>> Fixing slots coverage..."
438 xputs "List of not covered slots: " + not_covered.join(",")
439
440 # For every slot, take action depending on the actual condition:
441 # 1) No node has keys for this slot.
442 # 2) A single node has keys for this slot.
443 # 3) Multiple nodes have keys for this slot.
444 slots = {}
445 not_covered.each{|slot|
446 nodes = nodes_with_keys_in_slot(slot)
447 slots[slot] = nodes
448 xputs "Slot #{slot} has keys in #{nodes.length} nodes: #{nodes.join(", ")}"
449 }
450
451 none = slots.select {|k,v| v.length == 0}
452 single = slots.select {|k,v| v.length == 1}
453 multi = slots.select {|k,v| v.length > 1}
454
455 # Handle case "1": keys in no node.
456 if none.length > 0
457 xputs "The folowing uncovered slots have no keys across the cluster:"
458 xputs none.keys.join(",")
459 yes_or_die "Fix these slots by covering with a random node?"
460 none.each{|slot,nodes|
461 node = @nodes.sample
462 xputs ">>> Covering slot #{slot} with #{node}"
463 node.r.cluster("addslots",slot)
464 }
465 end
466
467 # Handle case "2": keys only in one node.
468 if single.length > 0
469 xputs "The folowing uncovered slots have keys in just one node:"
470 puts single.keys.join(",")
471 yes_or_die "Fix these slots by covering with those nodes?"
472 single.each{|slot,nodes|
473 xputs ">>> Covering slot #{slot} with #{nodes[0]}"
474 nodes[0].r.cluster("addslots",slot)
475 }
476 end
477
478 # Handle case "3": keys in multiple nodes.
479 if multi.length > 0
480 xputs "The folowing uncovered slots have keys in multiple nodes:"
481 xputs multi.keys.join(",")
482 yes_or_die "Fix these slots by moving keys into a single node?"
483 multi.each{|slot,nodes|
484 target = get_node_with_most_keys_in_slot(nodes,slot)
485 xputs ">>> Covering slot #{slot} moving keys to #{target}"
486
487 target.r.cluster('addslots',slot)
488 target.r.cluster('setslot',slot,'stable')
489 nodes.each{|src|
490 next if src == target
491 # Set the source node in 'importing' state (even if we will
492 # actually migrate keys away) in order to avoid receiving
493 # redirections for MIGRATE.
494 src.r.cluster('setslot',slot,'importing',target.info[:name])
495 move_slot(src,target,slot,:dots=>true,:fix=>true,:cold=>true)
496 src.r.cluster('setslot',slot,'stable')
497 }
498 }
499 end
500 end
501
502 # Return the owner of the specified slot
503 def get_slot_owners(slot)
504 owners = []
505 @nodes.each{|n|
506 next if n.has_flag?("slave")
507 n.slots.each{|s,_|
508 owners << n if s == slot
509 }
510 }
511 owners
512 end
513
514 # Return the node, among 'nodes' with the greatest number of keys
515 # in the specified slot.
516 def get_node_with_most_keys_in_slot(nodes,slot)
517 best = nil
518 best_numkeys = 0
519 @nodes.each{|n|
520 next if n.has_flag?("slave")
521 numkeys = n.r.cluster("countkeysinslot",slot)
522 if numkeys > best_numkeys || best == nil
523 best = n
524 best_numkeys = numkeys
525 end
526 }
527 return best
528 end
529
530 # Slot 'slot' was found to be in importing or migrating state in one or
531 # more nodes. This function fixes this condition by migrating keys where
532 # it seems more sensible.
533 def fix_open_slot(slot)
534 puts ">>> Fixing open slot #{slot}"
535
536 # Try to obtain the current slot owner, according to the current
537 # nodes configuration.
538 owners = get_slot_owners(slot)
539 owner = owners[0] if owners.length == 1
540
541 migrating = []
542 importing = []
543 @nodes.each{|n|
544 next if n.has_flag? "slave"
545 if n.info[:migrating][slot]
546 migrating << n
547 elsif n.info[:importing][slot]
548 importing << n
549 elsif n.r.cluster("countkeysinslot",slot) > 0 && n != owner
550 xputs "*** Found keys about slot #{slot} in node #{n}!"
551 importing << n
552 end
553 }
554 puts "Set as migrating in: #{migrating.join(",")}"
555 puts "Set as importing in: #{importing.join(",")}"
556
557 # If there is no slot owner, set as owner the slot with the biggest
558 # number of keys, among the set of migrating / importing nodes.
559 if !owner
560 xputs ">>> Nobody claims ownership, selecting an owner..."
561 owner = get_node_with_most_keys_in_slot(@nodes,slot)
562
563 # If we still don't have an owner, we can't fix it.
564 if !owner
565 xputs "[ERR] Can't select a slot owner. Impossible to fix."
566 exit 1
567 end
568
569 # Use ADDSLOTS to assign the slot.
570 puts "*** Configuring #{owner} as the slot owner"
571 owner.r.cluster("setslot",slot,"stable")
572 owner.r.cluster("addslots",slot)
573 # Make sure this information will propagate. Not strictly needed
574 # since there is no past owner, so all the other nodes will accept
575 # whatever epoch this node will claim the slot with.
576 owner.r.cluster("bumpepoch")
577
578 # Remove the owner from the list of migrating/importing
579 # nodes.
580 migrating.delete(owner)
581 importing.delete(owner)
582 end
583
584 # If there are multiple owners of the slot, we need to fix it
585 # so that a single node is the owner and all the other nodes
586 # are in importing state. Later the fix can be handled by one
587 # of the base cases above.
588 #
589 # Note that this case also covers multiple nodes having the slot
590 # in migrating state, since migrating is a valid state only for
591 # slot owners.
592 if owners.length > 1
593 owner = get_node_with_most_keys_in_slot(owners,slot)
594 owners.each{|n|
595 next if n == owner
596 n.r.cluster('delslots',slot)
597 n.r.cluster('setslot',slot,'importing',owner.info[:name])
598 importing.delete(n) # Avoid duplciates
599 importing << n
600 }
601 owner.r.cluster('bumpepoch')
602 end
603
604 # Case 1: The slot is in migrating state in one slot, and in
605 # importing state in 1 slot. That's trivial to address.
606 if migrating.length == 1 && importing.length == 1
607 move_slot(migrating[0],importing[0],slot,:dots=>true,:fix=>true)
608 # Case 2: There are multiple nodes that claim the slot as importing,
609 # they probably got keys about the slot after a restart so opened
610 # the slot. In this case we just move all the keys to the owner
611 # according to the configuration.
612 elsif migrating.length == 0 && importing.length > 0
613 xputs ">>> Moving all the #{slot} slot keys to its owner #{owner}"
614 importing.each {|node|
615 next if node == owner
616 move_slot(node,owner,slot,:dots=>true,:fix=>true,:cold=>true)
617 xputs ">>> Setting #{slot} as STABLE in #{node}"
618 node.r.cluster("setslot",slot,"stable")
619 }
620 # Case 3: There are no slots claiming to be in importing state, but
621 # there is a migrating node that actually don't have any key. We
622 # can just close the slot, probably a reshard interrupted in the middle.
623 elsif importing.length == 0 && migrating.length == 1 &&
624 migrating[0].r.cluster("getkeysinslot",slot,10).length == 0
625 migrating[0].r.cluster("setslot",slot,"stable")
626 else
627 xputs "[ERR] Sorry, Redis-trib can't fix this slot yet (work in progress). Slot is set as migrating in #{migrating.join(",")}, as importing in #{importing.join(",")}, owner is #{owner}"
628 end
629 end
630
631 # Check if all the nodes agree about the cluster configuration
632 def check_config_consistency
633 if !is_config_consistent?
634 cluster_error "[ERR] Nodes don't agree about configuration!"
635 else
636 xputs "[OK] All nodes agree about slots configuration."
637 end
638 end
639
640 def is_config_consistent?
641 signatures=[]
642 @nodes.each{|n|
643 signatures << n.get_config_signature
644 }
645 return signatures.uniq.length == 1
646 end
647
648 def wait_cluster_join
649 print "Waiting for the cluster to join"
650 while !is_config_consistent?
651 print "."
652 STDOUT.flush
653 sleep 1
654 end
655 print "\n"
656 end
657
658 def alloc_slots
659 nodes_count = @nodes.length
660 masters_count = @nodes.length / (@replicas+1)
661 masters = []
662
663 # The first step is to split instances by IP. This is useful as
664 # we'll try to allocate master nodes in different physical machines
665 # (as much as possible) and to allocate slaves of a given master in
666 # different physical machines as well.
667 #
668 # This code assumes just that if the IP is different, than it is more
669 # likely that the instance is running in a different physical host
670 # or at least a different virtual machine.
671 ips = {}
672 @nodes.each{|n|
673 ips[n.info[:host]] = [] if !ips[n.info[:host]]
674 ips[n.info[:host]] << n
675 }
676
677 # Select master instances
678 puts "Using #{masters_count} masters:"
679 interleaved = []
680 stop = false
681 while not stop do
682 # Take one node from each IP until we run out of nodes
683 # across every IP.
684 ips.each do |ip,nodes|
685 if nodes.empty?
686 # if this IP has no remaining nodes, check for termination
687 if interleaved.length == nodes_count
688 # stop when 'interleaved' has accumulated all nodes
689 stop = true
690 next
691 end
692 else
693 # else, move one node from this IP to 'interleaved'
694 interleaved.push nodes.shift
695 end
696 end
697 end
698
699 masters = interleaved.slice!(0, masters_count)
700 nodes_count -= masters.length
701
702 masters.each{|m| puts m}
703
704 # Rotating the list sometimes helps to get better initial
705 # anti-affinity before the optimizer runs.
706 interleaved.push interleaved.shift
707
708 # Alloc slots on masters. After interleaving to get just the first N
709 # should be optimal. With slaves is more complex, see later...
710 slots_per_node = ClusterHashSlots.to_f / masters_count
711 first = 0
712 cursor = 0.0
713 masters.each_with_index{|n,masternum|
714 last = (cursor+slots_per_node-1).round
715 if last > ClusterHashSlots || masternum == masters.length-1
716 last = ClusterHashSlots-1
717 end
718 last = first if last < first # Min step is 1.
719 n.add_slots first..last
720 first = last+1
721 cursor += slots_per_node
722 }
723
724 # Select N replicas for every master.
725 # We try to split the replicas among all the IPs with spare nodes
726 # trying to avoid the host where the master is running, if possible.
727 #
728 # Note we loop two times. The first loop assigns the requested
729 # number of replicas to each master. The second loop assigns any
730 # remaining instances as extra replicas to masters. Some masters
731 # may end up with more than their requested number of replicas, but
732 # all nodes will be used.
733 assignment_verbose = false
734
735 [:requested,:unused].each do |assign|
736 masters.each do |m|
737 assigned_replicas = 0
738 while assigned_replicas < @replicas
739 break if nodes_count == 0
740 if assignment_verbose
741 if assign == :requested
742 puts "Requesting total of #{@replicas} replicas " \
743 "(#{assigned_replicas} replicas assigned " \
744 "so far with #{nodes_count} total remaining)."
745 elsif assign == :unused
746 puts "Assigning extra instance to replication " \
747 "role too (#{nodes_count} remaining)."
748 end
749 end
750
751 # Return the first node not matching our current master
752 node = interleaved.find{|n| n.info[:host] != m.info[:host]}
753
754 # If we found a node, use it as a best-first match.
755 # Otherwise, we didn't find a node on a different IP, so we
756 # go ahead and use a same-IP replica.
757 if node
758 slave = node
759 interleaved.delete node
760 else
761 slave = interleaved.shift
762 end
763 slave.set_as_replica(m.info[:name])
764 nodes_count -= 1
765 assigned_replicas += 1
766 puts "Adding replica #{slave} to #{m}"
767
768 # If we are in the "assign extra nodes" loop,
769 # we want to assign one extra replica to each
770 # master before repeating masters.
771 # This break lets us assign extra replicas to masters
772 # in a round-robin way.
773 break if assign == :unused
774 end
775 end
776 end
777
778 optimize_anti_affinity
779 end
780
781 def optimize_anti_affinity
782 score,aux = get_anti_affinity_score
783 return if score == 0
784
785 xputs ">>> Trying to optimize slaves allocation for anti-affinity"
786
787 maxiter = 500*@nodes.length # Effort is proportional to cluster size...
788 while maxiter > 0
789 score,offenders = get_anti_affinity_score
790 break if score == 0 # Optimal anti affinity reached
791
792 # We'll try to randomly swap a slave's assigned master causing
793 # an affinity problem with another random slave, to see if we
794 # can improve the affinity.
795 first = offenders.shuffle.first
796 nodes = @nodes.select{|n| n != first && n.info[:replicate]}
797 break if nodes.length == 0
798 second = nodes.shuffle.first
799
800 first_master = first.info[:replicate]
801 second_master = second.info[:replicate]
802 first.set_as_replica(second_master)
803 second.set_as_replica(first_master)
804
805 new_score,aux = get_anti_affinity_score
806 # If the change actually makes thing worse, revert. Otherwise
807 # leave as it is becuase the best solution may need a few
808 # combined swaps.
809 if new_score > score
810 first.set_as_replica(first_master)
811 second.set_as_replica(second_master)
812 end
813
814 maxiter -= 1
815 end
816
817 score,aux = get_anti_affinity_score
818 if score == 0
819 xputs "[OK] Perfect anti-affinity obtained!"
820 elsif score >= 10000
821 puts "[WARNING] Some slaves are in the same host as their master"
822 else
823 puts "[WARNING] Some slaves of the same master are in the same host"
824 end
825 end
826
827 # Return the anti-affinity score, which is a measure of the amount of
828 # violations of anti-affinity in the current cluster layout, that is, how
829 # badly the masters and slaves are distributed in the different IP
830 # addresses so that slaves of the same master are not in the master
831 # host and are also in different hosts.
832 #
833 # The score is calculated as follows:
834 #
835 # SAME_AS_MASTER = 10000 * each slave in the same IP of its master.
836 # SAME_AS_SLAVE = 1 * each slave having the same IP as another slave
837 # of the same master.
838 # FINAL_SCORE = SAME_AS_MASTER + SAME_AS_SLAVE
839 #
840 # So a greater score means a worse anti-affinity level, while zero
841 # means perfect anti-affinity.
842 #
843 # The anti affinity optimizator will try to get a score as low as
844 # possible. Since we do not want to sacrifice the fact that slaves should
845 # not be in the same host as the master, we assign 10000 times the score
846 # to this violation, so that we'll optimize for the second factor only
847 # if it does not impact the first one.
848 #
849 # The function returns two things: the above score, and the list of
850 # offending slaves, so that the optimizer can try changing the
851 # configuration of the slaves violating the anti-affinity goals.
852 def get_anti_affinity_score
853 score = 0
854 offending = [] # List of offending slaves to return to the caller
855
856 # First, split nodes by host
857 host_to_node = {}
858 @nodes.each{|n|
859 host = n.info[:host]
860 host_to_node[host] = [] if host_to_node[host] == nil
861 host_to_node[host] << n
862 }
863
864 # Then, for each set of nodes in the same host, split by
865 # related nodes (masters and slaves which are involved in
866 # replication of each other)
867 host_to_node.each{|host,nodes|
868 related = {}
869 nodes.each{|n|
870 if !n.info[:replicate]
871 name = n.info[:name]
872 related[name] = [] if related[name] == nil
873 related[name] << :m
874 else
875 name = n.info[:replicate]
876 related[name] = [] if related[name] == nil
877 related[name] << :s
878 end
879 }
880
881 # Now it's trivial to check, for each related group having the
882 # same host, what is their local score.
883 related.each{|id,types|
884 next if types.length < 2
885 types.sort! # Make sure :m if the first if any
886 if types[0] == :m
887 score += 10000 * (types.length-1)
888 else
889 score += 1 * types.length
890 end
891
892 # Populate the list of offending nodes
893 @nodes.each{|n|
894 if n.info[:replicate] == id &&
895 n.info[:host] == host
896 offending << n
897 end
898 }
899 }
900 }
901 return score,offending
902 end
903
904 def flush_nodes_config
905 @nodes.each{|n|
906 n.flush_node_config
907 }
908 end
909
910 def show_nodes
911 @nodes.each{|n|
912 xputs n.info_string
913 }
914 end
915
916 # Redis Cluster config epoch collision resolution code is able to eventually
917 # set a different epoch to each node after a new cluster is created, but
918 # it is slow compared to assign a progressive config epoch to each node
919 # before joining the cluster. However we do just a best-effort try here
920 # since if we fail is not a problem.
921 def assign_config_epoch
922 config_epoch = 1
923 @nodes.each{|n|
924 begin
925 n.r.cluster("set-config-epoch",config_epoch)
926 rescue
927 end
928 config_epoch += 1
929 }
930 end
931
932 def join_cluster
933 # We use a brute force approach to make sure the node will meet
934 # each other, that is, sending CLUSTER MEET messages to all the nodes
935 # about the very same node.
936 # Thanks to gossip this information should propagate across all the
937 # cluster in a matter of seconds.
938 first = false
939 @nodes.each{|n|
940 if !first then first = n.info; next; end # Skip the first node
941 n.r.cluster("meet",first[:host],first[:port])
942 }
943 end
944
945 def yes_or_die(msg)
946 print "#{msg} (type 'yes' to accept): "
947 STDOUT.flush
948 if !(STDIN.gets.chomp.downcase == "yes")
949 xputs "*** Aborting..."
950 exit 1
951 end
952 end
953
954 def load_cluster_info_from_node(nodeaddr)
955 node = ClusterNode.new(nodeaddr)
956 node.connect(:abort => true)
957 node.assert_cluster
958 node.load_info(:getfriends => true)
959 add_node(node)
960 node.friends.each{|f|
961 next if f[:flags].index("noaddr") ||
962 f[:flags].index("disconnected") ||
963 f[:flags].index("fail")
964 fnode = ClusterNode.new(f[:addr])
965 fnode.connect()
966 next if !fnode.r
967 begin
968 fnode.load_info()
969 add_node(fnode)
970 rescue => e
971 xputs "[ERR] Unable to load info for node #{fnode}"
972 end
973 }
974 populate_nodes_replicas_info
975 end
976
977 # This function is called by load_cluster_info_from_node in order to
978 # add additional information to every node as a list of replicas.
979 def populate_nodes_replicas_info
980 # Start adding the new field to every node.
981 @nodes.each{|n|
982 n.info[:replicas] = []
983 }
984
985 # Populate the replicas field using the replicate field of slave
986 # nodes.
987 @nodes.each{|n|
988 if n.info[:replicate]
989 master = get_node_by_name(n.info[:replicate])
990 if !master
991 xputs "*** WARNING: #{n} claims to be slave of unknown node ID #{n.info[:replicate]}."
992 else
993 master.info[:replicas] << n
994 end
995 end
996 }
997 end
998
999 # Given a list of source nodes return a "resharding plan"
1000 # with what slots to move in order to move "numslots" slots to another
1001 # instance.
1002 def compute_reshard_table(sources,numslots)
1003 moved = []
1004 # Sort from bigger to smaller instance, for two reasons:
1005 # 1) If we take less slots than instances it is better to start
1006 # getting from the biggest instances.
1007 # 2) We take one slot more from the first instance in the case of not
1008 # perfect divisibility. Like we have 3 nodes and need to get 10
1009 # slots, we take 4 from the first, and 3 from the rest. So the
1010 # biggest is always the first.
1011 sources = sources.sort{|a,b| b.slots.length <=> a.slots.length}
1012 source_tot_slots = sources.inject(0) {|sum,source|
1013 sum+source.slots.length
1014 }
1015 sources.each_with_index{|s,i|
1016 # Every node will provide a number of slots proportional to the
1017 # slots it has assigned.
1018 n = (numslots.to_f/source_tot_slots*s.slots.length)
1019 if i == 0
1020 n = n.ceil
1021 else
1022 n = n.floor
1023 end
1024 s.slots.keys.sort[(0...n)].each{|slot|
1025 if moved.length < numslots
1026 moved << {:source => s, :slot => slot}
1027 end
1028 }
1029 }
1030 return moved
1031 end
1032
1033 def show_reshard_table(table)
1034 table.each{|e|
1035 puts " Moving slot #{e[:slot]} from #{e[:source].info[:name]}"
1036 }
1037 end
1038
1039 # Move slots between source and target nodes using MIGRATE.
1040 #
1041 # Options:
1042 # :verbose -- Print a dot for every moved key.
1043 # :fix -- We are moving in the context of a fix. Use REPLACE.
1044 # :cold -- Move keys without opening slots / reconfiguring the nodes.
1045 # :update -- Update nodes.info[:slots] for source/target nodes.
1046 # :quiet -- Don't print info messages.
1047 def move_slot(source,target,slot,o={})
1048 o = {:pipeline => MigrateDefaultPipeline}.merge(o)
1049
1050 # We start marking the slot as importing in the destination node,
1051 # and the slot as migrating in the target host. Note that the order of
1052 # the operations is important, as otherwise a client may be redirected
1053 # to the target node that does not yet know it is importing this slot.
1054 if !o[:quiet]
1055 print "Moving slot #{slot} from #{source} to #{target}: "
1056 STDOUT.flush
1057 end
1058
1059 if !o[:cold]
1060 target.r.cluster("setslot",slot,"importing",source.info[:name])
1061 source.r.cluster("setslot",slot,"migrating",target.info[:name])
1062 end
1063 # Migrate all the keys from source to target using the MIGRATE command
1064 while true
1065 keys = source.r.cluster("getkeysinslot",slot,o[:pipeline])
1066 break if keys.length == 0
1067 begin
1068 source.r.client.call(["migrate",target.info[:host],target.info[:port],"",0,@timeout,:keys,*keys])
1069 rescue => e
1070 if o[:fix] && e.to_s =~ /BUSYKEY/
1071 xputs "*** Target key exists. Replacing it for FIX."
1072 source.r.client.call(["migrate",target.info[:host],target.info[:port],"",0,@timeout,:replace,:keys,*keys])
1073 else
1074 puts ""
1075 xputs "[ERR] Calling MIGRATE: #{e}"
1076 exit 1
1077 end
1078 end
1079 print "."*keys.length if o[:dots]
1080 STDOUT.flush
1081 end
1082
1083 puts if !o[:quiet]
1084 # Set the new node as the owner of the slot in all the known nodes.
1085 if !o[:cold]
1086 @nodes.each{|n|
1087 next if n.has_flag?("slave")
1088 n.r.cluster("setslot",slot,"node",target.info[:name])
1089 }
1090 end
1091
1092 # Update the node logical config
1093 if o[:update] then
1094 source.info[:slots].delete(slot)
1095 target.info[:slots][slot] = true
1096 end
1097 end
1098
1099 # redis-trib subcommands implementations.
1100
1101 def check_cluster_cmd(argv,opt)
1102 load_cluster_info_from_node(argv[0])
1103 check_cluster
1104 end
1105
1106 def info_cluster_cmd(argv,opt)
1107 load_cluster_info_from_node(argv[0])
1108 show_cluster_info
1109 end
1110
1111 def rebalance_cluster_cmd(argv,opt)
1112 opt = {
1113 'pipeline' => MigrateDefaultPipeline,
1114 'threshold' => RebalanceDefaultThreshold
1115 }.merge(opt)
1116
1117 # Load nodes info before parsing options, otherwise we can't
1118 # handle --weight.
1119 load_cluster_info_from_node(argv[0])
1120
1121 # Options parsing
1122 threshold = opt['threshold'].to_i
1123 autoweights = opt['auto-weights']
1124 weights = {}
1125 opt['weight'].each{|w|
1126 fields = w.split("=")
1127 node = get_node_by_abbreviated_name(fields[0])
1128 if !node || !node.has_flag?("master")
1129 puts "*** No such master node #{fields[0]}"
1130 exit 1
1131 end
1132 weights[node.info[:name]] = fields[1].to_f
1133 } if opt['weight']
1134 useempty = opt['use-empty-masters']
1135
1136 # Assign a weight to each node, and compute the total cluster weight.
1137 total_weight = 0
1138 nodes_involved = 0
1139 @nodes.each{|n|
1140 if n.has_flag?("master")
1141 next if !useempty && n.slots.length == 0
1142 n.info[:w] = weights[n.info[:name]] ? weights[n.info[:name]] : 1
1143 total_weight += n.info[:w]
1144 nodes_involved += 1
1145 end
1146 }
1147
1148 # Check cluster, only proceed if it looks sane.
1149 check_cluster(:quiet => true)
1150 if @errors.length != 0
1151 puts "*** Please fix your cluster problems before rebalancing"
1152 exit 1
1153 end
1154
1155 # Calculate the slots balance for each node. It's the number of
1156 # slots the node should lose (if positive) or gain (if negative)
1157 # in order to be balanced.
1158 threshold = opt['threshold'].to_f
1159 threshold_reached = false
1160 @nodes.each{|n|
1161 if n.has_flag?("master")
1162 next if !n.info[:w]
1163 expected = ((ClusterHashSlots.to_f / total_weight) *
1164 n.info[:w]).to_i
1165 n.info[:balance] = n.slots.length - expected
1166 # Compute the percentage of difference between the
1167 # expected number of slots and the real one, to see
1168 # if it's over the threshold specified by the user.
1169 over_threshold = false
1170 if threshold > 0
1171 if n.slots.length > 0
1172 err_perc = (100-(100.0*expected/n.slots.length)).abs
1173 over_threshold = true if err_perc > threshold
1174 elsif expected > 0
1175 over_threshold = true
1176 end
1177 end
1178 threshold_reached = true if over_threshold
1179 end
1180 }
1181 if !threshold_reached
1182 xputs "*** No rebalancing needed! All nodes are within the #{threshold}% threshold."
1183 return
1184 end
1185
1186 # Only consider nodes we want to change
1187 sn = @nodes.select{|n|
1188 n.has_flag?("master") && n.info[:w]
1189 }
1190
1191 # Because of rounding, it is possible that the balance of all nodes
1192 # summed does not give 0. Make sure that nodes that have to provide
1193 # slots are always matched by nodes receiving slots.
1194 total_balance = sn.map{|x| x.info[:balance]}.reduce{|a,b| a+b}
1195 while total_balance > 0
1196 sn.each{|n|
1197 if n.info[:balance] < 0 && total_balance > 0
1198 n.info[:balance] -= 1
1199 total_balance -= 1
1200 end
1201 }
1202 end
1203
1204 # Sort nodes by their slots balance.
1205 sn = sn.sort{|a,b|
1206 a.info[:balance] <=> b.info[:balance]
1207 }
1208
1209 xputs ">>> Rebalancing across #{nodes_involved} nodes. Total weight = #{total_weight}"
1210
1211 if $verbose
1212 sn.each{|n|
1213 puts "#{n} balance is #{n.info[:balance]} slots"
1214 }
1215 end
1216
1217 # Now we have at the start of the 'sn' array nodes that should get
1218 # slots, at the end nodes that must give slots.
1219 # We take two indexes, one at the start, and one at the end,
1220 # incrementing or decrementing the indexes accordingly til we
1221 # find nodes that need to get/provide slots.
1222 dst_idx = 0
1223 src_idx = sn.length - 1
1224
1225 while dst_idx < src_idx
1226 dst = sn[dst_idx]
1227 src = sn[src_idx]
1228 numslots = [dst.info[:balance],src.info[:balance]].map{|n|
1229 n.abs
1230 }.min
1231
1232 if numslots > 0
1233 puts "Moving #{numslots} slots from #{src} to #{dst}"
1234
1235 # Actaully move the slots.
1236 reshard_table = compute_reshard_table([src],numslots)
1237 if reshard_table.length != numslots
1238 xputs "*** Assertio failed: Reshard table != number of slots"
1239 exit 1
1240 end
1241 if opt['simulate']
1242 print "#"*reshard_table.length
1243 else
1244 reshard_table.each{|e|
1245 move_slot(e[:source],dst,e[:slot],
1246 :quiet=>true,
1247 :dots=>false,
1248 :update=>true,
1249 :pipeline=>opt['pipeline'])
1250 print "#"
1251 STDOUT.flush
1252 }
1253 end
1254 puts
1255 end
1256
1257 # Update nodes balance.
1258 dst.info[:balance] += numslots
1259 src.info[:balance] -= numslots
1260 dst_idx += 1 if dst.info[:balance] == 0
1261 src_idx -= 1 if src.info[:balance] == 0
1262 end
1263 end
1264
1265 def fix_cluster_cmd(argv,opt)
1266 @fix = true
1267 @timeout = opt['timeout'].to_i if opt['timeout']
1268
1269 load_cluster_info_from_node(argv[0])
1270 check_cluster
1271 end
1272
1273 def reshard_cluster_cmd(argv,opt)
1274 opt = {'pipeline' => MigrateDefaultPipeline}.merge(opt)
1275
1276 load_cluster_info_from_node(argv[0])
1277 check_cluster
1278 if @errors.length != 0
1279 puts "*** Please fix your cluster problems before resharding"
1280 exit 1
1281 end
1282
1283 @timeout = opt['timeout'].to_i if opt['timeout'].to_i
1284
1285 # Get number of slots
1286 if opt['slots']
1287 numslots = opt['slots'].to_i
1288 else
1289 numslots = 0
1290 while numslots <= 0 or numslots > ClusterHashSlots
1291 print "How many slots do you want to move (from 1 to #{ClusterHashSlots})? "
1292 numslots = STDIN.gets.to_i
1293 end
1294 end
1295
1296 # Get the target instance
1297 if opt['to']
1298 target = get_node_by_name(opt['to'])
1299 if !target || target.has_flag?("slave")
1300 xputs "*** The specified node is not known or not a master, please retry."
1301 exit 1
1302 end
1303 else
1304 target = nil
1305 while not target
1306 print "What is the receiving node ID? "
1307 target = get_node_by_name(STDIN.gets.chop)
1308 if !target || target.has_flag?("slave")
1309 xputs "*** The specified node is not known or not a master, please retry."
1310 target = nil
1311 end
1312 end
1313 end
1314
1315 # Get the source instances
1316 sources = []
1317 if opt['from']
1318 opt['from'].split(',').each{|node_id|
1319 if node_id == "all"
1320 sources = "all"
1321 break
1322 end
1323 src = get_node_by_name(node_id)
1324 if !src || src.has_flag?("slave")
1325 xputs "*** The specified node is not known or is not a master, please retry."
1326 exit 1
1327 end
1328 sources << src
1329 }
1330 else
1331 xputs "Please enter all the source node IDs."
1332 xputs " Type 'all' to use all the nodes as source nodes for the hash slots."
1333 xputs " Type 'done' once you entered all the source nodes IDs."
1334 while true
1335 print "Source node ##{sources.length+1}:"
1336 line = STDIN.gets.chop
1337 src = get_node_by_name(line)
1338 if line == "done"
1339 break
1340 elsif line == "all"
1341 sources = "all"
1342 break
1343 elsif !src || src.has_flag?("slave")
1344 xputs "*** The specified node is not known or is not a master, please retry."
1345 elsif src.info[:name] == target.info[:name]
1346 xputs "*** It is not possible to use the target node as source node."
1347 else
1348 sources << src
1349 end
1350 end
1351 end
1352
1353 if sources.length == 0
1354 puts "*** No source nodes given, operation aborted"
1355 exit 1
1356 end
1357
1358 # Handle soures == all.
1359 if sources == "all"
1360 sources = []
1361 @nodes.each{|n|
1362 next if n.info[:name] == target.info[:name]
1363 next if n.has_flag?("slave")
1364 sources << n
1365 }
1366 end
1367
1368 # Check if the destination node is the same of any source nodes.
1369 if sources.index(target)
1370 xputs "*** Target node is also listed among the source nodes!"
1371 exit 1
1372 end
1373
1374 puts "\nReady to move #{numslots} slots."
1375 puts " Source nodes:"
1376 sources.each{|s| puts " "+s.info_string}
1377 puts " Destination node:"
1378 puts " #{target.info_string}"
1379 reshard_table = compute_reshard_table(sources,numslots)
1380 puts " Resharding plan:"
1381 show_reshard_table(reshard_table)
1382 if !opt['yes']
1383 print "Do you want to proceed with the proposed reshard plan (yes/no)? "
1384 yesno = STDIN.gets.chop
1385 exit(1) if (yesno != "yes")
1386 end
1387 reshard_table.each{|e|
1388 move_slot(e[:source],target,e[:slot],
1389 :dots=>true,
1390 :pipeline=>opt['pipeline'])
1391 }
1392 end
1393
1394 # This is an helper function for create_cluster_cmd that verifies if
1395 # the number of nodes and the specified replicas have a valid configuration
1396 # where there are at least three master nodes and enough replicas per node.
1397 def check_create_parameters
1398 masters = @nodes.length/(@replicas+1)
1399 if masters < 3
1400 puts "*** ERROR: Invalid configuration for cluster creation."
1401 puts "*** Redis Cluster requires at least 3 master nodes."
1402 puts "*** This is not possible with #{@nodes.length} nodes and #{@replicas} replicas per node."
1403 puts "*** At least #{3*(@replicas+1)} nodes are required."
1404 exit 1
1405 end
1406 end
1407
1408 def create_cluster_cmd(argv,opt)
1409 opt = {'replicas' => 0}.merge(opt)
1410 @replicas = opt['replicas'].to_i
1411
1412 xputs ">>> Creating cluster"
1413 argv[0..-1].each{|n|
1414 node = ClusterNode.new(n)
1415 node.connect(:abort => true)
1416 node.assert_cluster
1417 node.load_info
1418 node.assert_empty
1419 add_node(node)
1420 }
1421 check_create_parameters
1422 xputs ">>> Performing hash slots allocation on #{@nodes.length} nodes..."
1423 alloc_slots
1424 show_nodes
1425 yes_or_die "Can I set the above configuration?"
1426 flush_nodes_config
1427 xputs ">>> Nodes configuration updated"
1428 xputs ">>> Assign a different config epoch to each node"
1429 assign_config_epoch
1430 xputs ">>> Sending CLUSTER MEET messages to join the cluster"
1431 join_cluster
1432 # Give one second for the join to start, in order to avoid that
1433 # wait_cluster_join will find all the nodes agree about the config as
1434 # they are still empty with unassigned slots.
1435 sleep 1
1436 wait_cluster_join
1437 flush_nodes_config # Useful for the replicas
1438 # Reset the node information, so that when the
1439 # final summary is listed in check_cluster about the newly created cluster
1440 # all the nodes would get properly listed as slaves or masters
1441 reset_nodes
1442 load_cluster_info_from_node(argv[0])
1443 check_cluster
1444 end
1445
1446 def addnode_cluster_cmd(argv,opt)
1447 xputs ">>> Adding node #{argv[0]} to cluster #{argv[1]}"
1448
1449 # Check the existing cluster
1450 load_cluster_info_from_node(argv[1])
1451 check_cluster
1452
1453 # If --master-id was specified, try to resolve it now so that we
1454 # abort before starting with the node configuration.
1455 if opt['slave']
1456 if opt['master-id']
1457 master = get_node_by_name(opt['master-id'])
1458 if !master
1459 xputs "[ERR] No such master ID #{opt['master-id']}"
1460 end
1461 else
1462 master = get_master_with_least_replicas
1463 xputs "Automatically selected master #{master}"
1464 end
1465 end
1466
1467 # Add the new node
1468 new = ClusterNode.new(argv[0])
1469 new.connect(:abort => true)
1470 new.assert_cluster
1471 new.load_info
1472 new.assert_empty
1473 first = @nodes.first.info
1474 add_node(new)
1475
1476 # Send CLUSTER MEET command to the new node
1477 xputs ">>> Send CLUSTER MEET to node #{new} to make it join the cluster."
1478 new.r.cluster("meet",first[:host],first[:port])
1479
1480 # Additional configuration is needed if the node is added as
1481 # a slave.
1482 if opt['slave']
1483 wait_cluster_join
1484 xputs ">>> Configure node as replica of #{master}."
1485 new.r.cluster("replicate",master.info[:name])
1486 end
1487 xputs "[OK] New node added correctly."
1488 end
1489
1490 def delnode_cluster_cmd(argv,opt)
1491 id = argv[1].downcase
1492 xputs ">>> Removing node #{id} from cluster #{argv[0]}"
1493
1494 # Load cluster information
1495 load_cluster_info_from_node(argv[0])
1496
1497 # Check if the node exists and is not empty
1498 node = get_node_by_name(id)
1499
1500 if !node
1501 xputs "[ERR] No such node ID #{id}"
1502 exit 1
1503 end
1504
1505 if node.slots.length != 0
1506 xputs "[ERR] Node #{node} is not empty! Reshard data away and try again."
1507 exit 1
1508 end
1509
1510 # Send CLUSTER FORGET to all the nodes but the node to remove
1511 xputs ">>> Sending CLUSTER FORGET messages to the cluster..."
1512 @nodes.each{|n|
1513 next if n == node
1514 if n.info[:replicate] && n.info[:replicate].downcase == id
1515 # Reconfigure the slave to replicate with some other node
1516 master = get_master_with_least_replicas
1517 xputs ">>> #{n} as replica of #{master}"
1518 n.r.cluster("replicate",master.info[:name])
1519 end
1520 n.r.cluster("forget",argv[1])
1521 }
1522
1523 # Finally shutdown the node
1524 xputs ">>> SHUTDOWN the node."
1525 node.r.shutdown
1526 end
1527
1528 def set_timeout_cluster_cmd(argv,opt)
1529 timeout = argv[1].to_i
1530 if timeout < 100
1531 puts "Setting a node timeout of less than 100 milliseconds is a bad idea."
1532 exit 1
1533 end
1534
1535 # Load cluster information
1536 load_cluster_info_from_node(argv[0])
1537 ok_count = 0
1538 err_count = 0
1539
1540 # Send CLUSTER FORGET to all the nodes but the node to remove
1541 xputs ">>> Reconfiguring node timeout in every cluster node..."
1542 @nodes.each{|n|
1543 begin
1544 n.r.config("set","cluster-node-timeout",timeout)
1545 n.r.config("rewrite")
1546 ok_count += 1
1547 xputs "*** New timeout set for #{n}"
1548 rescue => e
1549 puts "ERR setting node-timeot for #{n}: #{e}"
1550 err_count += 1
1551 end
1552 }
1553 xputs ">>> New node timeout set. #{ok_count} OK, #{err_count} ERR."
1554 end
1555
1556 def call_cluster_cmd(argv,opt)
1557 cmd = argv[1..-1]
1558 cmd[0] = cmd[0].upcase
1559
1560 # Load cluster information
1561 load_cluster_info_from_node(argv[0])
1562 xputs ">>> Calling #{cmd.join(" ")}"
1563 @nodes.each{|n|
1564 begin
1565 res = n.r.send(*cmd)
1566 puts "#{n}: #{res}"
1567 rescue => e
1568 puts "#{n}: #{e}"
1569 end
1570 }
1571 end
1572
1573 def import_cluster_cmd(argv,opt)
1574 source_addr = opt['from']
1575 xputs ">>> Importing data from #{source_addr} to cluster #{argv[1]}"
1576 use_copy = opt['copy']
1577 use_replace = opt['replace']
1578
1579 # Check the existing cluster.
1580 load_cluster_info_from_node(argv[0])
1581 check_cluster
1582
1583 # Connect to the source node.
1584 xputs ">>> Connecting to the source Redis instance"
1585 src_host,src_port = source_addr.split(":")
1586 source = Redis.new(:host =>src_host, :port =>src_port)
1587 if source.info['cluster_enabled'].to_i == 1
1588 xputs "[ERR] The source node should not be a cluster node."
1589 end
1590 xputs "*** Importing #{source.dbsize} keys from DB 0"
1591
1592 # Build a slot -> node map
1593 slots = {}
1594 @nodes.each{|n|
1595 n.slots.each{|s,_|
1596 slots[s] = n
1597 }
1598 }
1599
1600 # Use SCAN to iterate over the keys, migrating to the
1601 # right node as needed.
1602 cursor = nil
1603 while cursor != 0
1604 cursor,keys = source.scan(cursor, :count => 1000)
1605 cursor = cursor.to_i
1606 keys.each{|k|
1607 # Migrate keys using the MIGRATE command.
1608 slot = key_to_slot(k)
1609 target = slots[slot]
1610 print "Migrating #{k} to #{target}: "
1611 STDOUT.flush
1612 begin
1613 cmd = ["migrate",target.info[:host],target.info[:port],k,0,@timeout]
1614 cmd << :copy if use_copy
1615 cmd << :replace if use_replace
1616 source.client.call(cmd)
1617 rescue => e
1618 puts e
1619 else
1620 puts "OK"
1621 end
1622 }
1623 end
1624 end
1625
1626 def help_cluster_cmd(argv,opt)
1627 show_help
1628 exit 0
1629 end
1630
1631 # Parse the options for the specific command "cmd".
1632 # Returns an hash populate with option => value pairs, and the index of
1633 # the first non-option argument in ARGV.
1634 def parse_options(cmd)
1635 idx = 1 ; # Current index into ARGV
1636 options={}
1637 while idx < ARGV.length && ARGV[idx][0..1] == '--'
1638 if ARGV[idx][0..1] == "--"
1639 option = ARGV[idx][2..-1]
1640 idx += 1
1641
1642 # --verbose is a global option
1643 if option == "verbose"
1644 $verbose = true
1645 next
1646 end
1647
1648 if ALLOWED_OPTIONS[cmd] == nil || ALLOWED_OPTIONS[cmd][option] == nil
1649 puts "Unknown option '#{option}' for command '#{cmd}'"
1650 exit 1
1651 end
1652 if ALLOWED_OPTIONS[cmd][option] != false
1653 value = ARGV[idx]
1654 idx += 1
1655 else
1656 value = true
1657 end
1658
1659 # If the option is set to [], it's a multiple arguments
1660 # option. We just queue every new value into an array.
1661 if ALLOWED_OPTIONS[cmd][option] == []
1662 options[option] = [] if !options[option]
1663 options[option] << value
1664 else
1665 options[option] = value
1666 end
1667 else
1668 # Remaining arguments are not options.
1669 break
1670 end
1671 end
1672
1673 # Enforce mandatory options
1674 if ALLOWED_OPTIONS[cmd]
1675 ALLOWED_OPTIONS[cmd].each {|option,val|
1676 if !options[option] && val == :required
1677 puts "Option '--#{option}' is required "+ \
1678 "for subcommand '#{cmd}'"
1679 exit 1
1680 end
1681 }
1682 end
1683 return options,idx
1684 end
1685end
1686
1687#################################################################################
1688# Libraries
1689#
1690# We try to don't depend on external libs since this is a critical part
1691# of Redis Cluster.
1692#################################################################################
1693
1694# This is the CRC16 algorithm used by Redis Cluster to hash keys.
1695# Implementation according to CCITT standards.
1696#
1697# This is actually the XMODEM CRC 16 algorithm, using the
1698# following parameters:
1699#
1700# Name : "XMODEM", also known as "ZMODEM", "CRC-16/ACORN"
1701# Width : 16 bit
1702# Poly : 1021 (That is actually x^16 + x^12 + x^5 + 1)
1703# Initialization : 0000
1704# Reflect Input byte : False
1705# Reflect Output CRC : False
1706# Xor constant to output CRC : 0000
1707# Output for "123456789" : 31C3
1708
1709module RedisClusterCRC16
1710 def RedisClusterCRC16.crc16(bytes)
1711 crc = 0
1712 bytes.each_byte{|b|
1713 crc = ((crc<<8) & 0xffff) ^ XMODEMCRC16Lookup[((crc>>8)^b) & 0xff]
1714 }
1715 crc
1716 end
1717
1718private
1719 XMODEMCRC16Lookup = [
1720 0x0000,0x1021,0x2042,0x3063,0x4084,0x50a5,0x60c6,0x70e7,
1721 0x8108,0x9129,0xa14a,0xb16b,0xc18c,0xd1ad,0xe1ce,0xf1ef,
1722 0x1231,0x0210,0x3273,0x2252,0x52b5,0x4294,0x72f7,0x62d6,
1723 0x9339,0x8318,0xb37b,0xa35a,0xd3bd,0xc39c,0xf3ff,0xe3de,
1724 0x2462,0x3443,0x0420,0x1401,0x64e6,0x74c7,0x44a4,0x5485,
1725 0xa56a,0xb54b,0x8528,0x9509,0xe5ee,0xf5cf,0xc5ac,0xd58d,
1726 0x3653,0x2672,0x1611,0x0630,0x76d7,0x66f6,0x5695,0x46b4,
1727 0xb75b,0xa77a,0x9719,0x8738,0xf7df,0xe7fe,0xd79d,0xc7bc,
1728 0x48c4,0x58e5,0x6886,0x78a7,0x0840,0x1861,0x2802,0x3823,
1729 0xc9cc,0xd9ed,0xe98e,0xf9af,0x8948,0x9969,0xa90a,0xb92b,
1730 0x5af5,0x4ad4,0x7ab7,0x6a96,0x1a71,0x0a50,0x3a33,0x2a12,
1731 0xdbfd,0xcbdc,0xfbbf,0xeb9e,0x9b79,0x8b58,0xbb3b,0xab1a,
1732 0x6ca6,0x7c87,0x4ce4,0x5cc5,0x2c22,0x3c03,0x0c60,0x1c41,
1733 0xedae,0xfd8f,0xcdec,0xddcd,0xad2a,0xbd0b,0x8d68,0x9d49,
1734 0x7e97,0x6eb6,0x5ed5,0x4ef4,0x3e13,0x2e32,0x1e51,0x0e70,
1735 0xff9f,0xefbe,0xdfdd,0xcffc,0xbf1b,0xaf3a,0x9f59,0x8f78,
1736 0x9188,0x81a9,0xb1ca,0xa1eb,0xd10c,0xc12d,0xf14e,0xe16f,
1737 0x1080,0x00a1,0x30c2,0x20e3,0x5004,0x4025,0x7046,0x6067,
1738 0x83b9,0x9398,0xa3fb,0xb3da,0xc33d,0xd31c,0xe37f,0xf35e,
1739 0x02b1,0x1290,0x22f3,0x32d2,0x4235,0x5214,0x6277,0x7256,
1740 0xb5ea,0xa5cb,0x95a8,0x8589,0xf56e,0xe54f,0xd52c,0xc50d,
1741 0x34e2,0x24c3,0x14a0,0x0481,0x7466,0x6447,0x5424,0x4405,
1742 0xa7db,0xb7fa,0x8799,0x97b8,0xe75f,0xf77e,0xc71d,0xd73c,
1743 0x26d3,0x36f2,0x0691,0x16b0,0x6657,0x7676,0x4615,0x5634,
1744 0xd94c,0xc96d,0xf90e,0xe92f,0x99c8,0x89e9,0xb98a,0xa9ab,
1745 0x5844,0x4865,0x7806,0x6827,0x18c0,0x08e1,0x3882,0x28a3,
1746 0xcb7d,0xdb5c,0xeb3f,0xfb1e,0x8bf9,0x9bd8,0xabbb,0xbb9a,
1747 0x4a75,0x5a54,0x6a37,0x7a16,0x0af1,0x1ad0,0x2ab3,0x3a92,
1748 0xfd2e,0xed0f,0xdd6c,0xcd4d,0xbdaa,0xad8b,0x9de8,0x8dc9,
1749 0x7c26,0x6c07,0x5c64,0x4c45,0x3ca2,0x2c83,0x1ce0,0x0cc1,
1750 0xef1f,0xff3e,0xcf5d,0xdf7c,0xaf9b,0xbfba,0x8fd9,0x9ff8,
1751 0x6e17,0x7e36,0x4e55,0x5e74,0x2e93,0x3eb2,0x0ed1,0x1ef0
1752 ]
1753end
1754
1755# Turn a key name into the corrisponding Redis Cluster slot.
1756def key_to_slot(key)
1757 # Only hash what is inside {...} if there is such a pattern in the key.
1758 # Note that the specification requires the content that is between
1759 # the first { and the first } after the first {. If we found {} without
1760 # nothing in the middle, the whole key is hashed as usually.
1761 s = key.index "{"
1762 if s
1763 e = key.index "}",s+1
1764 if e && e != s+1
1765 key = key[s+1..e-1]
1766 end
1767 end
1768 RedisClusterCRC16.crc16(key) % 16384
1769end
1770
1771#################################################################################
1772# Definition of commands
1773#################################################################################
1774
1775COMMANDS={
1776 "create" => ["create_cluster_cmd", -2, "host1:port1 ... hostN:portN"],
1777 "check" => ["check_cluster_cmd", 2, "host:port"],
1778 "info" => ["info_cluster_cmd", 2, "host:port"],
1779 "fix" => ["fix_cluster_cmd", 2, "host:port"],
1780 "reshard" => ["reshard_cluster_cmd", 2, "host:port"],
1781 "rebalance" => ["rebalance_cluster_cmd", -2, "host:port"],
1782 "add-node" => ["addnode_cluster_cmd", 3, "new_host:new_port existing_host:existing_port"],
1783 "del-node" => ["delnode_cluster_cmd", 3, "host:port node_id"],
1784 "set-timeout" => ["set_timeout_cluster_cmd", 3, "host:port milliseconds"],
1785 "call" => ["call_cluster_cmd", -3, "host:port command arg arg .. arg"],
1786 "import" => ["import_cluster_cmd", 2, "host:port"],
1787 "help" => ["help_cluster_cmd", 1, "(show this help)"]
1788}
1789
1790ALLOWED_OPTIONS={
1791 "create" => {"replicas" => true},
1792 "add-node" => {"slave" => false, "master-id" => true},
1793 "import" => {"from" => :required, "copy" => false, "replace" => false},
1794 "reshard" => {"from" => true, "to" => true, "slots" => true, "yes" => false, "timeout" => true, "pipeline" => true},
1795 "rebalance" => {"weight" => [], "auto-weights" => false, "use-empty-masters" => false, "timeout" => true, "simulate" => false, "pipeline" => true, "threshold" => true},
1796 "fix" => {"timeout" => MigrateDefaultTimeout},
1797}
1798
1799def show_help
1800 puts "Usage: redis-trib <command> <options> <arguments ...>\n\n"
1801 COMMANDS.each{|k,v|
1802 puts " #{k.ljust(15)} #{v[2]}"
1803 if ALLOWED_OPTIONS[k]
1804 ALLOWED_OPTIONS[k].each{|optname,has_arg|
1805 puts " --#{optname}" + (has_arg ? " <arg>" : "")
1806 }
1807 end
1808 }
1809 puts "\nFor check, fix, reshard, del-node, set-timeout you can specify the host and port of any working node in the cluster.\n"
1810end
1811
1812# Sanity check
1813if ARGV.length == 0
1814 show_help
1815 exit 1
1816end
1817
1818rt = RedisTrib.new
1819cmd_spec = COMMANDS[ARGV[0].downcase]
1820if !cmd_spec
1821 puts "Unknown redis-trib subcommand '#{ARGV[0]}'"
1822 exit 1
1823end
1824
1825# Parse options
1826cmd_options,first_non_option = rt.parse_options(ARGV[0].downcase)
1827rt.check_arity(cmd_spec[1],ARGV.length-(first_non_option-1))
1828
1829# Dispatch
1830rt.send(cmd_spec[0],ARGV[first_non_option..-1],cmd_options)