1 package com.ubermq.jms.server; 2 import java.io.*; 3 import java.lang.reflect.*; 4 import java.net.*; 5 import java.nio.channels.*; 6 import java.util.*; 7 8 import org.apache.log4j.*; 9 10 import com.ubermq.jms.client.*; 11 import com.ubermq.jms.client.unicast.*; 12 import com.ubermq.jms.common.datagram.*; 13 import com.ubermq.jms.common.datagram.impl.*; 14 import com.ubermq.jms.server.cluster.*; 15 import com.ubermq.jms.server.journal.*; 16 import com.ubermq.jms.server.journal.impl.*; 17 import com.ubermq.jms.server.proc.*; 18 import com.ubermq.jms.server.ssl.*; 19 import com.ubermq.kernel.*; 20 21 41 public class MessageServer extends KernelBasedServer implements Runnable , PipeEndpoint 42 { 43 private static final Logger log = Logger.getLogger(MessageServer.class); 44 private boolean started = false; 45 46 private IMessageProcessor datagramProcessor; 48 private Set protocols = new LinkedHashSet(); 49 50 private ReadWriteTransformThread[] read, write; 52 53 private static final String DEFAULT_RW_THREAD_COUNT = 54 String.valueOf(2 * Runtime.getRuntime().availableProcessors()); 55 private static final int RW_THREAD_COUNT = 56 Integer 57 .valueOf(Configurator.getProperty(ServerConfig.RW_THREAD_COUNT, DEFAULT_RW_THREAD_COUNT)) 58 .intValue(); 59 60 private static final String DATAGRAM_FACTORY_CLASS = 62 Configurator.getProperty(ServerConfig.DATAGRAM_FACTORY_CLASS, ServerDatagramFactory.class.getName()); 63 private static final String DATAGRAM_INSTANCE_METHOD = "getInstance"; 64 65 private ClusterMembership clusterMembership; 67 68 public MessageServer(String [] args) 69 { 70 super(args.length > 0 ? args[0] : null); 71 } 72 73 public MessageServer(Properties props) 74 { 75 super(props); 76 } 77 78 protected void init() 79 { 80 try 81 { 82 ISettingsRepository fj = new BinarySettingsRepository(); 84 85 read = new ReadWriteTransformThread[RW_THREAD_COUNT]; 87 write = new ReadWriteTransformThread[RW_THREAD_COUNT]; 88 log.debug("Creating read/write pool of size " + RW_THREAD_COUNT); 89 for (int i = 0; i < RW_THREAD_COUNT; i++) 90 { 91 read[i] = new ReadWriteTransformThread(SelectionKey.OP_READ); 92 read[i].start(); 93 94 write[i] = new ReadWriteTransformThread(SelectionKey.OP_WRITE); 95 write[i].start(); 96 } 97 98 this.datagramProcessor = createDatagramProc(fj); 100 101 if (Boolean 103 .valueOf(Configurator.getProperty(ServerConfig.CLUSTER_ENABLE, "false")) 104 .booleanValue()) 105 { 106 try 107 { 108 Class clusterImpl = 109 Class.forName( 110 Configurator.getProperty( 111 ServerConfig.CLUSTER_IMPLEMENTATION, 112 JGroupsClusterMembership.class.getName())); 113 this.clusterMembership = (ClusterMembership)clusterImpl.newInstance(); 114 115 clusterMembership.join(new PipeConnectionFactory(this)); 117 } 118 catch (ClassCastException cce) 119 { 120 log.fatal( 121 "Your cluster provider must implement " + ClusterMembership.class.getName(), 122 cce); 123 } 124 } 125 126 } 127 catch (Exception x) 128 { 129 log.fatal("Could not initialize the server", x); 130 throw new IllegalStateException (x.getMessage()); 131 } 132 } 133 134 139 public void resetProtocols() 140 { 141 protocols.clear(); 142 } 143 144 150 public void add(Protocol p) 151 { 152 protocols.add(p); 153 } 154 155 161 public void addStandardProtocols() 162 { 163 add(new DefaultProtocol(getDatagramFactory(), DefaultProtocol.getConfiguredBindAddress())); 165 add(new AdminProtocol()); 166 add(new SSLProtocol(getDatagramFactory(), this, SSLProtocol.getConfiguredBindAddress())); 167 } 168 169 174 public Set getProtocols() 175 { 176 return protocols; 177 } 178 179 184 protected URI exec() 185 { 186 URI serviceURI = null; 187 188 try 189 { 190 Iterator iter = protocols.iterator(); 192 while (iter.hasNext()) 193 { 194 Protocol p = (Protocol)iter.next(); 195 if (p.isEnabled()) 196 { 197 p.start(datagramProcessor, new IConnectionInfo.ConnectionAcceptor() 198 { 199 public void acceptIncomingConnection(IConnectionInfo incoming) 200 { 201 int i = new Random().nextInt(read.length); 202 write[i].register((ConnectionInfo)incoming, true); 203 read[i].register((ConnectionInfo)incoming, true); 204 } 205 }); 206 log.info("Protocol " + p.toString() + " started"); 207 208 if (serviceURI == null) 209 serviceURI = p.getServiceURI(); 210 } 211 } 212 213 started = true; 215 } 216 catch (Exception x) 217 { 218 log.fatal("Initialization failed", x); 219 } 220 221 if (datagramProcessor instanceof DatagramProc && serviceURI != null) 223 { 224 ((DatagramProc)datagramProcessor).setServerName(serviceURI.toString()); 225 } 226 return serviceURI; 227 } 228 229 233 public void stop() throws InterruptedException 234 { 235 for (int i = 0; i < read.length; i++) 236 { 237 read[i].interrupt(); 238 } 239 for (int i = 0; i < write.length; i++) 240 { 241 write[i].interrupt(); 242 } 243 244 Iterator iter = protocols.iterator(); 246 while (iter.hasNext()) 247 { 248 Protocol p = (Protocol)iter.next(); 249 p.stop(); 250 } 251 } 252 253 protected IMessageProcessor createDatagramProc(ISettingsRepository settings) throws IOException 254 { 255 DatagramProc dp = new DatagramProc(settings, getDatagramFactoryHolder()); 256 return dp; 257 } 258 259 264 protected IMessageProcessor getDatagramProc() 265 { 266 return datagramProcessor; 267 } 268 269 283 protected DatagramFactoryHolder getDatagramFactoryHolder() 284 { 285 return new DatagramFactoryHolder(getDatagramFactory(), ServerDatagramFactory.getInstance()); 286 } 287 288 296 public DatagramFactoryHolder getClientDatagramFactoryHolder() 297 { 298 return DatagramFactory.getHolder(); 299 } 300 301 309 protected ClusterMembership getClusterMembership() 310 { 311 return clusterMembership; 312 } 313 314 330 public PipeConnectionInfo connectPipes(Pipe upstream, Pipe downstream, IDatagramFactory df) throws java.io.IOException 331 { 332 return doConnectPipes(upstream, downstream, df, datagramProcessor); 333 } 334 335 protected PipeConnectionInfo doConnectPipes( 336 Pipe upstream, 337 Pipe downstream, 338 IDatagramFactory df, 339 IMessageProcessor dp) 340 throws IOException 341 { 342 PipeConnectionInfo ci = new PipeConnectionInfo(upstream.source(), downstream.sink(), df, dp); 344 dp.accept(ci); 345 write[0].register(ci, true); 346 read[0].register(ci, true); 347 return ci; 348 } 349 350 360 public static void main(String [] args) 361 { 362 final MessageServer s = new MessageServer(args); 363 s.addStandardProtocols(); 364 365 s.run(); 367 log.info( 368 "UberMQ " 369 + com.ubermq.jms.client.impl.Connection.UBERMQ_PROVIDER_VERSION 370 + " running at " 371 + s.getServiceUrl()); 372 } 373 374 384 IDatagramFactory getDatagramFactory() 385 { 386 Class clazz = null; 387 try 388 { 389 clazz = Class.forName(DATAGRAM_FACTORY_CLASS); 390 return (IDatagramFactory)clazz.newInstance(); 391 } 392 catch (ClassNotFoundException e) 393 { 394 } 396 catch (ClassCastException e) 397 { 398 } 400 catch (Exception e) 401 { 402 try 403 { 404 Method getInstance = clazz.getMethod(DATAGRAM_INSTANCE_METHOD, null); 405 return (IDatagramFactory)getInstance.invoke(null, null); 406 } 407 catch (Exception e2) 408 { 409 log.error("", e2); 411 } 412 } 413 414 return DatagramFactory.getInstance(); 415 } 416 } 417 | Popular Tags |