Distributed Task Management in R/S with CORBA

Duncan Temple Lang

Instead of having perform the task dispatching and harvesting, it is possible to do the same in R or S. Here are a few simple examples that we can build on to provide more automated dispatching.

We start with the simplest case. This involves k identical tasks. We assume that all the available servers are accessible by name in the nested Naming Context ``LME'' within the top-level naming context. Note that to make this easier, we can change the default naming service to the LME naming context

  namingService("LME")
and refer to the objects

The first thing we do is get a list of all the real objects in the naming context. We do this using the function namingServiceContents. This returns a named logical vector in which the elements indicate whether the object is another naming context () or a regular CORBA object (). Hence, we can use the names of the elements which are to identify the real servers.

  
  # switch to the new default naming service context.
  # store the current one so we can switch back.
 old <- namingService("LME")

 tmp <- namingServiceContents()
   # get only the regular objects, not the nested naming contexts.
 servers <- names(tmp[!tmp]) 

In a more advanced setup we may want to use the elements in the nested naming contexts. We can process these recursively.

The Number of Tasks

First, we define the total number of tasks we want performed.
  # the number of tasks, assumed > num servers
 k <- 1000

The Initial Task Loops

Next, we want to dispatch a task to each of the available servers. Then we will wait until one of them completes that task and becomes idle. We can take care of ensuring that we dispatch at most k tasks, and at this point only as many as we have servers.

The key aspect of each dispatch is that we do not block. The .deferred implies this and arranges to store a reference to the CORBA request object in a -level table. Since we may have different groups of remote commands executing at the same time, we use a unique name for each group. This is the "mb" supplied as the name for the list of deferred or background requests.

 numPending <- min(length(servers),k)
 for(i in 1:numPending) {
   .Corba(servers[i], "multiBootstrap", 21, "foo", .deferred="mb")
 }

Waiting for Idle Servers

At this point, each of the servers is executing the task sent to it. We do not want to dispatch another task until one of these becomes idle which happens when it has completed that task. To determine whether it has finished we busily cycle through each of these deferred request references, querying whether it has finished. When we find one that has completed, we the return the value of that task, its index and also the server to which the task had been dispatched.

Unfortunately, this function involves not only identifying the next server, but also returning its value, etc. Hence we call it getNextCompletedTask

 getNextCompletedTask <- function(listName) {
  numPending <- deferredRequestList()listName
  while(T) {
   for(i in 1:numPending) {
     if(getRequestStatus(listName, i) == 1) {
        val <- getRequestValue(listName, i)
        server <- getRequestServer(listName, i)
        removeRequest(listName, i)
        return(list(value=val, index=i, server=server))
     }
   }
  }
 }
By calling this, we can now dispatch a new task to this server and return to waiting for an idle server.

values <- vector("list", 0)
numTasksDispatched <- numPending

while(numTasksDispatched < k) {
  task <- getNextCompletedTask("mb") 
  numTasksCompleted <- numTasksCompleted + 1
  values[ [values(length)+1] ] <- task$value

  .Corba(task$server, "multiBootstrap", 21,"foo", .deferred="mb")
}
And then finally, one must wait for all the pending tasks to be completed.

General Task Management

The sequence of steps in the previous section can be performed in a function, if we assume some simple, but reasonably general, specification of the sub-tasks within the top-level task. The user provides a list of tasks where each task is itself a list containing the name of the CORBA method, and the individual arguments to that operation call. So, in our example, we would have a list of length k in which each element was
  list("multiBootstrap", 21, "foo")

This is the input task list and constitutes the set of all tasks that are to be done by a task manager. The other input the manager needs is the set of servers.

As the server discovers an available, idle server, it dispatches a call based on the next element of the sub-task list.

Example

A simple example of this distribution is to create, say, 3 CORBA servers which implement simple interface Sleep. Each server is told to sleep for a duration specified by the caller. This allows us to simulate different task lengths from the manager.

In §, we create the server method as

setCorbaServer("RSExamples/Sleep",
  named(sleep=function(this, howLong) {
        sleep(howLong)
        return(rnorm(1, mean = howLong))
   }
 ))

We create a server instance and register it with the name, sleeper1, say, in the default naming context.

 .MutableCorbaServer("IDL:RSExamples/Sleep:1.0", NULL,name="sleeper1")
This blocks awaiting calls. We repeat this in 2 other S sessions to create the different servers named sleeper2 and sleeper3.

From the manager's session,

Limitations

Task Argument Types

One of the most severe limitations in this setup is that the arguments to the background CORBA tasks must be simple primitive CORBA objects or CORBA objects residing in another process (be it S or not). The reason for this is simple. A slave will invoke methods on these non-primitive CORBA objects which will be evaluated in the § session. However, since we are never in the CORBA event loop [Well, we are but we cannot guarantee it and nor are we at the appropriate place in the § evaluator] , the request will be unanswered by the host of this target CORBA object (the original argument to the background task). Accordingly, the slave will block waiting for the result.

We may be able to provide a mechanism to get around this limitation. The simplest approach (for the developer of the CORBA chapter) is to have threads in the system - or §. This may happen for within the next 6 months. An alternative is to use a single internal thread (i.e. not accessible to the interactive user) in which we implement the task monitoring. This would signal the §/ evaluator of pending action and evaluate the dispatch-next-task action of the distributed setup above.

Note that this problem is easiest to solve in since we have threads there. Also, we could use the CORBA event channels and provide a significantly better framework in which we do not consume CPU cycles on the manager polling the status of pending tasks, and we also can process intermediate results sent from the slaves.

Fault Tolerance

One of the benefits of CORBA is that we inherit fault-tolerance. By this we mean that we can detect that a task has been either normally or abnormally terminated. The server may have crashed, the machine on which it resides been disconnected from the network, or the computation may have generated an error (such as an invalid function call). However there are some problems in the setup .

This can be fixed if we had real exceptions in the user-level language. At present we can get around it in .