Looping with Fibers
An overview of how Fibers work in Ruby
Fibers are code blocks that can be paused and resumed. They are unlike threads because they never run concurrently. The programmer is in complete control of when a fiber is run. Because of this we can create two fibers and pass control between them.
Control is passed to a fiber when you call Fiber#resume, the Fiber returns control by calling Fiber.yield
fiber = Fiber.new do
Fiber.yield 'one'
Fiber.yield 'two'
end
puts fiber.resume
#=> one
puts fiber.resume
#=> two
The above example shows the most common use case where Fiber.yield is passed an argument which is returned through Fiber#resume. What’s interesting is that you can pass an argument into the fiber via Fiber#resume as well. The first call to Fiber#resume starts the fiber and that argument goes to the block that creates the fiber, all subsequent calls to Fiber#resume have their arguments passed to Fiber.yield.
fiber = Fiber.new do |arg|
puts arg # prints 'one'
puts Fiber.yield('two') # prints 'three'
puts Fiber.yield('four') # prints 'five'
end
puts fiber.resume('one') # prints 'two'
#=> one
#=> two
puts fiber.resume('three') # prints 'four'
#=> three
#=> four
puts fiber.resume('five') # prints nil because there's no corresponding yield and the fiber exits
#=> nil
Armed with this information, we can setup two fibers and get them to communicate between each other.
require 'fiber'
fiber2 = nil
fiber1 = Fiber.new do
puts fiber2.resume # start fiber2 and print first result (1)
puts fiber2.resume 2 # send second number and print second result (3)
fiber2.resume 4 # send forth number, print nothing and exit
end
fiber2 = Fiber.new do
puts Fiber.yield 1 # send first number and print returned result (2)
puts Fiber.yield 3 # send third number, print returned result (4) and exit
end
fiber1.resume # start fiber1
#=> 1
#=> 2
#=> 3
#=> 4
puts "fiber1 done" unless fiber1.alive?
#=> fiber1 done
puts "fiber2 done" unless fiber2.alive?
#=> fiber2 done
EachGroup module
Knowing we can send information between two fibers with alternating calls of Fiber#resume and Fiber.yield, we have the building blocks to tackle a streaming #each_group method. Tip: The fiber you first call #resume on should always call #resume on the fiber it is communicating with. The other thread then always calls Fiber.yield. This goes against the natural inclination to pass information with Fiber.yield as in the first example above. Because of how the two fibers are setup below, you’ll see that no information is passed with Fiber.yield, information is only passed using Fiber#resume —confusing, I know.
# -*- coding: utf-8 -*-
require 'fiber'
module EachGroup
def each_group(*fields, &block)
grouper = Grouper.new(*fields, &block)
loop_fiber = Fiber.new do
each do |result|
grouper.process_result(result)
end
end
loop_fiber.resume
end
class Grouper
def initialize(*fields, &block)
@current_group = nil
@fields = fields
@block = block
end
attr_reader :fields, :block
attr_accessor :current_group
def process_result(result)
group_fiber = get_group_fiber(result)
group_fiber.resume(result) if group_fiber.alive?
end
private
def get_group_fiber(result)
group_value = fields.map{|f| result.public_send(f) }
unless current_group == group_value
self.current_group = group_value
create_group_fiber(result, group_value)
end
@group_fiber
end
def create_group_fiber(result, group_value)
@group_fiber = Fiber.new do |first_result|
group = Group.new(group_value)
block.call(group)
end
@group_fiber.resume(nil) # Start the fiber and wait for its first yield
end
end
class Group
def initialize(value)
@value = value
end
attr_reader :value
def each(&block)
while result = Fiber.yield
block.call(result)
end
end
end
end
Example Usage
#each_group requires input sorted for grouping.
require 'each_group'
require 'ostruct'
Array.send(:include, EachGroup)
array = [
OpenStruct.new(year: 2014, month: 1, date: 1),
OpenStruct.new(year: 2014, month: 1, date: 3),
OpenStruct.new(year: 2014, month: 2, date: 5),
OpenStruct.new(year: 2014, month: 2, date: 7),
]
array.each_group(:year, :month) do |group|
puts group.value.inspect
group.each do |obj|
puts " #{obj.date}"
end
end
#=> [2014, 1]
#=> 1
#=> 3
#=> [2014, 2]
#=> 5
#=> 7
This code can be used with ActiveRecord as follows:
ActiveRecord::Relation.send(:include, EachGroup)
Model.order('year, month').each_group do |group|
group.each do
# ...
end
end
I have uploaded a Gist that shows a previous iteration of the EachGroup module using a nested loop which you may find easier to use to understand how the fibers are used to control the flow of the loop.
- The above code with a RSpec spec - https://gist.github.com/andrewtimberlake/9462561
- The original code with nested loops - https://gist.github.com/andrewtimberlake/9462561/f0e88cd310614a34693d57c3fc759f5c78e3a264
Thanks for taking the time to read through this. Explaining complicated concepts like Fibers is a challenge, please leave a comment and let me know if this was helpful or if you still have any questions.