Chapter 3
The Communication Framework

[pdf version]
This document is made available under the terms of the GNU Free Documentation License as published by the Free Software Foundation (http://www.gnu.org/copyleft/fdl.html)

3.1  Principles of the Communication Framework

3.1.1  Protocols and Related Notions

A protocol (a basic abstraction for communication) provides a communication service for exchanging messages through a network. A protocol typically works by using the services provided by a lower level protocol, down to the hardwired communication interface. This defines a layered organization such as a protocol stack or acyclic graph. In Jonathan, the basic protocol layer is that provided by the operating system, in the form of Java sockets, and we are not concerned about the lower levels.

The main communication abstraction provided by Jonathan is a session. A session is an object that represents a communication channel. It provides an interface for sending and receiving messages; actually two different interfaces (Session_Low and Session_High) are respectively  provided for incoming and outgoing messages. A protocol is essentially a session manager: it creates sessions, acts as a naming and binding context for these sessions, and provides them with communication resources. Like protocols, sessions are organized in a hierarchy. At the lowest level, a session relies on a basic communication mechanism called a connection, which provides an interface to send and to receive elementary messages (sequences of bytes). For instance, in the TCP-IP protocol suite, a connection provides the IpConnection interface and encapsulates a socket.

The main communication primitives are message sending and receiving. They operate in different ways, due to the asynchronous nature of receiving. A read operation (implemented by a receive() method on a connection) blocks the executing thread until data is available on the input channel associated with the connection. When data becomes available (a message has arrived), the thread is unblocked, causing the message to be passed up the protocol stack by calling the ``lower'' interfaces of the sessions, in ascending order. On the other hand, an application process sends an outgoing message by calling the ``higher'' interface provided by a session. The message is then sent down the protocol stack by calling ``higher'' interfaces in descending order, down to the call of an emit method on the connection. Figure 3.1 gives an overview of this mechanism, which is described in further detail in Section 3.2.

Figures/session-stack.gif
Figure 3.1: Sending and receiving messages

Sessions are set up according to the Jonathan binding framework. On the server side, a protocol graph is first constructed by assembling elementary protocols. The protocol graph is a naming context, which provides the export method. The exported interface (srv_itf) is the ``lower'' interface of a session (of type Session_Low), which provides the functionality of the server. The export method returns a session identifier (a name for the exported interface), which contains all the information needed to set up a communication with the server (e.g.,for TCP/IP, the IP address of the server and a port number). This information may be transmitted over the network and decoded by a client.

In order to be able to access the interface exported by a server, a client must call the bind method provided by a session identifier that designates the server, passing the client application's ``lower'' interface (clt_itf) as a parameter. The session identifier may be obtained from the network (e.g. through a name service), or it may be constructed locally using the server address and port number if these are known. The bind method returns an interface session of type Session_High, which may be used by the client to call the server. Messages from the server are directed to the client application, through the interface clt_itf provided as a parameter of the call to bind.

A general picture of the export-bind mechanism is outlined on Figure 3.2. Many details are omitted; these are provided in Section 3.2.

Figures/export-bind.gif
Figure 3.2: The export-bind pattern for session setup

Actual communication relies on two services: chunks and (un)marshallers, that are provided, respectively, by the Jonathan resource library and the Jeremie presentation library. We describe these services briefly. A full description may be found in the resource framework tutorial.

Typically, marshallers and unmarshallers are used as follows (this is a simplified example).

Sending a message composed of an integer i followed by a 8-byte string str followed by an object obj.

 
   Session_High session ...
   StdMarshallerFactory marshaller_factory ...
   ...
   Marshaller m = marshaller\_factory.newMarshaller();
   marshaller.writeInt(i);
   marshaller.writeString8(str);
   marshaller.writeValue(obj);
   session.send(marshaller);
   ...

Receiving the message sent by the above program sequence; the following sequence is supposed to be part of a method having Unmarshaller unmarshaller as a parameter.

   i=unmarshaller.readInt();}
   str=unmarshaller.writeString8();}
   obj=unmarshaller.readValue();}
   unmarshaller.close();}
   ...

   Marshallers and unmarshallers are created by marshaller factories. A marshaller factory is usually provided in the bootstrap configuration of Jonathan (see the configuration framework tutorial).

3.1.2   The Communication Infrastructure: Java Sockets

The first example (using Java sockets) does not involve Jonathan at all. It illustrates, at a fairly low level, the export-bind pattern of interaction that is further expanded in the following use cases. Consider a server that provides a service to a single client at a time (multiple clients are considered later on). The server selects a port (port 3456 in this example) and creates a server socket associated with that port. It then waits for client connections by calling accept() on the socket. When a client connects to port 3456 (this is done in the Socket constructor), accept() returns a new socket dedicated to exchanges with the client. The original socket remains available for new connections (if we do not create a new thread per client, only one client connection may be opened at a time).

Server

// create a new server socket associated with a specified port
   server_socket = new ServerSocket(3456);        
// wait for client connections:~a ``pseudo-export'' operation
   Socket socket = server_socket.accept();            
// socket is now available for communication with client                                                                         

Client

// connecting to server: a ``pseudo-bind'' operation
   Socket socket = new Socket(hostname, 3456);  
// socket is now available for communication with server

In effect, the accept() call in the server program is equivalent to our export primitive, while the connect() implicitly called in the Socket constructor in the client program is equivalent to our bind primitive.

Note that the binding process always relies on an information shared by the client and the server (here, the hostname and the port number). In the present case, this shared information is hardwired in the code. More elaborate methods are introduced in further examples.

The complete code of an example using this pattern (a simple echo server) may be found in the Sun Java tutorial (http://java.sun.com/docs/books/tutorial/networking/sockets/)

3.2  The TCP-IP Protocol

3.2.1  Overview of the TCP-IP Framework

The TCP-IP framework is composed of the following interfaces and classes (actually the interfaces in the apis.protocols package are common to all communication protocols).

package org.objectweb.jonathan.apis.protocols
public interface Protocol
public interface ProtocolGraph
public interface ReplyInterface
public interface ReplySession
public interface RequestSession
public class ServerException extends JonathanException
public interface SessionIdentifier
public interface Session_High
public interface Session_Low

package org.objectweb.jonathan.libs.protocols.tcpip
final public class TcpIpProtocol implements Protocol
final class TcpIpProtocolGraph implements ProtocolGraph
final class CltSessionIdentifier extends IpSessionIdentifier
final class SrvSessionId extends IpSessionIdentifier
abstract class Session implements IpSession, Runnable
final class CltSession extends Session
final class SrvSession extends Session
final class SrvSessionFactory implements Runnable
final class TcpIpChunkProvider extends Chunk implements ChunkProvider

package org.objectweb.jonathan.apis.protocols.ip
public interface TcpIpSrvConnectionFactory
public interface IpConnection
public interface IpSession extends Session_High
abstract public class IpSessionIdentifier implements SessionIdentifier
public interface TcpIpConnectionMgr
public interface UdpConnectionMgr

package org.objectweb.jonathan.libs.resources.tcpip
public class IPv4ConnectionFactory implements TcpIpConnectionMgr
class SrvConnectionFactory implements TcpIpSrvConnectionFactory
public class JConnectionMgr implements TcpIpConnectionMgr
class Connection implements IpConnection
class SrvConnectionFactory implements TcpIpSrvConnectionFactory

In addition, the TCP-IP framework uses the following interfaces and classes.

package org.objectweb.jonathan.apis.presentation
public class EndOfMessageException extends JonathanException
public class MarshalException extends JonathanException
public interface Marshaller
public interface MarshallerFactory
public class UnMarshalException extends JonathanException
public interface UnMarshaller

package org.objectweb.jeremie.libs.presentation.std
public class StdMarshallerFactory implements MarshallerFactory
public class StdMarshallerFactoryFactory extends GenericFactory

The libs.protocols.tcpip package implements the session level, together with the ``chunk provider'' which allows a session to get input data from a connection. The libs.resources.tcpip package implements the connection level. The session and connection levels are described in the following sections.

3.2.2   The Session Level

Since sessions play a central part in the communication framework, it is important to understand the interplay between sessions at different levels. We illustrate this by the example of TCP-IP (Figure 3.3). The general pattern outlined on this figure applies both on the client and on the server side. The main difference is that the server-side sessions are typically created by export, while the client-side sessions are created by bind.

Figures/general-session.gif
Figure 3.3: Sessions in the TCP-IP Jonathan framework

At the lower level, we have a TcpIp session, which essentially encapsulates a connection to the network. It has two functions:

  • to receive messages from the network and to pass them up to the upper level (``application'') session;
  • to implement an interface (called Session_High) allowing the application session to send messages through the network to its ``sibling'' application session (i.e. client to server and server to client). In this sense, the TcpIp session acts as a surrogate to a (remote) application session.
The TcpIp session has two slightly different forms ( TcpIpProtocol.CltSession and TcpIpProtocol.SrvSession) on the client and server side.

At the upper level, we have an application session, which provides the client or server functionality. The application session

  • sends messages on the network by calling the send method provided by the TcpIp session in its Session_High interface.
  • receives messages from a lower level session through the Session_Low interface that it implements. However, there is no explicit receive operation; instead, the TcpIp session delivers an incoming message to the application session by calling the send method of that session's Session_Low interface.

It is important to emphasize the difference between the Session_High and Session_Low interfaces (especially since both interfaces include a method called send, which may seem confusing at first sight).

  • Session_High is used by the application session to send messages ``downwards''. If lower is a variable that designates a TcpIp session in the application session, lower.send(message) sends a message down to the network (eventually to the remote application session for which the TcpIp session is a surrogate).
  • Session_Low is used by the TcpIp session to send messages ``upwards''. If hls (standing for ``higher level session'') is the variable that designates an application session in the TcpIp session, hls.send(message) sends a (presumably incoming) message up to the application session.

The classes ServerSession and ClientSession that implement the server and client application sessions of the Echo application have the following general outline.

class ServerSession implements Session_Low{

   ServerSession();

   static MarshallerFactory marshaller_factory;
   private int counter;             //internal state of session

   // the server method for accepting requests:
   //    - unmarshaller: the request message
   //    - sender: the local interface to the client

   public void send(UnMarshaller unmarshaller, Session_High sender){
      String theOutput = null;
      String theInput = unmarshaller.readString8();
      theOutput = counter + ":" + theInput;
      unmarshaller.close();
      Marshaller marshaller = marshaller_factory.newMarshaller();
      sender.prepare(marshaller);
      marshaller.writeString8(theOutput);
      sender.send(marshaller);
   }
}


class ClientSession implements Session_Low {

   static MarshallerFactory marshaller_factory;
   BufferedReader reader;           // for terminal input by client
   ClientSession(BufferedReader reader);
   this.reader = reader;}
   
   // the client method for accepting messages from server   
   //    - unmarshaller: the message   
   //    - session: the local interface to the server   

   public void send(UnMarshaller unmarshaller, Session_High session){
      String fromServer,input;
      System.out.print("Client: "); // prompting client   
      System.out.flush();   
      input = reader.readLine();   
      Marshaller marshaller = marshaller_factory.newMarshaller();   
      session.prepare(marshaller);   
      marshaller.writeString8(input);   
      session.send(marshaller);   
      fromServer = unmarshaller.readString8();   
      unmarshaller.close();
   }
}

The actual programs must include, in addition, provision for exception handling and for nice termination of client sessions. The complete programs may be found in Client.java, Server.java (the programs actually contain provision for multiple clients, to be explained later on (cf. Section 3.2.6)).

3.2.3   Setting up sessions

The mechanism for session setup uses the binding framework based on the export-bind pattern.

Both the server and the client start by an initial configuration phase (see the configuration tutorial), and create an instance of TcpIpProtocol. Then each side instantiates a session as follows.

  • On the server side, an instance of ServerSession (the application session) is created. Then, a protocol graph is created with a single node (the instance of TcpIpProtocol). Finally, this graph exports the newly created application session: it creates an instance of SrvSession, with  the ServerSession instance as its higher level session, and  returns a session identifier that designates the exported session. Here is the code sequence that does this:

    Server:
    // configuring the system: creating factories
    // (described in the configuration tutorial)
    // creating a protocol instance (a naming context for sessions)
       TcpIpProtocol protocol = 
          new TcpIpProtocol(<parameters, to be described later>);
    
    // creating and exporting a new session
       SessionIdentifier session_id =
          protocol.newProtocolGraph(port).export (new ServerSession());   
    // if no port specified, selects an unused port
    
    

  • On the client side, a new session identifier (participant) is created to designate the remote server (In this version, we still assume that the name of the server host and the server port are known by the client). An instance of ClientSession (the application session) is created. Finally, the bind method is called on the participant identifier: it creates a new instance of CltSession, with the ClientSession instance as its higher level session, and returns a session identifier that designates the exported session. Here is the code sequence that does this:

    Client:
    // configuring the system: creating factories
    // (described in the configuration tutorial)
    // creating reader, getting server hostname and port
    
    // creating a protocol instance (a naming context for sessions)
       TcpIpProtocol protocol =
          new TcpIpProtocol(<parameters, to be described later>);
    
    // preparing for connection to server
       IpSessionIdentifier participant =
          protocol.newSessionIdentifier(hostname,port) ;
    
    // creating client-side session and connecting to server
       Session_High session = participant.bind (new ClientSession(reader)) ;
    // session is now available for communication with server}
    
    

From this point on, the core of the program runs in the application programs, i.e. the ClientSession and ServerSession classes, as described above.

3.2.4   The Connection Level

The interfaces provided by the session level abstract away (in the send methods) the low-level message transmission mechanism. This mechanism is defined at the connection level and (in the current implementation) relies on two classes: JConnectionMgr defines generic mechanisms for using socket-based connections, and IPv4ConnectionFactory provides a specific implementation of these mechanisms. The main abstraction at this level is the connection (instance of IpConnection), which encapsulates a socket.

A full description of the connection level is given in the resource framework tutorial.

For completeness, we now give a summary explanation of the mechanisms for message input. Recall that CltSession and SrvSession are the client and server incarnations, respectively, of the generic TcpIp session described above (in the code, both classes derive from a common abstract class, TcpIpProtocol.Session). This class extends Runnable, i.e. its instances are executed as independent threads activated by a run() method, which is called when a message is received. This is done through the TcpIpProtocol.TcpChunkProvider class, which encapsulates a socket input stream (through an IpConnection), and delivers messages as ``chunks'' (a Chunk is the abstraction provided by Jonathan to efficiently use data of variable length). This class has two main methods, prepare() and close(), which are respectively called as a prelude and postlude of all input operationsperformed through an Unmarshaller on the input stream. A TcpChunkProvider contains a data cache,which is used as follows.

  • prepare() delivers the contents of the cache (if not empty) and attempts to read further data into the cache from the underlying connection(the input stream);
  • close() is used to close the chunk provider if it is no longer used; if the cache is not empty, the session thread is reactivated, so the session may read the remaining data.

Thus the chunk provider effectively acts as a data pump that injects incoming messages into the TcpIp session, which in turn sends them to the upper level application session.

3.2.5   Putting it all together

We now describe in detail the internal workings of the export-bind operations. An overview of these operations is given in Figure 3.4 which gives a more detailed picture of the process outlined on Figure 3.2.

Figures/session-setup.gif
Figure 3.4: Creating client and server sessions

Calling the export method on ProtocolGraph has the following effect (s1, c1, etc. refer to the tags that designate the server and client operations on Figure 3.4).

  • The newSrvConnectionFactory(port) method is called on JConnectionMgr (s1). This creates a new instance of a TcpIpSrvConnectionFactory (s2), which encapsulates a server socket bound to the port provided as parameter (if 0, an available port is selected).
  • A new instance (session_id) of SrvSessionId is created (s3); it contains the hostname of the server and the port number of the server socket.
  • A new instance of SrvSessionFactory is created (s4); it has references to session_id, to the exported ServerSession and to the TcpIpProtocol.
  • A new thread is started to execute the run() method of the SrvSessionFactory. The first action of this method is to create a new instance of SrvSession (s5).
  • The newConnection method is called on the TcpIpSrvConnectionFactory. This method actually calls an accept() on the underlying server socket (s6). This is a blocking call. The server now waits for a connect() operation from a client.
  • When connect() is called from a client (see client description below, step c4), a new socket is created and connected to the client socket (s7-c6).
  • A new thread is created to execute the run() method of SrvSession (s8). This in turns starts reading messages from the socket, as explained in the description of connections.

Calling the bind method on CltSessionIdentifier has the following effect.

  • A new instance of CltSession is created (c1).
  • The newCltConnection method is called on JConnectionMgr (c2). This creates a new socket (c3), encapsulated in a Connection, an implementation of the IpConnection interface.
  • The socket tries to connect() to the remote server, whose hostname and port number are included in the CltSessionIdentifier (c4).
  • Finally, a new thread is created to execute the run() method of CltSession (c5). This in turns starts reading messages from the socket, as explained in the description of connections.

3.2.6  Serving multiple clients

Two patterns may be used for serving multiple clients, according to whether the server maintains a common state shared by all clients or a distinct state for each client.

Multiple connections with shared state

The mechanism described above allows several clients to connect to a single server, through the connection factory mechanism. If a new client binds to the server, a new connection is created (using the socket accept mechanism), as well as a new  SrvSession instance encapsulating this connection, together with a new thread. However, there is still a unique application session (ServerSession) , whose state is shared between all clients (Figure 3.5). This is illustrated in the example programs by adding state to the application session, in the form of an integer variable counter that is incremented after each client call. Multiple clients see a single instance on this variable. The text of the programs may be found in Client.java, Server.java..

Figures/shared-state.gif
Figure 3.5: Multiple clients sharing a session state

If the application needs a per-client session state, then it is necessary to explicitly manage multiple sessions at the application level. This is done in the following example.

Multiple connections with private state

In this example, each client is associated with a distinct application-level session that maintains the client's own version of the state (in this case, the counter variable). This is achieved, on the server side by an instance of SrvProtocol, which has two functions: it acts as a factory that creates a new instance of the client session, OwnSession, when a new client connects; it acts as a demultiplexer that forwards the incoming messages to the appropriate OwnSession according to the identity of the sender. The factory is implemented as a hashtable that contains an entry per session. The body of the application (in this case, the incrementation of the counter) is implemented by OwnSession.

Figures/private-state.gif
Figure 3.6: Multiple clients, with a server session per client

The client side is identical to that of the previous application. The text of the programs may be found in Client.java, Server.java.

3.3  Other Communication Protocols

In this section, we examine the implementation in Jonathan of two other protocols: the IP Multicast Protocol and the Real Time Protocol (RTP). These implementations conform to the export-bind pattern as decribed in Section 3.1.1, with local variations. Then we present the Event Channel use case, which combines both protocols. In addition, a presentation of the GIOP protocol may be found in the Jonathan binding tutorial.

3.3.1  The IP Multicast Protocol

As defined in IETF RFC 1112, ``IP multicasting is the transmission of an IP datagram to a ``host group'', a set of zero or more hosts identified by a single IP destination address''. IP addresses starting with 1110 (i.e. 224.0.0.1 to 239.255.255.255) are reserved for multicast. Groups are dynamic, i.e. a host may join or leave a group at any time; a host may be a member of several groups. A host need not be a member of a group to send a message to that group.

A particular class of Java sockets, java.net.MulticastSocket, is used in Jonathan as the base layer for implementing IP Multicast. A multicast socket s may subscribe to a host group g by executing s.joinGroup(g) and unsubscribe by executing s.leaveGroup(g). In order to send a message msg to a group g, a datagram must first be created by DatagramPacket d = new DatagramPacket(msg, msg.length, g, port), where port is the port to which socket s is bound. The datagram is then sent by executing s.send(d).

The Jonathan MulticastIpProtocol class manages MulticastIpSession sessions. Each session is dedicated to a (IP multicast address, port) network endpoint. A session may optionally be associated with an upper level session. In that case, it may send and receive messages, and a per session thread is used to wait for incoming messages. Otherwise, the session is only used to send messages.

In the IP Multicast protocol, there are no separate client and server roles; therefore there is no need to separate protocol graphs (which export servers) from session identifiers (which are used by clients to bind to servers). A single data structure, MulticastIpSessionIdentifier, is used for both functions (it implements SessionIdentifier and ProtocolGraph and thus provides both export and bind).

The sessions managed by this protocols are instances of class MulticastIpSession, which essentially provides two methods, send and run. In the current implementation, a single thread is created with each instance and waits on the multicast socket. The constructor is as follows:

  MulticastIpSession(MulticastIpSessionIdentifier sid, 
                Session_Low prev_protocol) throws IOException {
    socket=new MulticastSocket(sid.port());
    socket.joinGroup(sid.InetAddress());
    this.sid=sid;
    this.prev_protocol=prev_protocol;
    if(prev_protocol!=null) {
      reader =new Thread(this);
      cont = true;
      reader.start();
    }
  }

The reader threads executes the run() method which is a loop with the following overall structure (exceptions not shown):

  while (true) {
    socket.receive(packet); \\ a DatagramPacket
    extract message from packet
    send message to upper session, i.e. prev_protocol;
  }

The send(message) method does essentially this:

  encapsulate message into a DatagramPacket packet;
  socket.send(packet);

A MulticastIpSessionIdentifier contains three fields: address, port, and session. As explained above, the export and bind methods are very similar. Both create a new socket with an associated session. They only differ by the type of the returned value (an identifier for export, a session for bind); in addition, export needs to supply an upper level interface.

 118   public SessionIdentifier export(Session_Low hls) throws JonathanException {
 119      if (hls != null) {
 120        try {
 121               session = new MulticastIpSession(this,hls);
 122               return this;
 123        } catch (IOException e) {
 124               throw new ExportException(e);
 125        }
 126      } else {
 127        throw new ExportException("MulticastIpSessionIdentifier: no protocol low interface specified.");
 128      } 
 129   }  

 135   public Session_High bind(Session_Low hls)  throws JonathanException {
 136      try {
 137             return new MulticastIpSession(this,hls);
 138      } catch (IOException e) {
 139             throw new org.objectweb.jonathan.apis.binding.BindException(e);
 140      }
 141   }

As shown above, this creates a multicast socket using the port associated with the identifier, and spawns the reader thread if an upper (receiving) interface (hls) is provided (if not, the socket is only used for sending and does not need a waiting thread).

3.3.2  The RTP Protocol

RTP (Real-time Transport Protocol) is the Internet standard protocol for the transport of real-time data, including audio and video. RTP works on top of an existing transport protocol (usually UDP) and is designed to be independent of that protocol. RTP is composed of a real-time transport protocol (RTP proper), and of an RTP control protocol, RTCP, which monitors the quality of service.

Jonathan provides a partial implementation, RTPProtocol, of the RTP protocol (not including RTCP). RTP packets have a 12 byte header (defined by the RTPHeader class), which includes such information as sequence number and timestamp.

RTPProtocol provides the usual export and bind operations to its users:

  • export(Session_Low hls) is borne by a protocol graph built over the underlying protocol, and is called by a client providing hls interface for receiving messages. It returns a new RTPSessionIdentifier, which identifies an RTP session. It also creates a receiving front end for the hls interface, in the form of a decoder, an instance of the internal class RTPDecoder. The decoder extracts the header from an incoming message and forwards the message to the upper level interface (here hls).
  • bind(Session_Low client) is borne by an RTPSessionIdentifier. Its effect is to bind client to a new RTPDecoder (for receiving messages), and to return a coder implementing the Session_High interface for sending messages. This coder (an instance of the internal class RTPCoder) allows the client to prepare a header to be prepended to an outgoing message, incrementing the sequence number and timestamp as needed.

3.3.3  Use Case: Event Channel

Introduction

An event channel is a communication channel on which two types of entities may be connected: event sources and event consumers. When a source produces an event, in the form of a message, this message is delivered to all the consumers connected to the channel. The channel itself may be regarded as both an event source and consumer: it consumes events from the sources and delivers them to the consumers.

Two communication patterns may be used:

  • the ``push'' pattern, in which events are pushed by the sources into the channel, which in turn pushes them into the consumers;
  • the ``pull'' pattern, in which events are explicitly requested (pulled) by the consumers from the channel, which in turn tries to pull them from a source (this operation may block until events are produced).

In this example, we use the push pattern. The interface provided by the channel to event sources is that of a representative (a proxy) of the interface provided by the event consumers (Figure 3.7). Particular implementations of the event channel may provide specific guarantees for message delivery, in terms of reliability, ordering, or timeliness.

Figures/event-channel.gif
Figure 3.7: An event channel based on the ``push'' pattern

Jonathan provides two implementations of a simple event channel, using David (CORBA) and Jeremie (Java RMI), respectively. Both rely on the same binder and event model; they essentially differ by the communication protocol. This presentation is based on the Jeremie version of the event channel.

The event channel provides the following interface, which defines the methods needed by event sources and consumers to connect to the channel using the ``push'' pattern:

public interface EventChannel extends java.rmi.Remote { // for use by RMI
  void addConsumer(Object consumer)
    throws JonathanException;  // adds a new consumer to the event channel
  void removeConsumer(Object consumer)
    throws JonathanException;  // removes a consumer (will no longer receive events)
  Object getConsumerProxy()
    throws JonathanException;  // returns a consumer proxy to be used by a source
}

Event channels are built by event channel factories. Class EventChannelFactory provides, among others, the following method:

  public EventChannel newEventChannel(String address, int port, String type) 
      throws JonathanException

which creates a new event channel built on the supplied host address and port. It also provides a class that implements the EventChannel interface defined above.

Using an Event Channel

Like in the ``Hello World'' case described in the binding tutorial, a name server (in Jeremie, a registry) is used to register event channels. In this example, an event source creates the channel, registers it under a symbolic name of the form <// < registry host > < channel name > , and starts producing events. A prospective consumer retrieves a channel from the name server using its symbolic name, and subscribes to the channel; it then starts receiving messages transmitted on this channel.

The core of the main method of NewsSource, the program for event sources, is:

  EventChannel channel;
  EventChannelFactory channelFactory = 
     EventChannelFactoryFactory.newEventChannelFactory(NewsSource.class);
  channel=channelFactory.newEventChannel(address,port,"NewsChannel");
  Naming.rebind("//"+args[3]+"/"+args[2], channel); // //<registry host><channel name>
  System.out.println("Ready...");

  NewsTicker ticker=(NewsTicker)channel.getConsumerProxy(); 
  NewsSource producer=new NewsSource(ticker);
  producer.produce();// produce is the event generator method of NewSource class
                     // it includes a call to the NewsTicker method: ticker.latestNews

The core of the main method of NewsConsumer, the program for event consumers, is:

  System.setSecurityManager(new RMISecurityManager());                   
  channelName=args[0];
  EventChannel channel=
     (EventChannel) Naming.lookup("//" + args[1] + "/" + channelName);
  if(channel==null) {
     System.err.println("Channel "+ channelName + " not found");
     System.exit(1);
  }
  channel.addConsumer(new NewsConsumer());

The NewsTicker interface shared by source and consumer classes consists of the method latestNews(String msgs[]) activated when an event is produced. This method is implemented in the NewsConsumer class; it simply displays the incoming messages on the screen:

  public void latestNews(String msgs[]) {
     for(int i=0;i<msgs.length;i++)
      System.out.println(msgs[i]);
  }

Event Channel Implementation

The implementation of the event channel relies on two components that closely interact: the event channel factory, EventChannelFactory, and the event binder, EBinder. The role of the factory is to deliver implementations of the EventChannel interface, both in the form of actual instances and in the form of stubs. The role of the binder is to provide an interface allowing both sources and consumers to connect to an event channel. The current implementation is hardwired to work with the RTP protocol on top of the IP Multicast protocol.

EBinder provides a specific class of identifiers, EIds. An EId designates an event channel built on a particular IP address and port number used by the underlying IP Multicast protocol. The two main methods are getProtocolGraph() and bind(), which are used as follows:

  • An event source that needs to connect to an event channel designated by EId channel_id executes channel_id.bind(), which returns a (Session_High) session on which the source will send events.
  • An event consumer that needs to connect to an event channel designated by EId channel_id executes EBinder.bindConsumer (consumer, channel_id), where consumer is the (Session_Low) interface provided by the consumer to receive events (note that this is a ``push'' interface, including a method send). bindConsumer is implemented as follows:

        ProtocolGraph protocol_graph = channel_id.getProtocolGraph();
        protocol_graph.export(consumer);
    
    

The usual export-bind pattern is again used here; getProtocolGraph() returns the protocol graph of the underlying RTP protocol, itself relying on IP Multicast; bind() returns a stub, created by a stub factory using the underlying RTP session.

An EventChannelFactory is created by an EventChannelFactoryFactory, which associates it with an EBinder. It implements special instances of stubs and skeletons targeted towards one-way invocation, and provides a specific implementation of EventChannel, that works as follows.

An instance of EventChannel is created by the constructor:

289  EventChannelImpl(String address, int port, String type, 
290        EventChannelFactory binder) throws JonathanException  {
291    id=(EBinder.EId)binder.getEBinder().newId(address,port,type);
292    SessionIdentifier ep=id.getSessionIdentifier();
293    Context hints = JContextFactory.instance.newContext();
294    hints.addElement("interface_type",String.class,type,(char) 0);
295    proxy=binder.newStub(ep,new Identifier[] {id}, hints);
296    hints.release();
297    this.binder=binder;
298  }

On line 291, a new EId is created by the EBinder, with the given address and port. On line 292, a new session is associated with this EId, using the underlying RTP and IP Multicast protocols. Finally, a proxy (stub) is created using this session. This proxy is ready to be delivered to any source willing to send events to the channel, through the following method:

328  public Object  getConsumerProxy() throws JonathanException {
329     return proxy;
330  }

A consumer connects to the event channel by calling the following method:

301    public void addConsumer(Object consumer) throws JonathanException {
302       a: if(binder==null) {
303          if (proxy instanceof StdStub) {
304             Identifier[] ids =
305                ((JRMIRef) ((StdStub) proxy).getRef()).getIdentifiers();
306             Object cid;
307             for (int i = 0; i < ids.length; i++) {
308                cid = ids[i];
309                while (cid instanceof Identifier) {
310                   if (cid instanceof EBinder.EId) {
311                      id = (EBinder.EId) cid;
312                      binder = (EventChannelFactory) id.getContext();
313                      break a;
314                   } else {
315                      cid = ((Identifier) cid).resolve();
316                   }     
317                }
318             }
319          }
320          throw new BindException("Unbound channel");
321       }
322       binder.getEBinder().bindConsumer(new OneWaySkeleton(consumer),id);
323    }

The key operation here is on line 322: the consumer builds a skeleton that will act as an event receiver interface for it, then binds that skeleton to the event channel using the bindConsumer method of the EBinder, as described above. Lines 302 to 321 illustrate another (classical) situation in the binding process: if the factory associated with the channel is not known (e.g. because the event channel reference has been sent over the network), the method tries to retrieve it using one of the identifiers associated with the stub, by iteratively resolving the identifier chain until the factory (binder) is found, or until the search fails.

   Acknowledgments. Thanks to Marc Lacoste (France Telecom R&D) for providing useful material on the Jonathan TCP-IP communication framework .




File translated from TEX by TTH, version 3.05.
On 10 Jun 2002, 12:24.