Chapter 5
The Resource Framework

[pdf version]
This document is made available under the terms of the GNU Free Documentation License as published by the Free Software Foundation (http://www.gnu.org/copyleft/fdl.html)

The Resource framework defines abstractions for the management of various resources (threads, network connections, buffers), allowing the programmer of resource-constrained applications to introduce the most adequate resource management policies. For instance, the application programmer may choose the type of threads to be used, control their activity, use a scheduling policy based on task deadlines, etc. In the same way, it is possible to change the buffer or network connections management policies. In each case, a default policy is provided.

The following sections describe the components of the Jonathan resource framework. We also have included a section on Marshallers and Unmarshallers, which provide tools for message encoding and decoding. These tools are actually part of the Jeremie personality; however, they are presented here for two reasons: they provide a ready illustration of the use of the buffer management interface, and they are widely used in all communication protocols.

5.1  Scheduling: Jobs and Schedulers

The scheduling framework is concerned with activity management. It is composed of the following classes and interfaces.

   package org.objectweb.jonathan.apis.resources

public interface Job
public interface Scheduler

package org.objectweb.jonathan.libs.resources

public final class JScheduler extends Object implements Scheduler
public class JSchedulerFactory extends GenericFactory

A Job is an abstraction of an activity; a job is intended to execute a task, which is a description of the activity. In most practical cases, a task is a sequential program and the job may be thought of as a thread.

Jobs are created and managed by a Scheduler, which acts as a job factory and controls the execution of jobs. The motivation for the introduction of jobs and schedulers is flexibilty: this framework allows an application designer to redefine the control of activities and the scheduling policy. A scheduler has the responsibility to associate a job with some processing resource (which may be a processor, or more simply a Java thread) when it is entitled to run.

A typical use of jobs is to execute runnable objects (instances of a class that implements the Runnable interface). Assuming scheduler designates a Scheduler, and Task is a runnable class, the following code sequence may be used:

    Task myTask = new Task(...);
    Job myJob = scheduler.newJob()
    myJob.run(myTask);

This sequence creates a new job and makes it execute the run() method of the new object myTask. One may also replace the last two instructions by:

    scheduler.newJob().run(myTask);

if one does not want to keep track of the identity of the new job (which the job may however retrieve by calling scheduler.getCurrent()). Likewise, the execution of a runnable object can be started within a method of that object, by the instruction:

    scheduler.newJob().run(this);

This allows a runnable object to be (re)started from the outside (if it is not already active) without explicitly creating a new job, by calling a method containing the above instruction.

A job is associated with a Context (a set of named, typed objects) whose elements may be set or retrieved by the job, e.g. by calling

    scheduler.getCurrent().getContext().addElement(name, type, value, separator);

The context is created by a ContextFactory associated with the scheduler at configuration time. The context allows specific attributes to be attached to a job.

A scheduler reimplements the activity control methods of the Object interface (i.e. notify, notifyAll, wait) and the yield method of the Thread interface.

The current default implementation of the Scheduler interface is the JScheduler class, which internally provides a simple (sequential) implementation of Job as JJob, an extension of Thread. This implementation has the same behaviour as the default Java thread scheduling. JScheduler manages a pool of threads, whose size is defined at configuration time, and reallocates these threads when a new job needs to be run (a new thread is created if the pool is exhausted).

5.2  Memory Management: Chunks

A memory management service should be both flexible and efficient.

  • Flexible: the service should allow memory zones of arbitrary length to be allocated, released, split, merged, and duplicated; it should be possible to customize the allocation policy.
  • Efficient: the service should avoid unnecessary copying, and should minimize the overhead of garbage collection.

In order to achieve these goals, the Jonathan memory management framework is based on chunks. A chunk represents a part of an array of bytes. Chunks may be linked to form (non contiguous) memory zones. Chunks are mainly used to build messages that may be sent from an adress space to another. Using chunks avoids unnecessarily copying arrays of bytes, and helps recovering these arrays without resorting to garbage collection. Chunk factories are the standard mechanism for building chunks; they may be modified to redefine the memory allocation policy.

The memory management framework is composed of the following classes and interfaces.

   package org.objectweb.jonathan.apis.resources

public interface ChunkFactory
public interface ChunkProvider
public class Chunk

package org.objectweb.jonathan.libs.resources

public final class JChunkFactory extends Object implements ChunkFactory
public class JChunkFactoryFactory extends GenericFactory
public final class SimpleChunkFactory extends Object implements ChunkFactory
public class SimpleChunkFactoryFactory extends Object implements Factory

5.2.1  Chunk APIS

The API of the memory management service is composed of the Chunk class and the ChunkFactory interface. Chunk provides the following fields:

  
byte[]dataThe associated array of bytes
ChunknextThe next chunk in the chain
intoffsetThe index of the first valid (written) byte in this chunk
inttopThe index of the last valid (written) byte in this chunk + 1

  

The duplicate() method creates a new chunk that duplicates (part of) the target chunk (for partial duplication, the offset and top of the duplicated region must be be provided as parameters). The release() method releases the chunk, which will be recycled through the chunk factory mechanism (note that its contents may still be accessible in the buffer).

It should be noted that Chunk is a class, not an interface. Its default implementation is not optimized. Memory allocation is delegated to the chunk factories (Section 5.2.3): the implementation of release() simply resets offset and top to zero), and the chunk factories overload this method to reuse released chunks. Chunks should not be used concurrently since access to the fields is not synchronized.

Chunks may be linked to form messages, as shown on Figure 5.1.

Figures/chunks.gif
Figure 5.1: Chunks

The ChunkFactory interface provides the newChunk() method, that creates a new chunk. If the desired size is not specified, a default size is chosen.

5.2.2  Using Chunks

As an example of the use of chunks, consider the ChunkProvider interface, which encapsulates an input stream. It defines two methods: prepare(), which delivers a chunk containing data to read from, and close(), which should be called when the provider is no longer used. A specific implementation is the TcpIpChunkProvider class, which encapsulates a socket input stream. The role of TcpIpChunkProvider in the TCP-IP protocol stack is further described in the communication framework tutorial.

The prepare() method is implemented as follows.

final class TcpIpChunkProvider extends Chunk implements ChunkProvider  {
   ...
   TcpIpProtocol.Session session; // Reader associated with the chunk 
   IpConnection connection;       // The network connection 
   int max;
   Chunk cache;                   // Contains the data of this chunk provider
   ...
   public Chunk prepare() throws JonathanException {
      TcpIpProtocol protocol = session.getProtocol();
      if (top == offset) {        // there is nothing left to read from this message
         try {
            int to_read = connection.available();
                                  // number of bytes that can be read without blocking
            if (to_read > 1) {
               if (to_read > max - top) {         // not enough room in current cache
                  cache.release();                // release it and create a new one
                  cache = protocol.chunk_factory.newChunk(to_read);
                  data = cache.data;              // aliasing this chunk to the cache
                  offset = cache.offset;
                  top = cache.top;
                  max = data.length;
               }
               connection.receive(this,to_read);  // reads to_read bytes into cache
               return this;                       // returns contents of cache
            } else {                              // at most one byte to read
               if (max - top == 0) {              // no more room 
                  cache.release();                
                  cache = protocol.chunk_factory.newChunk(); // create new cache
                  data = cache.data;              // aliasing this chunk to the cache
                  offset = cache.offset;
                  top = cache.top;
                  max = data.length;              
               }
               connection.receive(this,1);        // try to read one byte into cache
               return this;                       // returns contents of cache
            }
         } catch (IOException e) {
            delete();
            connection.delete();
            connection = null;
            throw new JonathanException(e);
         } 
      } else {             // top != offset, there are still unread data in this chunk
         return this;      // return these data
      }
   }

The main data structure is the cache (a chunk), into which input data are read from the socket input stream. Note that the chunk returned by TcpIpChunkProvider is ``aliased'' to the cache (i.e. its data structures point to those of the cache) which avoids data copying (Figure 5.2). The field max points to the end of the current (cache) buffer. The variable to_read is the size of the available data in the input stream (i.e. the number of bytes that may be read before blocking). If there is not enough room in the cache to read these data, the current cache is released and a new cache is created. The data are then read from the input stream. If there is no data to read (or if there is a single byte), and no space is left in the cache, then again the cache is released and a new one is created for further use.

Figures/chunkprovider.gif
Figure 5.2: Implementation of TcpChunkProvider

The close method is implemented as follows.

   public void close() {
      if (cache != null) {
         session.closeNotify(this);
      }
   }

The closeNotify method restarts the reader session (i.e. reactivates its thread) to allow it to read the data that remain in the cache.

Other examples of the use of chunks may be found in the description of marshallers (Section 5.3).

5.2.3  Chunk Factories Implementation

Chunks are managed by a chunk factory that implements the ChunkFactory interface. Designers may provide their own chunk factories implementing specific allocation policies. The Jonathan distribution includes the JChunkFactory class, which provides two internal optimizations (invisible to the users):

  • chunk pooling, which reduces the overhead of memory allocation and collection;
  • efficient chunk duplication, which avoids byte copying.

JChunkFactory manages two pools of chunks of different sizes SmallPoolChunk (size small_size) and BigPoolChunk (size big_size). SmallPoolChunk and BigPoolChunk both extend a class PoolChunk, which itself extends Chunk. When a chunk of size s is requested, JChunkFactory looks up the pool of the next higher size for an available chunk and returns it. If no available chunk is found, a new chunk of the adequate size (small or big) is created. If s > big_size, a new ``non-pool'' chunk, of exactly size s, is created. When a PoolChunk is freed, it is added to the pool corresponding to its size. The number of preallocated chunks in the two pools, and the two standard sizes, are fixed at configuration time.

The Chunk interface provides a duplicate() method whose standard implementation copies the data into a new byte array. PoolChunks provides an optimized duplicate() method which avoids copying, by aliasing the data field of the duplicate to the data field of the original chunk (the target). Thus several chunks may share the same data, each managing its own offset, top and next fields (Figure 5.3.

Figures/duplicate.gif
Figure 5.3: Efficient chunk duplication

A replication counter (initialized to 1 at creation) is maintained by the target PoolChunk. It is incremented when a new duplicate is created, and decremented by each release() operation. The chunk is freed when the counter returns to 0.

5.3  Message Encoding and Decoding: Marshallers and Unmarshallers

In order to be transmitted over a network, data need to be put in a standard form agreed upon by the partners of the communication. Marshalling is the process of converting data to a standard form for transmission; unmarshalling is the reverse process, i.e. converting data back from the standard form.

There are two aspects to marshalling: collecting the data, which are not necessarily stored in contiguous memory locations; performing the actual conversion. Likewise, unmarshalling entails both a conversion and a possible reorganization in storage.

The marshalling framework is composed of the following classes and interfaces.

   package org.objectweb.jonathan.apis.presentation

public interface Marshaller
public interface MarshallerFactory
public interface UnMarshaller

package org.objectweb.jeremie.apis.presentation

public interface JRMIStream
public interface JRMIMarshaller extends Marshaller
public interface JRMIUnMarshaller extends UnMarshaller

package org.objectweb.jeremie.libs.presentation.std

public class StdMarshallerFactory
public class StdMarshallerFactoryFactory

5.3.1  Using Marshallers

Jonathan provides the following interfaces:

  • Marshaller, an abstraction for a message to be sent. The main methods are write < Type > , where < Type > may be Boolean, Int, Long, etc. The underlying structure of a marshaller is a (chain of) chunk(s).
  • UnMarshaller, an abstraction for a message being received. The main methods are read < Type > , where < Type > may again take many forms; these methods deliver data of the corresponding type (e.g. readInt() returns an int, etc.). The underlying data structure of an unmarshaller is again a (chain of) chunk(s); it may also be a ChunkProvider (i.e. the abstraction of an input stream, described in Section 5.2.2).
  • MarshallerFactory, which provides methods to create marshallers and unmarshallers.

In addition, marshallers and unmarshallers may be used to encapsulate, respectively, an ObjectOutputStream and an ObjectInputStream, in order to send and receive Serializable and Externalizable objects. The methods provided are writeValue, writeMethod, writeParameters, etc. This use is specific to stubs, and is examined in the chapters on the Jeremie and David personalities.

In order to use a marshaller, one needs to have a marshaller factory, which should be defined in the configuration phase (see configuration tutorial). A common use case is answering a message. Typically, an object representing a server contains a method reply(UnMarshaller request, Session_High sender), in which request is a request received from a client and sender is a session which represents that client (i.e. reply messages to the client should be sent back to sender). The method reply is organized as follows, assuming the request contains an integer followed by a string and the reply consists of a string.

public void reply(UnMarshaller request, Session_High sender) {
   try {
      int inputInt = request.readInt();
      String inputString = request.readString8();
      // now comes the specific function of the server
      String outputString = <some function of inputInt and inputString>
      request.close();
      // marshaller_factory has been created at configuration
      Marshaller reply = marshaller_factory.newMarshaller();
      sender.prepare(reply);  // inserts protocol-specific header
      reply.writeString8(outputString);
      // send the message down the protocol stack
      sender.send(reply); 
   } catch (JonathanException e) {
      System.out.println("Exception");
      e.printStackTrace();
   }
}

A slightly more elaborate use of marshallers is illustrated by the internal working of a protocol, in which a header is prefixed to an incoming message (this function is performed, for instance, by the prepare() method of Session_High). This use takes advantage of the internal structure of a marshaller as a chain of chunks. The method send takes an incoming message (in the form of a marshaller) and sends it to a reply destination after having added a header (a string).

public void send(Marshaller incoming_message, Session_High sender) {

      Marshaller new_message = marshaller_factory.newMarshaller();
      Chunk incomingState = incoming_message.getState(); 
                                              // gets first chunk of message
      new_message.writeString8(header);

      new_message.write(incomingState);       // chains incomingState to new_message

      sender.send(new_message);               // sends message down the protocol stack      
      incoming_message.reset();               // since original chunks are reused
}

The method m.getState() returns the first chunk of a marshaller m. The method m.write(Chunk c) appends the chunk c to m. Thus, in the example, the header is prefixed to the incoming message without any string copying (Figure 5.4).

Figures/addheader.gif
Figure 5.4: Adding a header to a message

5.3.2  Marshaller Implementation

A marshaller is represented by a chain of chunks. As shown on Figure 5.5, there are four (protected) fields:

  
Chunkfirstpoints to the first chunk of the chain
Chunkcurrentpoints to the chunk currently being filled
intoffsetpoints to the first available space in the current chunk
inttoppoints to the last available space in the current chunk + 1

   In addition, a field called os of type JRMIOuputStream points to the encapsulated ObjectOutputStream (to be used in stubs, not presented here).

Marshalling a value consists in writing the value into the current chunk. If the available space is exhausted (i.e. offset > = top), a method prepare() is called. Its effect is to allocate a new chunk, which is chained to current and becomes the new current. After use, a marshaller should be closed, which releases all its chunks.

Figures/marshallerstruct.gif
Figure 5.5: Internal structure of a marshaller

An unmarshaller has a similar structure, i.e. it encapsulates a chain of chunks and an ObjectInputStream. The interpretation of some data (e.g. integers, floats, etc.) depends on whether they have been written using the ``little endian'' or the ``big endian'' order. This is specified by a boolean little_endian which must be set according to the convention in use (default value is true). A marshaller may be associated with a chunk provider, in which case the implementation of the prepare() and close() methods is different (prepare calls the prepare() method of the chunk provider, and close must also close the chunk provider).

5.4  Connections and Connection Managers

Recall (see the communication framework tutorial) that the main abstraction for communication is a session, which represents a communication channel. Sessions are created by protocols and may be organized in a stack or an acyclic graph reflecting the protocol hierarchy. At the lowest level, a session relies on a basic communication mechanism called a connection. A connection encapsulates the low-level communication mechanism provided by the underlying layers. In the current implementation, a connection encapsulates a socket.

The connection framework is in organized in two levels. At the upper level, a connection manager provides connections (i.e. client sockets) and server connection factories (i.e. server sockets) to client and server sessions, respectively. To do this, it manages a pool of server connection factories and a pool of connections. If the requested resource is not available in the pool, it creates a new one, by calling a connection factory at the lower level. This organization separates the functions of connection management and connection creation and implementation, allowing the latter to be easily changed.

The connection framework relies on the following interfaces and classes.

   package org.objectweb.jonathan.apis.protocols.ip

public interface IpConnection
public interface TcpIpSrvConnectionFactory
public interface TcpIpConnectionMgr
public interface UdpConnectionMgr

package org.objectweb.jonathan.libs.resources.tcpip

public class IPv4ConnectionFactory extends Object implements TcpIpConnectionMgr
public class IPv4ConnectionFactoryFactory extends GenericFactory
public class JConnectionMgr extends Object implements TcpIpConnectionMgr
public class JConnectionMgrFactory extends GenericFactory

5.4.1  Connection Management

The JConnectionMgr class provides the following methods.

  • newSrvConnectionFactory(int port) returns a new server connection factory encapsulating a server socket on the provided port. If port = 0, an anonymous server socket is opened. JConnectionMgr looks up a list of server connection factories, searching for one built on port. If none is found, a new one is created and chained in the list, using the newSrvConnectionFactory method of the underlying factory, i.e. IPv4ConnectionFactory.
  • newCltConnection(String host, int port, IpSession session) returns a client connection encapsulating a socket. JConnectionMgr looks up a table for a connection having the (host, port) destination. If found, it acquires it, i.e. increments its user count to prevent it from being removed, and assigns it to the provided session if needed; if not found, it creates a new connection by calling the newCltConnection method of the underlying factory. When a connection is released by its last user, it is marked as idle, i.e. it becomes eligible be removed from the pool if the maximum number of idle connections is exceeded (a parameter set up at configuration time).
  • getCanonicalHostName(String hostname) Returns the canonical host name of the provided host. This function is again delegated to the underlying factory, which returns the string corresponding to the IP address of the host.

JConnectionMgr includes a class JConnectionMgr.Connection which implements a TCP/IP connection. The implementation of this class is delegated to the underlying factory.

5.4.2  Connection Implementation

The IPv4ConnectionFactory class provides an implementation of connections, using sockets.

The (internal) class Connection implements the interface IpConnection and encapsulates a java.net.Socket. An instance of Connection is associated with a session. The main methods provided are emit(Chunk c) and receive(Chunk c, int sz), which are implemented by calling write and read, respectively, on the java.io.OutputStream and java.io.InputStream associated with the socket, as shown below.

   ...
   is = socket.getInputStream();
   os = socket.getOutputStream();
   ...
   public void emit(Chunk c) throws IOException {
      synchronized (os) {
         os.write(c.data,c.offset,c.top);
      }
   }
      
   public void receive(Chunk c,int sz) throws IOException {
      byte[] data = c.data;
      int top = c.top;
      while (sz > 0) {
         int a = is.read(data,top,sz); // actual number of bytes read
         if (a > 0) {
            top += a; sz -= a;
         } else {
            throw new EOFException();
         }
      }
      c.top = top;
   }   

These methods illustrate the use of chunks. In the case of receive, data are read as available, until either the specified amount has been read or end of file is encountered.

The newCltConnection(String host, int port, IpSession session method is implemented by creating a new java.net.Socket; the constructor calls connect to the given host and port. The method returns a Connection encapsulating the new socket and associated with the provided session.

The (internal) class SrvConnectionFactory implements the interface TcpIpSrvConnectionFactory and encapsulates a java.net.ServerSocket. The main method is newSrvConnection(IpSession session), which essentially performs an accept on the server socket. It returns an instance of Connection, associated with the provided session, and encapsulating the socket created by accept.

Figures/delegation.gif
Figure 5.6: Delegation in the Connection Framework

The delegation relationships between the JConnectionMgr and IPv4ConnectionFactory classes are summarized on Figure 5.6. Here ``delegation'' means the following: a method call in JConnectionMgr is implemented by a call to a method of the same name in IPv4ConnectionFactory; (some) method calls in an internal class of JConnectionMgr are implemented by corresponding method calls of an internal class of the same name in IPv4ConnectionFactory. This organization shows the location of the code sequences to be modified if (part of) the lower layer of the connection framework should be reimplemented.

   Acknowledgments. Nicolas Rivierre (France Telecom R&D) provided useful material on the memory management framework .




File translated from TEX by TTH, version 3.05.
On 10 Jun 2002, 12:42.