1 6 7 package com.quikj.application.web.talk.plugin; 8 9 import java.net.*; 10 import java.util.*; 11 import java.io.*; 12 import com.quikj.server.framework.*; 13 import com.quikj.server.web.*; 14 import com.quikj.application.web.talk.messaging.*; 15 16 20 public class ConferenceBridge extends AceThread implements EndPointInterface 21 { 22 private String identifier; 23 private static int counter = 0; 24 private static Object identifierLock = new Object (); 25 private static String hostName = null; 26 27 private LinkedList endPointList = new LinkedList(); 28 private long sessionId = -1; 29 30 private HashMap keyValuePair = new HashMap(); 31 32 33 public ConferenceBridge(String name) 34 throws IOException 35 { 36 super(name); 37 38 if (hostName == null) 39 { 40 try 41 { 42 hostName = InetAddress.getLocalHost().getHostName(); 43 } 44 catch (UnknownHostException ex) 45 { 46 hostName = "unknown"; 47 } 48 } 49 50 synchronized(identifierLock) 51 { 52 identifier = hostName + ":conference:" + (new Date()).getTime() + 53 ":" + counter++ ; 54 } 55 56 } 57 58 public void dispose() 59 { 60 super.dispose(); 61 } 62 63 public void run() 64 { 65 while (true) 66 { 67 AceMessageInterface message = waitMessage(); 68 if (message == null) 69 { 70 AceLogger.Instance().log(AceLogger.ERROR, AceLogger.SYSTEM_LOG, 72 getName() 73 + "- ConferenceBridge.run() -- A null message was received while waiting for a message - " 74 + getErrorMessage()); 75 break; 76 } 77 78 if ((message instanceof AceSignalMessage) == true) 79 { 80 82 break; 90 } 91 else if ((message instanceof MessageEvent) == true) 92 { 93 processMessageEvent((MessageEvent)message); 94 } 95 else if ((message instanceof InformationEvent) == true) 96 { 97 processInformationEvent((InformationEvent)message); 98 } 99 else 100 { 101 AceLogger.Instance().log(AceLogger.WARNING, AceLogger.SYSTEM_LOG, 103 getName() 104 + "- ConferenceBridge.run() -- An unexpected event is received : " 105 + message.messageType()); 106 } 107 } 108 109 dispose(); 110 } 111 112 public boolean sendEvent(AceMessageInterface message) 113 { 114 return sendMessage(message); 115 } 116 117 public boolean addEndPoint(EndPointInterface ep) 118 { 119 synchronized (endPointList) 120 { 121 InformationEvent ev = new InformationEvent(); 123 ev.setFrom(this); 124 ev.setRequest(true); 125 126 if (ep.sendEvent(ev) == false) 127 { 128 AceLogger.Instance().log(AceLogger.ERROR, AceLogger.SYSTEM_LOG, 130 Thread.currentThread().getName() 131 + "- ConferenceBridge.addEndPoint() -- Error sending information event to the client " 132 + ep); 133 134 135 } 137 138 return endPointList.add(new ConferencedEndPoints(ep)); 139 } 140 } 141 142 public void removeAllEndPoints() 143 { 144 synchronized (endPointList) 145 { 146 endPointList.clear(); 147 148 notifyEndPoint(); 149 } 150 } 151 152 public boolean removeEndPoint(EndPointInterface ep) 153 { 154 synchronized (endPointList) 155 { 156 Iterator iterator = endPointList.iterator(); 157 while (iterator.hasNext() == true) 158 { 159 ConferencedEndPoints cep = (ConferencedEndPoints)iterator.next(); 160 if (cep.getEndpoint() == ep) 161 { 162 iterator.remove(); 163 notifyEndPoint(); 164 return true; 165 } 166 } 167 } 168 169 return false; 170 } 171 172 public EndPointInterface[] listEndPoints() 173 { 174 synchronized (endPointList) 175 { 176 int size = numEndPoints(); 177 EndPointInterface[] elements = new EndPointInterface[size]; 178 Iterator iter = endPointList.iterator(); 179 180 int count = 0; 181 while (iter.hasNext() == true) 182 { 183 elements[count++] = ((ConferencedEndPoints)(iter.next())).getEndpoint(); 184 } 185 186 return elements; 187 } 188 } 189 190 public int numEndPoints() 191 { 192 synchronized (endPointList) 193 { 194 return endPointList.size(); 195 } 196 } 197 198 public boolean containsEndPoint(EndPointInterface ep) 199 { 200 synchronized (endPointList) 201 { 202 Iterator iter = endPointList.iterator(); 203 while (iter.hasNext() == true) 204 { 205 ConferencedEndPoints cep = (ConferencedEndPoints)iter.next(); 206 if (cep.getEndpoint() == ep) 207 { 208 return true; 209 } 210 } 211 } 212 213 return false; 214 } 215 216 private void processMessageEvent(MessageEvent e) 217 { 218 if (e.getEventType() == MessageEvent.RTP_MESSAGE) 219 { 220 EndPointInterface from = e.getFrom(); 221 RTPMessage packet = (RTPMessage)e.getMessage(); 222 223 synchronized (endPointList) 224 { 225 ListIterator iter = endPointList.listIterator(); 226 227 while (iter.hasNext() == true) 228 { 229 EndPointInterface element = ((ConferencedEndPoints)(iter.next())).getEndpoint(); 230 if (element != from) 231 { 232 if (element.sendEvent(new MessageEvent(MessageEvent.RTP_MESSAGE, 233 from, 234 packet, 235 null)) == false) 236 { 237 AceLogger.Instance().log(AceLogger.ERROR, 239 AceLogger.SYSTEM_LOG, 240 Thread.currentThread().getName() 241 + "- ConferenceBridge.processMessageEvent() -- Could not send RTP message to the endpoint " 242 + element); 243 } 245 } 246 } 247 248 } 249 250 } 251 } 253 254 private void notifyEndPoint() 255 { 256 synchronized (endPointList) 257 { 258 ConferenceInformationMessage conf_msg = new ConferenceInformationMessage(); 259 conf_msg.setSessionId(sessionId); 260 261 Iterator iter = endPointList.iterator(); 262 while (iter.hasNext() == true) 263 { 264 ConferencedEndPoints conf = (ConferencedEndPoints)(iter.next()); 265 266 if (conf.getInformation() != null) 267 { 268 conf_msg.addEndPoint(conf.getInformation()); 269 } 270 } 272 273 iter = endPointList.iterator(); 275 while (iter.hasNext() == true) 276 { 277 EndPointInterface to = ((ConferencedEndPoints)iter.next()).getEndpoint(); 278 279 MessageEvent me = new MessageEvent( 280 MessageEvent.CLIENT_REQUEST_MESSAGE, 281 this, 282 conf_msg, 283 null); 284 if (to.sendEvent(me) == false) 285 { 286 AceLogger.Instance().log(AceLogger.ERROR, AceLogger.SYSTEM_LOG, 288 Thread.currentThread().getName() 289 + "- ConferenceBridge.processTimerEvent() -- Error sending conference information message to the client"); 290 } 291 } 292 } 293 } 294 295 private void processInformationEvent(InformationEvent information) 296 { 297 if (information.isRequest() == false) 298 { 299 EndPointInterface ep = information.getFrom(); 300 301 synchronized (endPointList) 302 { 303 Iterator iter = endPointList.iterator(); 304 while (iter.hasNext() == true) 305 { 306 ConferencedEndPoints cep = (ConferencedEndPoints)iter.next(); 307 if (cep.getEndpoint() == ep) 308 { 309 cep.setInformation(information.getInformation()); 310 notifyEndPoint(); 311 return; 312 } 313 } 314 } 315 } 316 } 318 319 322 public long getSessionId() 323 { 324 return sessionId; 325 } 326 327 330 public void setSessionId(long sessionId) 331 { 332 this.sessionId = sessionId; 333 } 334 335 public String getIdentifier() 336 { 337 return identifier; 338 339 } 340 341 public void setParam (String key, String value) 342 { 343 synchronized (keyValuePair) 344 { 345 keyValuePair.put(key, value); 346 } 347 } 348 349 public String getParam(String key) 350 { 351 synchronized (keyValuePair) 352 { 353 return (String )keyValuePair.get(key); 354 } 355 } 356 357 public void setParamObject (String key, Object value) 358 { 359 synchronized (keyValuePair) 360 { 361 keyValuePair.put(key, value); 362 } 363 } 364 365 public Object getParamObject(String key) 366 { 367 synchronized (keyValuePair) 368 { 369 return keyValuePair.get(key); 370 } 371 } 372 373 public void removeParam(String key) 374 { 375 synchronized (keyValuePair) 376 { 377 keyValuePair.remove(key); 378 } 379 } 380 } 381 | Popular Tags |