My favorites | Sign in
Project Home Downloads Wiki Issues Source
Checkout   Browse   Changes    
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
package cn.org.rapid_framework.util.concurrent.async;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;

import javax.swing.SwingUtilities;

/**
* 该类主要用于得到异步方法的执行结果.
* 使用示例如下.
* <pre>
* public void testSendEmail() {
* final String address = "badqiu(a)gmail.com";
* final String subject = "test";
* final String content = "async token test";
*
* //返回的token,包含token.addResponder()用于监听异步方法的执行结果
* AsyncToken token = sendAsyncEmail(address,subject,content);
*
* //token可以继续传递给外部,以便外面感兴趣的listener监听这个异步方法的执行结果
* token.addResponder(new IResponder() {
* public void onFault(Exception fault) {
* System.out.println("email send fail,cause:"+fault);
* //此处可以直接引用address,subject,content,如,我们可以再次发送一次
* sendAsyncEmail(address,subject,content);
* }
* public void onResult(Object result) {
* System.out.println("email send success,result:"+result);
* }
* });
* }
*
* public AsyncToken sendAsyncEmail(String address,String subject,String content) {
* final AsyncToken token = new AsyncToken();
*
* Thread thread = new Thread(new Runnable() {
* public void run() {
* try {
* //do send email job...
* token.setComplete(executeResult); //通知Responder token执行完
* }catch(Exception e) {
* token.setFault(e); //通知Responder token发生错误
* }
* }
* });
* thread.start();
*
* return token;
* }
*
* </pre>
* 生成token请查看AsyncTokenTemplate
* @see AsyncTokenTemplate
* @author badqiu
*/
public class AsyncToken<T> {
public static final String DEFAULT_TOKEN_GROUP = "default";
private static AtomicLong tokenIdSequence = new AtomicLong(1);

//tokenGroup tokenName tokenId
private String tokenGroup = DEFAULT_TOKEN_GROUP;
private String tokenName;
private long tokenId;

private List<IResponder> _responders = new ArrayList(2);

private UncaughtExceptionHandler uncaughtExceptionHandler;
private T _result;
private Exception _fault;
private boolean _isFiredResult;

private CountDownLatch awaitResultSignal = null;

public AsyncToken(){
this(null);
}

public AsyncToken(UncaughtExceptionHandler uncaughtExceptionHandler) {
this(DEFAULT_TOKEN_GROUP,null);
this.uncaughtExceptionHandler = uncaughtExceptionHandler;
}

public AsyncToken(String tokenGroup,String tokenName) {
setTokenGroup(tokenGroup);
setTokenName(tokenName);
this.tokenId = tokenIdSequence.getAndIncrement();
}

public String getTokenGroup() {
return tokenGroup;
}

public void setTokenGroup(String tokenGroup) {
if(tokenGroup == null) throw new IllegalArgumentException("'tokenGroup' must be not null");
this.tokenGroup = tokenGroup;
}

public String getTokenName() {
return tokenName;
}

public void setTokenName(String tokenName) {
this.tokenName = tokenName;
}

public long getTokenId() {
return tokenId;
}

/**
* 增加监听器
* addResponder(responder,false);
* @param responder
*/
public void addResponder(final IResponder<T> responder) {
addResponder(responder,false);
}
/**
* 增加监听器,如果AsyncToken已经拥有token的执行结果,
* 则token会根据invokeResponderInOtherThread参数决定是否在异步线程调用responder
* @param responder 监听器
* @param invokeResponderInOtherThread true则另起线程调用responder
*/
public void addResponder(final IResponder<T> responder,boolean invokeResponderInOtherThread) {
_responders.add(responder);

if(_isFiredResult) {
if(invokeResponderInOtherThread) {
SwingUtilities.invokeLater(new Runnable() {
public void run() {
fireResult2Responder(responder);
}
});
}else {
fireResult2Responder(responder);
}
}
}

public List<IResponder> getResponders() {
return _responders;
}

public boolean hasResponder() {
return _responders != null && _responders.size() > 0;
}

public UncaughtExceptionHandler getUncaughtExceptionHandler() {
return uncaughtExceptionHandler;
}

public void setUncaughtExceptionHandler(UncaughtExceptionHandler uncaughtExceptionHandler) {
this.uncaughtExceptionHandler = uncaughtExceptionHandler;
}

private void fireResult2Responder(IResponder responder) {
try {
if(_fault != null) {
responder.onFault(_fault);
}else {
responder.onResult(_result);
}
}catch(RuntimeException e) {
if(getUncaughtExceptionHandler() != null) {
getUncaughtExceptionHandler().uncaughtException(responder, e);
}else {
throw e;
}
}catch(Error e) {
if(getUncaughtExceptionHandler() != null) {
getUncaughtExceptionHandler().uncaughtException(responder, e);
}else {
throw e;
}
}
}

private void fireResult2Responders() {
synchronized (this) {
_isFiredResult = true;
if(awaitResultSignal != null) {
awaitResultSignal.countDown();
}
}

for(IResponder r : _responders) {
fireResult2Responder(r);
}
}

public void setComplete(){
setComplete(null);
}

public void setComplete(T result) {
if(_isFiredResult) throw new IllegalStateException("token already fired");
this._result = result;
fireResult2Responders();
}

public void setFault(Exception fault) {
if(fault == null) throw new NullPointerException();
if(_isFiredResult) throw new IllegalStateException("token already fired");
this._fault = fault;
fireResult2Responders();
}

public boolean isDone() {
synchronized (this) {
return _isFiredResult;
}
}

/**
* 等待得到token结果,测试一般使用此方法,因为jdk有相同功能的Future.get()可以使用
* @see Future
*/
@Deprecated
public Object waitForResult() throws InterruptedException,Exception {
return waitForResult(-1, null);
}
/**
* 等待得到token结果,测试一般使用此方法,因为jdk有相同功能的Future.get()可以使用
* @see Future
*/
@Deprecated
public Object waitForResult(long timeout,TimeUnit timeUnit) throws InterruptedException,Exception {
synchronized(this) {
if(_isFiredResult) {
if(_fault != null) {
throw _fault;
}else {
return _result;
}
}

awaitResultSignal = new CountDownLatch(1);
}

if(timeout > 0) {
awaitResultSignal.await(timeout,timeUnit);
}else {
awaitResultSignal.await();
}

if(_fault != null) {
throw _fault;
}else {
return _result;
}
}

}

Change log

r3743 by badqiu on Jun 27, 2010   Diff
[No log message]
Go to: 
Project members, sign in to write a code review

Older revisions

r3657 by badqiu on Jun 25, 2010   Diff
[No log message]
r2996 by badqiu on Jun 8, 2010   Diff
[No log message]
r2157 by badqiu on Mar 12, 2010   Diff
重构
All revisions of this file

File info

Size: 7217 bytes, 257 lines
Powered by Google Project Hosting