1 7 package org.jboss.messaging.channel.plugins.handler; 8 9 import org.jboss.messaging.interfaces.*; 10 import org.jboss.messaging.interfaces.Consumer; 11 import org.jboss.messaging.interfaces.MessageReference; 12 13 19 public abstract class AbstractChannelHandler implements ChannelHandler 20 { 21 23 25 26 protected MessageSet messages; 27 28 30 32 37 public AbstractChannelHandler(MessageSet messages) 38 { 39 this.messages = messages; 40 messages.setConsumer(this); 41 } 42 43 45 47 public boolean accepts(MessageReference reference, boolean active) 48 { 49 return true; 51 } 52 53 public void onMessage(MessageReference reference) 54 { 55 Consumer consumer; 56 messages.lock(); 57 try 58 { 59 consumer = findConsumer(reference); 60 } 61 finally 62 { 63 messages.unlock(); 64 } 65 66 if (consumer != null) 67 consumer.onMessage(reference); 68 } 69 70 72 public void addMessage(MessageReference reference) 73 { 74 Consumer consumer; 75 messages.lock(); 76 try 77 { 78 consumer = findConsumer(reference); 79 if (consumer == null) 80 messages.add(reference); 81 } 82 finally 83 { 84 messages.unlock(); 85 } 86 87 if (consumer != null) 88 consumer.onMessage(reference); 89 } 90 91 public MessageReference removeMessage(Consumer consumer) 92 { 93 messages.lock(); 94 try 95 { 96 return messages.remove(consumer); 97 } 98 finally 99 { 100 messages.unlock(); 101 } 102 } 103 104 public void waitMessage(Consumer consumer, long wait) 105 { 106 MessageReference message; 107 messages.lock(); 108 try 109 { 110 message = messages.remove(consumer); 111 if (message == null) 113 addConsumer(consumer, wait); 114 } 115 finally 116 { 117 messages.unlock(); 118 } 119 120 if (message != null) 122 consumer.onMessage(message); 123 } 124 125 public void stopWaitMessage(Consumer consumer) 126 { 127 messages.lock(); 128 try 129 { 130 removeConsumer(consumer); 131 } 132 finally 133 { 134 messages.unlock(); 135 } 136 } 137 138 140 146 protected abstract void addConsumer(Consumer consumer, long wait); 147 148 153 protected abstract void removeConsumer(Consumer consumer); 154 155 161 protected abstract Consumer findConsumer(MessageReference reference); 162 163 165 167 } 169 | Popular Tags |