1 package com.ubermq.jms.ui.viewer; 2 3 import java.util.*; 4 import javax.jms.*; 5 6 import EDU.oswego.cs.dl.util.concurrent.SynchronizedInt; 7 import com.ubermq.jms.client.impl.LocalMessage; 8 import javax.swing.table.TableModel ; 9 import javax.swing.tree.DefaultTreeModel ; 10 import javax.swing.tree.TreeModel ; 11 import javax.swing.tree.TreeNode ; 12 13 17 public class MessageController 18 implements MessageListener 19 { 20 private static final org.apache.log4j.Logger log = org.apache.log4j.Logger.getLogger(MessageController.class); 21 22 private final DefaultTreeModel model; 23 private final MessageTreeNode root; 24 25 private final ReadOnlyTableModel listModel; 26 private static final Object [] columns = new Object [] { 27 "Msg #", 28 "Received", 29 "Sender ID", 30 "Sequence", 31 "Type" 32 }; 33 private static final Class [] columnTypes = new Class [] { 34 Integer .class, 35 Date.class, 36 Long .class, 37 Integer .class, 38 Class .class 39 }; 40 public static final int COL_MSG_NUMBER = 0, 41 COL_TIMESTAMP = 1, 42 COL_SENDER_ID = 2, 43 COL_SEQUENCE = 3, 44 COL_TYPE = 4; 45 46 49 private javax.jms.Connection conn; 50 private Session session; 51 private Session pubSession; 52 private MessageConsumer sub; 53 54 57 private final Map allMessages; 58 59 62 public MessageController() 63 { 64 root = new MessageTreeNode("", "Topic Namespace"); 65 model = new DefaultTreeModel (root); 66 67 listModel = new ReadOnlyTableModel(); 68 listModel.setColumnIdentifiers(columns); 69 listModel.setColumnClasses(columnTypes); 70 71 allMessages = new HashMap(250); 72 } 73 74 81 public void connect(TopicConnectionFactory f, 82 String subscription) 83 throws JMSException 84 { 85 this.conn = f.createConnection(); 87 this.session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); 88 this.pubSession = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); 89 this.sub = session.createConsumer(session.createTopic(subscription), "", false); 90 sub.setMessageListener(this); 91 92 conn.start(); 93 } 94 95 98 public void disconnect() 99 { 100 try 101 { 102 sub.close(); 103 session.close(); 104 conn.close(); 105 } 106 catch (JMSException e) { 107 } 109 } 110 111 public TreeModel getTreeModel() {return model;} 112 public TableModel getTableModel() {return listModel;} 113 114 117 void clear() 118 { 119 allMessages.clear(); 120 listModel.setRowCount(0); 121 122 root.removeAllChildren(); 123 model.nodeStructureChanged(root); 124 } 125 126 133 void publish(String topic, String text) 134 throws JMSException 135 { 136 MessageProducer p = pubSession.createProducer(session.createTopic(topic)); 137 Message msg = pubSession.createTextMessage(text); 138 try 139 { 140 msg.setStringProperty("sent-from", java.net.InetAddress.getLocalHost().toString()); 141 } 142 catch (Exception e) {} 143 p.send(msg); 144 p.close(); 145 } 146 147 151 void populateList(TreeNode tn) 152 { 153 MessageTreeNode selection = (MessageTreeNode)tn; 154 155 Collection c = (Collection)allMessages.get(selection.getTopic()); 157 if (c == null) 158 c = Collections.EMPTY_LIST; 159 160 int n = c.size(); 162 Object [][] x = new Object [n][columns.length]; 163 Object [] user = new Object [n]; 164 165 Iterator iter = c.iterator(); 167 int i=0; 168 while (iter.hasNext() && 169 i < n) 170 { 171 try { 172 MessageHolder mh = (MessageHolder)iter.next(); 173 Message m = mh.getMessage(); 174 175 x[i][COL_MSG_NUMBER] = new Integer (mh.getMessageNumber()); 176 x[i][COL_TIMESTAMP] = new Date(mh.getTimestamp()); 177 178 if (m instanceof BytesMessage) { 179 x[i][COL_TYPE] = "Bytes"; 180 } else if (m instanceof ObjectMessage) { 181 x[i][COL_TYPE] = "Object"; 182 } else if (m instanceof StreamMessage) { 183 x[i][COL_TYPE] = "Stream"; 184 } else if (m instanceof MapMessage) { 185 x[i][COL_TYPE] = "Map"; 186 } else if (m instanceof TextMessage) { 187 x[i][COL_TYPE] = "Text"; 188 } else { 189 x[i][COL_TYPE] = "Message"; 190 } 191 192 try 193 { 194 LocalMessage lm = (LocalMessage)m; 195 x[i][COL_SENDER_ID] = new Long (lm.getSenderId()); 196 x[i][COL_SEQUENCE] = new Integer (lm.getSequence()); 197 198 user[i] = m; 200 } 201 catch (ClassCastException cce) 202 { 203 x[i][COL_SENDER_ID] = x[i][COL_SEQUENCE] = "?"; 204 } 205 } 206 catch (Exception e) { 207 log.error("",e); 208 } 209 210 i++; 212 } 213 214 listModel.setRows(x); 216 listModel.setUserData(user); 217 } 218 219 223 String getMessageDescription(int row) 224 { 225 Message m = (Message)listModel.getUserData(row); 226 if (m instanceof LocalMessage) { 227 return ((LocalMessage)m).toHtml(); 228 } else { 229 return m.toString(); 230 } 231 } 232 233 private String saveMessage(Message p0) 234 { 235 try 236 { 237 String topic = ((Topic)p0.getJMSDestination()).getTopicName(); 238 239 Collection c = (Collection)allMessages.get(topic); 240 if (c == null) { 241 c = new LinkedList(); 242 allMessages.put(topic, c); 243 } 244 c.add(new MessageHolder(p0)); 245 246 return topic; 247 } 248 catch (JMSException e) { 249 return "(empty)"; 250 } 251 } 252 253 public void onMessage(Message p0) 254 { 255 String topic = saveMessage(p0); 257 258 LinkedList l = new LinkedList(); 260 StringTokenizer st = new StringTokenizer(topic, "."); 261 StringBuffer fullTopic = new StringBuffer (); 262 while(st.hasMoreTokens()) 263 { 264 String partial = st.nextToken(); 265 266 if (fullTopic.length() > 0) { 268 fullTopic.append("."); 269 fullTopic.append(partial); 270 } else { 271 fullTopic.append(partial); 272 } 273 274 l.addLast(new MessageTreeNode(fullTopic.toString(), 276 partial)); 277 } 278 279 MessageTreeNode mine = root; 281 while(l.size() > 0 && 282 mine != null) 283 { 284 MessageTreeNode lookingFor = (MessageTreeNode)l.removeFirst(); 286 MessageTreeNode found = mine.findChild(lookingFor); 287 288 if (found != null) { 289 mine = found; 290 } else { 291 model.insertNodeInto(lookingFor, mine, 0); 293 mine = lookingFor; 294 } 295 } 296 297 mine.newMessage(); 299 this.model.nodeChanged(mine); 300 } 301 302 private static class MessageHolder 303 { 304 private Message m; 305 private long timestamp; 306 private int number; 307 308 private static final SynchronizedInt nextNumber = new SynchronizedInt(0); 309 310 private MessageHolder(Message m) 311 { 312 this.m = m; 313 this.timestamp = System.currentTimeMillis(); 314 this.number = nextNumber.increment(); 315 } 316 317 public Message getMessage() {return m;} 318 public long getTimestamp() {return timestamp;} 319 public int getMessageNumber() {return number;} 320 } 321 322 } 323 324 | Popular Tags |