Main Page   Packages   Class Hierarchy   Compound List   File List   Compound Members  

TcpIpProtocol.java

Go to the documentation of this file.
00001 /***
00002  * Jonathan: an Open Distributed Processing Environment 
00003  * Copyright (C) 1999-2000 France Telecom R&D
00004  * 
00005  * This library is free software; you can redistribute it and/or
00006  * modify it under the terms of the GNU Lesser General Public
00007  * License as published by the Free Software Foundation; either
00008  * version 2 of the License, or (at your option) any later version.
00009  * 
00010  * This library is distributed in the hope that it will be useful,
00011  * but WITHOUT ANY WARRANTY; without even the implied warranty of
00012  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
00013  * Lesser General Public License for more details.
00014  * 
00015  * You should have received a copy of the GNU Lesser General Public
00016  * License along with this library; if not, write to the Free Software
00017  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
00018  * 
00019  * Release: 3.0
00020  *
00021  * Contact: jonathan@objectweb.org
00022  *
00023  * Author: Bruno Dumant
00024  * 
00025  */
00026 
00027 package org.objectweb.jonathan.libs.protocols.tcpip;
00028 
00029 import java.io.IOException;
00030 
00031 import org.objectweb.jonathan.apis.kernel.Context;
00032 import org.objectweb.jonathan.apis.kernel.InternalException;
00033 import org.objectweb.jonathan.apis.kernel.JonathanException;
00034 import org.objectweb.jonathan.apis.presentation.EndOfMessageException;
00035 import org.objectweb.jonathan.apis.presentation.Marshaller;
00036 import org.objectweb.jonathan.apis.presentation.MarshallerFactory;
00037 import org.objectweb.jonathan.apis.protocols.CommunicationException;
00038 import org.objectweb.jonathan.apis.protocols.Protocol;
00039 import org.objectweb.jonathan.apis.protocols.ProtocolGraph;
00040 import org.objectweb.jonathan.apis.protocols.ReplyInterface;
00041 import org.objectweb.jonathan.apis.protocols.SessionIdentifier;
00042 import org.objectweb.jonathan.apis.protocols.Session_High;
00043 import org.objectweb.jonathan.apis.protocols.Session_Low;
00044 import org.objectweb.jonathan.apis.protocols.ip.IpConnection;
00045 import org.objectweb.jonathan.apis.protocols.ip.IpSession;
00046 import org.objectweb.jonathan.apis.protocols.ip.IpSessionIdentifier;
00047 import org.objectweb.jonathan.apis.protocols.ip.TcpIpConnectionMgr;
00048 import org.objectweb.jonathan.apis.protocols.ip.TcpIpSrvConnectionFactory;
00049 import org.objectweb.jonathan.apis.resources.Chunk;
00050 import org.objectweb.jonathan.apis.resources.ChunkFactory;
00051 import org.objectweb.jonathan.apis.resources.ChunkProvider;
00052 import org.objectweb.jonathan.apis.resources.Scheduler;
00053 
00059 final public class TcpIpProtocol implements Protocol {
00060    Scheduler scheduler;
00061    ChunkFactory chunk_factory;
00062    MarshallerFactory marshaller_factory;
00063    
00070    public boolean verbose;
00071 
00073    SrvSessionFactory[] srv_session_factories = new SrvSessionFactory[4];
00074    int num_srv_sessions = 0;
00075 
00077    TcpIpConnectionMgr connection_mgr;
00078    
00089    public TcpIpProtocol(Context _c, TcpIpConnectionMgr connection_mgr,
00090                         Scheduler scheduler, ChunkFactory chunk_factory,
00091                         MarshallerFactory mf)
00092       throws JonathanException {
00093          this.scheduler = scheduler;
00094          this.chunk_factory = chunk_factory;
00095          this.connection_mgr = connection_mgr;
00096          this.marshaller_factory = mf;
00097 
00098          verbose = false;
00099          Object prop = _c.getValue("/jonathan/tcpip/verbose",'/');
00100          if (prop!=Context.NO_VALUE){
00101             verbose = ((Boolean) prop).booleanValue();
00102          }
00103    }
00104 
00109    public final boolean isAnInvocationProtocol() {
00110       return false;
00111    }
00112 
00117    public final ProtocolGraph newProtocolGraph() {
00118       return new TcpIpProtocolGraph();
00119    }
00120    
00126    public final ProtocolGraph newProtocolGraph(int port) {
00127       return new TcpIpProtocolGraph(port);
00128    }   
00129    
00136    public final IpSessionIdentifier newSessionIdentifier(String host,
00137                                                          int port) {
00138       return new CltSessionIdentifier(host,port);
00139    }
00140    
00149    public boolean isLocal(IpSessionIdentifier tcp_session_id) {
00150       String hostname = 
00151          connection_mgr.getCanonicalHostName(tcp_session_id.hostname);
00152       tcp_session_id.hostname = hostname;
00153       return isLocal(hostname,tcp_session_id.port);
00154    }
00155 
00164    boolean isLocal(String host,int port) {
00165       synchronized(srv_session_factories) {
00166          for (int i = 0; i < num_srv_sessions; i++) {
00167             if (srv_session_factories[i].session_id.port == port &&
00168                 srv_session_factories[i].session_id.hostname.equals(host)) {
00169                return true;
00170             }
00171          }
00172          return false;
00173       }
00174    }
00175 
00177    void remove(SrvSessionFactory factory) {
00178       int i = 0;
00179       while(i < num_srv_sessions && srv_session_factories[i] != factory) {
00180          i++;
00181       }
00182       if (i < num_srv_sessions) {
00183          num_srv_sessions--;
00184          System.arraycopy(srv_session_factories,i+1,
00185                           srv_session_factories,i,num_srv_sessions - i);
00186          srv_session_factories[num_srv_sessions] = null;
00187       }
00188    }
00189 
00190    void send(Marshaller message,IpConnection connection)
00191       throws IOException {
00192       Chunk first = message.getState(), portion = null;
00193       int size = 0, len;
00194       boolean onechunk = true;
00195       Chunk c = first;
00196 
00197       while (c != null) {
00198          len = c.top - c.offset;
00199          if (len != 0) {
00200             if (size != 0) {
00201                onechunk = false;
00202             } else {
00203                portion = c;
00204             }
00205             size += len;
00206          }
00207          c = c.next;
00208       }
00209       if (onechunk) {
00210          if (portion != null) {
00211 //               String str = "TCP a: ";
00212 //               for (int i = 0; i <  portion.top - portion.offset; i++) {
00213 //                  str = str + portion.data[i] + " ";
00214 //               }
00215 //               System.out.println(str);
00216             connection.emit(portion);
00217          }
00218          message.close();                  
00219       } else {
00220          try {
00221             portion = chunk_factory.newChunk(size);
00222             int off = portion.offset;
00223             c = first;
00224             while (c != null) {
00225                if ((len = c.top - c.offset) > 0) {
00226                   System.arraycopy(c.data,c.offset,portion.data,off,len);
00227                   off += len;
00228                }
00229                c = c.next; 
00230             }
00231             portion.top = off;
00232 //             String str = "TCP b: ";            
00233 //              for (int i = 0; i < size; i++) {
00234 //                 str = portion.data[i] + " ";
00235 //              }
00236 //              System.out.println(str);
00237             connection.emit(portion);
00238             message.close();
00239          } finally {
00240             portion.release();
00241          }
00242       }
00243    }
00244 
00245    final class TcpIpProtocolGraph implements ProtocolGraph {
00246 
00247       int port;
00248       
00249       TcpIpProtocolGraph(int port) {
00250          this.port = port;
00251       }
00252       
00253       TcpIpProtocolGraph() {
00254          this(0);
00255       }
00256 
00257       public SessionIdentifier export(Session_Low hls)
00258          throws JonathanException {
00259          SrvSessionId session_id;
00260          synchronized (srv_session_factories) {
00261             // try to reuse an existing server socket factory.
00262             for (int i = 0; i < num_srv_sessions; i++) {
00263                if ((session_id = 
00264                     srv_session_factories[i].register(hls,this)) != null) {
00265                   return session_id;
00266                }
00267             }
00268             // it has not been possible to reuse an existing server socket
00269             // factory. create a new session identifier and srv session factory.
00270             TcpIpSrvConnectionFactory srv_connection_factory = 
00271                connection_mgr.newSrvConnectionFactory(port);
00272             session_id = new SrvSessionId(srv_connection_factory);
00273             SrvSessionFactory srv_fac =
00274                new SrvSessionFactory(session_id,hls,TcpIpProtocol.this);
00275             scheduler.newJob().run(srv_fac);
00276             int len = srv_session_factories.length;
00277             if (num_srv_sessions == len) {
00278                SrvSessionFactory[] new_srv_session_factories =
00279                   new SrvSessionFactory[len + 4];
00280                System.arraycopy(srv_session_factories,0,new_srv_session_factories,
00281                                 0,len);
00282                srv_session_factories = new_srv_session_factories;
00283             }
00284             srv_session_factories[num_srv_sessions++] = srv_fac;
00285             return session_id;
00286          }
00287       }
00288    }   
00289 
00290    final class CltSessionIdentifier extends IpSessionIdentifier {
00291       
00292       CltSessionIdentifier(String hostname, int port) {
00293          super(hostname,port);
00294       }
00295 
00296       public Protocol getProtocol() {
00297          return TcpIpProtocol.this;
00298       }
00299 
00300       public void unexport() {}
00301       
00302       public Session_High bind(Session_Low hls)
00303          throws JonathanException {
00304           CltSession session = new CltSession(hls,this);
00305           IpConnection connection =
00306              connection_mgr.newCltConnection(hostname,port,session);
00307           session = (CltSession) connection.getSession();
00308           session.acquire();
00309           session.connect(connection);
00310           return session;
00311       }
00312    }
00313    
00314    final class SrvSessionId extends IpSessionIdentifier {
00315       
00316       TcpIpSrvConnectionFactory connection_factory;
00317 
00318       SrvSessionId(TcpIpSrvConnectionFactory connection_factory) {
00319          super(connection_factory.getHostName(),
00320                connection_factory.getPort());
00321          this.connection_factory = connection_factory;
00322       }
00323       
00324       public Protocol getProtocol() {
00325          return TcpIpProtocol.this;
00326       }
00327 
00328       public void unexport() {
00329          connection_factory.close();
00330       }
00331       
00332       public Session_High bind(Session_Low hls)
00333          throws JonathanException {
00334          throw new InternalException("Meaningless operation");
00335       }
00336    }
00337    
00338    abstract class Session implements IpSession, Runnable {
00343       IpConnection connection;
00345       TcpIpChunkProvider tcp_message;
00347       Session_Low hls;   
00348 
00349       Session(Session_Low hls) {
00350          super();
00351          this.hls = hls;
00352       }
00353 
00354       final public void prepare(Marshaller m) {}
00355       
00356       final public ReplyInterface prepareInvocation(Marshaller m)
00357          throws JonathanException {
00358             throw new InternalException("TCP session don't handle invocations.");
00359       }
00360 
00361       final public boolean direct() {
00362          return true;
00363       }
00364       
00365       public final Session_Low getHls() {
00366          return hls;
00367       }
00368 
00369       public synchronized void connect(IpConnection connection)
00370          throws JonathanException {
00371          if (this.connection != null) {
00372             return;
00373          }
00374          this.connection = connection;
00375          if (tcp_message == null) {
00376             // If tcp_message != null, a thread should already be started and 
00377             // running.
00378             tcp_message = new TcpIpChunkProvider(this);
00379             scheduler.newJob().run(this);
00380          }
00381       }
00382    
00383 
00384       public final IpConnection getConnection() {
00385          return connection;
00386       }
00387    
00389       final public void run() {
00390          TcpIpChunkProvider message = null;
00391          IpConnection connection = null;
00392          try {
00393             synchronized (this) {
00394                if (this.connection != null) {
00395                   message = tcp_message;
00396                   connection = this.connection;
00397                } else {
00398                   return;
00399                }
00400             }
00401             hls.send(marshaller_factory.newUnMarshaller(message),this);
00402          } catch (JonathanException e) {
00403             if (verbose) {
00404                System.err.println("Exception caught by TcpIpProtocol.");
00405             }
00406             Throwable f = e.represents();
00407             if (f instanceof IOException || f instanceof EndOfMessageException) {
00408                synchronized (this) {
00409                   if (connection == this.connection) {
00410                      unbind();
00411                      return;
00412                   } else {
00413                      message.delete();
00414                   }
00415                }
00416             }
00417             hls.send(e,this);
00418          }
00419       }
00420 
00422       final synchronized void closeNotify(TcpIpChunkProvider message) {
00423          if (connection != null && message == tcp_message) {
00424             tcp_message = new TcpIpChunkProvider(message);
00425             scheduler.newJob().run(this);
00426          } else {
00427             message.delete();
00428          }
00429       }
00430 
00432        final synchronized void deleteNotify(TcpIpChunkProvider message) {
00433            if (message == tcp_message) {
00434                tcp_message = null; // rebinding should work better now
00435            }
00436        }
00437 
00439       void unbind() {
00440          if (connection != null) {
00441             connection.delete();
00442             connection = null;
00443          }
00444       }
00445 
00446       TcpIpProtocol getProtocol() {
00447          return TcpIpProtocol.this;
00448       }
00449    }
00450 
00451    final class SrvSession extends Session {
00452       SrvSession(Session_Low hls) {
00453          super(hls);
00454       }
00455 
00456       final public void close() {}
00457       
00459       public final void send(Marshaller message)
00460          throws JonathanException {
00461             try {
00462                TcpIpProtocol.this.send(message,connection);
00463             } catch (IOException e) {
00464                try {
00465                   synchronized (this) {
00466                      if (connection == null) {
00467                         throw new CommunicationException("Session is closed");
00468                      }
00469                      unbind();
00470                      throw new CommunicationException(e);
00471                   }
00472                } finally {
00473                   message.close();
00474                }     
00475             }
00476       }
00477    }
00478 
00479    final class CltSession extends Session {
00480       IpSessionIdentifier session_id;
00481       int acquired;
00482 
00483 
00484       CltSession(Session_Low hls,IpSessionIdentifier session_id)
00485          throws JonathanException {
00486          super(hls);
00487          this.session_id = session_id;
00488          acquired = 0;
00489       }
00490    
00492       public final void send(Marshaller message)
00493          throws JonathanException {
00494          try {
00495             TcpIpProtocol.this.send(message,connection);
00496          } catch (IOException e) {
00497             try {
00498                synchronized (this) {
00499                   if (connection == null) {
00500                      throw new CommunicationException("Session is closed");
00501                   }
00502                   try {
00503                      rebind();
00504                   } catch (JonathanException g) {
00505                      throw new org.objectweb.jonathan.apis.binding.BindException("Can't rebind TCP session");
00506                   }
00507                   try {
00508                      TcpIpProtocol.this.send(message,connection);
00509                   } catch (IOException f) {
00510                      unbind();
00511                      throw new CommunicationException(f);
00512                   }
00513                }
00514             } finally {
00515                message.close();
00516             }
00517          }
00518       }
00519       
00521       void rebind() throws JonathanException {
00522          connection.delete();
00523          connection = null;
00524          String hostname = session_id.hostname;
00525          int port = session_id.port;
00526          IpConnection new_connection = 
00527             connection_mgr.newCltConnection(hostname,port,this);
00528          connect(new_connection);
00529       }
00530 
00534       final public synchronized void close() {
00535          acquired--;
00536          if (acquired == 0 && connection != null) {
00537             connection.release();
00538             connection = null;
00539          }
00540       }
00541 
00542       final synchronized void acquire() {
00543          acquired++;
00544       }
00545       
00546       public boolean equals(Object object) {
00547          if (object instanceof CltSession) {
00548             CltSession other = (CltSession) object;
00549             return other.session_id.equals(session_id) && other.hls.equals(hls);
00550          }
00551          return false;
00552       }
00553 
00554       public int hashCode() {
00555          return session_id.hashCode() + hls.hashCode();
00556       }
00557    }
00558 
00562    final class SrvSessionFactory implements Runnable {
00563       SrvSessionId session_id;
00564       Session_Low hls;
00565       boolean cont;
00566       TcpIpProtocol protocol;
00567       Thread runner;
00568    
00573       SrvSessionFactory(SrvSessionId session_id, Session_Low hls,
00574                         TcpIpProtocol protocol) {
00575          this.hls = hls;
00576          this.session_id = session_id;
00577          this.protocol = protocol;
00578          cont = true;
00579       }
00580    
00581       SrvSessionId register(Session_Low hls,
00582                             TcpIpProtocolGraph protocol_graph) {
00583          int port = protocol_graph.port;
00584          if ((port == 0 || port == session_id.port) && this.hls.equals(hls)) {
00585             return session_id;
00586          } else {
00587             return null;
00588          }
00589       }
00590    
00591       public void run() {
00592          runner = Thread.currentThread();
00593          TcpIpSrvConnectionFactory connection_factory = 
00594             session_id.connection_factory;
00595          while (cont) {
00596             try {
00597                SrvSession session = new  SrvSession(hls);
00598                IpConnection connection = 
00599                   connection_factory.newSrvConnection(session);
00600                session.connect(connection);
00601             } catch (JonathanException e) {
00602                if (cont) {
00603                   if (verbose) {
00604                      System.err.println("Stopping server socket on exception.");
00605                      e.printStackTrace();
00606                   }
00607                   session_id.unexport();
00608                   remove(this);
00609                   cont = false;
00610                }
00611             }
00612          }
00613       }
00614    
00615       void release() {
00616          cont = false;
00617          runner.interrupt();
00618          session_id.unexport();
00619          remove(this);
00620       }
00621    }   
00622 }
00623 
00624 
00629 final class TcpIpChunkProvider extends Chunk implements ChunkProvider  {
00630    static final byte[] empty_data = new byte[0];
00631    static final Chunk empty_chunk = new Chunk(empty_data,0,0);
00632    
00634    TcpIpProtocol.Session session;
00636    IpConnection connection;
00637    int max;
00638 
00640    Chunk cache;
00641    
00642    TcpIpChunkProvider(TcpIpProtocol.Session session) {
00643       super(empty_data,0,0);
00644       max = 0;
00645       this.session = session;
00646       connection = session.connection;
00647       cache = empty_chunk;
00648    }
00649 
00650    TcpIpChunkProvider(TcpIpChunkProvider message) {
00651       super(message.data,message.offset,message.top);
00652       cache = message.cache;
00653       max = message.max;
00654       session = message.session;
00655       connection = message.connection;
00656       message.cache = null;
00657    }
00658    
00659    public Chunk prepare() throws JonathanException {
00660       TcpIpProtocol protocol = session.getProtocol();
00661       if (top == offset) {
00662          // there is nothing left to read from this message
00663          try {
00664             int to_read = connection.available();
00665             if (to_read > 1) {
00666                if (to_read > max - top) {
00667                   cache.release();
00668                   cache = protocol.chunk_factory.newChunk(to_read);
00669                   data = cache.data;
00670                   offset = cache.offset;
00671                   top = cache.top;
00672                   max = data.length;
00673                }
00674                connection.receive(this,to_read);
00675                return this;
00676             } else {
00677                if (max - top == 0) {
00678                   cache.release();
00679                   cache = protocol.chunk_factory.newChunk();
00680                   data = cache.data;
00681                   offset = cache.offset;
00682                   top = cache.top;
00683                   max = data.length;
00684                }
00685                connection.receive(this,1);
00686                return this;
00687             }
00688          } catch (IOException e) {
00689             delete();
00690             connection.delete();
00691             connection = null;
00692             throw new JonathanException(e);
00693          } 
00694       } else {
00695          return this;
00696       }
00697    }
00698 
00699    public void close() {
00700       if (cache != null) {
00701          session.closeNotify(this);
00702       }
00703    }
00704 
00705    protected void finalize() {
00706       if (cache != null) {
00707          System.err.println("Resource management error. message " + this + 
00708                             " has not been properly closed.");
00709          delete();
00710       }
00711    }   
00712 
00713    final void delete() {
00714       if (cache != null) {
00715          cache.release();
00716          cache = null;
00717       }
00718       
00719       session.deleteNotify(this);
00720    }
00721 
00722    public Chunk duplicate() throws JonathanException {
00723       cache.top = top;
00724       return cache.duplicate(offset,top);
00725    }
00726 
00727    public Chunk duplicate(int off,int t) throws JonathanException {
00728       cache.top = top;
00729       return cache.duplicate(off,t);
00730    }
00731    
00732    public void release() {}   
00733 }

Generated at Fri May 31 19:23:35 2002 for Jonathan by doxygen1.2.6 written by Dimitri van Heesch, © 1997-2001