1 45 package org.exolab.jms.messagemgr; 46 47 import java.io.File ; 48 import java.io.FilenameFilter ; 49 import java.sql.Connection ; 50 import java.util.Comparator ; 51 import java.util.HashMap ; 52 import java.util.Iterator ; 53 import java.util.LinkedList ; 54 import java.util.TreeSet ; 55 import java.util.Vector ; 56 57 import javax.jms.JMSException ; 58 import javax.transaction.xa.XAException ; 59 import javax.transaction.xa.XAResource ; 60 import javax.transaction.xa.Xid ; 61 62 import org.apache.commons.logging.Log; 63 import org.apache.commons.logging.LogFactory; 64 65 import org.exolab.jms.service.BasicService; 66 import org.exolab.jms.service.ServiceException; 67 import org.exolab.jms.service.ServiceState; 68 import org.exolab.jms.common.uuid.UUID; 69 import org.exolab.jms.message.MessageImpl; 70 import org.exolab.jms.persistence.DatabaseService; 71 import org.exolab.jms.persistence.PersistenceException; 72 import org.exolab.jms.persistence.SQLHelper; 73 import org.exolab.jms.tranlog.ExternalXid; 74 import org.exolab.jms.tranlog.StateTransactionLogEntry; 75 import org.exolab.jms.tranlog.TransactionLog; 76 import org.exolab.jms.tranlog.TransactionLogException; 77 import org.exolab.jms.tranlog.TransactionState; 78 import org.exolab.jms.tranlog.DataTransactionLogEntry; 79 80 81 112 public class ResourceManager 113 extends BasicService { 114 115 118 private final static String RM_SERVICE_NAME = "XAResourceManager"; 119 120 124 private final static String RM_LOGFILE_PREFIX = "ojmsrm"; 125 126 129 public final static String RM_LOGFILE_EXTENSION = ".log"; 130 131 139 public static final int GC_DISABLED = 0; 140 141 148 public static final int GC_SYNCHRONOUS = 1; 149 150 157 public static final int GC_ASYNCHRONOUS = 2; 158 159 162 private static ResourceManager _instance = null; 163 164 167 private static final Object _initializer = new Object (); 168 169 173 private int _logFileSize = 1000000; 174 175 179 private TreeSet _logs = new TreeSet (new TranLogFileComparator()); 180 181 185 private HashMap _tridToLogCache = new HashMap (); 186 187 190 private HashMap _logToTridCache = new HashMap (); 191 192 196 private final Object _cacheLock = new Object (); 197 198 203 private HashMap _activeTransactions = new HashMap (); 204 205 209 private String _logDirectory = "."; 210 211 214 private long _lastLogNumber = 0; 215 216 221 private int _txExpiryTime = 120; 222 223 229 private int _gcMode = GC_SYNCHRONOUS; 230 231 235 private String _rid = UUID.next(); 236 237 240 private static final Log _log = LogFactory.getLog(ResourceManager.class); 241 242 243 249 public static ResourceManager instance() 250 throws ResourceManagerException { 251 if (_instance == null) { 252 synchronized (_initializer) { 253 if (_instance == null) { 256 _instance = new ResourceManager(); 257 } 258 } 259 } 260 261 return _instance; 262 } 263 264 273 private ResourceManager() 274 throws ResourceManagerException { 275 276 this("./logs"); 278 } 279 280 288 public ResourceManager(String dir) 289 throws ResourceManagerException { 290 super(RM_SERVICE_NAME); 291 _logDirectory = dir; 292 File file = new File (dir); 293 if ((!file.exists()) || 294 (!file.isDirectory())) { 295 throw new ResourceManagerException(dir + 296 " does not exist or is not a directory"); 297 } 298 299 buildLogFileList(); 301 302 recover(); 304 } 305 306 313 public void setLogDirectory(String dir) 314 throws IllegalArgumentException { 315 if (!(new File (dir)).isDirectory()) { 316 throw new IllegalArgumentException (dir + " is not a directory"); 317 } else { 318 _logDirectory = dir; 319 } 320 } 321 322 327 public String getLogDirectory() { 328 return _logDirectory; 329 } 330 331 341 public void setLogFileSize(int size) { 342 _logFileSize = size; 343 } 344 345 350 public int getLogFileSize() { 351 return _logFileSize; 352 } 353 354 361 public boolean setGCMode(int mode) { 362 boolean result = false; 363 364 if ((mode == GC_DISABLED) || 365 (mode == GC_SYNCHRONOUS) || 366 (mode == GC_ASYNCHRONOUS)) { 367 _gcMode = mode; 368 result = true; 369 } 370 371 return result; 372 } 373 374 379 public int getGCMode() { 380 return _gcMode; 381 } 382 383 388 public boolean gcDisabled() { 389 return (_gcMode == GC_DISABLED) ? true : false; 390 } 391 392 402 public synchronized void logPublishedMessage(Xid xid, MessageImpl message) 403 throws TransactionLogException, ResourceManagerException, JMSException { 404 MessageMgr.instance().prepare(message); 405 logTransactionData(new ExternalXid(xid), _rid, 406 createPublishedMessageWrapper(message)); 407 } 408 409 421 public synchronized void logReceivedMessage(Xid xid, long id, MessageHandle handle) 422 throws TransactionLogException, ResourceManagerException { 423 logTransactionData(new ExternalXid(xid), _rid, 424 createReceivedMessageWrapper(id, handle)); 425 } 426 427 436 public synchronized void logTransactionState(Xid xid, TransactionState state) 437 throws TransactionLogException, ResourceManagerException { 438 ExternalXid txid = new ExternalXid(xid); 439 switch (state.getOrd()) { 440 case TransactionState.OPENED_ORD: 441 { 442 TransactionLog log = getCurrentTransactionLog(); 443 addTridLogEntry(txid, log); 444 log.logTransactionState(txid, _txExpiryTime * 1000, _rid, 445 state); 446 447 _activeTransactions.put(txid, new LinkedList ()); 449 } 450 break; 451 452 case TransactionState.PREPARED_ORD: 453 LinkedList list = (LinkedList ) _activeTransactions.get(txid); 455 if (list != null) { 456 list.add(state); 457 } else { 458 throw new ResourceManagerException("Trasaction " + txid + 459 " is not active."); 460 } 461 break; 462 463 case TransactionState.CLOSED_ORD: 464 { 465 TransactionLog log = getTransactionLog(txid); 466 log.logTransactionState(txid, _txExpiryTime * 1000, _rid, 467 state); 468 removeTridLogEntry(txid, log); 469 470 synchronized (_cacheLock) { 472 if ((_logToTridCache.get(log) == null) && 473 (!isCurrentTransactionLog(log))) { 474 log.close(); 475 476 if (_gcMode == GC_SYNCHRONOUS) { 479 try { 480 log.destroy(); 481 } catch (TransactionLogException exception) { 482 exception.printStackTrace(); 483 } 484 } 485 } 486 } 487 488 _activeTransactions.remove(txid); 491 } 492 break; 493 494 default: 495 throw new ResourceManagerException("Cannot process tx state " + 496 state); 497 } 498 } 499 500 509 synchronized void logTransactionData(ExternalXid txid, String rid, 510 Object data) 511 throws ResourceManagerException, TransactionLogException { 512 getTransactionLog(txid).logTransactionData(txid, _txExpiryTime * 1000, 513 rid, data); 514 515 LinkedList list = (LinkedList ) _activeTransactions.get(txid); 518 if (list != null) { 519 list.add(data); 520 } else { 521 throw new ResourceManagerException("Trasaction " + txid + 522 " is not active."); 523 } 524 } 525 526 532 public void garbageCollect() { 533 try { 534 int gcfiles = 0; 535 536 if (_logs.size() == 0) { 538 return; 539 } 540 541 TreeSet copy = null; 542 synchronized (_logs) { 543 copy = new TreeSet (_logs); 544 } 545 546 copy.remove(_logs.last()); 549 550 while (copy.size() > 0) { 552 TransactionLog log = (TransactionLog) copy.first(); 553 copy.remove(log); 554 if (log.canGarbageCollect()) { 555 log.destroy(); 557 558 synchronized (_logs) { 560 _logs.remove(log); 561 } 562 563 ++gcfiles; 565 } 566 } 567 568 _log.info("[RMGC] Collected " + gcfiles + " files."); 570 } catch (Exception exception) { 571 exception.printStackTrace(); 572 } 573 } 574 575 586 public synchronized void commit(Xid id, boolean onePhase) 587 throws XAException { 588 if (id == null) { 590 throw new XAException (XAException.XAER_NOTA); 591 } 592 593 ExternalXid xid = new ExternalXid(id); 595 596 if (!isTransactionActive(xid)) { 599 throw new XAException (XAException.XAER_PROTO); 600 } 601 602 Connection connection = null; 607 try { 608 connection = DatabaseService.getConnection(); 610 611 Object [] records = getTransactionRecords(xid, _rid); 615 for (int index = 0; index < records.length; index++) { 616 if (records[index] instanceof TransactionalObjectWrapper) { 617 TransactionalObjectWrapper wrapper = 618 (TransactionalObjectWrapper) records[index]; 619 if (wrapper.isPublishedMessage()) { 620 MessageMgr.instance().add(connection, 622 (MessageImpl) wrapper.getObject()); 623 624 } else if (wrapper.isReceivedMessage()) { 625 MessageHandle handle = ((ReceivedMessageWrapper) (wrapper)).getMessageHandle(); 628 if (handle.isPersistent()) { 629 handle.destroy(connection); 630 } else { 631 handle.destroy(); 632 } 633 } 634 } else { 635 } 637 } 638 connection.commit(); 639 } catch (PersistenceException exception) { 640 SQLHelper.rollback(connection); 641 throw new XAException ("Failed in ResourceManager.commit : " + 642 exception.toString()); 643 } catch (Exception exception) { 644 throw new XAException ("Failed in ResourceManager.commit : " + 645 exception.toString()); 646 } finally { 647 SQLHelper.close(connection); 648 649 try { 651 logTransactionState(xid, TransactionState.CLOSED); 652 } catch (Exception exception) { 653 throw new XAException ("Error processing commit : " + exception); 654 } 655 } 656 } 657 658 668 public synchronized void end(Xid id, int flags) 669 throws XAException { 670 if (id == null) { 672 throw new XAException (XAException.XAER_NOTA); 673 } 674 675 ExternalXid xid = new ExternalXid(id); 677 678 if ((flags != XAResource.TMSUSPEND) || 680 (flags != XAResource.TMSUCCESS) || 681 (flags != XAResource.TMFAIL)) { 682 throw new XAException (XAException.XAER_PROTO); 683 } 684 685 switch (flags) { 686 case XAResource.TMFAIL: 687 if (!isTransactionActive(xid)) { 689 throw new XAException (XAException.XAER_PROTO); 690 } 691 692 rollback(xid); 694 break; 695 696 case XAResource.TMSUSPEND: 697 if (!isTransactionActive(xid)) { 699 throw new XAException (XAException.XAER_PROTO); 700 } 701 break; 702 703 case XAResource.TMSUCCESS: 704 if (isTransactionActive(xid)) { 708 throw new XAException (XAException.XAER_PROTO); 709 } 710 break; 711 } 712 } 713 714 721 public synchronized void forget(Xid id) 722 throws XAException { 723 if (id == null) { 725 throw new XAException (XAException.XAER_NOTA); 726 } 727 728 ExternalXid xid = new ExternalXid(id); 730 731 if (!isTransactionActive(xid)) { 733 throw new XAException (XAException.XAER_PROTO); 734 } 735 736 rollback(id); 738 } 739 740 747 public synchronized int getTransactionTimeout() 748 throws XAException { 749 return _txExpiryTime; 750 } 751 752 760 public synchronized boolean isSameRM(XAResource xares) 761 throws XAException { 762 boolean result = false; 763 764 if ((xares == this) || 765 ((xares instanceof ResourceManager) && 766 (((ResourceManager) xares)._rid.equals(_rid)))) { 767 result = true; 768 } 769 770 return result; 771 } 772 773 781 public synchronized int prepare(Xid id) 782 throws XAException { 783 if (id == null) { 785 throw new XAException (XAException.XAER_NOTA); 786 } 787 788 ExternalXid xid = new ExternalXid(id); 790 791 if (!isTransactionActive(xid)) { 793 throw new XAException (XAException.XAER_PROTO); 794 } 795 796 799 try { 800 logTransactionState(xid, TransactionState.PREPARED); 801 } catch (Exception exception) { 802 throw new XAException ("Error processing prepare : " + exception); 803 } 804 805 return XAResource.XA_OK; 806 } 807 808 814 public synchronized Xid [] recover(int flag) 815 throws XAException { 816 817 Xid [] result = new Xid [0]; 818 819 if ((flag == XAResource.TMNOFLAGS) || 820 (flag == XAResource.TMSTARTRSCAN) || 821 (flag == XAResource.TMENDRSCAN)) { 822 LinkedList xids = new LinkedList (); 823 Iterator iter = _activeTransactions.keySet().iterator(); 824 while (iter.hasNext()) { 825 Xid xid = (Xid ) iter.next(); 826 LinkedList list = (LinkedList ) _activeTransactions.get(xid); 827 if (list.size() > 1) { 828 Object last = list.getLast(); 830 if ((last instanceof StateTransactionLogEntry) && 831 (((StateTransactionLogEntry) last).getState().isPrepared())) { 832 xids.add(xid); 833 } 834 } 835 836 } 837 result = (Xid []) xids.toArray(); 838 } 839 840 return result; 841 } 842 843 848 public synchronized void rollback(Xid id) 849 throws XAException { 850 if (id == null) { 852 throw new XAException (XAException.XAER_NOTA); 853 } 854 855 ExternalXid xid = new ExternalXid(id); 857 858 if (!isTransactionActive(xid)) { 860 throw new XAException (XAException.XAER_PROTO); 861 } 862 863 Connection connection = null; 867 try { 868 connection = DatabaseService.getConnection(); 870 871 Object [] records = getTransactionRecords(xid, _rid); 875 for (int index = 0; index < records.length; index++) { 876 if (records[index] instanceof TransactionalObjectWrapper) { 877 TransactionalObjectWrapper wrapper = 878 (TransactionalObjectWrapper) records[index]; 879 if (wrapper.isPublishedMessage()) { 880 } else if (wrapper.isReceivedMessage()) { 883 ReceivedMessageWrapper rmsg_wrapper = 884 (ReceivedMessageWrapper) wrapper; 885 MessageHandle handle = 886 (MessageHandle) rmsg_wrapper.getObject(); 887 888 DestinationManager mgr = DestinationManager.instance(); 889 DestinationCache cache = 890 mgr.getDestinationCache(handle.getDestination()); 891 cache.returnMessageHandle(handle); 892 } 893 } else { 894 } 896 } 897 898 connection.commit(); 899 } catch (PersistenceException exception) { 900 if (connection != null) { 901 try { 902 connection.rollback(); 903 } catch (Exception nested) { 904 } 906 } 907 throw new XAException ("Failed in ResourceManager.rollback : " + 908 exception.toString()); 909 } catch (Exception exception) { 910 throw new XAException ("Failed in ResourceManager.rollback : " + 911 exception.toString()); 912 } finally { 913 if (connection != null) { 914 try { 915 connection.close(); 916 } catch (Exception nested) { 917 } 919 } 920 921 try { 923 logTransactionState(xid, TransactionState.CLOSED); 924 } catch (Exception exception) { 925 throw new XAException ("Error processing rollback : " + exception); 926 } 927 } 928 } 929 930 937 public synchronized boolean setTransactionTimeout(int seconds) 938 throws XAException { 939 _txExpiryTime = seconds; 940 return true; 941 } 942 943 public synchronized void start(Xid id, int flags) 945 throws XAException { 946 947 if (id == null) { 949 throw new XAException (XAException.XAER_NOTA); 950 } 951 952 ExternalXid xid = new ExternalXid(id); 954 955 if ((flags != XAResource.TMNOFLAGS) || 957 (flags != XAResource.TMJOIN) || 958 (flags != XAResource.TMRESUME)) { 959 throw new XAException (XAException.XAER_PROTO); 960 } 961 962 switch (flags) { 963 case XAResource.TMNOFLAGS: 964 if (isTransactionActive(xid)) { 966 throw new XAException (XAException.XAER_DUPID); 967 } 968 969 try { 971 logTransactionState(xid, TransactionState.OPENED); 972 } catch (Exception exception) { 973 throw new XAException ("Error processing start : " + exception); 974 } 975 break; 976 977 case XAResource.TMJOIN: 978 case XAResource.TMRESUME: 979 if (!isTransactionActive(xid)) { 982 throw new XAException (XAException.XAER_PROTO); 983 } 984 break; 985 } 986 } 987 988 public void start() 990 throws ServiceException { 991 this.setState(ServiceState.RUNNING); 992 } 993 994 public void stop() 996 throws ServiceException { 997 this.setState(ServiceState.STOPPED); 998 } 999 1000 public void run() { 1002 } 1004 1005 1010 public String getResourceManagerId() { 1011 return _rid; 1012 } 1013 1014 1023 protected TransactionLog createNextTransactionLog() 1024 throws ResourceManagerException { 1025 TransactionLog newlog = null; 1026 1027 synchronized (_logs) { 1028 try { 1029 long last = 1; 1031 if (!_logs.isEmpty()) { 1032 last = getSequenceNumber(((TransactionLog) _logs.last()).getName()); 1033 } 1034 1035 String name = _logDirectory + System.getProperty("file.separator") + 1038 RM_LOGFILE_PREFIX + Long.toString(++last) + RM_LOGFILE_EXTENSION; 1039 1040 newlog = new TransactionLog(name, true); 1042 _logs.add(newlog); 1043 } catch (TransactionLogException exception) { 1044 throw new ResourceManagerException( 1045 "Error in createNextTransactionLog " + exception); 1046 } 1047 } 1048 1049 return newlog; 1050 } 1051 1052 1057 protected void buildLogFileList() { 1058 File dir = new File (_logDirectory); 1059 if ((!dir.exists()) || 1060 (!dir.isDirectory())) { 1061 throw new IllegalArgumentException (_logDirectory + 1062 " is not a directory"); 1063 } 1064 1065 try { 1066 File [] list = dir.listFiles(new FilenameFilter () { 1067 1068 public boolean accept(File dir, String name) { 1070 boolean result = false; 1071 1072 if ((name.startsWith(RM_LOGFILE_PREFIX)) && 1073 (name.endsWith(RM_LOGFILE_EXTENSION))) { 1074 result = true; 1075 } 1076 1077 return result; 1078 } 1079 }); 1080 1081 synchronized (_logs) { 1083 for (int index = 0; index < list.length; index++) { 1084 _logs.add(new TransactionLog(list[index].getPath(), false)); 1085 } 1086 } 1087 } catch (Exception exception) { 1088 exception.printStackTrace(); 1090 } 1091 1092 } 1093 1094 1100 private synchronized void recover() 1101 throws ResourceManagerException { 1102 try { 1103 if (!_logs.isEmpty()) { 1104 Iterator iter = _logs.iterator(); 1105 while (iter.hasNext()) { 1106 TransactionLog log = (TransactionLog) iter.next(); 1107 HashMap records = log.recover(); 1108 } 1109 } 1110 } catch (Exception exception) { 1111 throw new ResourceManagerException("Error in recover " + 1112 exception.toString()); 1113 } 1114 } 1115 1116 1124 private TransactionLog getTransactionLog(ExternalXid txid) 1125 throws TransactionLogException, ResourceManagerException { 1126 TransactionLog log = (TransactionLog) _tridToLogCache.get(txid); 1127 if (log == null) { 1128 log = getCurrentTransactionLog(); 1129 addTridLogEntry(txid, log); 1130 } 1131 1132 return log; 1133 } 1134 1135 1147 private TransactionLog getCurrentTransactionLog() 1148 throws TransactionLogException, ResourceManagerException { 1149 TransactionLog log = null; 1150 1151 synchronized (_logs) { 1152 if (_logs.size() > 0) { 1153 log = (TransactionLog) _logs.last(); 1154 } 1155 1156 if ((log == null) || 1157 (log.size() > _logFileSize)) { 1158 log = createNextTransactionLog(); 1159 } 1160 } 1161 1162 return log; 1163 } 1164 1165 1172 private void addTridLogEntry(ExternalXid trid, TransactionLog log) { 1173 synchronized (_cacheLock) { 1174 _tridToLogCache.put(trid, log); 1176 1177 Vector trids = (Vector ) _logToTridCache.get(log); 1179 if (trids == null) { 1180 trids = new Vector (); 1181 _logToTridCache.put(log, trids); 1182 } 1183 trids.addElement(trid); 1184 } 1185 } 1186 1187 1193 private boolean isCurrentTransactionLog(TransactionLog log) { 1194 boolean result = false; 1195 1196 if (_logs.size() > 0) { 1197 result = log.equals(_logs.last()); 1198 } 1199 1200 return result; 1201 } 1202 1203 1210 private void removeTridLogEntry(ExternalXid trid, TransactionLog log) { 1211 synchronized (_cacheLock) { 1212 1213 _tridToLogCache.remove(trid); 1215 1216 Vector trids = (Vector ) _logToTridCache.get(log); 1218 if (trids != null) { 1219 trids.remove(trid); 1220 if (trids.size() == 0) { 1221 _logToTridCache.remove(log); 1222 } 1223 } 1224 } 1225 } 1226 1227 1235 protected Object [] getTransactionRecords(ExternalXid xid, String rid) { 1236 Object [] records; 1237 1238 LinkedList list = (LinkedList ) _activeTransactions.get(xid); 1241 if (list != null) { 1242 records = list.toArray(); 1243 } else { 1244 records = new Object [0]; 1245 } 1246 1247 return records; 1248 } 1249 1250 1251 1259 protected long getSequenceNumber(String name) 1260 throws ResourceManagerException { 1261 int start = name.indexOf(RM_LOGFILE_PREFIX) + 1262 RM_LOGFILE_PREFIX.length(); 1263 int end = name.indexOf(RM_LOGFILE_EXTENSION); 1264 1265 try { 1267 return Long.parseLong(name.substring(start, end)); 1268 } catch (NumberFormatException exception) { 1269 throw new ResourceManagerException( 1270 "Invalid name assigned to resource manager file " + name); 1271 } 1272 } 1273 1274 1279 private synchronized boolean isTransactionActive(ExternalXid xid) { 1280 return _activeTransactions.containsKey(xid); 1281 } 1282 1283 1286 private void dumpRecovered(HashMap records) { 1287 Iterator iter = records.keySet().iterator(); 1288 while (iter.hasNext()) { 1289 ExternalXid txid = (ExternalXid) iter.next(); 1290 LinkedList list = (LinkedList ) records.get(txid); 1291 Iterator oiter = list.iterator(); 1292 while (oiter.hasNext()) { 1293 Object object = oiter.next(); 1294 if (object instanceof StateTransactionLogEntry) { 1295 System.err.println("Recovered [" + txid + "] Class " + 1296 object.getClass().getName() + " [" + 1297 ((StateTransactionLogEntry) object).getState().toString() + "]"); 1298 } else { 1299 System.err.println("Recovered [" + txid + "] Class " + 1300 object.getClass().getName()); 1301 } 1302 } 1303 } 1304 } 1305 1306 1307 1314 private PublishedMessageWrapper createPublishedMessageWrapper( 1315 MessageImpl message) { 1316 return new PublishedMessageWrapper(message); 1317 } 1318 1319 1327 private ReceivedMessageWrapper createReceivedMessageWrapper( 1328 long id, MessageHandle handle) { 1329 return new ReceivedMessageWrapper(id, handle); 1330 } 1331 1332 1337 private class TranLogFileComparator 1338 implements Comparator { 1339 1340 public int compare(Object o1, Object o2) { 1342 int result = -1; 1343 1344 try { 1345 if ((o1 instanceof TransactionLog) && 1346 (o2 instanceof TransactionLog)) { 1347 long seq1 = getSequenceNumber(((TransactionLog) o1).getName()); 1348 long seq2 = getSequenceNumber(((TransactionLog) o2).getName()); 1349 1350 if (seq1 > seq2) { 1351 result = 1; 1352 } else if (seq1 < seq2) { 1353 result = -1; 1354 } else { 1355 result = 0; 1356 } 1357 } else { 1358 throw new ClassCastException ("o1 = " + 1359 o1.getClass().getName() + " and o2 = " + 1360 o2.getClass().getName()); 1361 } 1362 } catch (Exception exception) { 1363 throw new RuntimeException ("Error in ResourceManager.compare " + 1364 exception.toString()); 1365 } 1366 1367 return result; 1368 } 1369 1370 public boolean equals(Object obj) { 1372 if (obj instanceof TranLogFileComparator) { 1373 return true; 1374 } 1375 1376 return false; 1377 } 1378 } 1379 1380 1381 1386 abstract private class TransactionalObjectWrapper { 1387 1388 1391 private Object _object; 1392 1393 1398 public TransactionalObjectWrapper(Object object) { 1399 _object = object; 1400 } 1401 1402 1409 public boolean isPublishedMessage() { 1410 return this instanceof PublishedMessageWrapper; 1411 } 1412 1413 1420 public boolean isReceivedMessage() { 1421 return this instanceof ReceivedMessageWrapper; 1422 } 1423 1424 1429 public Object getObject() { 1430 return _object; 1431 } 1432 1433 } 1434 1435 1436 1439 private class PublishedMessageWrapper extends TransactionalObjectWrapper { 1440 1441 1446 public PublishedMessageWrapper(MessageImpl message) { 1447 super(message); 1448 } 1449 1450 1455 public MessageImpl getMessage() { 1456 return (MessageImpl) super.getObject(); 1457 } 1458 } 1459 1460 1461 1464 private class ReceivedMessageWrapper extends TransactionalObjectWrapper { 1465 1466 1470 private long _consumerId; 1471 1472 1478 public ReceivedMessageWrapper(long id, MessageHandle handle) { 1479 super(handle); 1480 _consumerId = id; 1481 } 1482 1483 1488 public long getConsumerId() { 1489 return _consumerId; 1490 } 1491 1492 1497 public MessageHandle getMessageHandle() { 1498 return (MessageHandle) super.getObject(); 1499 } 1500 } 1501 1502} 1503 | Popular Tags |