1 19 20 package com.sslexplorer.notification; 21 22 import java.io.DataInputStream ; 23 import java.io.DataOutputStream ; 24 import java.io.File ; 25 import java.io.FileFilter ; 26 import java.io.FileInputStream ; 27 import java.io.FileOutputStream ; 28 import java.io.IOException ; 29 import java.text.DateFormat ; 30 import java.text.SimpleDateFormat ; 31 import java.util.ArrayList ; 32 import java.util.Collection ; 33 import java.util.Collections ; 34 import java.util.Date ; 35 import java.util.HashMap ; 36 import java.util.Iterator ; 37 import java.util.List ; 38 import java.util.Map ; 39 import java.util.Properties ; 40 import java.util.TreeMap ; 41 42 import org.apache.commons.logging.Log; 43 import org.apache.commons.logging.LogFactory; 44 45 import com.sslexplorer.core.CoreAttributeConstants; 46 import com.sslexplorer.core.CoreEvent; 47 import com.sslexplorer.core.CoreEventConstants; 48 import com.sslexplorer.core.CoreServlet; 49 import com.sslexplorer.core.UserDatabaseManager; 50 import com.sslexplorer.policyframework.Policy; 51 import com.sslexplorer.policyframework.PolicyDatabaseFactory; 52 import com.sslexplorer.policyframework.Principal; 53 import com.sslexplorer.security.LogonControllerFactory; 54 import com.sslexplorer.security.Role; 55 import com.sslexplorer.security.User; 56 import com.sslexplorer.security.UserDatabase; 57 58 public class Notifier { 59 final static Log log = LogFactory.getLog(Notifier.class); 60 private List <MessageWrapper> messages; 61 private boolean started; 62 private File queueDirectory; 63 private long messageId; 64 private MessageConsumer messageConsumer; 65 private boolean stop; 66 private List <MessageSink> sinks; 67 private HashMap <String ,Boolean > sinkEnabled; 68 69 public Notifier(File queueDirectory) throws IOException { 70 messages = new ArrayList <MessageWrapper>(); 71 this.queueDirectory = queueDirectory; 72 sinks = new ArrayList <MessageSink>(); 73 sinkEnabled = new HashMap <String ,Boolean >(); 74 loadFromDisk(); 75 } 76 77 public List <MessageSink> getSinks() { 78 return sinks; 79 } 80 81 public Collection <MessageSink> getEnabledSinks() { 82 Collection <MessageSink> enabledSinks = new ArrayList <MessageSink>(); 83 for (MessageSink sink : sinks) { 84 if(isEnabled(sink.getName())) { 85 enabledSinks.add(sink); 86 } 87 } 88 return enabledSinks; 89 } 90 91 public void addSink(MessageSink sink, boolean enabled) { 92 if(enabled && started) { 93 94 try { 95 sink.start(this); 96 } catch(Exception ex) { 97 log.error("Failed to start " + sink.getName() + " message sink", ex); 98 return; 99 } 100 } 101 102 sinks.add(sink); 103 sinkEnabled.put(sink.getName(), Boolean.valueOf(enabled)); 104 } 105 106 public void removeSink(MessageSink sink) { 107 sinks.remove(sink); 108 sinkEnabled.remove(sink.getName()); 109 } 110 111 public boolean isEnabled(String sinkName) { 112 return Boolean.TRUE.equals(sinkEnabled.get(sinkName)); 113 } 114 115 public void start() throws IllegalStateException { 116 if (started) { 117 throw new IllegalStateException ("Already started."); 118 } 119 messageConsumer = new MessageConsumer(); 120 if (sinks.size() == 0) { 121 throw new IllegalStateException ( 122 "At least one message sink must have been registered for the notfication queue to be started."); 123 } 124 for (Iterator i = sinks.iterator(); i.hasNext();) { 125 MessageSink sink = (MessageSink) i.next(); 126 try { 127 if (log.isDebugEnabled()) 128 log.debug("Starting message sink " + sink.getName()); 129 sink.start(this); 130 } catch (Exception e) { 131 log.error("Failed to start sink " + sink.getName() + ".", e); 132 } 133 } 134 started = true; 135 messageConsumer.start(); 136 if (log.isInfoEnabled()) 137 log.info("Notifier started"); 138 } 139 140 void loadFromDisk() throws IOException { 141 File [] f = queueDirectory.listFiles(new FileFilter () { 142 public boolean accept(File f) { 143 return f.getName().endsWith(".msg"); 144 } 145 }); 146 if (f == null) { 149 throw new IOException ("Could not list queue directory " + queueDirectory.getAbsolutePath()); 150 } 151 for (int i = 0; i < f.length; i++) { 152 FileInputStream fin = new FileInputStream (f[i]); 153 try { 154 DataInputStream din = new DataInputStream (fin); 155 long id = din.readLong(); 156 String sinkName = din.readUTF(); 157 messageId = Math.max(id, messageId); 158 boolean urgent = din.readBoolean(); 159 String subject = din.readUTF(); 160 List <Recipient> recipientList = new ArrayList <Recipient>(); 161 while (true) { 162 int recipientType = din.readInt(); 163 if (recipientType == Recipient.EOF) { 164 break; 165 } else { 166 String recipientAlias = din.readUTF(); 167 String realmName = din.readUTF(); 168 Recipient recipient = new Recipient(recipientType, recipientAlias, realmName); 169 recipientList.add(recipient); 170 } 171 } 172 Properties parameters = new Properties (); 173 while (true) { 174 int parameterType = din.readInt(); 175 if (parameterType < 1) { 176 break; 177 } else { 178 String key = din.readUTF(); 179 String val = din.readUTF(); 180 parameters.setProperty(key, val); 181 } 182 } 183 String content = din.readUTF(); 184 String lastMessage = din.readUTF(); 185 Message msg = new Message(subject, content, urgent); 186 msg.setId(id); 187 msg.setRecipients(recipientList); 188 msg.setSinkName(sinkName); 189 msg.setLastMessage(lastMessage); 190 queue(msg); 191 } finally { 192 fin.close(); 193 } 194 } 195 } 196 197 public void stop() throws IllegalStateException { 198 if (!started) { 199 throw new IllegalStateException ("Notifier is not started."); 200 } 201 stop = true; 202 queueNotify(); 203 started = false; 204 if (log.isInfoEnabled()) 205 log.info("Waiting for up to 120 seconds message consumer to stop"); 206 try { 207 messageConsumer.join(120000); 208 } catch (InterruptedException ie) { 209 } 210 for (Iterator i = sinks.iterator(); i.hasNext();) { 211 MessageSink sink = (MessageSink) i.next(); 212 try { 213 sink.stop(); 214 } catch (Exception e) { 215 log.error("Failed to stop sink " + sink.getName() + ".", e); 216 } 217 } 218 if (log.isInfoEnabled()) 219 log.info("Notifier stopped"); 220 } 221 222 public boolean isStarted() { 223 return started; 224 } 225 226 public void clearAllMessages() { 227 synchronized (messages) { 228 try { 229 messages.clear(); 230 File [] f = queueDirectory.listFiles(new FileFilter () { 231 public boolean accept(File f) { 232 return f.getName().endsWith(".msg"); 233 } 234 }); 235 if (f != null) { 236 for (int i = 0; i < f.length; i++) { 237 f[i].delete(); 238 } 239 } 240 CoreServlet.getServlet().fireCoreEvent(new CoreEvent(this, CoreEventConstants.MESSAGE_QUEUE_CLEARED, null, null, CoreEvent.STATE_SUCCESSFUL)); 241 } catch (Exception e) { 242 log.error("Failed to clear messages from queue", e); 243 CoreServlet.getServlet().fireCoreEvent(new CoreEvent(this, CoreEventConstants.MESSAGE_QUEUE_CLEARED, null, null, CoreEvent.STATE_UNSUCCESSFUL)); 244 } 245 } 246 } 247 248 public void sendToAll(Message message) { 249 sendToSink("*", message); 250 } 251 252 public void sendToAllExcept(Message message, String except) { 253 sendToSink("!" + except, message); 254 } 255 256 public void sendToFirst(Message message) { 257 sendToSink("^", message); 258 } 259 260 public void sendToSink(String sinkName, Message message) throws IllegalArgumentException { 261 messageId++; 262 message.setSinkName(sinkName); 263 message.setId(messageId); 264 if (log.isDebugEnabled()) 265 log.debug("Sending message " + message.getId() + " '" + message.getSubject() + "' to sink '" + sinkName + "'"); 266 queue(message); 267 try { 268 write(message); 269 } catch (IOException ioe) { 270 } 271 } 272 273 public void setEnabled(String sinkName, boolean enabled) { 274 sinkEnabled.put(sinkName, Boolean.valueOf(enabled)); 275 if (enabled) { 276 queueNotify(); 277 } 278 } 279 280 public MessageWrapper getMessage(long messageId) { 281 for (MessageWrapper wrapper : getMessages()) { 282 if(wrapper.getMessage().getId() == messageId) 283 return wrapper; 284 } 285 return null; 286 } 287 288 public List <MessageWrapper> getMessages() { 289 return messages; 290 } 291 292 public List getFullRecipientListAsUsers(List recipients) throws Exception { 293 Map <String ,Recipient> h = new TreeMap <String ,Recipient>(); 294 for (Iterator i = recipients.iterator(); i.hasNext();) { 295 Recipient r = (Recipient) i.next(); 296 if (r.getRecipientType() == Recipient.USER) { 297 h.put(r.getRecipientAlias(), r); 298 } else if (r.getRecipientType() == Recipient.POLICY) { 299 UserDatabase udb = UserDatabaseManager.getInstance().getUserDatabase(r.getRealmName()); 300 User[] users = udb.listAllUsers("*"); 301 Policy pol = PolicyDatabaseFactory.getInstance().getPolicyByName(r.getRecipientAlias(), udb.getRealm().getResourceId()); 302 List principals = PolicyDatabaseFactory.getInstance().getPrincipalsGrantedPolicy(pol, udb.getRealm()); 303 for (Iterator j = principals.iterator(); j.hasNext();) { 304 Principal p = (Principal) j.next(); 305 if (p instanceof Role) { 306 for (int x = 0; x < users.length; x++) { 307 Role[] roles = users[x].getRoles(); 308 for (int y = 0; y < roles.length; y++) { 309 if (roles[y].getPrincipalName().equals(p.getPrincipalName())) { 310 h.put(users[x].getPrincipalName(), new Recipient(Recipient.USER, users[x].getPrincipalName(), users[x].getRealm().getResourceName())); 311 break; 312 } 313 } 314 } 315 } else { 316 h.put(p.getPrincipalName(), new Recipient(Recipient.USER, p.getPrincipalName(), p.getRealm().getResourceName())); 317 } 318 } 319 } else if (r.getRecipientType() == Recipient.ROLE) { 320 UserDatabase udb = UserDatabaseManager.getInstance().getUserDatabase(r.getRealmName()); 321 Role role = udb.getRole(r.getRecipientAlias()); 322 User[] users = udb.listAllUsers("*"); 323 for (int x = 0; x < users.length; x++) { 324 Role[] roles = users[x].getRoles(); 325 for (int y = 0; y < roles.length; y++) { 326 if (roles[y].getPrincipalName().equals(role.getPrincipalName())) { 327 h.put(users[x].getPrincipalName(), new Recipient(Recipient.USER, users[x].getPrincipalName(), users[x].getRealm().getResourceName())); 328 break; 329 } 330 } 331 } 332 } else if (r.getRecipientType() == Recipient.ADMINS) { 333 UserDatabase udb = UserDatabaseManager.getInstance().getUserDatabase(r.getRealmName()); 334 User[] users = udb.listAllUsers("*"); 335 for (int j = 0; j < users.length; j++) { 336 if (LogonControllerFactory.getInstance().isAdministrator(users[j])) { 337 h.put(users[j].getPrincipalName(), new Recipient(Recipient.USER, users[j].getPrincipalName(), users[j].getRealm().getResourceName())); 338 } 339 } 340 } 341 } 342 List <Recipient> l = new ArrayList <Recipient>(); 343 for (Map.Entry <String , Recipient> entry : h.entrySet()) { 344 l.add(entry.getValue()); 345 } 346 return l; 347 } 348 349 MessageSink getSink(String sinkName) { 350 MessageSink s; 351 for (Iterator i = sinks.iterator(); i.hasNext();) { 352 s = (MessageSink) i.next(); 353 if (s.getName().equals(sinkName)) { 354 return s; 355 } 356 } 357 return null; 358 } 359 360 void write(Message message) throws IOException { 361 if (log.isDebugEnabled()) 362 log.debug("Writing message " + message.getId() + " '" + message.getSubject() + "' to disk"); 363 FileOutputStream fout = new FileOutputStream (new File (queueDirectory, String.valueOf(message.getId()) + ".msg")); 364 try { 365 DataOutputStream dout = new DataOutputStream (fout); 366 dout.writeLong(message.getId()); 367 dout.writeUTF(message.getSinkName()); 368 dout.writeBoolean(message.isUrgent()); 369 dout.writeUTF(message.getSubject()); 370 for (Iterator i = message.getRecipients().iterator(); i.hasNext();) { 371 Recipient r = (Recipient) i.next(); 372 dout.writeInt(r.getRecipientType()); 373 dout.writeUTF(r.getRecipientAlias() == null ? "" : r.getRecipientAlias()); 374 dout.writeUTF(r.getRealmName() == null ? "" : r.getRealmName()); 375 } 376 dout.writeInt(0); 377 for (Iterator i = message.getParameterNames(); i.hasNext();) { 378 String key = (String ) i.next(); 379 dout.writeInt(1); 380 dout.writeUTF(key); 381 dout.writeUTF(message.getParameter(key)); 382 } 383 dout.writeInt(0); 384 dout.writeUTF(message.getContent()); 385 dout.writeUTF(message.getLastMessage()); 386 } finally { 387 fout.close(); 388 } 389 } 390 391 void queue(Message message) { 392 if (log.isDebugEnabled()) 393 log.debug("Queueing message " + message.getId() + " '" + message.getSubject() + "'"); 394 MessageWrapper wrapper = new MessageWrapper(message); 395 if (message.isUrgent()) { 396 messages.add(0, wrapper); 397 fireQueuedEvent(message); 398 queueNotify(); 399 } else { 400 messages.add(wrapper); 401 fireQueuedEvent(message); 402 } 403 } 404 405 void fireQueuedEvent(Message message) { 406 fireMessageEvent(message, new CoreEvent(this, CoreEventConstants.MESSAGE_QUEUED, message, null, CoreEvent.STATE_SUCCESSFUL)); 407 } 408 409 void fireSentEvent(Message message, int state) { 410 fireMessageEvent(message, new CoreEvent(this, CoreEventConstants.MESSAGE_SENT, message, null, state)); 411 } 412 413 void fireMessageEvent(Message message, CoreEvent evt) { 414 List l = message.getRecipients(); 415 StringBuffer userR = new StringBuffer (); 416 StringBuffer roleR = new StringBuffer (); 417 StringBuffer policyR = new StringBuffer (); 418 for (Iterator i = l.iterator(); i.hasNext();) { 419 Recipient r = (Recipient) i.next(); 420 StringBuffer rb = userR; 421 if (r.getRecipientType() == Recipient.ROLE) { 422 rb = roleR; 423 } else if (r.getRecipientType() == Recipient.POLICY) { 424 rb = policyR; 425 } 426 if (rb.length() > 0) { 427 rb.append(","); 428 } 429 rb.append(r.getRecipientAlias()); 430 } 431 CoreServlet.getServlet().fireCoreEvent( 432 evt.addAttribute(CoreAttributeConstants.EVENT_ATTR_MESSAGE_ID, String.valueOf(message.getId())) 433 .addAttribute(CoreAttributeConstants.EVENT_ATTR_MESSAGE_URGENT, 434 String.valueOf(message.isUrgent())).addAttribute( 435 CoreAttributeConstants.EVENT_ATTR_MESSAGE_SUBJECT, 436 String.valueOf(message.getSubject())) 437 .addAttribute(CoreAttributeConstants.EVENT_ATTR_MESSAGE_ROLE_RECIPIENTS, roleR.toString()) 438 .addAttribute(CoreAttributeConstants.EVENT_ATTR_MESSAGE_POLICY_RECIPIENTS, 439 policyR.toString())); 440 } 441 442 void queueNotify() { 443 if (log.isDebugEnabled()) 444 log.debug("Notify queue"); 445 synchronized (messages) { 446 messages.notifyAll(); 447 } 448 if (log.isDebugEnabled()) 449 log.debug("Queue notified"); 450 } 451 452 public boolean doSend(String sinkName, Message message) { 453 message.setSinkName(sinkName); 454 return doSend(message); 455 } 456 457 boolean doSend(Message message) { 458 459 if(log.isDebugEnabled()) 460 log.debug("Sending message with subject of " + message.getSubject() + " urgent = " + message.isUrgent()); 461 462 if (log.isDebugEnabled()) { 463 for (Iterator i = message.getRecipients().iterator(); i.hasNext();) { 464 Recipient r = (Recipient) i.next(); 465 log.debug(" " + r.getRecipientType() + "/" + r.getRecipientAlias()); 466 } 467 log.debug("Content = " + message.getContent()); 468 log.debug("Sink name = " + message.getSinkName()); 469 } 470 471 boolean sent = false; 472 if (message.getSinkName().equals("*")) { 473 for (Iterator i = sinks.iterator(); i.hasNext();) { 474 MessageSink sink = (MessageSink) i.next(); 475 if (Boolean.TRUE == sinkEnabled.get(sink.getName())) { 476 try { 477 if (sink.send(message)) { 478 sent = true; 479 } 480 } catch (Exception e) { 481 log.error("Failed to send message " + message.getId() + ".", e); 482 } 483 } 484 } 485 } else if (message.getSinkName().startsWith("!")) { 486 String [] except = message.getSinkName().substring(1).split(","); 487 for (Iterator i = sinks.iterator(); i.hasNext();) { 488 MessageSink sink = (MessageSink) i.next(); 489 boolean found = false; 490 for (int j = 0; j < except.length && !found; j++) { 491 if (sink.getName().equals(except[j])) { 492 found = true; 493 } 494 } 495 if (!found && Boolean.TRUE == sinkEnabled.get(sink.getName())) { 496 try { 497 if (sink.send(message)) { 498 sent = true; 499 } 500 } catch (Exception e) { 501 log.error("Failed to send message " + message.getId() + ".", e); 502 } 503 } 504 } 505 } else if (message.getSinkName().equals("^")) { 506 for (Iterator i = sinks.iterator(); !sent && i.hasNext();) { 507 MessageSink sink = (MessageSink) i.next(); 508 if (Boolean.TRUE == sinkEnabled.get(sink.getName())) { 509 try { 510 if (sink.send(message)) { 511 sent = true; 512 } 513 } catch (Exception e) { 514 log.error("Failed to send message " + message.getId() + ".", e); 515 } 516 } 517 } 518 } else { 519 MessageSink s = getSink(message.getSinkName()); 520 if (s == null) { 521 log.error("No message sink named " + message.getSinkName()); 522 } else { 523 if (Boolean.TRUE == sinkEnabled.get(s.getName())) { 524 try { 525 sent = s.send(message); 526 } catch (Exception e) { 527 message.setLastMessage(e.getMessage()); 528 log.error("Failed to send message " + message.getId() + ".", e); 529 ; 530 } 531 } 532 } 533 } 534 if (!sent) { 535 log.error("No message sink sent message " + message.getId()); 536 fireSentEvent(message, CoreEvent.STATE_UNSUCCESSFUL); 537 } 538 else { 539 fireSentEvent(message, CoreEvent.STATE_SUCCESSFUL); 540 } 541 return sent; 542 } 543 544 class MessageConsumer extends Thread { 545 MessageConsumer() { 546 super("Notification Message Consumer"); 547 } 548 549 public void run() { 550 stop = false; 551 MessageWrapper msg = null; 552 int valid = -1; 553 boolean waitOnce = false; 554 while (!stop) { 555 synchronized (messages) { 556 while ((waitOnce || messages.size() == 0) && !stop) { 557 try { 558 messages.wait(30000); 559 } catch (InterruptedException ie) { 560 log.error("MessageConsumer interrupted.", ie); 561 } 562 waitOnce = false; 563 } 564 if (!stop) { 565 if (log.isDebugEnabled()) 566 log.debug("Checking message queue"); 567 int i = 0; 568 valid = -1; 569 while (i < messages.size()) { 570 msg = (MessageWrapper) messages.get(i); 571 if (log.isDebugEnabled()) 572 log.debug("Checking if message " + msg.getMessage().getId() + " is valid"); 573 if (msg.attempt == 0 || (msg.attempt > 0 && (msg.time.getTime() + 60000) < System.currentTimeMillis())) { 576 valid = i; 577 break; 578 } else { 579 i++; 580 } 581 } 582 if (valid != -1) { 583 messages.remove(valid); 584 } 585 } 586 } 587 if (!stop) { 588 boolean sent = false; 589 if (valid != -1) { 590 sent = doSend(msg.message); 591 if (!sent) { 592 msg.attempt++; 593 msg.time = new Date (); 594 try { 595 Thread.sleep(1); 596 } catch (InterruptedException ie) { 597 } 598 log.error("Failed to send message. " + msg.getMessage().getLastMessage()); 599 messages.add(msg); 600 } else { 601 new File (queueDirectory, msg.message.getId() + ".msg").delete(); 602 } 603 } else { 604 waitOnce = true; 605 } 606 } 607 } 608 } 609 } 610 611 public class MessageWrapper { 612 private final Message message; 613 private Date time; 614 private int attempt; 615 616 private MessageWrapper(Message message) { 617 this.message = message; 618 this.time = new Date (); 619 } 620 621 public Message getMessage() { 622 return message; 623 } 624 625 public int getAttempt() { 626 return attempt; 627 } 628 629 public Date getTime() { 630 return time; 631 } 632 633 public String getFormattedTime() { 634 DateFormat sdf = SimpleDateFormat.getTimeInstance(SimpleDateFormat.SHORT); 635 return sdf.format(getTime()); 636 } 637 638 public String getUserRecipients() { 639 return getRecipients(Recipient.USER); 640 } 641 642 public String getRoleRecipients() { 643 return getRecipients(Recipient.ROLE); 644 } 645 646 public String getPolicyRecipients() { 647 return getRecipients(Recipient.POLICY); 648 } 649 650 private String getRecipients(int type) { 651 List <String > recipients = new ArrayList <String >(); 652 for (Recipient recipient : message.getRecipients()) { 653 if(recipient.getRecipientType() == type) { 654 recipients.add(recipient.getRecipientAlias()); 655 } 656 } 657 Collections.sort(recipients); 658 659 StringBuffer buffer = new StringBuffer (); 660 for (String value : recipients) { 661 buffer.append(value).append(", "); 662 } 663 return (buffer.length() == 0 ) ? "" : buffer.substring(0, buffer.length() - 2); 664 } 665 } 666 } | Popular Tags |