There are two RPC servers (org.apache.hadoop.ipc.Server) inside NameNode. One is mainly for communication with clients while the other is for communication between Datanodes, BackupNode and NameNode.
RPC server has a Listener thread listening for incoming connection. Listener thread creates a server socket in non blocking mode and uses Selector to listen for connection accept event. At the same time, it instantiates an array of Reader threads. Each reader thread is given a read Selector. This allows multiple reader threads to read incoming remote RPC method calls.
Server start method: start the Listener thread, responder thread, and an array of Handler threads
public synchronized void start() { responder.start(); listener.start(); handlers = new Handler[handlerCount]; for (int i = 0; i < handlerCount; i++) { handlers[i] = new Handler(i); handlers[i].start(); } }
In Listener thread constructor, configure ServerSocketChannel to listen for new incoming connection and
initialize an array of Reader threads with registered separate read Selector
public Listener() throws IOException { address = new InetSocketAddress(bindAddress, port); // Create a new server socket and set to non blocking mode acceptChannel = ServerSocketChannel.open(); acceptChannel.configureBlocking(false); // Bind the server socket to the local host and port bind(acceptChannel.socket(), address, backlogLength, conf, portRangeConfig); port = acceptChannel.socket().getLocalPort(); //Could be an ephemeral port // create a selector; selector= Selector.open(); readers = new Reader[readThreads]; for (int i = 0; i < readThreads; i++) { Reader reader = new Reader( "Socket Reader #" + (i + 1) + " for port " + port); readers[i] = reader; reader.start(); } // Register accepts on the server socket with the selector. acceptChannel.register(selector, SelectionKey.OP_ACCEPT); this.setName("IPC Server listener on " + port); this.setDaemon(true); }
In the Listener thread run method, if there is any event detected, it will poll the selectedKeys from the Selector and figure out whether it is a connection accepted.
public void run() { LOG.info(getName() + ": starting"); SERVER.set(Server.this); while (running) { SelectionKey key = null; try { getSelector().select(); Iterator<SelectionKey> iter = getSelector().selectedKeys().iterator(); while (iter.hasNext()) { key = iter.next(); iter.remove(); try { if (key.isValid()) { if (key.isAcceptable()) doAccept(key); } } catch (IOException e) { } key = null; } } catch (OutOfMemoryError e) { // we can run out of memory if we have too many threads // log the event and sleep for a minute and give // some thread(s) a chance to finish LOG.warn("Out of Memory in server select", e); closeCurrentConnection(key, e); cleanupConnections(true); try { Thread.sleep(60000); } catch (Exception ie) {} } catch (Exception e) { closeCurrentConnection(key, e); } cleanupConnections(false); } LOG.info("Stopping " + this.getName()); synchronized (this) { try { acceptChannel.close(); selector.close(); } catch (IOException e) { } selector= null; acceptChannel= null; // clean up all connections while (!connectionList.isEmpty()) { closeConnection(connectionList.remove(0)); } } }
When there is new incoming network connection, the following doAccept method will be called and
one Reader object will be selected (round robin) and new channel with Read Selector will be registered to listen for read event.
void doAccept(SelectionKey key) throws IOException, OutOfMemoryError { Connection c = null; ServerSocketChannel server = (ServerSocketChannel) key.channel(); SocketChannel channel; while ((channel = server.accept()) != null) { channel.configureBlocking(false); channel.socket().setTcpNoDelay(tcpNoDelay); Reader reader = getReader(); try { reader.startAdd(); SelectionKey readKey = reader.registerChannel(channel); c = new Connection(readKey, channel, System.currentTimeMillis()); readKey.attach(c); synchronized (connectionList) { connectionList.add(numConnections, c); numConnections++; } if (LOG.isDebugEnabled()) LOG.debug("Server connection from " + c.toString() + "; # active connections: " + numConnections + "; # queued calls: " + callQueue.size()); } finally { reader.finishAdd(); } } }
When a reader receives a read event from the read Selector, the following method is called.
void doRead(SelectionKey key) throws InterruptedException { int count = 0; Connection c = (Connection)key.attachment(); if (c == null) { return; } c.setLastContact(System.currentTimeMillis()); try { count = c.readAndProcess(); } catch (InterruptedException ieo) { LOG.info(getName() + ": readAndProcess caught InterruptedException", ieo); throw ieo; } catch (Exception e) { LOG.info(getName() + ": readAndProcess threw exception " + e + ". Count of bytes read: " + count, e); count = -1; //so that the (count < 0) block is executed } if (count < 0) { if (LOG.isDebugEnabled()) LOG.debug(getName() + ": disconnecting client " + c.getHostAddress() + ". Number of active connections: "+ numConnections); closeConnection(c); c = null; } else { c.setLastContact(System.currentTimeMillis()); } }
The use of multiple Reader threads with separate read Selectors is an improvement introduced in version Hadoop 0.21.0. Prior to 0.21.0, the same accept Selector object that is used for listening for connection is also used as read Selector among all other threads. As a result, all the threads are sharing a single Selector object. To learn more, please refer to the ticket
https://issues.apache.org/jira/browse/HADOOP-6713
Whenever a new connection is established, a reader thread will be ready to read data (method call) from client. After receiving the input bytes from client, it reads the bytes data to figure out the method name and required params. Once it has the method name/params, it goes ahead to create Call object encapsulating the method name/params and enqueue it into Call queue.
Multiple Handler threads are waiting on the above Call queue (BlockingQueue callQueue) and poll Call for processing.
The next question is can we do even better ?
HBase RPC basically is a copy of Hadoop RPC. It retains most of the Hadoop RPC codes.