1 4 package org.jfox.jms.connector; 5 6 import java.util.HashMap ; 7 import java.util.Iterator ; 8 import java.util.Map ; 9 10 import org.jfox.jms.message.JMSMessage; 11 12 15 16 public class SessionMeta implements Runnable { 17 private String sessionId; 18 private ConnectionMeta connectionMeta; 19 private Map <String , ConsumerMeta> consumers = new HashMap <String , ConsumerMeta>(); 20 21 private boolean closed = false; 22 23 public SessionMeta(final String sessionId, ConnectionMeta connMeta) { 24 this.sessionId = sessionId; 25 this.connectionMeta = connMeta; 26 new Thread (this, "Session - " + sessionId + " Async Sender").start(); 28 } 29 30 public String getConnectionId() { 31 return connectionMeta.getConnectionId(); 32 } 33 34 public ConsumerMeta registerCunsumer(String consumerId) { 35 ConsumerMeta consumerMeta = new ConsumerMeta(consumerId, this); 36 consumers.put(consumerId, consumerMeta); 37 return consumerMeta; 38 } 39 40 public ConsumerMeta getConsumer(String consumerId) { 41 return consumers.get(consumerId); 42 } 43 44 public void unregisterConsumer(String consumerId) { 45 consumers.remove(consumerId); 46 } 47 48 public String getSessionId() { 49 return sessionId; 50 } 51 52 public void close() { 53 closed = true; 54 connectionMeta.unregisterSession(sessionId); 55 synchronized (this) { 56 notifyAll(); 57 } 58 } 59 60 public void run() { 61 62 while (!closed) { 63 try { 64 if (beWaiting()) { 65 synchronized (this) { 66 wait(); 67 } 68 if (closed) break; 69 } 70 for (Iterator it = consumers.entrySet().iterator(); it.hasNext();) { 71 Map.Entry <String , ConsumerMeta> entry = (Map.Entry <String , ConsumerMeta>) it.next(); 72 String consumerId = entry.getKey(); 73 ConsumerMeta meta = entry.getValue(); 74 if (meta.isAsync()) { 75 JMSMessage msg = meta.popMessage(); 76 if (msg != null) { 77 connectionMeta.getJMSConnection().onMessage(sessionId, consumerId, msg); 78 } 79 } 80 } 81 } catch (Exception e) { 82 e.printStackTrace(); 83 } 84 } 85 86 } 87 88 private boolean beWaiting() { 89 if (!connectionMeta.isStarted() || consumers.isEmpty()) { 90 return true; 91 } else { 92 for (ConsumerMeta meta : consumers.values()) { 93 if (meta.isAsync() && meta.hasMessage()) { 94 return false; 95 } 96 } 97 return true; 98 } 99 } 100 101 102 public static void main(String [] args) { 103 104 } 105 } 106 107 | Popular Tags |