Reactor design pattern
Reactor pattern is described by Douglas C. Schmidt in Reactor, An Object Behavioral Pattern for Demultiplexing and Dispatching Handles for Synchronous Events. An overview with examples in C++/Poco and Java is provided.
/* Reactor.cpp ----------- Use of the Reactor design pattern. Compile with g++ -I /usr/local/poco-1.3.5/include -L /usr/local/poco-1.3.5/lib/ -lPocoFoundation -lPocoNet -oReactor Reactor.cpp */ #include <cstdlib> #include <iostream> #include <string> #include <Poco/Timespan.h> #include <Poco/Thread.h> #include <Poco/Observer.h> #include <Poco/Net/SocketReactor.h> #include <Poco/Net/ServerSocket.h> #include <Poco/Net/StreamSocket.h> #include <Poco/Net/SocketNotification.h> using std::cout; using std::endl; using std::string; using Poco::Timespan; using Poco::Thread; using Poco::Observer; using Poco::Net::SocketReactor; using Poco::Net::ServerSocket; using Poco::Net::StreamSocket; using Poco::Net::SocketNotification; using Poco::Net::ReadableNotification; using Poco::Net::WritableNotification; class ClientHandler { public: ClientHandler(SocketReactor& reactor, StreamSocket socket) : _reactor(reactor), _socket(socket) { } void onReadable(ReadableNotification* notification) { char buf[100]; int no = _socket.receiveBytes(buf, 99); if (no > 0) { buf[no] = '\0'; cout << "ClientHandler::onReadable(): buf=" << buf << endl; _request = buf; } else { // destroy handlers cout << "ClientHandler::onReadable(): destroying handlers" << endl; Observer<ClientHandler, ReadableNotification> readObserver(*this, &ClientHandler::onReadable); _reactor.removeEventHandler(_socket, readObserver); Observer<ClientHandler, WritableNotification> writeObserver(*this, &ClientHandler::onWritable); _reactor.removeEventHandler(_socket, writeObserver); _socket.close(); delete this; } } void onWritable(WritableNotification* notification) { if (!_request.empty()) { cout << "ClientHandler::onWritable(): _request=" << _request << endl; _socket.sendBytes(_request.c_str(), _request.length()); _request.clear(); } } private: SocketReactor& _reactor; StreamSocket _socket; string _request; }; class ServerHandler { public: ServerHandler(const ServerSocket& socket, SocketReactor& reactor) : _reactor(reactor) { Observer<ServerHandler, ReadableNotification> readObserver(*this, &ServerHandler::onReadable); _reactor.addEventHandler(socket, readObserver); } void onReadable(ReadableNotification* notification) { ServerSocket server(notification->socket()); StreamSocket client = server.acceptConnection(); cout << "ServerHandler::onReadable(): accepted client from " << client.address().toString() << endl; ClientHandler* handler = new ClientHandler(_reactor, client); Observer<ClientHandler, ReadableNotification> readObserver(*handler, &ClientHandler::onReadable); _reactor.addEventHandler(client, readObserver); Observer<ClientHandler, WritableNotification> writeObserver(*handler, &ClientHandler::onWritable); _reactor.addEventHandler(client, writeObserver); } private: SocketReactor& _reactor; }; int main() { ServerSocket socket(9000); SocketReactor reactor(Timespan(0, 0, 0, 1, 0)); ServerHandler handler(socket, reactor); Thread thread; thread.start(reactor); thread.join(); return EXIT_SUCCESS; }
/* Reactor.java ------------ Reactor design pattern. Compile with javac Reactor.java */ import java.net.InetSocketAddress; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.nio.channels.Selector; import java.nio.channels.SelectionKey; import java.nio.ByteBuffer; import java.nio.CharBuffer; import java.nio.charset.Charset; import java.nio.charset.CharsetDecoder; import java.util.Iterator; import java.io.IOException; import java.nio.channels.ClosedChannelException; class ClientHandler { void onReadable(SelectionKey key) throws IOException { SocketChannel client = (SocketChannel)key.channel(); final int BUFFER_SIZE = 32; this.request = ByteBuffer.allocate(BUFFER_SIZE); int result = client.read(request); if (result <= 0) { if (result == -1) { client.close(); key.cancel(); System.out.println("ClientHandler.onReadable(): key cancelled"); } } else { request.flip(); Charset charset = Charset.forName("ISO-8859-1"); CharsetDecoder decoder = charset.newDecoder(); CharBuffer charBuffer = decoder.decode(request); System.out.println("ClientHandler.onReadable(): request=" + charBuffer); } } void onWritable(SelectionKey key) throws IOException { SocketChannel client = (SocketChannel)key.channel(); ByteBuffer response = this.request; if (response != null) { response.rewind(); Charset charset = Charset.forName("ISO-8859-1"); CharsetDecoder decoder = charset.newDecoder(); CharBuffer charBuffer = decoder.decode(response); System.out.println("ClientHandler.onWritable(): response=" + charBuffer); response.rewind(); client.write(response); this.request = null; } } ByteBuffer request; } class ServerHandler { public ServerHandler(Reactor reactor, ServerSocketChannel server, Selector selector) { this.reactor = reactor; this.server = server; this.selector = selector; } void onAcceptable(SelectionKey key) throws IOException { SocketChannel client = this.server.accept(); if (client != null) { System.out.println("ServerHandler.onAcceptable(): client accepted"); client.configureBlocking(false); client.register(this.selector, SelectionKey.OP_READ | SelectionKey.OP_WRITE); client.keyFor(this.selector).attach(new ClientHandler()); } } private Reactor reactor; private ServerSocketChannel server; private Selector selector; } class Reactor implements Runnable { public static void main(String[] args) throws IOException, InterruptedException { Thread t = new Thread(new Reactor()); t.start(); t.join(); } public Reactor() throws IOException, ClosedChannelException { this.server = ServerSocketChannel.open(); this.server.socket().bind(new InetSocketAddress(9000)); if (!server.socket().isBound()) throw new IOException(); this.server.configureBlocking(false); this.selector = Selector.open(); this.server.register(selector, SelectionKey.OP_ACCEPT); this.serverHandler = new ServerHandler(this, this.server, this.selector); } public void run() { try { while (true) { this.selector.select(); Iterator<SelectionKey> keyIter = selector.selectedKeys().iterator(); while (keyIter.hasNext()) { SelectionKey key = keyIter.next(); if (key.isAcceptable()) { this.serverHandler.onAcceptable(key); } if (key.isReadable() && key != this.server.keyFor(this.selector)) { ClientHandler h = (ClientHandler)key.attachment(); h.onReadable(key); } if (key.isValid() && key.isWritable()) { ClientHandler h = (ClientHandler)key.attachment(); h.onWritable(key); } } } } catch (IOException e) { System.out.println("main(): e=" + e); } } private ServerSocketChannel server; private Selector selector; ServerHandler serverHandler; }