When one issues a get/scan query against HBase table via HTable or HTablePool, it will first lookup the -ROOT-, .META. tables to locate the region servers that host the data. This is the phase where most of the data irrelevant to our query got skipped. That’s why the row key design is so important when come to using HBase.
For example, one can use the following composite key to partition data based on siteId so that query to scan all users belong to a specific site can be done efficiently by specifying the startRow with prefix siteId.
Composite Key:
siteId|userId
-ROOT- and .META. lookup will locate all the region servers that contain the data for the specified siteId. Then, multiple get/scan RPC calls will be initialized against these region servers to retrieve the required results.
To be continued.
For example, when we call HTable get(List) method, it will call the processBatchCallback method in HConnectionImplementation. In this method, it finds out the region servers that host the data and then establish calls to these servers.
public <R> void processBatchCallback( List<? extends Row> list, byte[] tableName, ExecutorService pool, Object[] results, Batch.Callback<R> callback) throws IOException, InterruptedException { // This belongs in HTable!!! Not in here. St.Ack // results must be the same size as list if (results.length != list.size()) { throw new IllegalArgumentException( "argument results must be the same size as argument list"); } if (list.isEmpty()) { return; } // Keep track of the most recent servers for any given item for better // exceptional reporting. We keep HRegionLocation to save on parsing. // Later below when we use lastServers, we'll pull what we need from // lastServers. HRegionLocation [] lastServers = new HRegionLocation[results.length]; List<Row> workingList = new ArrayList<Row>(list); boolean retry = true; // count that helps presize actions array int actionCount = 0; for (int tries = 0; tries < numRetries && retry; ++tries) { // sleep first, if this is a retry if (tries >= 1) { long sleepTime = ConnectionUtils.getPauseTime(this.pause, tries); LOG.debug("Retry " +tries+ ", sleep for " +sleepTime+ "ms!"); Thread.sleep(sleepTime); } // step 1: break up into regionserver-sized chunks and build the data structs Map<HRegionLocation, MultiAction<R>> actionsByServer = new HashMap<HRegionLocation, MultiAction<R>>(); for (int i = 0; i < workingList.size(); i++) { Row row = workingList.get(i); if (row != null) { HRegionLocation loc = locateRegion(tableName, row.getRow()); byte[] regionName = loc.getRegionInfo().getRegionName(); MultiAction<R> actions = actionsByServer.get(loc); if (actions == null) { actions = new MultiAction<R>(); actionsByServer.put(loc, actions); } Action<R> action = new Action<R>(row, i); lastServers[i] = loc; actions.add(regionName, action); } } // step 2: make the requests Map<HRegionLocation, Future<MultiResponse>> futures = new HashMap<HRegionLocation, Future<MultiResponse>>( actionsByServer.size()); for (Entry<HRegionLocation, MultiAction<R>> e: actionsByServer.entrySet()) { futures.put(e.getKey(), pool.submit(createCallable(e.getKey(), e.getValue(), tableName))); } // step 3: collect the failures and successes and prepare for retry for (Entry<HRegionLocation, Future<MultiResponse>> responsePerServer : futures.entrySet()) { HRegionLocation loc = responsePerServer.getKey(); try { Future<MultiResponse> future = responsePerServer.getValue(); MultiResponse resp = future.get(); if (resp == null) { // Entire server failed LOG.debug("Failed all for server: " + loc.getHostnamePort() + ", removing from cache"); continue; } for (Entry<byte[], List<Pair<Integer,Object>>> e : resp.getResults().entrySet()) { byte[] regionName = e.getKey(); List<Pair<Integer, Object>> regionResults = e.getValue(); for (Pair<Integer, Object> regionResult : regionResults) { if (regionResult == null) { // if the first/only record is 'null' the entire region failed. LOG.debug("Failures for region: " + Bytes.toStringBinary(regionName) + ", removing from cache"); } else { // Result might be an Exception, including DNRIOE results[regionResult.getFirst()] = regionResult.getSecond(); if (callback != null && !(regionResult.getSecond() instanceof Throwable)) { callback.update(e.getKey(), list.get(regionResult.getFirst()).getRow(), (R)regionResult.getSecond()); } } } } } catch (ExecutionException e) { LOG.warn("Failed all from " + loc, e); } } // step 4: identify failures and prep for a retry (if applicable). // Find failures (i.e. null Result), and add them to the workingList (in // order), so they can be retried. retry = false; workingList.clear(); actionCount = 0; for (int i = 0; i < results.length; i++) { // if null (fail) or instanceof Throwable && not instanceof DNRIOE // then retry that row. else dont. if (results[i] == null || (results[i] instanceof Throwable && !(results[i] instanceof DoNotRetryIOException))) { retry = true; actionCount++; Row row = list.get(i); workingList.add(row); deleteCachedLocation(tableName, row.getRow()); } else { if (results[i] != null && results[i] instanceof Throwable) { actionCount++; } // add null to workingList, so the order remains consistent with the original list argument. workingList.add(null); } } } List<Throwable> exceptions = new ArrayList<Throwable>(actionCount); List<Row> actions = new ArrayList<Row>(actionCount); List<String> addresses = new ArrayList<String>(actionCount); for (int i = 0 ; i < results.length; i++) { if (results[i] == null || results[i] instanceof Throwable) { exceptions.add((Throwable)results[i]); actions.add(list.get(i)); addresses.add(lastServers[i].getHostnamePort()); } } if (!exceptions.isEmpty()) { throw new RetriesExhaustedWithDetailsException(exceptions, actions, addresses); } }