KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > mule > routing > inbound > IdempotentReceiver


1 /*
2  * $Id: IdempotentReceiver.java 3937 2006-11-20 16:04:25Z lajos $
3  * --------------------------------------------------------------------------------------
4  * Copyright (c) MuleSource, Inc. All rights reserved. http://www.mulesource.com
5  *
6  * The software in this package is published under the terms of the MuleSource MPL
7  * license, a copy of which has been included with this distribution in the
8  * LICENSE.txt file.
9  */

10
11 package org.mule.routing.inbound;
12
13 import org.mule.MuleManager;
14 import org.mule.config.i18n.Message;
15 import org.mule.config.i18n.Messages;
16 import org.mule.umo.MessagingException;
17 import org.mule.umo.UMOEvent;
18 import org.mule.umo.routing.RoutingException;
19 import org.mule.util.FileUtils;
20
21 import java.io.BufferedReader JavaDoc;
22 import java.io.File JavaDoc;
23 import java.io.FileReader JavaDoc;
24 import java.io.IOException JavaDoc;
25 import java.util.HashSet JavaDoc;
26 import java.util.Set JavaDoc;
27
28 /**
29  * <code>IdempotentReceiver</code> ensures that only unique messages are received
30  * by a component. It does this by checking the unique id of the incoming message.
31  * Note that the underlying endpoint must support unique message Ids for this to
32  * work, otherwise a <code>UniqueIdNotSupportedException</code> is thrown. This
33  * implementation is simple and not suitable in a failover environment, this is
34  * because previously received message Ids are stored in memory and not persisted.
35  *
36  */

37
38 public class IdempotentReceiver extends SelectiveConsumer
39 {
40     private static String JavaDoc DEFAULT_STORE_PATH = "./idempotent";
41
42     private Set JavaDoc messageIds;
43     private File JavaDoc idStore;
44     private String JavaDoc componentName;
45     private boolean disablePersistence = false;
46     private String JavaDoc storePath;
47
48     public IdempotentReceiver()
49     {
50         messageIds = new HashSet JavaDoc();
51         DEFAULT_STORE_PATH = MuleManager.getConfiguration().getWorkingDirectory() + "/idempotent";
52         setStorePath(DEFAULT_STORE_PATH);
53     }
54
55     public boolean isMatch(UMOEvent event) throws MessagingException
56     {
57         if (idStore == null)
58         {
59             // we need to load this of fist request as we need the component
60
// name
61
load(event);
62         }
63         return !messageIds.contains(event.getMessage().getUniqueId());
64     }
65
66     public UMOEvent[] process(UMOEvent event) throws MessagingException
67     {
68         if (isMatch(event))
69         {
70             try
71             {
72                 checkComponentName(event.getComponent().getDescriptor().getName());
73             }
74             catch (IllegalArgumentException JavaDoc e)
75             {
76                 throw new RoutingException(event.getMessage(), event.getEndpoint());
77             }
78             String JavaDoc id = event.getMessage().getUniqueId();
79             try
80             {
81                 storeId(id);
82                 return new UMOEvent[]{event};
83             }
84             catch (IOException JavaDoc e)
85             {
86                 throw new RoutingException(new Message(Messages.FAILED_TO_WRITE_X_TO_STORE_X, id,
87                     idStore.getAbsolutePath()), event.getMessage(), event.getEndpoint(), e);
88             }
89         }
90         else
91         {
92             return null;
93         }
94     }
95
96     private void checkComponentName(String JavaDoc name) throws IllegalArgumentException JavaDoc
97     {
98         if (!componentName.equals(name))
99         {
100             throw new IllegalArgumentException JavaDoc("This receiver is assigned to component: " + componentName
101                                                + " but has received an event for component: " + name
102                                                + ". Please check your config to make sure each component"
103                                                + "has its own instance of IdempotentReceiver");
104         }
105     }
106
107     protected synchronized void load(UMOEvent event) throws RoutingException
108     {
109         this.componentName = event.getComponent().getDescriptor().getName();
110         idStore = FileUtils.newFile(storePath + "/muleComponent_" + componentName + ".store");
111         if (disablePersistence)
112         {
113             return;
114         }
115         try
116         {
117             if (idStore.exists())
118             {
119                 BufferedReader JavaDoc reader = null;
120                 try
121                 {
122                     reader = new BufferedReader JavaDoc(new FileReader JavaDoc(idStore));
123                     String JavaDoc id;
124                     while ((id = reader.readLine()) != null)
125                     {
126                         messageIds.add(id);
127                     }
128                 }
129                 finally
130                 {
131                     if (reader != null)
132                     {
133                         reader.close();
134                     }
135                 }
136             }
137             else
138             {
139                 idStore = FileUtils.createFile(idStore.getAbsolutePath());
140             }
141         }
142         catch (IOException JavaDoc e)
143         {
144             throw new RoutingException(new Message(Messages.FAILED_TO_READ_FROM_STORE_X,
145                 idStore.getAbsolutePath()), event.getMessage(), event.getEndpoint(), e);
146         }
147     }
148
149     protected synchronized void storeId(Object JavaDoc id) throws IOException JavaDoc
150     {
151         messageIds.add(id);
152         if (disablePersistence)
153         {
154             return;
155         }
156         FileUtils.stringToFile(idStore.getAbsolutePath(), id.toString(), true, true);
157     }
158
159     public boolean isDisablePersistence()
160     {
161         return disablePersistence;
162     }
163
164     public void setDisablePersistence(boolean disablePersistence)
165     {
166         this.disablePersistence = disablePersistence;
167     }
168
169     public String JavaDoc getStorePath()
170     {
171         return storePath;
172     }
173
174     public void setStorePath(String JavaDoc storePath)
175     {
176         if (storePath == null)
177         {
178             this.storePath = DEFAULT_STORE_PATH;
179         }
180         else if (storePath.endsWith("/"))
181         {
182             storePath = storePath.substring(0, storePath.length() - 1);
183             this.storePath = storePath;
184         }
185         else
186         {
187             this.storePath = storePath;
188         }
189     }
190 }
191
Popular Tags