KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > mr > core > groups > GroupReactor


1 /*
2  * Copyright 2002 by
3  * <a HREF="http://www.coridan.com">Coridan</a>
4  * <a HREF="mailto: support@coridan.com ">support@coridan.com</a>
5  *
6  * The contents of this file are subject to the Mozilla Public License Version
7  * 1.1 (the "License"); you may not use this file except in compliance with the
8  * License. You may obtain a copy of the License at
9  * http://www.mozilla.org/MPL/
10  *
11  * Software distributed under the License is distributed on an "AS IS" basis,
12  * WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License
13  * for the specific language governing rights and limitations under the
14  * License.
15  *
16  * The Original Code is "MantaRay" (TM).
17  *
18  * The Initial Developer of the Original Code is Amir Shevat.
19  * Portions created by the Initial Developer are Copyright (C) 2006
20  * Coridan Inc. All Rights Reserved.
21  *
22  * Contributor(s): all the names of the contributors are added in the source
23  * code where applicable.
24  *
25  * Alternatively, the contents of this file may be used under the terms of the
26  * LGPL license (the "GNU LESSER GENERAL PUBLIC LICENSE"), in which case the
27  * provisions of LGPL are applicable instead of those above. If you wish to
28  * allow use of your version of this file only under the terms of the LGPL
29  * License and not to allow others to use your version of this file under
30  * the MPL, indicate your decision by deleting the provisions above and
31  * replace them with the notice and other provisions required by the LGPL.
32  * If you do not delete the provisions above, a recipient may use your version
33  * of this file under either the MPL or the GNU LESSER GENERAL PUBLIC LICENSE.
34  
35  *
36  * This library is free software; you can redistribute it and/or modify it
37  * under the terms of the MPL as stated above or under the terms of the GNU
38  * Lesser General Public License as published by the Free Software Foundation;
39  * either version 2.1 of the License, or any later version.
40  *
41  * This library is distributed in the hope that it will be useful, but WITHOUT
42  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
43  * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
44  * License for more details.
45  */

46 package org.mr.core.groups;
47
48 import java.io.IOException JavaDoc;
49 import java.net.DatagramPacket JavaDoc;
50 import java.net.MulticastSocket JavaDoc;
51 import java.nio.ByteBuffer JavaDoc;
52 import java.util.ArrayList JavaDoc;
53 import java.util.HashMap JavaDoc;
54 import java.util.List JavaDoc;
55
56 import org.apache.commons.logging.Log;
57 import org.apache.commons.logging.LogFactory;
58 import org.mr.MantaAgent;
59 import org.mr.core.protocol.MantaBusMessage;
60 import org.mr.core.protocol.MessageTransformer;
61
62 /**
63  * Converts the receive behavior to push behavior, pushes messages to the listeners
64  * @author Amir Shevat
65  *
66  */

67 public class GroupReactor extends Thread JavaDoc {
68 // this map will hold list of listeners per subject
69
HashMap JavaDoc subjectListenerMap = new HashMap JavaDoc();
70     // the socket to pull
71
MulticastSocket JavaDoc socket ;
72     
73     boolean go = true;
74     boolean logEveryMessage = false;
75     byte[] buf = new byte[100000];
76     // the group key for this socket
77
GroupKey key;
78     
79     private Log log;
80     
81     GroupReactor(MulticastSocket JavaDoc s, GroupKey key){
82         log=LogFactory.getLog("GroupReactor");
83         this.socket = s;
84         this.key = key;
85         this.setName("GroupReactor");
86         logEveryMessage = MantaAgent.getInstance().getSingletonRepository().getConfigManager().getBooleanProperty("multicast.log", false);
87         
88     }
89     
90     public void setSocket(MulticastSocket JavaDoc s) {
91         this.socket = s;
92     }
93
94     public void run(){
95         while(go){
96             DatagramPacket JavaDoc recv = new DatagramPacket JavaDoc(buf, buf.length);
97             try {
98                 socket.receive(recv);
99                 ByteBuffer JavaDoc buff = ByteBuffer.wrap(recv.getData() ,0,recv.getLength() );
100             
101                 MantaBusMessage message = MessageTransformer.fromBuffer(buff);
102                 if(log.isDebugEnabled() && logEveryMessage){
103                     log.debug( "Got group message "+message);
104                 }
105                 String JavaDoc sender = null;
106                 String JavaDoc subject = null;
107                 List JavaDoc listeners =null;
108                 if(message != null && (subject=(String JavaDoc)message.getHeader(MutlicastGroupManager.GRUOP_SUBJECT_KEY )) != null) {
109                     listeners=(List JavaDoc)subjectListenerMap.get(subject);
110                     if(listeners == null) {
111                         if(log.isDebugEnabled()){
112                             log.debug( "Received a messages tagged with subject=" + subject + ", but there is no registration for that subject. Will drop message.");
113                         }
114                         continue;
115                     }
116                     sender = message.getHeader(MutlicastGroupManager.GRUOP_SENDER_KEY);
117                     for (int i = 0; i < listeners.size(); i++) {
118                         ((GroupMessageListener)listeners.get(i)).onMessage(key, subject, message);
119                     }
120                 }
121                 
122                 
123                 
124             } catch (IOException JavaDoc e) {
125                 log.error("Problem while getting group message",e);
126             }
127             
128         }
129     }//run
130

131     /**
132      * here is where the miltiplexing is done over the jchannel
133      * @param subject the multiplex key
134      * @param mlistener the message listener to this subject on this channel
135      */

136     public synchronized void registerListenerToSubject(String JavaDoc subject, GroupMessageListener mlistener) {
137         List JavaDoc listeners =(List JavaDoc) subjectListenerMap.get(subject);
138         if(listeners == null){
139             listeners = new ArrayList JavaDoc();
140             subjectListenerMap.put(subject ,listeners );
141         }
142         listeners.add(mlistener);
143         
144     }
145
146     /**
147      * removes the listener from the subject
148      * @param subject the key is listener was registered on
149      * @param mlistener the listener object
150      */

151     public synchronized void unregisterListenerToSubject(String JavaDoc subject, GroupMessageListener mlistener) {
152         List JavaDoc listeners =(List JavaDoc) subjectListenerMap.get(subject);
153         if(listeners != null)
154             listeners.remove(mlistener);
155         
156         
157     }
158     
159
160 }
161
Popular Tags