One of These Days

Ruby Day 8 Multi Threading Synchronisation And Buffers

02 Jul 2010

Now that you’ve seen the HTTP protocol in action, we can move on to a more advanced version. We will turning the server into a multi-threaded one, allowing it to handle multiple requests simultaneously. Previously, once we had accepted a socket we had to finish processing it until we could accept another one. This meant that if another person tried to connect while it was processing an earlier request, it would have to wait. By sending each request to a separate thread, more than one can be processed at a time resulting in a more responsive server for everyone.

The approach we will take to implement this is to use so-called circular buffer. It is essentially a ring with a start and an end. Items go into the end, and come out the front. As we add items, we move the back along to an empty slot. When we take items out, we also move the front along to an empty slot. When the front is the same as the back, we know the buffer is either completely full or completely empty. Lastly, the buffer will have a fixed capacity so that when the front is moved to a position greater than the capacity, it wraps back around and starts from 0.

When we accept a connection the returned socket will be placed into the buffer. A group of threads will continually be checking the buffer for new sockets, and when one is found a thread will take it and process it. This process is useful because it allows us to completely separate our request handling and response generation. We will call these threads ‘Worker’ threads, and the collection of them will be called a thread pool. Thus, a new request will be placed into the buffer and processed by a worker from the thread pool.

There is a slight caveat to working with threads that use any form of shared data structure (each thread has access to the same circular buffer). A thread can be interrupted at any time by the scheduler and so a number of problems could ensue. For example, let’s say we have a method called ‘put’, which takes a given object and stores it, and then updates the count of the number of stored objects. If the thread is interrupted in between the storing of the object and the updating of the count, the data structure is no longer consistent with itself. If there were no elements added before ‘put’ was called, the buffer will still say it is empty even though one object has been added. What’s worse is that these problems are wholly unpredictable, so we must find a way to ensure these problems can’t occur. We need a way of guaranteeing that once ‘put’ is called, no one else will access the buffer until the method has finished.

Ruby provides a class called ‘Monitor’ that will allow us to achieve this by letting us create ‘synchronised’ blocks of code. A synchronised block is one that can only be accessed by one thread at a time. A thread can get a lock on the method, and no other thread will have access to it until the lock is released. Let’s go over the logic we will need to implement for our buffer.

We need to put items in, and get items out. The buffer will have a fixed size, and when it is full we should not be able to put anything more in. When it is empty, we should not be able to get anything out. If we want to get something while it is empty, it should pause until something has been put in, and then return it. If we want to put in something while it is full, it should pause until something has been removed.

Let’s look at the implementation of this, then go over it.

require 'monitor'

class SynchronisedBuffer < Monitor

We are extending the Monitor class, which gives us access to ‘synchronize’ and the wait/signal methods. Next, the buffer is initialised

   def initialize(capacity)
      @capacity = capacity
      @front = 0
      @back = 0
      @elements = Array.new(capacity)  
      @empty_cond = new_cond
      @full_cond = new_cond
      super()
   end

After setting capacity, front position, back position, and creating a new array to hold our elements, we come to our first bit of the Monitor class. ‘new_cond’ is a method that returns a ConditionVariable object. This condition variable is an important construct that ensures the code will only execute when the condition is satisfied. We have a condition that elements can’t be removed when the buffer is empty (empty_cond) and a condition that we can’t add new elements when the buffer is full (full_cond). To have a section of code that is synchronised, we call the ‘synchronize’ method and have the code inside a do-end block. Once inside, we call the ‘wait_while’ method on the empty condition. We are passing in the method ‘empty?’ as our condition, and so the line can be read as ‘have any thread that calls this method wait as long as the buffer is empty’. Alternatively, we could write ‘wait_unless { !empty? }’ which would read ‘let any thread call this method unless the buffer is empty, in which case, make them wait’.

 def get
       synchronize do
           @empty_cond.wait_while {empty?}   
           element = @elements[@front]
           @elements[@front] = nil
           @front = (@front + 1) % @capacity
           @full_cond.signal 
           return element 
        end               
   end

When the buffer is empty, any thread that calls get will be put to sleep. If the buffer isn’t empty, then the element at the front is removed, and the front position is moved along by 1. The line ‘@full_cond.signal’ will wake up a thread that was put to sleep waiting for the full buffer to have some space. We have just removed an item, therefore there is now space to put in a new item, so we can wake up, or ‘signal’ a sleeping thread. ‘put’ is the opposite of get. Threads must wait if the buffer is full, and when there is space, an element is placed at the back of the buffer. Once this is done, ‘@empty_cond.signal’ is called and a thread who was put to sleep waiting for the buffer to have some items placed in it, is woken up. As only one thread is woken up at a time, it functions on a first come first served basis.

def put(element)             
       synchronize do
           @full_cond.wait_while {full?} 
           @elements[@back] = element
           @back = (@back + 1) % @capacity           
           @empty_cond.signal    
        end        
   end

The final two methods we have already seen used above If the front is the same position as the back we know that the buffer is either empty or full. We know it will be empty if the element at this position is nil, and full if the element at this position is not nil. These methods must also be synchronised as otherwise they could be interrupted in-between the two conditions, and a corrupted buffer state could result.

   def full?
     synchronize do
       (@front == @back and @elements[@front] != nil)
     end
   end

   def empty?
       synchronize do
           (@front == @back and @elements[@front] == nil)
        end
   end

end

Sincere thanks must go to Robert Klemme from the Ruby Forum who helped me work out the bugs in this implementation, as well as Craig Taverner from his blog for introducing the monitor class.

So now we have a synchronised buffer which we can fill with incoming requests. What we need next is a thread pool full of workers who are ready to process the contents of the buffer. The class below should look fairly familiar to what was in the basic web server, with just a few additions.

require 'thread'

class Worker < Thread

    def initialize(buffer)
        super(buffer) { |buffer|
            begin
                loop do
                    socket = buffer.get

                    request = socket.readline

                    validGET = request.match(/GET .* HTTP\/1\.1/)

                    unless (validGET) 
                        socket.puts "HTTP/1.1 400 Bad Request"
                        socket.close
                        next
                    end

                    file = request.split(' ')[1]
                    file = '.' + file

                    unless ( File.exists?(file) ) 
                        socket.puts "HTTP/1.1 404 File Not Found"
                        socket.close
                        next
                    end

                    socket.puts "HTTP/1.1 200 OK\n"
                    socket.puts "Connection: close\n"
                    socket.puts "Content-Type: text/html\n"
                    File.open(file, 'r') { |f| 
                        while (line = f.gets) 
                            socket.puts line
                        end
                    }

                    socket.close
                end
            rescue Exception => e
                $stderr.puts $!.inspect
            end
        }          
    end    
end

The main difference is at the top, and the bottom

class Worker < Thread

    def initialize(buffer)
        super(buffer) { |buffer|
            begin
                loop do
                    socket = buffer.get
          ...
        end
      rescue Exception => e
   				$stderr.puts $!.inspect
      end
    }          
  end    
end

Extending the Thread class means we can treat Worker objects exactly as if they were threads. To do this however, we also need to implemenet the initialize method. When creating a regular thread, the instructions to execute are passed to it as a block

Thread.new { # do something }

Therefore, all we need to do is call ‘super’, as this will call the initialize method on the parent class, Thread. Placing the block after this will mean that it is executed by the Thread class. We want to pass in the buffer for it to use, so this is passed in as an argument and then into the block.

As each thread will be running outside of the main execution thread, we will not be informed of any errors that occur which can make bug finding difficult. If we wrap the code to be executed in Ruby’s equivalent of a try/catch block then we can grab any exceptions thrown and send them to standard error. ‘$stderr.puts $!.inspect’ is a neat shortcut that uses the $! global variable. This contains the most recently thrown exception, and so we are calling inspect on this and sending it to stderr.

Lastly, instead of executing the processing code once, we are looping infinitely. We can to continually check the buffer for new sockets with ‘buffer.get’. When the buffer is empty, ‘buffer.get’ will put the worker to sleep, and it will be woken up eventually when a new socket is placed into the buffer.

Last but not least, we need to create a number of workers, and set up our buffer.

server = TCPServer.new('127.0.0.1', '8080')

We are creating our buffer with a capacity of 100. This means that we can have 100 requests queued up until we have to start waiting for the buffer to be emptied. We are also creating 40 workers, and naming them so we can tell them apart. Lastly, we have our familiar ‘while socket = server.accept’ loop but instead of doing any processing, we simply put it into the buffer.

buffer = SynchronisedBuffer.new(100)

workers = []

for i in (1..40)
   workers[i] = Worker.new(buffer)
   workers[i][:name] = 'worker' + i.to_s  
end

while socket = server.accept
    buffer.put(socket)
end

All things going well, your web server should now be capable of handling many simultaneous requests. Leave a comment, email me, send me a tweet or a message on Facebook with any questions, criticisms or comments.