
The Resource framework defines abstractions for the management of various resources (threads, network connections, buffers), allowing the programmer of resource-constrained applications to introduce the most adequate resource management policies. For instance, the application programmer may choose the type of threads to be used, control their activity, use a scheduling policy based on task deadlines, etc. In the same way, it is possible to change the buffer or network connections management policies. In each case, a default policy is provided.
The following sections describe the components of the Jonathan resource framework. We also have included a section on Marshallers and Unmarshallers, which provide tools for message encoding and decoding. These tools are actually part of the Jeremie personality; however, they are presented here for two reasons: they provide a ready illustration of the use of the buffer management interface, and they are widely used in all communication protocols.
The scheduling framework is concerned with activity management. It is composed of the following classes and interfaces.
package org.objectweb.jonathan.apis.resources
package org.objectweb.jonathan.libs.resources
A Job is an abstraction
of an activity; a job is intended to execute a task, which is a
description of the activity. In most practical cases, a task is a
sequential program and the job may be thought of as a thread.
Jobs are created and managed by a
Scheduler, which
acts as a job factory and controls the execution of jobs. The
motivation for the introduction of jobs and schedulers is flexibilty:
this framework allows an application designer to redefine the control of activities and the
scheduling policy. A scheduler has the responsibility to associate a job with some processing
resource (which may be a processor, or more simply a Java thread) when it is entitled to run.
A typical use of jobs is to execute runnable objects (instances of a class
that implements the Runnable interface). Assuming scheduler
designates a Scheduler,
and Task is a runnable class, the following code sequence may be used:
This sequence creates a new job and makes it execute the run()
method of the new object myTask. One may also replace the last
two instructions by:
if one does not want to keep track of the identity of the new job (which the
job may however retrieve by calling scheduler.getCurrent()). Likewise, the execution
of a runnable object can be started within a method of that object, by the instruction:
This allows a runnable object to be (re)started from the outside (if
it is not already active) without explicitly creating a new job, by
calling a method containing the above instruction.
A job is associated with a
Context (a set
of named, typed objects) whose elements
may be set or retrieved by the job, e.g. by calling
The context is created by a
ContextFactory
associated with the scheduler at configuration time. The context
allows specific attributes to be attached to a job.
A scheduler reimplements the activity control methods of the Object
interface (i.e. notify, notifyAll, wait) and the
yield method of the Thread interface.
The current default implementation of the Scheduler
interface is the JScheduler
class, which internally provides a simple (sequential) implementation of
Job as
JJob, an extension of Thread. This
implementation has the same behaviour as the default Java thread scheduling.
JScheduler manages a
pool of threads, whose size is defined at configuration time, and
reallocates these threads when a new job needs to be run (a new
thread is created if the pool is exhausted).
A memory management service should be both flexible and efficient.
In order to achieve these goals, the Jonathan memory management
framework is based on chunks. A chunk represents a part of an array of bytes. Chunks may be linked to form (non contiguous) memory zones. Chunks are mainly used to build messages that may be sent
from an adress space to another. Using chunks avoids unnecessarily copying arrays of bytes, and helps
recovering these arrays without resorting to garbage collection. Chunk factories are the standard mechanism for building chunks; they may be modified to redefine the memory allocation policy.
The memory management framework is composed of the following classes and interfaces.
package org.objectweb.jonathan.apis.resources
package org.objectweb.jonathan.libs.resources
The duplicate() method creates a new chunk that duplicates (part of) the target chunk (for partial duplication, the offset and top of the duplicated region must be be provided as parameters). The release() method releases the chunk, which will be recycled through the chunk factory mechanism (note that its contents may still be accessible in the buffer).
It should be noted that Chunk is a class, not an interface. Its default implementation is not optimized. Memory allocation is delegated to the chunk factories (Section 5.2.3): the implementation of release() simply resets offset and top to zero), and the chunk factories overload this method to reuse released chunks. Chunks should not be used concurrently since access to the fields is not synchronized.
Chunks may be linked to form messages, as shown on Figure 5.1.
public interface Job
public interface Scheduler
public final class JScheduler extends Object
implements Scheduler
public class JSchedulerFactory extends GenericFactory
Task myTask = new Task(...);
Job myJob = scheduler.newJob()
myJob.run(myTask);
scheduler.newJob().run(myTask);
scheduler.newJob().run(this);
scheduler.getCurrent().getContext().addElement(name, type, value, separator);
5.2 Memory Management: Chunks
public interface ChunkFactory
public interface ChunkProvider
public class Chunk
public final class JChunkFactory extends Object implements ChunkFactory
public class JChunkFactoryFactory extends GenericFactory
public final class SimpleChunkFactory extends Object implements ChunkFactory
public class SimpleChunkFactoryFactory extends Object implements Factory
5.2.1 Chunk APIS
The API of the memory management service is composed of the Chunk
class and the ChunkFactory interface.
Chunk provides the following fields:
byte[] data The associated array of bytes Chunk next The next chunk in the chain int offset The index of the first valid (written) byte in this chunk int top The index of the last valid (written) byte in this chunk + 1
|
|
The ChunkFactory interface provides the newChunk() method, that creates a new chunk. If the desired size is not specified, a default size is chosen.
As an example of the use of chunks, consider the ChunkProvider interface, which encapsulates an input stream. It defines two methods: prepare(), which delivers a chunk containing data to read from, and close(), which should be called when the provider is no longer used. A specific implementation is the TcpIpChunkProvider class, which encapsulates a socket input stream. The role of TcpIpChunkProvider in the TCP-IP protocol stack is further described in the communication framework tutorial.
The prepare() method is implemented as follows.
The main data structure is the cache (a chunk), into which input data are read from the socket input stream. Note that the chunk returned by TcpIpChunkProvider is ``aliased'' to the cache (i.e. its data structures point to those of the cache) which avoids data copying (Figure 5.2). The field max points to the end of the current (cache) buffer. The variable to_read is the size of the available data in the input stream (i.e. the number of bytes that may be read before blocking). If there is not enough room in the cache to read these data, the current cache is released and a new cache is created. The data are then read from the input stream. If there is no data to read (or if there is a single byte), and no space is left in the cache, then again the cache is released and a new one is created for further use.
final class TcpIpChunkProvider extends Chunk implements ChunkProvider {
...
TcpIpProtocol.Session session; // Reader associated with the chunk
IpConnection connection; // The network connection
int max;
Chunk cache; // Contains the data of this chunk provider
...
public Chunk prepare() throws JonathanException {
TcpIpProtocol protocol = session.getProtocol();
if (top == offset) { // there is nothing left to read from this message
try {
int to_read = connection.available();
// number of bytes that can be read without blocking
if (to_read > 1) {
if (to_read > max - top) { // not enough room in current cache
cache.release(); // release it and create a new one
cache = protocol.chunk_factory.newChunk(to_read);
data = cache.data; // aliasing this chunk to the cache
offset = cache.offset;
top = cache.top;
max = data.length;
}
connection.receive(this,to_read); // reads to_read bytes into cache
return this; // returns contents of cache
} else { // at most one byte to read
if (max - top == 0) { // no more room
cache.release();
cache = protocol.chunk_factory.newChunk(); // create new cache
data = cache.data; // aliasing this chunk to the cache
offset = cache.offset;
top = cache.top;
max = data.length;
}
connection.receive(this,1); // try to read one byte into cache
return this; // returns contents of cache
}
} catch (IOException e) {
delete();
connection.delete();
connection = null;
throw new JonathanException(e);
}
} else { // top != offset, there are still unread data in this chunk
return this; // return these data
}
}
|
|
The close method is implemented as follows.
The closeNotify method restarts the reader session (i.e. reactivates its thread) to allow it to read the data that remain in the cache.
Other examples of the use of chunks may be found in the description of marshallers (Section 5.3).
Chunks are managed by a chunk factory that implements the ChunkFactory interface. Designers may provide their
own chunk factories implementing specific allocation policies. The Jonathan distribution includes the JChunkFactory class, which
provides two internal optimizations (invisible to the users):
JChunkFactory manages two pools of chunks of different sizes
SmallPoolChunk (size small_size) and BigPoolChunk (size big_size).
SmallPoolChunk and BigPoolChunk both extend a class PoolChunk, which itself
extends Chunk. When a chunk of size s is requested, JChunkFactory
looks up the pool of the next higher size for an available chunk and returns
it. If no available chunk is found, a new chunk of the adequate size
(small or big) is created. If s > big_size, a new ``non-pool''
chunk, of exactly size s, is created. When a PoolChunk is freed,
it is added to the pool corresponding to its size. The number of
preallocated chunks in the two pools, and the two standard sizes, are fixed at configuration time.
The Chunk interface provides a duplicate() method whose standard
implementation copies the data into a new byte array. PoolChunks provides
an optimized duplicate() method which avoids copying, by aliasing the
data field of the duplicate to the data field of the original chunk (the target).
Thus several chunks may share the same data, each managing its own offset,
top and next fields (Figure 5.3.
public void close() {
if (cache != null) {
session.closeNotify(this);
}
}
5.2.3 Chunk Factories Implementation
|
|
A replication counter (initialized to 1 at creation) is maintained by the target PoolChunk. It is incremented when a new duplicate is created, and decremented by each release() operation. The chunk is freed when the counter returns to 0.
There are two aspects to marshalling: collecting the data, which are not necessarily stored in contiguous memory locations; performing the actual conversion. Likewise, unmarshalling entails both a conversion and a possible reorganization in storage.
The marshalling framework is composed of the following classes and interfaces.
package org.objectweb.jonathan.apis.presentation
package org.objectweb.jeremie.apis.presentation
package org.objectweb.jeremie.libs.presentation.std
Jonathan provides the following interfaces:
In addition, marshallers and unmarshallers may be used to encapsulate, respectively, an ObjectOutputStream and an ObjectInputStream,
in order to send and receive Serializable and Externalizable objects. The methods provided are writeValue, writeMethod,
writeParameters, etc. This use is specific to stubs, and is examined in the chapters on the Jeremie and David personalities.
In order to use a marshaller, one needs to have a marshaller factory, which should be defined in the configuration phase (see configuration tutorial).
A common use case is answering a message. Typically, an object representing a server contains a method
reply(UnMarshaller request, Session_High sender), in which request is a request received from a client and sender is a session
which represents that client (i.e. reply messages to the client should be sent back to sender). The method reply is organized as follows,
assuming the request contains an integer followed by a string and the reply consists of a string.
A slightly more elaborate use of marshallers is illustrated by the internal working of a protocol, in which a header is prefixed to an incoming message
(this function is performed, for instance, by the prepare() method of Session_High).
This use takes advantage of the internal structure of a marshaller as a chain of chunks. The method send takes an incoming message (in the form
of a marshaller) and sends it to a reply destination after having added a header (a string).
The method m.getState() returns the first chunk of a marshaller m. The method m.write(Chunk c) appends the chunk c to m.
Thus, in the example, the header is prefixed to the incoming message without any string copying (Figure 5.4).
public interface Marshaller
public interface MarshallerFactory
public interface UnMarshaller
public interface JRMIStream
public interface JRMIMarshaller extends Marshaller
public interface JRMIUnMarshaller extends UnMarshaller
public class StdMarshallerFactory
public class StdMarshallerFactoryFactory
5.3.1 Using Marshallers
public void reply(UnMarshaller request, Session_High sender) {
try {
int inputInt = request.readInt();
String inputString = request.readString8();
// now comes the specific function of the server
String outputString = <some function of inputInt and inputString>
request.close();
// marshaller_factory has been created at configuration
Marshaller reply = marshaller_factory.newMarshaller();
sender.prepare(reply); // inserts protocol-specific header
reply.writeString8(outputString);
// send the message down the protocol stack
sender.send(reply);
} catch (JonathanException e) {
System.out.println("Exception");
e.printStackTrace();
}
}
public void send(Marshaller incoming_message, Session_High sender) {
Marshaller new_message = marshaller_factory.newMarshaller();
Chunk incomingState = incoming_message.getState();
// gets first chunk of message
new_message.writeString8(header);
new_message.write(incomingState); // chains incomingState to new_message
sender.send(new_message); // sends message down the protocol stack
incoming_message.reset(); // since original chunks are reused
}
|
|
A marshaller is represented by a chain of chunks. As shown on Figure 5.5, there are four (protected) fields:
| Chunk | first | points to the first chunk of the chain |
| Chunk | current | points to the chunk currently being filled |
| int | offset | points to the first available space in the current chunk |
| int | top | points to the last available space in the current chunk + 1 |
In addition, a field called os of type JRMIOuputStream points to the encapsulated ObjectOutputStream (to be used in stubs, not presented here).
Marshalling a value consists in writing the value into the current chunk. If the available space is exhausted (i.e. offset > = top), a method prepare() is called. Its effect is to allocate a new chunk, which is chained to current and becomes the new current. After use, a marshaller should be closed, which releases all its chunks.
|
|
An unmarshaller has a similar structure, i.e. it encapsulates a chain of chunks and an ObjectInputStream. The interpretation of some data (e.g. integers, floats, etc.) depends on whether they have been written using the ``little endian'' or the ``big endian'' order. This is specified by a boolean little_endian which must be set according to the convention in use (default value is true). A marshaller may be associated with a chunk provider, in which case the implementation of the prepare() and close() methods is different (prepare calls the prepare() method of the chunk provider, and close must also close the chunk provider).
Recall (see the communication framework tutorial) that the main abstraction for communication is a session, which represents a communication channel. Sessions are created by protocols and may be organized in a stack or an acyclic graph reflecting the protocol hierarchy. At the lowest level, a session relies on a basic communication mechanism called a connection. A connection encapsulates the low-level communication mechanism provided by the underlying layers. In the current implementation, a connection encapsulates a socket.
The connection framework is in organized in two levels. At the upper level, a connection manager provides connections (i.e. client sockets) and server connection factories (i.e. server sockets) to client and server sessions, respectively. To do this, it manages a pool of server connection factories and a pool of connections. If the requested resource is not available in the pool, it creates a new one, by calling a connection factory at the lower level. This organization separates the functions of connection management and connection creation and implementation, allowing the latter to be easily changed.
The connection framework relies on the following interfaces and classes.
package org.objectweb.jonathan.apis.protocols.ip
package org.objectweb.jonathan.libs.resources.tcpip
The JConnectionMgr class provides the following methods.
JConnectionMgr includes a class JConnectionMgr.Connection which implements a TCP/IP connection. The implementation of this class is
delegated to the underlying factory.
The IPv4ConnectionFactory class provides an implementation of connections,
using sockets.
The (internal) class Connection implements the interface IpConnection and
encapsulates a java.net.Socket. An instance of Connection is associated with a session. The main methods provided are emit(Chunk c) and
receive(Chunk c, int sz), which are implemented by calling write and read, respectively, on the java.io.OutputStream and
java.io.InputStream associated with the socket, as shown below.
These methods illustrate the use of chunks. In the case of receive, data are read as available, until either the specified amount has been
read or end of file is encountered.
The newCltConnection(String host, int port, IpSession session method is implemented by creating a new java.net.Socket; the
constructor calls connect to the given host and port. The method returns a Connection encapsulating the new socket and
associated with the provided session.
The (internal) class SrvConnectionFactory implements the interface
TcpIpSrvConnectionFactory and encapsulates a java.net.ServerSocket.
The main method is newSrvConnection(IpSession session), which essentially performs an accept on the server socket. It returns an instance of
Connection, associated with the provided session, and encapsulating the socket created by accept.
public interface IpConnection
public interface
TcpIpSrvConnectionFactory
public interface TcpIpConnectionMgr
public interface UdpConnectionMgr
public class IPv4ConnectionFactory extends Object implements TcpIpConnectionMgr
public class IPv4ConnectionFactoryFactory extends GenericFactory
public class JConnectionMgr extends Object implements TcpIpConnectionMgr
public class JConnectionMgrFactory extends GenericFactory
5.4.1 Connection Management
5.4.2 Connection Implementation
...
is = socket.getInputStream();
os = socket.getOutputStream();
...
public void emit(Chunk c) throws IOException {
synchronized (os) {
os.write(c.data,c.offset,c.top);
}
}
public void receive(Chunk c,int sz) throws IOException {
byte[] data = c.data;
int top = c.top;
while (sz > 0) {
int a = is.read(data,top,sz); // actual number of bytes read
if (a > 0) {
top += a; sz -= a;
} else {
throw new EOFException();
}
}
c.top = top;
}
|
|
The delegation relationships between the JConnectionMgr and IPv4ConnectionFactory classes are summarized on Figure 5.6. Here ``delegation'' means the following: a method call in JConnectionMgr is implemented by a call to a method of the same name in IPv4ConnectionFactory; (some) method calls in an internal class of JConnectionMgr are implemented by corresponding method calls of an internal class of the same name in IPv4ConnectionFactory. This organization shows the location of the code sequences to be modified if (part of) the lower layer of the connection framework should be reimplemented.
Acknowledgments. Nicolas Rivierre (France Telecom R&D) provided useful material on the memory management framework .