00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027 package org.objectweb.jonathan.libs.protocols.tcpip;
00028
00029 import java.io.IOException;
00030
00031 import org.objectweb.jonathan.apis.kernel.Context;
00032 import org.objectweb.jonathan.apis.kernel.InternalException;
00033 import org.objectweb.jonathan.apis.kernel.JonathanException;
00034 import org.objectweb.jonathan.apis.presentation.EndOfMessageException;
00035 import org.objectweb.jonathan.apis.presentation.Marshaller;
00036 import org.objectweb.jonathan.apis.presentation.MarshallerFactory;
00037 import org.objectweb.jonathan.apis.protocols.CommunicationException;
00038 import org.objectweb.jonathan.apis.protocols.Protocol;
00039 import org.objectweb.jonathan.apis.protocols.ProtocolGraph;
00040 import org.objectweb.jonathan.apis.protocols.ReplyInterface;
00041 import org.objectweb.jonathan.apis.protocols.SessionIdentifier;
00042 import org.objectweb.jonathan.apis.protocols.Session_High;
00043 import org.objectweb.jonathan.apis.protocols.Session_Low;
00044 import org.objectweb.jonathan.apis.protocols.ip.IpConnection;
00045 import org.objectweb.jonathan.apis.protocols.ip.IpSession;
00046 import org.objectweb.jonathan.apis.protocols.ip.IpSessionIdentifier;
00047 import org.objectweb.jonathan.apis.protocols.ip.TcpIpConnectionMgr;
00048 import org.objectweb.jonathan.apis.protocols.ip.TcpIpSrvConnectionFactory;
00049 import org.objectweb.jonathan.apis.resources.Chunk;
00050 import org.objectweb.jonathan.apis.resources.ChunkFactory;
00051 import org.objectweb.jonathan.apis.resources.ChunkProvider;
00052 import org.objectweb.jonathan.apis.resources.Scheduler;
00053
00059 final public class TcpIpProtocol implements Protocol {
00060 Scheduler scheduler;
00061 ChunkFactory chunk_factory;
00062 MarshallerFactory marshaller_factory;
00063
00070 public boolean verbose;
00071
00073 SrvSessionFactory[] srv_session_factories = new SrvSessionFactory[4];
00074 int num_srv_sessions = 0;
00075
00077 TcpIpConnectionMgr connection_mgr;
00078
00089 public TcpIpProtocol(Context _c, TcpIpConnectionMgr connection_mgr,
00090 Scheduler scheduler, ChunkFactory chunk_factory,
00091 MarshallerFactory mf)
00092 throws JonathanException {
00093 this.scheduler = scheduler;
00094 this.chunk_factory = chunk_factory;
00095 this.connection_mgr = connection_mgr;
00096 this.marshaller_factory = mf;
00097
00098 verbose = false;
00099 Object prop = _c.getValue("/jonathan/tcpip/verbose",'/');
00100 if (prop!=Context.NO_VALUE){
00101 verbose = ((Boolean) prop).booleanValue();
00102 }
00103 }
00104
00109 public final boolean isAnInvocationProtocol() {
00110 return false;
00111 }
00112
00117 public final ProtocolGraph newProtocolGraph() {
00118 return new TcpIpProtocolGraph();
00119 }
00120
00126 public final ProtocolGraph newProtocolGraph(int port) {
00127 return new TcpIpProtocolGraph(port);
00128 }
00129
00136 public final IpSessionIdentifier newSessionIdentifier(String host,
00137 int port) {
00138 return new CltSessionIdentifier(host,port);
00139 }
00140
00149 public boolean isLocal(IpSessionIdentifier tcp_session_id) {
00150 String hostname =
00151 connection_mgr.getCanonicalHostName(tcp_session_id.hostname);
00152 tcp_session_id.hostname = hostname;
00153 return isLocal(hostname,tcp_session_id.port);
00154 }
00155
00164 boolean isLocal(String host,int port) {
00165 synchronized(srv_session_factories) {
00166 for (int i = 0; i < num_srv_sessions; i++) {
00167 if (srv_session_factories[i].session_id.port == port &&
00168 srv_session_factories[i].session_id.hostname.equals(host)) {
00169 return true;
00170 }
00171 }
00172 return false;
00173 }
00174 }
00175
00177 void remove(SrvSessionFactory factory) {
00178 int i = 0;
00179 while(i < num_srv_sessions && srv_session_factories[i] != factory) {
00180 i++;
00181 }
00182 if (i < num_srv_sessions) {
00183 num_srv_sessions--;
00184 System.arraycopy(srv_session_factories,i+1,
00185 srv_session_factories,i,num_srv_sessions - i);
00186 srv_session_factories[num_srv_sessions] = null;
00187 }
00188 }
00189
00190 void send(Marshaller message,IpConnection connection)
00191 throws IOException {
00192 Chunk first = message.getState(), portion = null;
00193 int size = 0, len;
00194 boolean onechunk = true;
00195 Chunk c = first;
00196
00197 while (c != null) {
00198 len = c.top - c.offset;
00199 if (len != 0) {
00200 if (size != 0) {
00201 onechunk = false;
00202 } else {
00203 portion = c;
00204 }
00205 size += len;
00206 }
00207 c = c.next;
00208 }
00209 if (onechunk) {
00210 if (portion != null) {
00211
00212
00213
00214
00215
00216 connection.emit(portion);
00217 }
00218 message.close();
00219 } else {
00220 try {
00221 portion = chunk_factory.newChunk(size);
00222 int off = portion.offset;
00223 c = first;
00224 while (c != null) {
00225 if ((len = c.top - c.offset) > 0) {
00226 System.arraycopy(c.data,c.offset,portion.data,off,len);
00227 off += len;
00228 }
00229 c = c.next;
00230 }
00231 portion.top = off;
00232
00233
00234
00235
00236
00237 connection.emit(portion);
00238 message.close();
00239 } finally {
00240 portion.release();
00241 }
00242 }
00243 }
00244
00245 final class TcpIpProtocolGraph implements ProtocolGraph {
00246
00247 int port;
00248
00249 TcpIpProtocolGraph(int port) {
00250 this.port = port;
00251 }
00252
00253 TcpIpProtocolGraph() {
00254 this(0);
00255 }
00256
00257 public SessionIdentifier export(Session_Low hls)
00258 throws JonathanException {
00259 SrvSessionId session_id;
00260 synchronized (srv_session_factories) {
00261
00262 for (int i = 0; i < num_srv_sessions; i++) {
00263 if ((session_id =
00264 srv_session_factories[i].register(hls,this)) != null) {
00265 return session_id;
00266 }
00267 }
00268
00269
00270 TcpIpSrvConnectionFactory srv_connection_factory =
00271 connection_mgr.newSrvConnectionFactory(port);
00272 session_id = new SrvSessionId(srv_connection_factory);
00273 SrvSessionFactory srv_fac =
00274 new SrvSessionFactory(session_id,hls,TcpIpProtocol.this);
00275 scheduler.newJob().run(srv_fac);
00276 int len = srv_session_factories.length;
00277 if (num_srv_sessions == len) {
00278 SrvSessionFactory[] new_srv_session_factories =
00279 new SrvSessionFactory[len + 4];
00280 System.arraycopy(srv_session_factories,0,new_srv_session_factories,
00281 0,len);
00282 srv_session_factories = new_srv_session_factories;
00283 }
00284 srv_session_factories[num_srv_sessions++] = srv_fac;
00285 return session_id;
00286 }
00287 }
00288 }
00289
00290 final class CltSessionIdentifier extends IpSessionIdentifier {
00291
00292 CltSessionIdentifier(String hostname, int port) {
00293 super(hostname,port);
00294 }
00295
00296 public Protocol getProtocol() {
00297 return TcpIpProtocol.this;
00298 }
00299
00300 public void unexport() {}
00301
00302 public Session_High bind(Session_Low hls)
00303 throws JonathanException {
00304 CltSession session = new CltSession(hls,this);
00305 IpConnection connection =
00306 connection_mgr.newCltConnection(hostname,port,session);
00307 session = (CltSession) connection.getSession();
00308 session.acquire();
00309 session.connect(connection);
00310 return session;
00311 }
00312 }
00313
00314 final class SrvSessionId extends IpSessionIdentifier {
00315
00316 TcpIpSrvConnectionFactory connection_factory;
00317
00318 SrvSessionId(TcpIpSrvConnectionFactory connection_factory) {
00319 super(connection_factory.getHostName(),
00320 connection_factory.getPort());
00321 this.connection_factory = connection_factory;
00322 }
00323
00324 public Protocol getProtocol() {
00325 return TcpIpProtocol.this;
00326 }
00327
00328 public void unexport() {
00329 connection_factory.close();
00330 }
00331
00332 public Session_High bind(Session_Low hls)
00333 throws JonathanException {
00334 throw new InternalException("Meaningless operation");
00335 }
00336 }
00337
00338 abstract class Session implements IpSession, Runnable {
00343 IpConnection connection;
00345 TcpIpChunkProvider tcp_message;
00347 Session_Low hls;
00348
00349 Session(Session_Low hls) {
00350 super();
00351 this.hls = hls;
00352 }
00353
00354 final public void prepare(Marshaller m) {}
00355
00356 final public ReplyInterface prepareInvocation(Marshaller m)
00357 throws JonathanException {
00358 throw new InternalException("TCP session don't handle invocations.");
00359 }
00360
00361 final public boolean direct() {
00362 return true;
00363 }
00364
00365 public final Session_Low getHls() {
00366 return hls;
00367 }
00368
00369 public synchronized void connect(IpConnection connection)
00370 throws JonathanException {
00371 if (this.connection != null) {
00372 return;
00373 }
00374 this.connection = connection;
00375 if (tcp_message == null) {
00376
00377
00378 tcp_message = new TcpIpChunkProvider(this);
00379 scheduler.newJob().run(this);
00380 }
00381 }
00382
00383
00384 public final IpConnection getConnection() {
00385 return connection;
00386 }
00387
00389 final public void run() {
00390 TcpIpChunkProvider message = null;
00391 IpConnection connection = null;
00392 try {
00393 synchronized (this) {
00394 if (this.connection != null) {
00395 message = tcp_message;
00396 connection = this.connection;
00397 } else {
00398 return;
00399 }
00400 }
00401 hls.send(marshaller_factory.newUnMarshaller(message),this);
00402 } catch (JonathanException e) {
00403 if (verbose) {
00404 System.err.println("Exception caught by TcpIpProtocol.");
00405 }
00406 Throwable f = e.represents();
00407 if (f instanceof IOException || f instanceof EndOfMessageException) {
00408 synchronized (this) {
00409 if (connection == this.connection) {
00410 unbind();
00411 return;
00412 } else {
00413 message.delete();
00414 }
00415 }
00416 }
00417 hls.send(e,this);
00418 }
00419 }
00420
00422 final synchronized void closeNotify(TcpIpChunkProvider message) {
00423 if (connection != null && message == tcp_message) {
00424 tcp_message = new TcpIpChunkProvider(message);
00425 scheduler.newJob().run(this);
00426 } else {
00427 message.delete();
00428 }
00429 }
00430
00432 final synchronized void deleteNotify(TcpIpChunkProvider message) {
00433 if (message == tcp_message) {
00434 tcp_message = null;
00435 }
00436 }
00437
00439 void unbind() {
00440 if (connection != null) {
00441 connection.delete();
00442 connection = null;
00443 }
00444 }
00445
00446 TcpIpProtocol getProtocol() {
00447 return TcpIpProtocol.this;
00448 }
00449 }
00450
00451 final class SrvSession extends Session {
00452 SrvSession(Session_Low hls) {
00453 super(hls);
00454 }
00455
00456 final public void close() {}
00457
00459 public final void send(Marshaller message)
00460 throws JonathanException {
00461 try {
00462 TcpIpProtocol.this.send(message,connection);
00463 } catch (IOException e) {
00464 try {
00465 synchronized (this) {
00466 if (connection == null) {
00467 throw new CommunicationException("Session is closed");
00468 }
00469 unbind();
00470 throw new CommunicationException(e);
00471 }
00472 } finally {
00473 message.close();
00474 }
00475 }
00476 }
00477 }
00478
00479 final class CltSession extends Session {
00480 IpSessionIdentifier session_id;
00481 int acquired;
00482
00483
00484 CltSession(Session_Low hls,IpSessionIdentifier session_id)
00485 throws JonathanException {
00486 super(hls);
00487 this.session_id = session_id;
00488 acquired = 0;
00489 }
00490
00492 public final void send(Marshaller message)
00493 throws JonathanException {
00494 try {
00495 TcpIpProtocol.this.send(message,connection);
00496 } catch (IOException e) {
00497 try {
00498 synchronized (this) {
00499 if (connection == null) {
00500 throw new CommunicationException("Session is closed");
00501 }
00502 try {
00503 rebind();
00504 } catch (JonathanException g) {
00505 throw new org.objectweb.jonathan.apis.binding.BindException("Can't rebind TCP session");
00506 }
00507 try {
00508 TcpIpProtocol.this.send(message,connection);
00509 } catch (IOException f) {
00510 unbind();
00511 throw new CommunicationException(f);
00512 }
00513 }
00514 } finally {
00515 message.close();
00516 }
00517 }
00518 }
00519
00521 void rebind() throws JonathanException {
00522 connection.delete();
00523 connection = null;
00524 String hostname = session_id.hostname;
00525 int port = session_id.port;
00526 IpConnection new_connection =
00527 connection_mgr.newCltConnection(hostname,port,this);
00528 connect(new_connection);
00529 }
00530
00534 final public synchronized void close() {
00535 acquired--;
00536 if (acquired == 0 && connection != null) {
00537 connection.release();
00538 connection = null;
00539 }
00540 }
00541
00542 final synchronized void acquire() {
00543 acquired++;
00544 }
00545
00546 public boolean equals(Object object) {
00547 if (object instanceof CltSession) {
00548 CltSession other = (CltSession) object;
00549 return other.session_id.equals(session_id) && other.hls.equals(hls);
00550 }
00551 return false;
00552 }
00553
00554 public int hashCode() {
00555 return session_id.hashCode() + hls.hashCode();
00556 }
00557 }
00558
00562 final class SrvSessionFactory implements Runnable {
00563 SrvSessionId session_id;
00564 Session_Low hls;
00565 boolean cont;
00566 TcpIpProtocol protocol;
00567 Thread runner;
00568
00573 SrvSessionFactory(SrvSessionId session_id, Session_Low hls,
00574 TcpIpProtocol protocol) {
00575 this.hls = hls;
00576 this.session_id = session_id;
00577 this.protocol = protocol;
00578 cont = true;
00579 }
00580
00581 SrvSessionId register(Session_Low hls,
00582 TcpIpProtocolGraph protocol_graph) {
00583 int port = protocol_graph.port;
00584 if ((port == 0 || port == session_id.port) && this.hls.equals(hls)) {
00585 return session_id;
00586 } else {
00587 return null;
00588 }
00589 }
00590
00591 public void run() {
00592 runner = Thread.currentThread();
00593 TcpIpSrvConnectionFactory connection_factory =
00594 session_id.connection_factory;
00595 while (cont) {
00596 try {
00597 SrvSession session = new SrvSession(hls);
00598 IpConnection connection =
00599 connection_factory.newSrvConnection(session);
00600 session.connect(connection);
00601 } catch (JonathanException e) {
00602 if (cont) {
00603 if (verbose) {
00604 System.err.println("Stopping server socket on exception.");
00605 e.printStackTrace();
00606 }
00607 session_id.unexport();
00608 remove(this);
00609 cont = false;
00610 }
00611 }
00612 }
00613 }
00614
00615 void release() {
00616 cont = false;
00617 runner.interrupt();
00618 session_id.unexport();
00619 remove(this);
00620 }
00621 }
00622 }
00623
00624
00629 final class TcpIpChunkProvider extends Chunk implements ChunkProvider {
00630 static final byte[] empty_data = new byte[0];
00631 static final Chunk empty_chunk = new Chunk(empty_data,0,0);
00632
00634 TcpIpProtocol.Session session;
00636 IpConnection connection;
00637 int max;
00638
00640 Chunk cache;
00641
00642 TcpIpChunkProvider(TcpIpProtocol.Session session) {
00643 super(empty_data,0,0);
00644 max = 0;
00645 this.session = session;
00646 connection = session.connection;
00647 cache = empty_chunk;
00648 }
00649
00650 TcpIpChunkProvider(TcpIpChunkProvider message) {
00651 super(message.data,message.offset,message.top);
00652 cache = message.cache;
00653 max = message.max;
00654 session = message.session;
00655 connection = message.connection;
00656 message.cache = null;
00657 }
00658
00659 public Chunk prepare() throws JonathanException {
00660 TcpIpProtocol protocol = session.getProtocol();
00661 if (top == offset) {
00662
00663 try {
00664 int to_read = connection.available();
00665 if (to_read > 1) {
00666 if (to_read > max - top) {
00667 cache.release();
00668 cache = protocol.chunk_factory.newChunk(to_read);
00669 data = cache.data;
00670 offset = cache.offset;
00671 top = cache.top;
00672 max = data.length;
00673 }
00674 connection.receive(this,to_read);
00675 return this;
00676 } else {
00677 if (max - top == 0) {
00678 cache.release();
00679 cache = protocol.chunk_factory.newChunk();
00680 data = cache.data;
00681 offset = cache.offset;
00682 top = cache.top;
00683 max = data.length;
00684 }
00685 connection.receive(this,1);
00686 return this;
00687 }
00688 } catch (IOException e) {
00689 delete();
00690 connection.delete();
00691 connection = null;
00692 throw new JonathanException(e);
00693 }
00694 } else {
00695 return this;
00696 }
00697 }
00698
00699 public void close() {
00700 if (cache != null) {
00701 session.closeNotify(this);
00702 }
00703 }
00704
00705 protected void finalize() {
00706 if (cache != null) {
00707 System.err.println("Resource management error. message " + this +
00708 " has not been properly closed.");
00709 delete();
00710 }
00711 }
00712
00713 final void delete() {
00714 if (cache != null) {
00715 cache.release();
00716 cache = null;
00717 }
00718
00719 session.deleteNotify(this);
00720 }
00721
00722 public Chunk duplicate() throws JonathanException {
00723 cache.top = top;
00724 return cache.duplicate(offset,top);
00725 }
00726
00727 public Chunk duplicate(int off,int t) throws JonathanException {
00728 cache.top = top;
00729 return cache.duplicate(off,t);
00730 }
00731
00732 public void release() {}
00733 }