模块 TSort

TSort 使用 Tarjan 算法实现强连通分量的拓扑排序。

TSort 旨在能够与任何可以解释为有向图的对象一起使用。

TSort 需要两个方法来将对象解释为图:tsort_each_node 和 tsort_each_child。

节点的相等性由 eql? 和 hash 定义,因为 TSort 内部使用 Hash。

一个简单的例子

以下示例演示如何将 TSort 模块混合到现有类中(在本例中为 Hash)。在这里,我们将哈希中的每个键视为图中的一个节点,因此我们简单地将所需的 tsort_each_node 方法别名为 Hash 的 each_key 方法。对于哈希中的每个键,关联的值是节点子节点的数组。反过来,这种选择导致我们实现了所需的 tsort_each_child 方法,该方法获取子节点数组,然后使用用户提供的块迭代该数组。

require 'tsort'

class Hash
  include TSort
  alias tsort_each_node each_key
  def tsort_each_child(node, &block)
    fetch(node).each(&block)
  end
end

{1=>[2, 3], 2=>[3], 3=>[], 4=>[]}.tsort
#=> [3, 2, 1, 4]

{1=>[2], 2=>[3, 4], 3=>[2], 4=>[]}.strongly_connected_components
#=> [[4], [2, 3], [1]]

一个更实际的例子

一个非常简单的类似“make”的工具可以按如下方式实现

require 'tsort'

class Make
  def initialize
    @dep = {}
    @dep.default = []
  end

  def rule(outputs, inputs=[], &block)
    triple = [outputs, inputs, block]
    outputs.each {|f| @dep[f] = [triple]}
    @dep[triple] = inputs
  end

  def build(target)
    each_strongly_connected_component_from(target) {|ns|
      if ns.length != 1
        fs = ns.delete_if {|n| Array === n}
        raise TSort::Cyclic.new("cyclic dependencies: #{fs.join ', '}")
      end
      n = ns.first
      if Array === n
        outputs, inputs, block = n
        inputs_time = inputs.map {|f| File.mtime f}.max
        begin
          outputs_time = outputs.map {|f| File.mtime f}.min
        rescue Errno::ENOENT
          outputs_time = nil
        end
        if outputs_time == nil ||
           inputs_time != nil && outputs_time <= inputs_time
          sleep 1 if inputs_time != nil && inputs_time.to_i == Time.now.to_i
          block.call
        end
      end
    }
  end

  def tsort_each_child(node, &block)
    @dep[node].each(&block)
  end
  include TSort
end

def command(arg)
  print arg, "\n"
  system arg
end

m = Make.new
m.rule(%w[t1]) { command 'date > t1' }
m.rule(%w[t2]) { command 'date > t2' }
m.rule(%w[t3]) { command 'date > t3' }
m.rule(%w[t4], %w[t1 t3]) { command 'cat t1 t3 > t4' }
m.rule(%w[t5], %w[t4 t2]) { command 'cat t4 t2 > t5' }
m.build('t5')

Bug

  • “tsort.rb”的名称是错误的,因为这个库使用了 Tarjan 算法来处理强连通分量。虽然“strongly_connected_components.rb”是正确的,但太长了。

参考

    1. Tarjan,“深度优先搜索和线性图算法”,

SIAM 计算杂志,第 1 卷,第 2 期,第 146-160 页,1972 年 6 月。

常量

VERSION

公共类方法

each_strongly_connected_component(each_node, each_child) { |nodes| ... } 点击切换源代码

TSort.strongly_connected_components 方法的迭代器版本。

图由 *each_node* 和 *each_child* 表示。*each_node* 应该具有 call 方法,该方法为图中的每个节点生成结果。*each_child* 应该具有 call 方法,该方法接受一个节点参数,并为每个子节点生成结果。

g = {1=>[2, 3], 2=>[4], 3=>[2, 4], 4=>[]}
each_node = lambda {|&b| g.each_key(&b) }
each_child = lambda {|n, &b| g[n].each(&b) }
TSort.each_strongly_connected_component(each_node, each_child) {|scc| p scc }
#=> [4]
#   [2]
#   [3]
#   [1]

g = {1=>[2], 2=>[3, 4], 3=>[2], 4=>[]}
each_node = lambda {|&b| g.each_key(&b) }
each_child = lambda {|n, &b| g[n].each(&b) }
TSort.each_strongly_connected_component(each_node, each_child) {|scc| p scc }
#=> [4]
#   [2, 3]
#   [1]
# File tsort.rb, line 345
def self.each_strongly_connected_component(each_node, each_child) # :yields: nodes
  return to_enum(__method__, each_node, each_child) unless block_given?

  id_map = {}
  stack = []
  each_node.call {|node|
    unless id_map.include? node
      each_strongly_connected_component_from(node, each_child, id_map, stack) {|c|
        yield c
      }
    end
  }
  nil
end
each_strongly_connected_component_from(node, each_child, id_map={}, stack=[]) { |nodes| ... } 点击切换源代码

迭代图中的强连通分量。图由 *node* 和 *each_child* 表示。

*node* 是第一个节点。*each_child* 应该具有 call 方法,该方法接受一个节点参数,并为每个子节点生成结果。

返回值未指定。

TSort.each_strongly_connected_component_from 是一个类方法,它不需要一个包含 TSort 的类来表示一个图。

graph = {1=>[2], 2=>[3, 4], 3=>[2], 4=>[]}
each_child = lambda {|n, &b| graph[n].each(&b) }
TSort.each_strongly_connected_component_from(1, each_child) {|scc|
  p scc
}
#=> [4]
#   [2, 3]
#   [1]
# File tsort.rb, line 411
def self.each_strongly_connected_component_from(node, each_child, id_map={}, stack=[]) # :yields: nodes
  return to_enum(__method__, node, each_child, id_map, stack) unless block_given?

  minimum_id = node_id = id_map[node] = id_map.size
  stack_length = stack.length
  stack << node

  each_child.call(node) {|child|
    if id_map.include? child
      child_id = id_map[child]
      minimum_id = child_id if child_id && child_id < minimum_id
    else
      sub_minimum_id =
        each_strongly_connected_component_from(child, each_child, id_map, stack) {|c|
          yield c
        }
      minimum_id = sub_minimum_id if sub_minimum_id < minimum_id
    end
  }

  if node_id == minimum_id
    component = stack.slice!(stack_length .. -1)
    component.each {|n| id_map[n] = nil}
    yield component
  end

  minimum_id
end
strongly_connected_components(each_node, each_child) 点击切换源代码

返回强连通分量,形式为节点数组的数组。该数组从子节点到父节点排序。该数组的每个元素表示一个强连通分量。

图由 *each_node* 和 *each_child* 表示。*each_node* 应该具有 call 方法,该方法为图中的每个节点生成结果。*each_child* 应该具有 call 方法,该方法接受一个节点参数,并为每个子节点生成结果。

g = {1=>[2, 3], 2=>[4], 3=>[2, 4], 4=>[]}
each_node = lambda {|&b| g.each_key(&b) }
each_child = lambda {|n, &b| g[n].each(&b) }
p TSort.strongly_connected_components(each_node, each_child)
#=> [[4], [2], [3], [1]]

g = {1=>[2], 2=>[3, 4], 3=>[2], 4=>[]}
each_node = lambda {|&b| g.each_key(&b) }
each_child = lambda {|n, &b| g[n].each(&b) }
p TSort.strongly_connected_components(each_node, each_child)
#=> [[4], [2, 3], [1]]
# File tsort.rb, line 283
def self.strongly_connected_components(each_node, each_child)
  each_strongly_connected_component(each_node, each_child).to_a
end
tsort(each_node, each_child) 点击切换源代码

返回一个拓扑排序的节点数组。该数组从子节点到父节点排序,即第一个元素没有子节点,最后一个节点没有父节点。

图由 *each_node* 和 *each_child* 表示。*each_node* 应该具有 call 方法,该方法为图中的每个节点生成结果。*each_child* 应该具有 call 方法,该方法接受一个节点参数,并为每个子节点生成结果。

如果存在循环,则会引发 TSort::Cyclic 异常。

g = {1=>[2, 3], 2=>[4], 3=>[2, 4], 4=>[]}
each_node = lambda {|&b| g.each_key(&b) }
each_child = lambda {|n, &b| g[n].each(&b) }
p TSort.tsort(each_node, each_child) #=> [4, 2, 3, 1]

g = {1=>[2], 2=>[3, 4], 3=>[2], 4=>[]}
each_node = lambda {|&b| g.each_key(&b) }
each_child = lambda {|n, &b| g[n].each(&b) }
p TSort.tsort(each_node, each_child) # raises TSort::Cyclic
# File tsort.rb, line 178
def self.tsort(each_node, each_child)
  tsort_each(each_node, each_child).to_a
end
tsort_each(each_node, each_child) { |node| ... } 点击切换源代码

TSort.tsort 方法的迭代器版本。

图由 *each_node* 和 *each_child* 表示。*each_node* 应该具有 call 方法,该方法为图中的每个节点生成结果。*each_child* 应该具有 call 方法,该方法接受一个节点参数,并为每个子节点生成结果。

g = {1=>[2, 3], 2=>[4], 3=>[2, 4], 4=>[]}
each_node = lambda {|&b| g.each_key(&b) }
each_child = lambda {|n, &b| g[n].each(&b) }
TSort.tsort_each(each_node, each_child) {|n| p n }
#=> 4
#   2
#   3
#   1
# File tsort.rb, line 226
def self.tsort_each(each_node, each_child) # :yields: node
  return to_enum(__method__, each_node, each_child) unless block_given?

  each_strongly_connected_component(each_node, each_child) {|component|
    if component.size == 1
      yield component.first
    else
      raise Cyclic.new("topological sort failed: #{component.inspect}")
    end
  }
end

公共实例方法

each_strongly_connected_component() { |nodes| ... } 点击切换源代码

strongly_connected_components 方法的迭代器版本。obj.each_strongly_connected_component 类似于 obj.strongly_connected_components.each,但在迭代期间修改 *obj* 可能会导致意外结果。

each_strongly_connected_component 返回 nil

class G
  include TSort
  def initialize(g)
    @g = g
  end
  def tsort_each_child(n, &b) @g[n].each(&b) end
  def tsort_each_node(&b) @g.each_key(&b) end
end

graph = G.new({1=>[2, 3], 2=>[4], 3=>[2, 4], 4=>[]})
graph.each_strongly_connected_component {|scc| p scc }
#=> [4]
#   [2]
#   [3]
#   [1]

graph = G.new({1=>[2], 2=>[3, 4], 3=>[2], 4=>[]})
graph.each_strongly_connected_component {|scc| p scc }
#=> [4]
#   [2, 3]
#   [1]
# File tsort.rb, line 316
def each_strongly_connected_component(&block) # :yields: nodes
  each_node = method(:tsort_each_node)
  each_child = method(:tsort_each_child)
  TSort.each_strongly_connected_component(each_node, each_child, &block)
end
each_strongly_connected_component_from(node, id_map={}, stack=[]) { |nodes| ... } 点击切换源代码

迭代从 *node* 可到达的子图中的强连通分量。

返回值未指定。

each_strongly_connected_component_from 不调用 tsort_each_node

class G
  include TSort
  def initialize(g)
    @g = g
  end
  def tsort_each_child(n, &b) @g[n].each(&b) end
  def tsort_each_node(&b) @g.each_key(&b) end
end

graph = G.new({1=>[2, 3], 2=>[4], 3=>[2, 4], 4=>[]})
graph.each_strongly_connected_component_from(2) {|scc| p scc }
#=> [4]
#   [2]

graph = G.new({1=>[2], 2=>[3, 4], 3=>[2], 4=>[]})
graph.each_strongly_connected_component_from(2) {|scc| p scc }
#=> [4]
#   [2, 3]
# File tsort.rb, line 386
def each_strongly_connected_component_from(node, id_map={}, stack=[], &block) # :yields: nodes
  TSort.each_strongly_connected_component_from(node, method(:tsort_each_child), id_map, stack, &block)
end
strongly_connected_components() 点击切换源代码

返回强连通分量,形式为节点数组的数组。该数组从子节点到父节点排序。该数组的每个元素表示一个强连通分量。

class G
  include TSort
  def initialize(g)
    @g = g
  end
  def tsort_each_child(n, &b) @g[n].each(&b) end
  def tsort_each_node(&b) @g.each_key(&b) end
end

graph = G.new({1=>[2, 3], 2=>[4], 3=>[2, 4], 4=>[]})
p graph.strongly_connected_components #=> [[4], [2], [3], [1]]

graph = G.new({1=>[2], 2=>[3, 4], 3=>[2], 4=>[]})
p graph.strongly_connected_components #=> [[4], [2, 3], [1]]
# File tsort.rb, line 257
def strongly_connected_components
  each_node = method(:tsort_each_node)
  each_child = method(:tsort_each_child)
  TSort.strongly_connected_components(each_node, each_child)
end
tsort() 点击切换源代码

返回一个拓扑排序的节点数组。该数组从子节点到父节点排序,即第一个元素没有子节点,最后一个节点没有父节点。

如果存在循环,则会引发 TSort::Cyclic 异常。

class G
  include TSort
  def initialize(g)
    @g = g
  end
  def tsort_each_child(n, &b) @g[n].each(&b) end
  def tsort_each_node(&b) @g.each_key(&b) end
end

graph = G.new({1=>[2, 3], 2=>[4], 3=>[2, 4], 4=>[]})
p graph.tsort #=> [4, 2, 3, 1]

graph = G.new({1=>[2], 2=>[3, 4], 3=>[2], 4=>[]})
p graph.tsort # raises TSort::Cyclic
# File tsort.rb, line 152
def tsort
  each_node = method(:tsort_each_node)
  each_child = method(:tsort_each_child)
  TSort.tsort(each_node, each_child)
end
tsort_each() { |node| ... } 点击切换源代码

tsort 方法的迭代器版本。obj.tsort_each 类似于 obj.tsort.each,但在迭代期间修改 *obj* 可能会导致意外结果。

tsort_each 返回 nil。如果存在循环,则会引发 TSort::Cyclic 异常。

class G
  include TSort
  def initialize(g)
    @g = g
  end
  def tsort_each_child(n, &b) @g[n].each(&b) end
  def tsort_each_node(&b) @g.each_key(&b) end
end

graph = G.new({1=>[2, 3], 2=>[4], 3=>[2, 4], 4=>[]})
graph.tsort_each {|n| p n }
#=> 4
#   2
#   3
#   1
# File tsort.rb, line 205
def tsort_each(&block) # :yields: node
  each_node = method(:tsort_each_node)
  each_child = method(:tsort_each_child)
  TSort.tsort_each(each_node, each_child, &block)
end
tsort_each_child(node) { |child| ... } 点击切换源代码

应该由扩展类实现。

tsort_each_child 用于迭代 *node* 的子节点。

# File tsort.rb, line 452
def tsort_each_child(node) # :yields: child
  raise NotImplementedError.new
end
tsort_each_node() { |node| ... } 点击切换源代码

应该由扩展类实现。

tsort_each_node 用于迭代图中的所有节点。

# File tsort.rb, line 444
def tsort_each_node # :yields: node
  raise NotImplementedError.new
end