1 package org.mr.kernel.services.queues.msmq; 2 3 4 public class Queue 5 { 6 public Queue(String queueName) 7 throws MessageQueueException 8 { 9 _init(queueName, 0x03); } 11 12 public Queue(String queueName, int access) 13 throws MessageQueueException 14 { 15 _init(queueName, access); 16 } 17 18 19 void _init(String queueName, int access) 20 throws MessageQueueException 21 { 22 int rc = 0; 24 if (access == 0x01) { 26 rc= nativeOpenQueueForReceive(queueName); 27 } 28 else 29 if (access == 0x02) { 31 rc= nativeOpenQueueForSend(queueName); 32 } 33 else 34 if (access == 0x03) { 36 rc= nativeOpenQueue(queueName); 37 } 38 else { rc= 0xC00E0006; } 39 40 if (rc!=0) throw new MessageQueueException("Cannot open queue.", rc); 41 42 _name= queueName; 43 _formatName= "unknown"; 44 _label= "need to set this"; 45 _isTransactional= false; } 47 48 49 public static Queue create(String queuePath, String queueLabel, boolean isTransactional) 50 throws MessageQueueException 51 { 52 int rc= nativeCreateQueue( queuePath, queueLabel, (isTransactional)?1:0); 53 if (rc!=0) 54 throw new MessageQueueException("Cannot create queue.", rc); 55 String a1= "OS"; 57 char[] c= queuePath.toCharArray(); 58 if ((c[0]>='1') 59 && (c[0]<='9')) a1= "TCP"; 61 Queue q= new Queue("DIRECT=" + a1 + ":" + queuePath); 62 q._name= queuePath; 63 q._label=queueLabel; 65 q._isTransactional= isTransactional; 66 return q; 67 } 68 69 70 public static void delete(String queuePath) 71 throws MessageQueueException 72 { 73 int rc= nativeDeleteQueue( queuePath ); 74 if (rc!=0) 75 throw new MessageQueueException("Cannot delete queue.", rc); 76 } 77 78 79 public void send(Message msg) 80 throws MessageQueueException 81 { 82 int rc= nativeSend(msg.getMessage(), 83 msg.getMessage().length, 84 msg.getLabel(), 85 msg.getCorrelationId(), 86 msg.getTransactionFlag()); 87 if (rc!=0) 88 throw new MessageQueueException("Cannot send.", rc); 89 } 90 91 92 public void send(byte[] s) 93 throws MessageQueueException 94 { 95 int rc= nativeSend(s, 96 s.length, 97 "", "", 0 ); 101 if (rc!=0) 102 throw new MessageQueueException("Cannot send.", rc); 103 } 104 105 107 private Message receiveEx(int timeout, int ReadOrPeek) 108 throws MessageQueueException 109 { 110 int rc = nativeReceive(timeout, ReadOrPeek); 111 112 if (rc!=0) 113 throw new MessageQueueException("Cannot receive.", rc); 114 115 return new Message(_lastMessageRetrieved_MessageBytes, 116 _lastMessageRetrieved_MessageLabel, 117 _lastMessageRetrieved_CorrelationId, 118 0); 119 120 } 121 122 public Message receive(int timeout) 123 throws MessageQueueException 124 { 125 return receiveEx(timeout, 1); 126 } 127 128 public Message receive() 129 throws MessageQueueException 130 { 131 return receiveEx(0,1); } 133 134 public Message peek() 135 throws MessageQueueException 136 { 137 return receiveEx(0,0); } 139 140 public Message peek(int timeout) 141 throws MessageQueueException 142 { 143 return receiveEx(timeout,0); 144 } 145 146 public void close() 147 throws MessageQueueException 148 { 149 150 int rc=nativeClose(); 151 if (rc!=0) 152 throw new MessageQueueException("Cannot close.", rc); 153 } 154 155 156 public String getName(){ return _name; } 159 public String getLabel(){ return _label; } 160 public String getFormatName(){ return _formatName; } 161 public boolean isTransactional(){ return _isTransactional; } 162 163 164 private static native int nativeInit(); 167 private static native int nativeCreateQueue(String queuePath, String queueLabel, int isTransactional); 168 private static native int nativeDeleteQueue(String queuePath); 169 private native int nativeOpenQueue(String queueString); 170 private native int nativeOpenQueueForSend(String queueString); 171 private native int nativeOpenQueueForReceive(String queueString); 172 private native int nativeReceive(int timeout, int ReadOrPeek); 173 private native int nativeSend(byte[] message, int length, String label, String correlationString, int transactionFlag); 175 private native int nativeClose(); 176 177 178 int _queueSlot = 0; 181 String _name; 182 String _formatName; 183 String _label; 184 boolean _isTransactional; 185 186 byte[] _lastMessageRetrieved_MessageBytes; 188 String _lastMessageRetrieved_MessageLabel; 189 String _lastMessageRetrieved_CorrelationId; 190 191 192 static { 195 System.loadLibrary("JNIMSMQ"); 196 nativeInit(); 197 } 198 } 199 | Popular Tags |