1 25 40 41 package org.datashare.client; 42 43 import java.net.Socket ; 44 import java.net.DatagramSocket ; 45 import java.net.MulticastSocket ; 46 import java.net.InetAddress ; 47 import java.io.ObjectOutputStream ; 48 import java.io.ObjectInputStream ; 49 import java.io.ByteArrayInputStream ; 50 import java.io.ByteArrayOutputStream ; 51 import java.io.BufferedInputStream ; 52 import java.io.BufferedOutputStream ; 53 import java.io.InputStream ; 54 import java.io.OutputStream ; 55 import java.io.IOException ; 56 import java.io.StreamCorruptedException ; 57 import java.io.UTFDataFormatException ; 58 import java.net.DatagramPacket ; 59 60 import org.datashare.objects.DataShareObject; 61 import org.datashare.objects.DataShareConnectionDescriptor; 62 import org.datashare.objects.ChannelDescription; 63 import org.datashare.objects.ActivateConnectionObject; 64 import org.datashare.SessionUtilities; 65 66 71 public class DataShareConnection implements Runnable 72 { 73 Socket tcpSocket = null; 74 DatagramSocket udpSocket = null; 75 MulticastSocket multicastSocket = null; 76 ObjectOutputStream oos = null; 77 ObjectInputStream ois = null; 78 private boolean running = true; private boolean closeAllCalled = false; 80 public DataShareConnectionDescriptor dscd = null; 81 ClientDataReceiverInterface cdri = null; 82 DatagramPacket sndPacket = null; DatagramPacket rcvPacket = null; protected int type; 85 private boolean completed = false; 86 public String keyValue; 87 private int sndBuffSize = -1; 88 private int rcvBuffSize = -1; 89 private byte[] byteArrayInstance = null; 90 91 public DataShareConnection(DataShareConnectionDescriptor dscd1, ClientDataReceiverInterface cdri) throws Exception 92 { 93 type = dscd1.type; 94 dscd = dscd1; 95 this.cdri = cdri; 96 97 switch(type) 98 { 99 case ChannelDescription.TCP: 100 { 101 System.out.println("Creating a TCP DataShareConnection instance for " + dscd.serverIP +":"+dscd.serverPort); 102 try{ 103 if(dscd.completelySpecified) tcpSocket = new Socket (dscd.serverIP, dscd.serverPort, dscd.clientIP, dscd.clientPort); 105 else 106 { 107 tcpSocket = new Socket (dscd.serverIP, dscd.serverPort); 108 dscd.clientIP = tcpSocket.getLocalAddress(); 109 dscd.clientPort = tcpSocket.getLocalPort(); 110 dscd.completelySpecified = true; 111 } 112 oos = new ObjectOutputStream (tcpSocket.getOutputStream()); 113 ois = new ObjectInputStream (tcpSocket.getInputStream()); 114 Thread me = new Thread (this,"DataShareConnectionFor"+dscd.channelDescription.channelName+"OnServerPort"+dscd.serverPort); 115 me.start(); 116 keyValue = "TCP-" + dscd.serverIP.getHostAddress()+":"+dscd.serverPort+"-"+dscd.clientIP.getHostAddress()+":"+dscd.clientPort; 117 while(!completed) 118 { 119 SessionUtilities.delay(100); 121 } 122 } 123 catch(Exception e) 124 { 125 System.out.println("Problems creating our tcpSocket..."); 126 System.out.println(e); 128 throw e; 130 } 131 break; 132 } 133 case ChannelDescription.UDP: 134 { 135 System.out.println("Creating a UDP DataShareConnection instance for " + dscd.serverIP +":"+dscd.serverPort); 136 try{ 137 udpSocket = new DatagramSocket (); 138 udpSocket.setReceiveBufferSize(15000); 139 udpSocket.setSendBufferSize(10000); 140 rcvBuffSize = udpSocket.getReceiveBufferSize(); 141 if(SessionUtilities.getVerbose()) 142 System.out.println("UDP socket max receive size is " + rcvBuffSize); 143 sndBuffSize = udpSocket.getSendBufferSize(); 144 if(SessionUtilities.getVerbose()) 145 System.out.println("UDP socket max send size is " + sndBuffSize); 146 dscd.clientIP = InetAddress.getLocalHost(); 147 dscd.clientPort = udpSocket.getLocalPort(); 149 dscd.completelySpecified = true; 150 Thread me = new Thread (this,"DataShareConnectionFor"+dscd.channelDescription.channelName); 151 me.start(); 152 keyValue = "UDP-" + dscd.serverIP.getHostAddress()+":"+dscd.serverPort+"-"+dscd.clientIP.getHostAddress()+":"+dscd.clientPort; 153 while(!completed) 154 { 155 SessionUtilities.delay(100); 157 } 158 } 159 catch(Exception e2) 160 { 161 System.out.println("Problems creating our udpSocket..."); 162 e2.printStackTrace(); 163 throw new Exception (e2.toString()); 164 } 165 break; 166 } 167 case ChannelDescription.MULTICAST: 168 { 169 System.out.println("Creating a Multicast DataShareConnection instance for " + dscd.serverIP +":"+dscd.serverPort); 170 try{ 171 multicastSocket = new MulticastSocket (dscd.serverPort); multicastSocket.setReceiveBufferSize(15000); 173 multicastSocket.setSendBufferSize(10000); 174 multicastSocket.joinGroup(dscd.serverIP); if(SessionUtilities.getVerbose()) 176 { 177 rcvBuffSize = multicastSocket.getReceiveBufferSize(); 178 sndBuffSize = multicastSocket.getSendBufferSize(); 179 if(SessionUtilities.getVerbose()) 180 { 181 System.out.println("Multicast socket max receive size is " + rcvBuffSize); 182 System.out.println("Multicast socket max send size is " + sndBuffSize); 183 } 184 } 185 dscd.clientIP = dscd.serverIP; 186 dscd.clientPort = multicastSocket.getLocalPort(); 188 dscd.completelySpecified = true; 189 Thread me = new Thread (this,"DataShareConnectionFor"+dscd.channelDescription.channelName); 190 me.start(); 191 keyValue = "Multicast-" + dscd.serverIP.getHostAddress()+":"+dscd.serverPort+"-"+dscd.clientIP.getHostAddress()+":"+dscd.clientPort; 192 while(!completed) 193 { 194 SessionUtilities.delay(100); 196 } 197 } 198 catch(Exception e2) 199 { 200 System.out.println("Problems creating our multcastSocket..."); 201 e2.printStackTrace(); 202 throw new Exception (e2.toString()); 203 } 204 break; 205 } 206 default: 207 { 208 System.out.println("DataShareConnection...unknown channel type!!!"); 209 } 210 } 211 if(SessionUtilities.getVerbose()) 212 System.out.println("DataShareConnection ready for "+ dscd.channelDescription.channelName); 213 } 214 215 public int getType() 216 { 217 return type; 218 } 219 220 public void 221 sendToOthers(Object object) 222 { 223 sendToOthers(object, false); 224 } 225 226 public void 227 sendToAll(Object object) 228 { 229 sendToAll(object, false); 230 } 231 232 public void 233 sendToClient(Object object, String clientUniqueName) 234 { 235 sendToClient(object, clientUniqueName, false); 236 } 237 238 public void 239 sendToOthers(Object object, boolean isControl) 240 { 241 try{ 242 DataShareObject dsObject = new DataShareObject(SessionUtilities.convertObjectToByteArray(object), 243 DataShareObject.SENDTOOTHERS, dscd.clientKeyValue); 244 dsObject.isControlObject = isControl; 245 sendDSObject(dsObject); 246 } 247 catch(Exception e) 248 { 249 e.printStackTrace(); 250 } 251 } 252 253 public void 254 sendToAll(Object object, boolean isControl) 255 { 256 try{ 257 DataShareObject dsObject = new DataShareObject(SessionUtilities.convertObjectToByteArray(object), 258 DataShareObject.SENDTOALL, dscd.clientKeyValue); 259 dsObject.isControlObject = isControl; 260 sendDSObject(dsObject); 261 } 262 catch(Exception e) 263 { 264 e.printStackTrace(); 265 } 266 } 267 268 public void 269 sendToClient(Object object, String clientUniqueName, boolean isControl) 270 { 271 try{ 272 DataShareObject dsObject = new DataShareObject(SessionUtilities.convertObjectToByteArray(object), 273 dscd.clientKeyValue, clientUniqueName); 274 dsObject.isControlObject = isControl; 275 sendDSObject(dsObject); 276 } 277 catch(Exception e) 278 { 279 e.printStackTrace(); 280 } 281 } 282 283 private synchronized void 284 sendDSObject(DataShareObject dsObject) 285 { 286 switch(type) 287 { 288 case ChannelDescription.TCP: 289 { 290 try{ 291 if(running) 292 { 293 if(SessionUtilities.getVerbose()) 294 System.out.println("Sending " + dsObject.objectBytes.length + " bytes over TCP socket " + dscd.clientPort + "->ServerPort"+dscd.serverPort); 295 oos.writeObject(dsObject); 296 oos.flush(); 297 } 298 } 299 catch(IOException ioe) 300 { 301 if(!closeAllCalled) 302 { 303 ioe.printStackTrace(); 304 closeAll(); 305 } 306 } 307 break; 308 } 309 case ChannelDescription.UDP: 310 { 311 try{ 312 if(running) 313 { 314 byte[] sndBytes = SessionUtilities.convertObjectToByteArray(dsObject); 315 sndPacket = new DatagramPacket (sndBytes, sndBytes.length, dscd.serverIP, dscd.serverPort); 316 if(SessionUtilities.getVerbose()) 317 System.out.println("Sending "+ sndBytes.length+ " bytes over UDP socket " + dscd.clientPort + "->ServerPort"+dscd.serverPort); 318 udpSocket.send(sndPacket); 319 } 320 } 321 catch(IOException ioe2) 322 { 323 if(!closeAllCalled) 324 { 325 ioe2.printStackTrace(); 326 closeAll(); 327 } 328 } 329 break; 330 } 331 case ChannelDescription.MULTICAST: 332 { 333 try{ 334 if(running) 335 { 336 byte[] sndBytes = SessionUtilities.convertObjectToByteArray(dsObject); 337 sndPacket = new DatagramPacket (sndBytes, sndBytes.length, dscd.serverIP, dscd.serverPort); 338 if(SessionUtilities.getVerbose()) 339 System.out.println("Sending "+ sndBytes.length+ " bytes over Multicast socket " + dscd.clientPort + "->ServerPort"+dscd.serverPort); 340 multicastSocket.send(sndPacket); 341 } 342 } 343 catch(IOException ioe2) 344 { 345 if(!closeAllCalled) 346 { 347 ioe2.printStackTrace(); 348 closeAll(); 349 } 350 } 351 break; 352 } 353 default: 354 System.out.println("*** Trying to send to an unknown type of Connection ***"); 355 break; 356 } 357 } 358 359 362 public int 363 getRcvBuffSize() 364 { 365 return rcvBuffSize; 366 } 367 368 371 public int 372 getSndBuffSize() 373 { 374 return sndBuffSize; 375 } 376 377 public void run() 379 { 380 if(SessionUtilities.getVerbose()) 381 System.out.println("waiting for data on " + dscd.channelDescription.channelName); 382 System.out.flush(); 383 Object object; 384 int objectCount = 0; 385 try{ 387 completed = true; while(running) 389 { 390 switch(type) 391 { 392 case ChannelDescription.TCP: 393 { 394 object = ois.readObject(); if(SessionUtilities.getVerbose()) 396 System.out.println("received " + dscd.channelDescription.channelName + " object #" + ++objectCount); 397 try{ 398 DataShareObject dso = (DataShareObject)object; 399 if(dso.type != DataShareObject.KEEPALIVE) 401 cdri.dataReceived(dso); 402 } 403 catch(ClassCastException cce) 404 { 405 System.out.println("Not DataShareObject for object #" + objectCount + ", data ignored"); 406 } 407 break; 408 } 409 case ChannelDescription.UDP: 410 { 411 try{ 412 rcvPacket = new DatagramPacket (new byte[9998], 9998); 413 udpSocket.receive(rcvPacket); 414 object = (Object )SessionUtilities.retrieveObject(rcvPacket.getData()); 416 try{ 417 DataShareObject dso = (DataShareObject)object; 418 Object o = SessionUtilities.retrieveObject(dso.objectBytes); 420 if(SessionUtilities.getVerbose()) 421 System.out.println("DSO info: " + dso.objectBytes.length + " bytes in " + o.getClass().toString() ); 422 cdri.dataReceived(dso); 424 } 425 catch(ClassCastException cce) 426 { 427 System.out.println("Not DataShareObject for object #" + objectCount + ", data ignored"); 428 cce.printStackTrace(); 429 } 430 } 431 catch(StreamCorruptedException sce) 432 { 433 System.out.println("Discarding UDP packet-> " + sce.getMessage()); 434 } 435 catch(UTFDataFormatException udfe) 436 { 437 System.out.println("Discarding UDP packet-> " + udfe.getMessage()); 438 } 439 break; 440 } 441 case ChannelDescription.MULTICAST: 442 { 443 try{ 444 rcvPacket = new DatagramPacket (new byte[9998], 9998); 445 multicastSocket.receive(rcvPacket); 446 object = (Object )SessionUtilities.retrieveObject(rcvPacket.getData()); 448 try{ 449 DataShareObject dso = (DataShareObject)object; 450 Object o = SessionUtilities.retrieveObject(dso.objectBytes); 452 if(SessionUtilities.getVerbose()) 453 System.out.println("DSO info: " + dso.objectBytes.length + " bytes in " + o.getClass().toString() ); 454 if(!dso.isControlObject) 458 cdri.dataReceived(dso); 459 } 460 catch(ClassCastException cce) 461 { 462 System.out.println("Not DataShareObject for object #" + objectCount + ", data ignored"); 463 cce.printStackTrace(); 464 } 465 } 466 catch(StreamCorruptedException sce) 467 { 468 System.out.println("Discarding Multicast packet-> " + sce.getMessage()); 469 } 470 catch(UTFDataFormatException udfe) 471 { 472 System.out.println("Discarding Multicast packet-> " + udfe.getMessage()); 473 } 474 break; 475 } 476 } 477 } 478 } 479 catch(Exception e) 480 { 481 if(!closeAllCalled) 482 { 483 System.out.println("DataShareConnection trouble for " + this.keyValue); 484 e.printStackTrace(); 485 closeAll(); 486 } 487 } 488 } 489 490 491 494 public void 495 closeAll() 496 { 497 if(!closeAllCalled) 498 { 499 closeAllCalled = true; 500 running = false; if(tcpSocket != null) 502 { 503 try{ 504 tcpSocket.close(); 505 tcpSocket = null; 506 } 507 catch(Exception e){} 508 } 509 else if(udpSocket != null) 510 { 511 try{ 512 udpSocket.close(); 513 udpSocket = null; 514 } 515 catch(Exception e){} 516 } 517 else if(multicastSocket != null) 518 { 519 try{ 520 multicastSocket.leaveGroup(dscd.serverIP); 521 multicastSocket.close(); 522 multicastSocket = null; 523 } 524 catch(Exception e){} 525 } 526 cdri.connectionLost(this); 527 } 528 } 529 530 } 531 | Popular Tags |