KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > mr > kernel > services > queues > msmq > Queue


1 package org.mr.kernel.services.queues.msmq;
2
3
4 public class Queue
5 {
6   public Queue(String JavaDoc queueName)
7     throws MessageQueueException
8   {
9     _init(queueName, 0x03); // open with both SEND and RECEIVE access
10
}
11
12   public Queue(String JavaDoc queueName, int access)
13     throws MessageQueueException
14   {
15     _init(queueName, access);
16   }
17
18
19   void _init(String JavaDoc queueName, int access)
20     throws MessageQueueException
21   {
22     // the openQueue native method causes the _queueSlot to be set.
23
int rc = 0;
24     if (access == 0x01) // RECEIVE
25
{
26      rc= nativeOpenQueueForReceive(queueName);
27     }
28     else
29     if (access == 0x02) // SEND
30
{
31      rc= nativeOpenQueueForSend(queueName);
32     }
33     else
34     if (access == 0x03) // SEND+RECEIVE
35
{
36      rc= nativeOpenQueue(queueName);
37     }
38     else { rc= 0xC00E0006; /* MQ_INVALID_PARAMETER */ }
39
40     if (rc!=0) throw new MessageQueueException("Cannot open queue.", rc);
41
42     _name= queueName;
43     _formatName= "unknown";
44     _label= "need to set this";
45     _isTransactional= false; // TODO: get actual value in "openQueue"
46
}
47
48
49   public static Queue create(String JavaDoc queuePath, String JavaDoc queueLabel, boolean isTransactional)
50     throws MessageQueueException
51   {
52     int rc= nativeCreateQueue( queuePath, queueLabel, (isTransactional)?1:0);
53     if (rc!=0)
54       throw new MessageQueueException("Cannot create queue.", rc);
55     // DIRECT=OS ? or DIRECT=TCP ?
56
String JavaDoc a1= "OS";
57     char[] c= queuePath.toCharArray();
58     if ((c[0]>='1')
59     && (c[0]<='9')) a1= "TCP"; // assume ip address
60

61     Queue q= new Queue("DIRECT=" + a1 + ":" + queuePath);
62     q._name= queuePath;
63     // q._formatName=queueFormatName;
64
q._label=queueLabel;
65     q._isTransactional= isTransactional;
66     return q;
67   }
68
69
70   public static void delete(String JavaDoc queuePath)
71     throws MessageQueueException
72   {
73     int rc= nativeDeleteQueue( queuePath );
74     if (rc!=0)
75       throw new MessageQueueException("Cannot delete queue.", rc);
76   }
77
78
79   public void send(Message msg)
80     throws MessageQueueException
81   {
82     int rc= nativeSend(msg.getMessage(),
83                msg.getMessage().length,
84                msg.getLabel(),
85                msg.getCorrelationId(),
86                msg.getTransactionFlag());
87     if (rc!=0)
88       throw new MessageQueueException("Cannot send.", rc);
89   }
90
91
92   public void send(byte[] s)
93     throws MessageQueueException
94   {
95     int rc= nativeSend(s,
96                s.length,
97                "", // empty label
98
"", // empty correlationId
99
0 // outside any transaction
100
);
101     if (rc!=0)
102       throw new MessageQueueException("Cannot send.", rc);
103   }
104
105   // TODO: add sendMessage() methods for other types: byte[]? object?
106

107   private Message receiveEx(int timeout, int ReadOrPeek)
108     throws MessageQueueException
109   {
110     int rc = nativeReceive(timeout, ReadOrPeek);
111
112     if (rc!=0)
113       throw new MessageQueueException("Cannot receive.", rc);
114
115     return new Message(_lastMessageRetrieved_MessageBytes,
116                _lastMessageRetrieved_MessageLabel,
117                _lastMessageRetrieved_CorrelationId,
118                0);
119
120   }
121
122   public Message receive(int timeout)
123     throws MessageQueueException
124   {
125     return receiveEx(timeout, 1);
126   }
127
128   public Message receive()
129     throws MessageQueueException
130   {
131     return receiveEx(0,1); // infinite timeout
132
}
133
134   public Message peek()
135     throws MessageQueueException
136   {
137     return receiveEx(0,0); // infinite timeout
138
}
139
140   public Message peek(int timeout)
141     throws MessageQueueException
142   {
143     return receiveEx(timeout,0);
144   }
145
146   public void close()
147     throws MessageQueueException
148   {
149
150     int rc=nativeClose();
151     if (rc!=0)
152       throw new MessageQueueException("Cannot close.", rc);
153   }
154
155
156   // --------------------------------------------
157
// getters on the Queue properties
158
public String JavaDoc getName(){ return _name; }
159   public String JavaDoc getLabel(){ return _label; }
160   public String JavaDoc getFormatName(){ return _formatName; }
161   public boolean isTransactional(){ return _isTransactional; }
162
163
164   // --------------------------------------------
165
// native methods
166
private static native int nativeInit();
167   private static native int nativeCreateQueue(String JavaDoc queuePath, String JavaDoc queueLabel, int isTransactional);
168   private static native int nativeDeleteQueue(String JavaDoc queuePath);
169   private native int nativeOpenQueue(String JavaDoc queueString);
170   private native int nativeOpenQueueForSend(String JavaDoc queueString);
171   private native int nativeOpenQueueForReceive(String JavaDoc queueString);
172   private native int nativeReceive(int timeout, int ReadOrPeek);
173 // private native int nativeSend(String messageString, int length, String label, String correlationString, int transactionFlag);
174
private native int nativeSend(byte[] message, int length, String JavaDoc label, String JavaDoc correlationString, int transactionFlag);
175   private native int nativeClose();
176
177
178   // --------------------------------------------
179
// private members
180
int _queueSlot = 0;
181   String JavaDoc _name;
182   String JavaDoc _formatName;
183   String JavaDoc _label;
184   boolean _isTransactional;
185
186 // String _lastMessageRetrieved_MessageString;
187
byte[] _lastMessageRetrieved_MessageBytes;
188   String JavaDoc _lastMessageRetrieved_MessageLabel;
189   String JavaDoc _lastMessageRetrieved_CorrelationId;
190
191
192   // --------------------------------------------
193
// static initializer
194
static {
195     System.loadLibrary("JNIMSMQ");
196     nativeInit();
197   }
198 }
199
Popular Tags