blog.tomoyat.dev

HBase Code Reading 1

2025-02-02

hbaseのリージョンを移動させるときにどのようになっているか調べる

hbase shellを実行するときはおそらくまずこの部分で実行するクラスを選択。shellの場合は org.jruby.JarBootstrapMain

if [ "$COMMAND" = "shell" ] ; then
	#find the hbase ruby sources
  # assume we are in a binary install if lib/ruby exists
  if [ -d "$HBASE_HOME/lib/ruby" ]; then
    # We want jruby to consume these things rather than our bootstrap script;
    # jruby will look for the env variable 'JRUBY_OPTS'.
    JRUBY_OPTS="${JRUBY_OPTS} -X+O"
    export JRUBY_OPTS
    # hbase-shell.jar contains a 'jar-bootstrap.rb'
    # for more info see
    # https://github.com/jruby/jruby/wiki/StandaloneJarsAndClasses#standalone-executable-jar-files
    CLASS="org.jruby.JarBootstrapMain"
  # otherwise assume we are running in a source checkout
  else
    HBASE_OPTS="$HBASE_OPTS -Dhbase.ruby.sources=$HBASE_HOME/hbase-shell/src/main/ruby"
    CLASS="org.jruby.Main -X+O ${JRUBY_OPTS} ${HBASE_HOME}/hbase-shell/src/main/ruby/jar-bootstrap.rb"
  fi
  HBASE_OPTS="$HBASE_OPTS $HBASE_SHELL_OPTS"

そしてここでそのクラスを実行

if [ "${HBASE_NOEXEC}" != "" ]; then
  "$JAVA" -Dproc_$COMMAND -XX:OnOutOfMemoryError="kill -9 %p" $HEAP_SETTINGS $HBASE_OPTS $CLASS "$@"
else
  export JVM_PID="$$"
  exec "$JAVA" -Dproc_$COMMAND -XX:OnOutOfMemoryError="kill -9 %p" $HEAP_SETTINGS $HBASE_OPTS $CLASS "$@"
fi

このときにirbが立ち上がっている気がする

irbでhbaseのコマンドを実行できるけどそれはおそらくここに定義されている

コマンドを実際に実行しているのはここ

require "shell/commands/#{name}"
klass_name = name.to_s.gsub(/(?:^|_)(.)/) { Regexp.last_match(1).upcase } # camelize
commands[name] = eval("Commands::#{klass_name}")
aliases.each do |an_alias|
  commands[an_alias] = commands[name]
end

これは特定のコマンドのrubyのファイルを読んで実行していて、 moveコマンドはこれ

def command(encoded_region_name, server_name = nil)
  admin.move(encoded_region_name, server_name)
end

これはおそらく以下のadmin.moveを実行している

https://github.com/apache/hbase/blob/88e9477f8b49df99552d1a0278b9edd0f47def1f/hbase-shell/src/main/ruby/hbase/admin.rb#L568

#----------------------------------------------------------------------------------------------
# Move a region
def move(encoded_region_name, server = nil)
  @admin.move(encoded_region_name.to_java_bytes, server ? server.to_java_bytes : nil)
end

@adminは以下のconnectionの中にでこれはコメントによるとjavaのAdmin instance

def initialize(connection)
  @connection = connection
  # Java Admin instance
  @admin = @connection.getAdmin
  @hbck = @connection.getHbck
  @conf = @connection.getConfiguration
end

この@connection.getAdminは以下のファクトリで生成されるコネクションクラスが提供している

https://github.com/apache/hbase/blob/97a32318fdf2e36e6005e1c84605dc56481b116c/hbase-shell/src/main/ruby/hbase/hbase.rb#L35

module Hbase
  class Hbase
    attr_accessor :configuration

    def initialize(config = nil)
      # Create configuration
      if config
        self.configuration = config
      else
        self.configuration = HBaseConfiguration.create
        # Turn off retries in hbase and ipc.  Human doesn't want to wait on N retries.
        configuration.setInt('hbase.client.retries.number', 7)
        configuration.setInt('hbase.ipc.client.connect.max.retries', 3)
      end
      @connection = ConnectionFactory.createConnection(configuration)
    end

    # Returns ruby's Admin class from admin.rb
    def admin
      ::Hbase::Admin.new(@connection)
    end

ConnectionFactoryはjavaのクラスをimportしているのでjava

include Java
java_import org.apache.hadoop.hbase.client.ConnectionFactory
java_import org.apache.hadoop.hbase.HBaseConfiguration

createConnectionはConnection型を返す

https://github.com/apache/hbase/blob/02dd2567037f15daaccc4c6c00a246eba2bbecd7/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionFactory.java#L215

public static Connection createConnection(Configuration conf, ExecutorService pool,
    final User user) throws IOException

Connection型は以下で定義されていて、getAdminは以下のメソッド。region moveはこの中のmoveメソッドが呼ばれる

https://github.com/apache/hbase/blob/b9a13eba67433971c4590e7f999ccbfefd6315a0/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Connection.java#L51 https://github.com/apache/hbase/blob/b9a13eba67433971c4590e7f999ccbfefd6315a0/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Connection.java#L156

Admin getAdmin() throws IOException;

なのでmoveを実行するのはAdminインターフェースに定義されているmoveメソッド。

そして実際にmoveメソッドを実装しているのは、HBaseAdminクラスの実装

@Override
public void move(byte[] encodedRegionName) throws IOException {
  move(encodedRegionName, (ServerName) null);
}
public void move(final byte[] encodedRegionName, ServerName destServerName) throws IOException {
  executeCallable(new MasterCallable<Void>(getConnection(), getRpcControllerFactory()) {
    @Override
    protected Void rpcCall() throws Exception {
      setPriority(encodedRegionName);
      MoveRegionRequest request =
        RequestConverter.buildMoveRegionRequest(encodedRegionName, destServerName);
      master.moveRegion(getRpcController(), request);
      return null;
    }
  });
}

なのでHbase shellでregionをmoveするときは最終的に、masterにmoveRegionするRPCを送っている

executeCallableはrpcCallerを作って、それがリトライつきでcallableを実行できる?

static private <C extends RetryingCallable<V> & Closeable, V> V executeCallable(C callable,
  RpcRetryingCallerFactory rpcCallerFactory, int operationTimeout, int rpcTimeout)
  throws IOException {
  RpcRetryingCaller<V> caller = rpcCallerFactory.newCaller(rpcTimeout);
  try {
    return caller.callWithRetries(callable, operationTimeout);
  } finally {
    callable.close();
  }
}

https://github.com/apache/hbase/blob/fb2593b840bb3b53a0babe1a91a9613dc9c47a47/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerImpl.java#L104

public T callWithRetries(RetryingCallable<T> callable, int callTimeout)
  throws IOException, RuntimeException {
  List<RetriesExhaustedException.ThrowableWithExtraContext> exceptions = new ArrayLis
  tracker.start();
  context.clear();
  for (int tries = 0;; tries++) {
    long expectedSleep;
    try {
      // bad cache entries are cleared in the call to RetryingCallable#throwable() in
      callable.prepare(tries != 0);
      interceptor.intercept(context.prepare(callable, tries));
      return callable.call(getTimeout(callTimeout));

このcallableMasterCallablerpcCallを実行している

public V call(int callTimeout) throws IOException {
  try {
    if (this.rpcController != null) {
      this.rpcController.reset();
      this.rpcController.setCallTimeout(callTimeout);
    }
    return rpcCall();
  } catch (Exception e) {
    throw ProtobufUtil.handleRemoteException(e);
  }
}

なので、moveの中のrpcCall実行しているmaster.moveRegionが実際にmoveをいっていそう

このmaster.moveRegionはThriftのRPC

/** Move the region region to the destination server. */
rpc MoveRegion(MoveRegionRequest)
  returns(MoveRegionResponse);

RPCを実装しているmoveRegionのハンドラはおそらくここ

public MoveRegionResponse moveRegion(RpcController controller, MoveRegionRequest req)
  throws ServiceException {
  final byte[] encodedRegionName = req.getRegion().getValue().toByteArray();
  RegionSpecifierType type = req.getRegion().getType();
  final byte[] destServerName = (req.hasDestServerName())
    ? Bytes.toBytes(ProtobufUtil.toServerName(req.getDestServerName()).getServerName())
    : null;
  MoveRegionResponse mrr = MoveRegionResponse.newBuilder().build();
  if (type != RegionSpecifierType.ENCODED_REGION_NAME) {
    LOG.warn("moveRegion specifier type: expected: " + RegionSpecifierType.ENCODED_REGION_NAME
      + " actual: " + type);
  }
  try {
    master.checkInitialized();
    master.move(encodedRegionName, destServerName);
  } catch (IOException ioe) {
    throw new ServiceException(ioe);
  }
  return mrr;
}

実際に処理を行ってそうなmaster.move(encodedRegionName, destServerName);これはHMaster.javaにある

色々チェックをして、実際に移動させてるのはHMaster.javaの以下の部分

// Now we can do the move
RegionPlan rp = new RegionPlan(hri, regionState.getServerName(), dest);
assert rp.getDestination() != null : rp.toString() + " " + dest;
try {
  checkInitialized();
  if (this.cpHost != null) {
    this.cpHost.preMove(hri, rp.getSource(), rp.getDestination());
  }
  TransitRegionStateProcedure proc =
    this.assignmentManager.createMoveRegionProcedure(rp.getRegionInfo(), rp.getDestination());
  // Warmup the region on the destination before initiating the move. this call
  // is synchronous and takes some time. doing it before the source region gets
  // closed
  serverManager.sendRegionWarmup(rp.getDestination(), hri);
  LOG.info(getClientIdAuditPrefix() + " move " + rp + ", running balancer");
  Future<byte[]> future = ProcedureSyncWait.submitProcedure(this.procedureExecutor, proc);
  try {
    // Is this going to work? Will we throw exception on error?
    // TODO: CompletableFuture rather than this stunted Future.
    future.get();
  } catch (InterruptedException | ExecutionException e) {
    throw new HBaseIOException(e);
  }
  if (this.cpHost != null) {
    this.cpHost.postMove(hri, rp.getSource(), rp.getDestination());
  }
} catch (IOException ioe) {
  if (ioe instanceof HBaseIOException) {
    throw (HBaseIOException) ioe;
  }
  throw new HBaseIOException(ioe);
}

Procedureというタスクをオーケストレーションを管理してくれるフレームワークを使っている。 このチケットの中のProcedureV2b.pdfというドキュメントがわかりやすい

処理はprocが持っていて、this.assignmentManager.createMoveRegionProcedure(rp.getRegionInfo(), rp.getDestination());で作成している。

public TransitRegionStateProcedure createMoveRegionProcedure(RegionInfo regionInfo,
  ServerName targetServer) throws HBaseIOException {
  RegionStateNode regionNode = this.regionStates.getRegionStateNode(regionInfo);
  if (regionNode == null) {
    throw new UnknownRegionException(
      "No RegionStateNode found for " + regionInfo.getEncodedName() + "(Closed/Deleted?)");
  }
  TransitRegionStateProcedure proc;
  regionNode.lock();
  try {
    preTransitCheck(regionNode, STATES_EXPECTED_ON_UNASSIGN_OR_MOVE);
    regionNode.checkOnline();
    proc = TransitRegionStateProcedure.move(getProcedureEnvironment(), regionInfo, targetServer);
    regionNode.setProcedure(proc);
  } finally {
    regionNode.unlock();
  }
  return proc;
}

この部分でprocを受け取っている

proc = TransitRegionStateProcedure.move(getProcedureEnvironment(), regionInfo, targetServer);

TransitRegionStateProcedure.moveは以下のような処理

public static TransitRegionStateProcedure move(MasterProcedureEnv env, RegionInfo region,
  @Nullable ServerName targetServer) {
  return setOwner(env, new TransitRegionStateProcedure(env, region, targetServer,
    targetServer == null, TransitionType.MOVE));
}

TransitRegionStateProcedureクラスが作られている

TransitRegionStateProcedure.executeを実行すると、その先でexecuteFromStateが実行されて、これでタスクをstateMachineのように管理する

https://github.com/apache/hbase/blob/02dd2567037f15daaccc4c6c00a246eba2bbecd7/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/TransitRegionStateProcedure.java#L336

protected Flow executeFromState(MasterProcedureEnv env, RegionStateTransitionState state)
  throws ProcedureSuspendedException, ProcedureYieldException, InterruptedException {
  RegionStateNode regionNode = getRegionStateNode(env);
  try {
    switch (state) {
      case REGION_STATE_TRANSITION_GET_ASSIGN_CANDIDATE:
        // Need to do some sanity check for replica region, if the region does not exist at
        // master, do not try to assign the replica region, log error and return.
        if (!RegionReplicaUtil.isDefaultReplica(regionNode.getRegionInfo())) {
          RegionInfo defaultRI =
            RegionReplicaUtil.getRegionInfoForDefaultReplica(regionNode.getRegionInfo());
          if (
            env.getMasterServices().getAssignmentManager().getRegionStates()
              .getRegionStateNode(defaultRI) == null
          ) {
            LOG.error(
              "Cannot assign replica region {} because its primary region {} does not exist.",
              regionNode.getRegionInfo(), defaultRI);
            regionNode.unsetProcedure(this);
            return Flow.NO_MORE_STATE;
          }
        }
        queueAssign(env, regionNode);
        return Flow.HAS_MORE_STATE;
      case REGION_STATE_TRANSITION_OPEN:
        openRegion(env, regionNode);
        return Flow.HAS_MORE_STATE;
      case REGION_STATE_TRANSITION_CONFIRM_OPENED:
        return confirmOpened(env, regionNode);
      case REGION_STATE_TRANSITION_CLOSE:
        closeRegion(env, regionNode);
        return Flow.HAS_MORE_STATE;
      case REGION_STATE_TRANSITION_CONFIRM_CLOSED:
        return confirmClosed(env, regionNode);
      default:
        throw new UnsupportedOperationException("unhandled state=" + state);
    }
  } catch (IOException e) {

コメントにある通り、regionの移動は次のようになっている

https://github.com/apache/hbase/blob/02dd2567037f15daaccc4c6c00a246eba2bbecd7/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/TransitRegionStateProcedure.java#L88

CLOSE -----> CONFIRM_CLOSED -----> GET_ASSIGN_CANDIDATE ------> OPEN -----> CONFIRM_OPENED