My favorites | Sign in
Project Home Downloads Wiki Issues Source
Search
for
Examples  
Some very simple examples how to use AsyncFutures
Updated Nov 30, 2009 by rka...@gmail.com

Examples

  1. HTTP Client
  2. Apache MINA SumUp Client

HTTP Client

This example shows how to write an asynchronous HTTP client with Apache HTTP Core 4.0.1 and NIO.

The main() method

public static void main(String[] args) {
    
    AsyncExecutorService executor 
        = AsyncExecutors.newCachedThreadPool();
    
    // 1. Create the HTTP request
    AsyncHttpRequest request 
        = new AsyncHttpRequest("http://www.google.com/");
    
    // 2. Send the HTTP request
    AsyncFuture<HttpResponse> future 
        = executor.submit(request);
    
    // 3. Wait for the HTTP response
    future.addAsyncFutureListener(new AsyncFutureListener<HttpResponse>() {
        @Override
        public void operationComplete(AsyncFuture<HttpResponse> future) {
            try {
                System.out.println("HttpResponse: " 
                        + future.get().getStatusLine());
            } catch (InterruptedException e) {
            } catch (ExecutionException e) {
            }
        }
    });
    
    // Or you can call future.get() and wait for the HttpResponse.
}

Pseudo Code

This is just pseudo code that demonstrates the basic idea.

public class AsyncHttpRequest implements AsyncProcess<HttpResponse> {
    
    private final String url;
    
    public AsyncHttpRequest(String url) {
        this.url = url;
    }
    
    @Override
    public void start(final AsyncFuture<HttpResponse> future) {
        
        HttpGet httpGet = new HttpGet(url);
        HttpCallback callback = new HttpCallback() {
            public void handleResponse(HttpResponse response) {
                future.setValue(response);
            }
            
            public void handleException(Exception exception) {
                future.setException(exception);
            }
        };
        
        // Send the HTTP request. We assume write() is a non-blocking
        // operation and will return immediately. The connection Object
        // will notify the callback as soon as the response is available
        // or an error occurred.
        connection.write(httpGet, callback);
    }
}

HttpCore 4.0.1 with NIO

This is a working example of Apache HttpCore 4.0.1 with NIO.

public class AsyncHttpRequest implements AsyncProcess<HttpResponse> {

    private static final ConnectingIOReactor REACTOR;
    
    static {
        HttpParams params = new BasicHttpParams();
        
        params
            .setIntParameter(CoreConnectionPNames.SO_TIMEOUT, 5000)
            .setIntParameter(CoreConnectionPNames.CONNECTION_TIMEOUT, 10000)
            .setIntParameter(CoreConnectionPNames.SOCKET_BUFFER_SIZE, 8 * 1024)
            .setBooleanParameter(CoreConnectionPNames.STALE_CONNECTION_CHECK, false)
            .setBooleanParameter(CoreConnectionPNames.TCP_NODELAY, true)
            .setParameter(CoreProtocolPNames.USER_AGENT, "HttpComponents/1.1");
        
        try {
            REACTOR = new DefaultConnectingIOReactor(1, params);
        } catch (IOReactorException err) {
            throw new IllegalStateException(
                    "IOReactorException", err);
        }
        
        BasicHttpProcessor httpproc = new BasicHttpProcessor();
        httpproc.addInterceptor(new RequestContent());
        httpproc.addInterceptor(new RequestTargetHost());
        httpproc.addInterceptor(new RequestConnControl());
        httpproc.addInterceptor(new RequestUserAgent());
        httpproc.addInterceptor(new RequestExpectContinue());
        
        BufferingHttpClientHandler handler 
            = new BufferingHttpClientHandler(
                httpproc,
                new ExecutionHandler(),
                new DefaultConnectionReuseStrategy(),
                params);
        
        final IOEventDispatch dispatcher 
            = new DefaultClientIOEventDispatch(handler, params);
        
        Runnable task = new Runnable() {
            @Override
            public void run() {
                try {
                    REACTOR.execute(dispatcher);
                } catch (InterruptedIOException ex) {
                    ex.printStackTrace();
                } catch (IOException ex) {
                    ex.printStackTrace();
                }
                System.out.println("Shutdown");
            }
        };
        
        new Thread(task, "IoEventDispatcherThread").start();
    }
    
    private final String url;
    
    private volatile AsyncFuture<HttpResponse> future;
    
    private volatile SessionRequest request;
    
    public AsyncHttpRequest(String url) {
        this.url = url;
    }
    
    @Override
    public void start(final AsyncFuture<HttpResponse> future) {
        this.future = future;
        
        future.addAsyncFutureListener(new AsyncFutureListener<HttpResponse>() {
            @Override
            public void operationComplete(AsyncFuture<HttpResponse> future) {
                // Try to cancel the request if the AsyncFuture timed out
                // or got cancelled by the user before we managed to send
                // it to the remote server.
                if (future.isTimeout() || future.isCancelled()) {
                    request.cancel();
                }
            }
        });
        
        SessionRequestCallback callback = new SessionRequestCallback() {
            @Override
            public void completed(SessionRequest request) {
            }
            
            @Override
            public void timeout(SessionRequest request) {
                future.setException(new TimeoutException(url));
            }
            
            @Override
            public void failed(SessionRequest request) {
                future.setException(new ConnectException(url));
            }
            
            @Override
            public void cancelled(SessionRequest request) {
                future.cancel(true);
            }
        };
        
        URI uri = URI.create(url);
        String host = uri.getHost();
        int port = uri.getPort();
        if (port == -1) {
            port = 80;
        }
        
        request = REACTOR.connect(
                new InetSocketAddress(host, port), 
                null, this, callback);
    }
    
    private static class ExecutionHandler 
            implements HttpRequestExecutionHandler {
    
        private static final String HTTP_REQUEST_KEY 
            = ExecutionHandler.class.getName() + ".HTTP_REQUEST_KEY";
        
        @Override
        public void initalizeContext(HttpContext context, Object attachment) {
            context.setAttribute(HTTP_REQUEST_KEY, (AsyncHttpRequest)attachment);
        }
        
        @Override
        public HttpRequest submitRequest(HttpContext context) {
            AsyncHttpRequest request = (AsyncHttpRequest)context.getAttribute(HTTP_REQUEST_KEY);
            URI uri = URI.create(request.url);
            return new BasicHttpRequest("GET", uri.getPath());
        }
        
        @Override
        public void handleResponse(HttpResponse response, HttpContext context)
                throws IOException {
            AsyncHttpRequest request = (AsyncHttpRequest)context.getAttribute(HTTP_REQUEST_KEY);
            request.future.setValue(response);
        }
        
        @Override
        public void finalizeContext(HttpContext context) {
            AsyncHttpRequest request = (AsyncHttpRequest)context.getAttribute(HTTP_REQUEST_KEY);
            request.future.cancel(true);
        }
    }
}

Apache MINA SumUp Client

This example is based on MINA 2.0 SumUp Server.

main()

import java.net.SocketAddress;

import org.apache.mina.core.future.CloseFuture;
import org.apache.mina.core.future.ConnectFuture;
import org.apache.mina.core.future.IoFuture;
import org.apache.mina.core.future.IoFutureListener;
import org.apache.mina.core.service.IoHandlerAdapter;
import org.apache.mina.core.session.IoSession;
import org.apache.mina.example.sumup.codec.SumUpProtocolCodecFactory;
import org.apache.mina.example.sumup.message.AddMessage;
import org.apache.mina.example.sumup.message.ResultMessage;
import org.apache.mina.filter.codec.ProtocolCodecFilter;
import org.apache.mina.filter.codec.serialization.ObjectSerializationCodecFactory;
import org.apache.mina.filter.logging.LoggingFilter;
import org.apache.mina.transport.socket.nio.NioSocketConnector;
import org.ardverk.concurrent.AsyncFuture;
import org.ardverk.concurrent.AsyncFutureListener;
import org.ardverk.concurrent.AsyncProcess;

public class AsyncSumUpProcess implements AsyncProcess<Integer> {

    private static final String KEY 
        = AsyncSumUpProcess.class.getName() + ".KEY";
    
    private static final NioSocketConnector CONNECTOR 
        = new NioSocketConnector();
    
    private static final long CONNECT_TIMEOUT = 30*1000L;
    
    private static final boolean USE_CUSTOM_CODEC = true;
    
    static {
        CONNECTOR.setConnectTimeoutMillis(CONNECT_TIMEOUT);
        
        if (USE_CUSTOM_CODEC) {
            CONNECTOR.getFilterChain().addLast(
                    "codec",
                    new ProtocolCodecFilter(
                        new SumUpProtocolCodecFactory(false)));
        } else {
            CONNECTOR.getFilterChain().addLast(
                    "codec",
                    new ProtocolCodecFilter(
                    new ObjectSerializationCodecFactory()));
        }
        
        CONNECTOR.getFilterChain().addLast("logger", new LoggingFilter());
        CONNECTOR.setHandler(new ClientSessionHandler());
    }
    
    private final SocketAddress address;
    
    private final int[] values;
    
    private volatile AsyncFuture<Integer> future;
    
    private volatile ConnectFuture connectFuture;
    
    private volatile IoSession session;
    
    public AsyncSumUpProcess(SocketAddress address, int[] values) {
        this.address = address;
        this.values = values;
    }
    
    @Override
    public void start(final AsyncFuture<Integer> future) {
        this.future = future;
        
        future.addAsyncFutureListener(new AsyncFutureListener<Integer>() {
            @Override
            public void operationComplete(AsyncFuture<Integer> future) {
                if (connectFuture != null) {
                    connectFuture.cancel();
                }
                
                if (session != null) {
                    session.close(true);
                }
            }
        });
        
        final IoFutureListener<IoFuture> onSessionClose 
                = new IoFutureListener<IoFuture>() {
            @Override
            public void operationComplete(IoFuture iof) {
                if (!future.isDone()) {
                    future.cancel(true);
                }
            }
        };

        IoFutureListener<IoFuture> onConnection 
                = new IoFutureListener<IoFuture>() {
            @Override
            public void operationComplete(IoFuture iof) {
                try {
                    session = iof.getSession();
                    
                    CloseFuture closeFuture = session.getCloseFuture();
                    closeFuture.addListener(onSessionClose);
                    
                    session.setAttribute(KEY, AsyncSumUpProcess.this);                    
                    for (int i = 0; i < values.length; i++) {
                        AddMessage m = new AddMessage();
                        m.setSequence(i);
                        m.setValue(values[i]);
                        session.write(m);
                    }
                    
                } catch (Exception err) {
                    future.setException(err);
                }
            }
        };
        
        connectFuture = CONNECTOR.connect(address);
        connectFuture.addListener(onConnection);
    }
    
    private static class ClientSessionHandler extends IoHandlerAdapter {
        
        @Override
        public void messageReceived(IoSession session, Object message) {
            
            AsyncSumUpProcess process = (AsyncSumUpProcess)session.getAttribute(KEY);
            
            ResultMessage rm = (ResultMessage) message;
            if (rm.isOk()) {
                if (rm.getSequence() == process.values.length - 1) {
                    process.future.setValue(rm.getValue());
                    session.close(true);
                }
            } else {
                session.close(true);
            }
        }

        @Override
        public void exceptionCaught(IoSession session, Throwable cause) {
            session.close(true);
        }
    }
}

Sign in to add a comment
Powered by Google Project Hosting