| 1 21 package com.presumo.jms; 22 23 import com.presumo.util.config.Configuration; 24 import com.presumo.util.config.Preferences; 25 import com.presumo.jms.persistence.PersistentQueue; 26 import com.presumo.jms.plugin.implementation.MemoryMessageQueue; 27 import com.presumo.jms.plugin.MessageQueue; 28 import com.presumo.jms.plugin.transport.ServerTransport; 29 import com.presumo.jms.plugin.transport.Transport; 30 import com.presumo.jms.resources.Resources; 31 import com.presumo.jms.router.RemoteSession; 32 import com.presumo.jms.router.ConnectionListener; 33 import com.presumo.jms.router.Router; 34 35 import com.presumo.util.log.Logger; 36 import com.presumo.util.log.LoggerFactory; 37 38 import java.io.BufferedReader ; 39 import java.io.InputStreamReader ; 40 import java.io.IOException ; 41 import java.io.File ; 42 import java.io.FileInputStream ; 43 44 import java.util.HashMap ; 45 import java.util.Map ; 46 import java.util.Set ; 47 import java.util.StringTokenizer ; 48 import java.util.Properties ; 49 50 import javax.jms.JMSException ; 51 52 86 public class JmsServer 87 { 88 89 95 protected final String persistentDir; 96 97 101 protected final String persistentPrefix; 102 103 108 protected final int persistentLogSize; 109 110 private final Logger logger; 111 private Map serverMap; 112 private Map clientMap; 113 private Router router; 114 115 119 125 public JmsServer() 126 { 127 this(null); 128 } 129 130 140 public JmsServer(String persistentDir) 141 { 142 this(persistentDir, "PresumoJms"); 143 } 144 145 146 147 166 public JmsServer(String persistentDir, 167 String persistentPrefix) 168 { 169 this(persistentDir, persistentPrefix, 100000); 170 } 171 172 173 200 public JmsServer(String persistentDir, 201 String persistentPrefix, 202 int persistentLogSize) 203 { 204 this.persistentDir = persistentDir; 205 this.persistentPrefix = persistentPrefix; 206 this.persistentLogSize = persistentLogSize; 207 208 serverMap = new HashMap (); 209 clientMap = new HashMap (); 210 logger = LoggerFactory.getLogger(JmsServer.class, Resources.getBundle()); 211 } 212 213 214 218 219 227 public synchronized void startup() throws JMSException  228 { 229 logger.entry("startup"); 230 231 if (router == null) { 232 try { 233 MessageQueue queue = null; 234 235 if (persistentDir == null) { 236 queue = new MemoryMessageQueue(); 237 } else { 238 File dir = new File (persistentDir); 239 PersistentQueue pqueue = 240 new PersistentQueue(dir, persistentPrefix, persistentLogSize); 241 pqueue.open(); 242 queue = pqueue; 243 } 244 245 router = new Router(queue); 246 247 } catch (IOException ioe) { 248 JMSException jmsex = new JMSException ("Unable to initialize queue"); 249 jmsex.setLinkedException(ioe); 250 throw jmsex; 251 } 252 } 253 254 logger.exit("startup"); 255 } 256 257 261 public synchronized void shutdown() 262 { 263 logger.entry("shutdown"); 264 265 Set serverSet = serverMap.keySet(); 268 Set clientSet = clientMap.keySet(); 269 270 String [] servers = new String [serverSet.size()]; 271 String [] clients = new String [clientSet.size()]; 272 273 serverSet.toArray(servers); 274 clientSet.toArray(clients); 275 276 int i; 277 for (i=0; i < servers.length; ++i) { 278 stopServerTransport(servers[i]); 279 } 280 281 for (i=0; i < clients.length; ++i) { 282 stopClientTransport(clients[i]); 283 } 284 285 router.closeRouter(); 286 router = null; 287 288 logger.exit("shutdown"); 289 } 290 291 292 316 public synchronized void startServerTransport(String url) throws JMSException  317 { 318 logger.entry("startServerTransport", url); 319 320 if (! url.toLowerCase().startsWith("tcp")) { 322 throw new JMSException (Resources.getResourceString("PJMSE0004", url)); 323 } 324 else if (serverMap.containsKey(url)) { 325 throw new JMSException (Resources.getResourceString("PJMSE0002", url)); 326 } 327 else { 328 ServerTransport transport = new 329 com.presumo.jms.plugin.implementation.transport.tcp.ServerTransportImpl(); 330 transport.setURL(url); 331 transport.setRouter(router); 332 transport.start(); 333 serverMap.put(url, transport); 334 } 335 336 logger.exit("startServerTransport"); 337 } 338 339 342 public synchronized void stopServerTransport(String url) 343 { 344 logger.entry("stopServerTransport", url); 345 346 ServerTransport transport = (ServerTransport) serverMap.get(url); 347 348 if (transport != null) { 349 transport.close(); 350 serverMap.remove(url); 351 } 352 353 logger.exit("stopServerTransport"); 354 } 355 356 357 360 public synchronized void startClientTransport(String url) 361 throws JMSException  362 { 363 logger.entry("startClientTransport", url); 364 365 if (! url.toLowerCase().startsWith("tcp")) { 367 throw new JMSException (Resources.getResourceString("PJMSE0004", url)); 368 } 369 else if (clientMap.containsKey(url)) { 370 throw new JMSException (Resources.getResourceString("PJMSE0002", url)); 371 } 372 else { 373 374 String host = getHost(url); 375 int port = getPort(url); 376 377 try { 378 Transport transport = new 379 com.presumo.jms.plugin.implementation.transport.tcp.TransportImpl(host, port); 380 ConnectionListener cl = new ConnectionListener() { 381 public void connectionLost(RemoteSession session) { 382 } 384 }; 385 RemoteSession session = new RemoteSession(router, transport, cl); 386 session.start(); 387 388 clientMap.put(url, transport); 389 } catch (java.io.IOException ioe) { 390 JMSException jmsex = new JMSException ("Unable to connect to server " + host+":"+port); 391 jmsex.setLinkedException(ioe); 392 throw jmsex; 393 } 394 } 395 396 logger.exit("startClientTransport"); 397 } 398 399 402 public void stopClientTransport(String url) 403 { 404 logger.entry("stopClientTransport", url); 405 406 Transport transport = (Transport) clientMap.get(url); 407 408 if (transport != null) { 409 transport.close(); 410 clientMap.remove(url); 411 } 412 413 logger.exit("stopClientTransport"); 414 } 415 416 417 421 425 protected void finalize() throws Throwable  426 { 427 shutdown(); 428 super.finalize(); 429 } 430 431 435 private String getHost(String url) throws JMSException  436 { 437 logger.entry("getHost", url); 438 439 int loc = url.lastIndexOf('/'); 440 int loc2 = url.lastIndexOf(':'); 441 442 if (loc == -1 || loc2 == -1) { 443 throw new JMSException ("Malformed URL: " + url); 444 } 445 446 String retval = url.substring(loc+1, loc2); 447 logger.exit("getHost", retval); 448 return retval; 449 } 450 451 private int getPort(String url) throws JMSException  452 { 453 logger.entry("getPort", url); 454 455 int loc = url.lastIndexOf(':'); 456 if (loc == -1) { 457 throw new JMSException ("Malformed URL: " + url); 458 } 459 460 int retval = 0; 461 try { 462 retval = Integer.parseInt(url.substring(loc+1)); 463 } catch (NumberFormatException nfe) { 464 JMSException jmsex = new JMSException ("Malformed URL: " + url); 465 jmsex.setLinkedException(nfe); 466 throw jmsex; 467 } 468 469 logger.exit("getPort", new Integer (retval)); 470 return retval; 471 } 472 473 474 478 488 public static void main(String [] args) throws Exception  489 { 490 if (args.length > 1) { 491 System.err.println("Usage: JmsServer <server.properties>"); 492 System.exit(-1); 493 } 494 495 try { 496 Properties props = new Properties (); 500 if (args.length == 1) { 501 FileInputStream fis = null; 502 try { 503 fis = new FileInputStream (args[0]); 504 props.load(fis); 505 } finally { 506 if (fis != null) fis.close(); 507 } 508 } 509 510 String persistentDir = props.getProperty("PersistentDirectory"); 511 String serverTransports = props.getProperty("ServerTransports", 512 "tcp://localhost:2323"); 513 String clientTransports = props.getProperty("ClientTransports"); 514 515 516 JmsServer server = new JmsServer(persistentDir); 519 server.startup(); 520 521 if (serverTransports != null && serverTransports.length() > 0) { 522 StringTokenizer tokens = 523 new StringTokenizer (serverTransports.trim(), ";"); 524 while(tokens.hasMoreTokens()) { 525 String serverTransport = tokens.nextToken().trim(); 526 server.startServerTransport(serverTransport); 527 } 528 } 529 530 if (clientTransports != null && clientTransports.length() > 0) { 531 StringTokenizer tokens = 532 new StringTokenizer (clientTransports.trim(), ";"); 533 while(tokens.hasMoreTokens()) { 534 String clientTransport = tokens.nextToken().trim(); 535 server.startClientTransport(clientTransport); 536 } 537 } 538 539 System.out.println(Resources.getResourceString("STARTUP_COMPLETE")+"\n"); 540 541 542 BufferedReader input = new BufferedReader (new InputStreamReader (System.in)); 545 String choice = ""; 546 while(! choice.toLowerCase().equals("exit")) { 547 System.out.println(Resources.getResourceString("SHUTDOWN_INSTRUCTIONS")); 548 choice = input.readLine(); 549 } 550 551 server.shutdown(); 554 System.out.println(Resources.getResourceString("SHUTDOWN_COMPLETE")); 555 556 } catch (Throwable t) { 557 System.err.println("The following error occured while starting the server:"); 558 t.printStackTrace(); 559 System.exit(-1); 560 } 561 562 } 563 564 } 565 | Popular Tags |