Idiots guide to building a compute grid
We all have heard about grid computing. If you have a computer science degree, then you probably have had a paper on this subject and maybe built or worked on a grid in your college. Grid computing can be simply defined as distributing a large task over a large number of computers so that they can all execute the task in parallel and we get the result quickly. And we all know that grids are complicated - you have to create a task, you have to submit it to a master server that sends it to one of the many workers, it collects the results, checks for failures, automatically executes failed jobs, puts all required resources in shared locations, provides a monitoring console etc etc etc. In short, its not simple. It can be built by guru’s and we use it.
From the point of view of a non computer science guy, this picture seems pretty reasonable to me. But is it really the case?
The only way to prove this to myself was to try it - and the following long blog post describes exactly what I did. By the end of this I will show you a simple grid that works and does a task in parallel. Read on to find out more.
Basic components of the grid
Before we begin, lets outline the key basics of a grid that we can read from any of the many books on this topic:
- The workers on the grid each execute a small and definite task that does not depend on other tasks and can be completed by itself
- The entire task is large enough and each individual task takes long enough time that it has to be executed in parallel to achieve best results.
- All the workers have access to all the libraries that are required to complete the task.
- Any input that is required for the task can be received without a contention with any of the other workers.
- There is a master process that sends tasks to each worker and collects the results.
There are a few more, but we will limit ourselves to these key ones.
Choice of technologies to build the grid
def fibonacci(n): if n<=1: return 1 else: return fibonacci(n-1)+fibonacci(n-2)
This code works fast up to n=30, after that it takes a very long time to execute and trying to calculate the sum for anything above 30 is something we can do in parallel. You can try running this code in your Python editor and you can see how long it takes. If you do two calculations for n=40 one after the other then it will take a very long time to execute.
We will build a grid that can take multiple executions of this code and run them in parallel.
The solution architecture
- Start on a given port and register a method which can be invoked remotely
- This method will accept the task and the value of n, and execute the code
- Once the task is completed, it will return the value.
- Connect to a server using a proxy
- Create the task and send it to the server using the proxy
- Get results back from the server
- Submit each task in a thread so that it need not wait for completion before submitting the next task
How do we share the task from client to server?We said we will create a task and send it to the server - but how can we do this? The immediate problems that we can think of are:
- The server should be able to identify and execute the code
- The server should have all the libraries that are required in the task
In python, functions are instances of the FunctionType - and they can be passed around like variables, just like function pointers. So, it is possible to define a function and pass it on to another piece of code which can then execute this function. That is very easy to do when all the code is executing in the same Python shell.
In our case we want to send the code over the network to a server and have the code execute there. So this will require special handling before we can send the code. In order to achieve this, we will use the marshal module in Python. This provides a means to serialize the function into a byte stream. This can then be sent over the network and again reconstructed into a function. As long as the function is self contained and does not have any references to code that is not commonly imported in both the client and server or recursive calls to itself, it can be executed without issues.
If you wanted to write a function to a file, read it back and then execute it - that’s possible too. The following example shows how this can be done.
import marshal,types,base64 def fibonacci(n): if n<=1: return 1 else: return fibonacci(n-1)+fibonacci(n-2) def persist(): code_string=marshal.dumps(fibonacci.func_code) f=open('/tmp/fibonacci','w') f.write(base64.b64encode(code_string)) f.close() def extract_and_execute(n): f=open('/tmp/fibonaaci','r') code_string=base64.b64decode(f.read()) f.close() code=marshal.loads(code_string) func=types.FunctionType(code,globals(),'fibonacci') return func(n) if __name__=='__main__': persist() print extract_and_execute(40)
One additional thing that you might notice in the above code is that we are encoding the result of marshal.dumps using base64 encoding. This is required so that we can write the data to the file and retrieve it back safely without running into encoding issues. We will use the same encoding when sending on the XML RPC request so that we can safely send the code over XML.
Building the server
Lets build the server now. Like I said, we will build this using the XML RPC module in Python. I am listing the code below and will then explain the key components in the code.
import sys,xmlrpclib,base64,types,marshal from SimpleXMLRPCServer import SimpleXMLRPCServer from fibonacci import * def execute(code_string,args): code=marshal.loads(base64.b64decode(code_string)) func=types.FunctionType(code,globals(),'fibonacci') return func(args) server = SimpleXMLRPCServer(("localhost", int(sys.argv))) print "Started server" server.register_function(execute, "execute") server.serve_forever()
What I have done here is put the code for the Fibonacci calculation in a separate module so that it can be imported into the server and the client. The server has one function which takes a string and a parameter. The function unmarshals the string into a function and calls the function using the parameter. The result is returned back.
How we will use this is by sending a serialized version of the fibonaaci method and the value of n as parameters. Because the code is available in the module which is imported on the server, we can make recursive calls without failing.
In order to expose the method, we will register it in the XML RPC server and start the server on a user specified port.
If you start this code in a terminal, you will see something like this
Building the client
The client will be a simple XML RPC client. A very simple client can be built as follows - the code serializes the fibonacci method, and invokes the method over the XML RPC proxy.
import xmlrpclib,base64,types,marshal from fibonaaci import * proxy = xmlrpclib.ServerProxy("http://localhost:8000/") code_string=base64.b64encode(marshal.dumps(fibonaaci.func_code)) print proxy.execute(code_string,20)
The code doesn’t do much - if you start the server in one terminal and run this in another terminal, you will see a log in the server window when the request is received and processed, and you will see the result printed in the client window.
Now, this was no grid - one server received task from one client. To make this more interesting,lets start another server and modify the client to send multiple requests to the server in threads. This is how the client code looks.
import xmlrpclib,base64,types,marshal from fibonaaci import * from threading import Thread class Runner(Thread): def __init__(self,code,proxy,param): Thread.__init__(self) self.code=code self.proxy=proxy self.param=param def run(self): print self.proxy.execute(self.code,self.param) proxy = xmlrpclib.ServerProxy("http://localhost:8000/") proxy1 = xmlrpclib.ServerProxy("http://localhost:8001/") code_string=base64.b64encode(marshal.dumps(fibonaaci.func_code)) r=Runner(code_string,proxy,30) r1=Runner(code_string,proxy1,40) r.start() r1.start()
If you start two servers and run this client code then you will see a log in one of the servers as soon as the calculation for 30 is completed. The code for 40 will take a little longer and you will see the logs.
You can try starting many servers and modifying the client code to send the request to all of them one by one to see how long it takes to execute.
Making the client code more effective
The usual case for a grid computing scenario is that you have a set of grid servers and you submit the tasks to them. Each server is busy for the duration that it is executing the code and then becomes available for more execution. The following code snippet shows how we can modify the client to loop over a set of proxies and continuously submit jobs.
import xmlrpclib,base64,types,marshal from fibonaaci import * from threading import Thread from collections import deque proxy = xmlrpclib.ServerProxy("http://localhost:8000/") proxy1 = xmlrpclib.ServerProxy("http://localhost:8001/") code_string=base64.b64encode(marshal.dumps(fibonaaci.func_code)) servers=deque([proxy,proxy1]) class Runner(Thread): def __init__(self,code,param): Thread.__init__(self) self.code=code self.param=param def run(self): while True: try: proxy=servers.popleft() except: proxy=None while proxy is None: try: proxy=servers.popleft() except: proxy=None print proxy.execute(self.code,self.param) servers.append(proxy) r=Runner(code_string,35) r1=Runner(code_string,35) r.start() r1.start() r.join() r1.join()
What I have shown here is a very simple example. In a larger use case you will have to ensure that all the libraries are deployed and available on all the grid servers. You will want to have an optimum number of servers, and some way to monitor the status of each job. Also, the client code will have to deal with taking a large input set and splitting it across all the tasks.
All that can be built on top of this simple framework and you can have a working grid ready in a matter of days!