|
Examples
Some very simple examples how to use AsyncFutures
ExamplesHTTP ClientThis example shows how to write an asynchronous HTTP client with Apache HTTP Core 4.0.1 and NIO. The main() methodpublic static void main(String[] args) {
AsyncExecutorService executor
= AsyncExecutors.newCachedThreadPool();
// 1. Create the HTTP request
AsyncHttpRequest request
= new AsyncHttpRequest("http://www.google.com/");
// 2. Send the HTTP request
AsyncFuture<HttpResponse> future
= executor.submit(request);
// 3. Wait for the HTTP response
future.addAsyncFutureListener(new AsyncFutureListener<HttpResponse>() {
@Override
public void operationComplete(AsyncFuture<HttpResponse> future) {
try {
System.out.println("HttpResponse: "
+ future.get().getStatusLine());
} catch (InterruptedException e) {
} catch (ExecutionException e) {
}
}
});
// Or you can call future.get() and wait for the HttpResponse.
}Pseudo CodeThis is just pseudo code that demonstrates the basic idea. public class AsyncHttpRequest implements AsyncProcess<HttpResponse> {
private final String url;
public AsyncHttpRequest(String url) {
this.url = url;
}
@Override
public void start(final AsyncFuture<HttpResponse> future) {
HttpGet httpGet = new HttpGet(url);
HttpCallback callback = new HttpCallback() {
public void handleResponse(HttpResponse response) {
future.setValue(response);
}
public void handleException(Exception exception) {
future.setException(exception);
}
};
// Send the HTTP request. We assume write() is a non-blocking
// operation and will return immediately. The connection Object
// will notify the callback as soon as the response is available
// or an error occurred.
connection.write(httpGet, callback);
}
}HttpCore 4.0.1 with NIOThis is a working example of Apache HttpCore 4.0.1 with NIO. public class AsyncHttpRequest implements AsyncProcess<HttpResponse> {
private static final ConnectingIOReactor REACTOR;
static {
HttpParams params = new BasicHttpParams();
params
.setIntParameter(CoreConnectionPNames.SO_TIMEOUT, 5000)
.setIntParameter(CoreConnectionPNames.CONNECTION_TIMEOUT, 10000)
.setIntParameter(CoreConnectionPNames.SOCKET_BUFFER_SIZE, 8 * 1024)
.setBooleanParameter(CoreConnectionPNames.STALE_CONNECTION_CHECK, false)
.setBooleanParameter(CoreConnectionPNames.TCP_NODELAY, true)
.setParameter(CoreProtocolPNames.USER_AGENT, "HttpComponents/1.1");
try {
REACTOR = new DefaultConnectingIOReactor(1, params);
} catch (IOReactorException err) {
throw new IllegalStateException(
"IOReactorException", err);
}
BasicHttpProcessor httpproc = new BasicHttpProcessor();
httpproc.addInterceptor(new RequestContent());
httpproc.addInterceptor(new RequestTargetHost());
httpproc.addInterceptor(new RequestConnControl());
httpproc.addInterceptor(new RequestUserAgent());
httpproc.addInterceptor(new RequestExpectContinue());
BufferingHttpClientHandler handler
= new BufferingHttpClientHandler(
httpproc,
new ExecutionHandler(),
new DefaultConnectionReuseStrategy(),
params);
final IOEventDispatch dispatcher
= new DefaultClientIOEventDispatch(handler, params);
Runnable task = new Runnable() {
@Override
public void run() {
try {
REACTOR.execute(dispatcher);
} catch (InterruptedIOException ex) {
ex.printStackTrace();
} catch (IOException ex) {
ex.printStackTrace();
}
System.out.println("Shutdown");
}
};
new Thread(task, "IoEventDispatcherThread").start();
}
private final String url;
private volatile AsyncFuture<HttpResponse> future;
private volatile SessionRequest request;
public AsyncHttpRequest(String url) {
this.url = url;
}
@Override
public void start(final AsyncFuture<HttpResponse> future) {
this.future = future;
future.addAsyncFutureListener(new AsyncFutureListener<HttpResponse>() {
@Override
public void operationComplete(AsyncFuture<HttpResponse> future) {
// Try to cancel the request if the AsyncFuture timed out
// or got cancelled by the user before we managed to send
// it to the remote server.
if (future.isTimeout() || future.isCancelled()) {
request.cancel();
}
}
});
SessionRequestCallback callback = new SessionRequestCallback() {
@Override
public void completed(SessionRequest request) {
}
@Override
public void timeout(SessionRequest request) {
future.setException(new TimeoutException(url));
}
@Override
public void failed(SessionRequest request) {
future.setException(new ConnectException(url));
}
@Override
public void cancelled(SessionRequest request) {
future.cancel(true);
}
};
URI uri = URI.create(url);
String host = uri.getHost();
int port = uri.getPort();
if (port == -1) {
port = 80;
}
request = REACTOR.connect(
new InetSocketAddress(host, port),
null, this, callback);
}
private static class ExecutionHandler
implements HttpRequestExecutionHandler {
private static final String HTTP_REQUEST_KEY
= ExecutionHandler.class.getName() + ".HTTP_REQUEST_KEY";
@Override
public void initalizeContext(HttpContext context, Object attachment) {
context.setAttribute(HTTP_REQUEST_KEY, (AsyncHttpRequest)attachment);
}
@Override
public HttpRequest submitRequest(HttpContext context) {
AsyncHttpRequest request = (AsyncHttpRequest)context.getAttribute(HTTP_REQUEST_KEY);
URI uri = URI.create(request.url);
return new BasicHttpRequest("GET", uri.getPath());
}
@Override
public void handleResponse(HttpResponse response, HttpContext context)
throws IOException {
AsyncHttpRequest request = (AsyncHttpRequest)context.getAttribute(HTTP_REQUEST_KEY);
request.future.setValue(response);
}
@Override
public void finalizeContext(HttpContext context) {
AsyncHttpRequest request = (AsyncHttpRequest)context.getAttribute(HTTP_REQUEST_KEY);
request.future.cancel(true);
}
}
}Apache MINA SumUp ClientThis example is based on MINA 2.0 SumUp Server. main()import java.net.SocketAddress;
import org.apache.mina.core.future.CloseFuture;
import org.apache.mina.core.future.ConnectFuture;
import org.apache.mina.core.future.IoFuture;
import org.apache.mina.core.future.IoFutureListener;
import org.apache.mina.core.service.IoHandlerAdapter;
import org.apache.mina.core.session.IoSession;
import org.apache.mina.example.sumup.codec.SumUpProtocolCodecFactory;
import org.apache.mina.example.sumup.message.AddMessage;
import org.apache.mina.example.sumup.message.ResultMessage;
import org.apache.mina.filter.codec.ProtocolCodecFilter;
import org.apache.mina.filter.codec.serialization.ObjectSerializationCodecFactory;
import org.apache.mina.filter.logging.LoggingFilter;
import org.apache.mina.transport.socket.nio.NioSocketConnector;
import org.ardverk.concurrent.AsyncFuture;
import org.ardverk.concurrent.AsyncFutureListener;
import org.ardverk.concurrent.AsyncProcess;
public class AsyncSumUpProcess implements AsyncProcess<Integer> {
private static final String KEY
= AsyncSumUpProcess.class.getName() + ".KEY";
private static final NioSocketConnector CONNECTOR
= new NioSocketConnector();
private static final long CONNECT_TIMEOUT = 30*1000L;
private static final boolean USE_CUSTOM_CODEC = true;
static {
CONNECTOR.setConnectTimeoutMillis(CONNECT_TIMEOUT);
if (USE_CUSTOM_CODEC) {
CONNECTOR.getFilterChain().addLast(
"codec",
new ProtocolCodecFilter(
new SumUpProtocolCodecFactory(false)));
} else {
CONNECTOR.getFilterChain().addLast(
"codec",
new ProtocolCodecFilter(
new ObjectSerializationCodecFactory()));
}
CONNECTOR.getFilterChain().addLast("logger", new LoggingFilter());
CONNECTOR.setHandler(new ClientSessionHandler());
}
private final SocketAddress address;
private final int[] values;
private volatile AsyncFuture<Integer> future;
private volatile ConnectFuture connectFuture;
private volatile IoSession session;
public AsyncSumUpProcess(SocketAddress address, int[] values) {
this.address = address;
this.values = values;
}
@Override
public void start(final AsyncFuture<Integer> future) {
this.future = future;
future.addAsyncFutureListener(new AsyncFutureListener<Integer>() {
@Override
public void operationComplete(AsyncFuture<Integer> future) {
if (connectFuture != null) {
connectFuture.cancel();
}
if (session != null) {
session.close(true);
}
}
});
final IoFutureListener<IoFuture> onSessionClose
= new IoFutureListener<IoFuture>() {
@Override
public void operationComplete(IoFuture iof) {
if (!future.isDone()) {
future.cancel(true);
}
}
};
IoFutureListener<IoFuture> onConnection
= new IoFutureListener<IoFuture>() {
@Override
public void operationComplete(IoFuture iof) {
try {
session = iof.getSession();
CloseFuture closeFuture = session.getCloseFuture();
closeFuture.addListener(onSessionClose);
session.setAttribute(KEY, AsyncSumUpProcess.this);
for (int i = 0; i < values.length; i++) {
AddMessage m = new AddMessage();
m.setSequence(i);
m.setValue(values[i]);
session.write(m);
}
} catch (Exception err) {
future.setException(err);
}
}
};
connectFuture = CONNECTOR.connect(address);
connectFuture.addListener(onConnection);
}
private static class ClientSessionHandler extends IoHandlerAdapter {
@Override
public void messageReceived(IoSession session, Object message) {
AsyncSumUpProcess process = (AsyncSumUpProcess)session.getAttribute(KEY);
ResultMessage rm = (ResultMessage) message;
if (rm.isOk()) {
if (rm.getSequence() == process.values.length - 1) {
process.future.setValue(rm.getValue());
session.close(true);
}
} else {
session.close(true);
}
}
@Override
public void exceptionCaught(IoSession session, Throwable cause) {
session.close(true);
}
}
}
|
► Sign in to add a comment