Skip to content

セルフホスト HBase から TDSQL Boundless への移行

移行ツールの特徴

基本機能

  • マルチスレッドエクスポート:HBase パーティションテーブルの Region 単位でデータを分割可能。カスタム分割にも対応
  • グローバルレート制限:バイト数または行数ベースの速度制御をサポート
  • KV 構造エクスポート:設定を簡素化し、テーブルごとの Schema 定義が不要

エクスポートツールのデフォルト動作

デフォルト値の処理

  • date / varchar 型:空文字列を使用
  • その他のデータ型:"NULL" で埋める

範囲エクスポート

start_key と end_key は左閉右開区間を採用します。

移行方式の選択

移行方式の説明

移行方式説明
全量移行適用シナリオ:初回移行。すべての履歴データを一括で移行します
増分同期適用シナリオ:継続的なデータ同期。全量移行後に増分変更を継続的に同期します

移行に関する制限事項

  1. 現在、移行においてブレークポイント再開はサポートされていません。
  2. 増分移行には Kafka 環境を別途用意する必要があります。

環境の準備

  1. 移行ツールを取得してください。

説明:最新バージョンの移行ツールを使用することを推奨します。

  1. ツールを任意のディレクトリにアップロードし(本ドキュメントでは /data/tdsql-project/ を例に説明)、解凍します。

ツールのディレクトリ構造:

/data/tdsql-project/apache-seatunnel-2.3.8/
├── bin/                      # 実行スクリプトを格納するディレクトリ
├── config/                   # 設定ファイルディレクトリ(ログコンポーネント、JVM 設定など)
├── connectors/               # 依存 jar パッケージを格納するディレクトリ
├── lib/                      # 依存 jar パッケージを格納するディレクトリ
├── licenses/                 # 依存 jar パッケージを格納するディレクトリ
├── plugins/                  # 依存 jar パッケージを格納するディレクトリ
└── starter/                  # 依存 jar パッケージを格納するディレクトリ
    ├── logging/
  1. 移行対象の HBase クラスターと TDSQL Boundless インスタンスを用意します。

  2. Kafka 環境の準備(HBase 増分同期の場合のみ必要)

アクセス可能な Kafka 環境が必要です。詳細は公式サイトをご参照ください。Docker を使用して移行マシン上で素早く環境を起動できます。

bash
docker pull apache/kafka:4.1.0
docker run -p 9092:9092 apache/kafka:4.1.0

全量移行

seatunnel.sh スクリプトでエクスポートツールを実行します。

./bin/seatunnel.sh -h で実行スクリプトのヘルプ情報を確認できます。

ステップ1:設定ファイルの作成

設定ファイル(例:file_config)を作成し、config/ ディレクトリに保存します。設定ファイルは3つの主要部分で構成されます:

  • env:環境設定項目
  • source:HBase データソースの設定
  • sink:ターゲット側の設定
env {
  # 環境設定項目 - タスク実行環境とリソース制限を制御します。詳細は「環境設定(env)」を参照
  parallelism = 1
  job.mode = "BATCH"
}

source {
  Hbase {
  # HBase データソースの接続情報、認証方式、データエクスポートルールを定義します。
  # 詳細は「HBase データソース設定(source)」を参照
  }
}

sink {
  # データの書き込み先を設定します。JDBC(推奨)と localfile の2つのモードをサポートします。
  # 詳細は「ターゲット側設定(sink)」を参照
}

1. 環境設定(env)

env はエクスポート/インポートタスクの全体設定で、主に並列度とレート制限を設定します(デフォルトではレート制限なし)。

env {
  parallelism = 1 # Source/Sink の並列スレッド数を指定します。実際の並列度は Source/Sink の設定に依存する場合があります
  job.mode = "BATCH"
  read_limit.bytes_per_second = 7000000 # 読み取りバイト数によるレート制限。未指定の場合は制限なし
  read_limit.rows_per_second = 200000 # 読み取り行数によるレート制限。未指定の場合は制限なし
}

2. HBase データソース設定(source)

HBase の1行のデータは最終的に複数の KV ペアに変換されます。変換後の各 KV は TDSQL Boundless の1行のデータとして格納されます。

TDSQL Boundless の KV 状態におけるデフォルトテーブル構造:

sql
CREATE TABLE IF NOT EXISTS `{TableName}`
(
    `K` varbinary(1024) not null,
    `Q` varbinary(256) not null,
    `T` bigint not null,
    `V` mediumblob,
    PRIMARY KEY (`K`, `Q`, `T`))
partition by key(`k`) partitions 6;

KV 形式の設定テンプレート:

source {
  Hbase {
    # HBase 接続オプション:
    # 必須
    zookeeper_quorum = "127.0.0.1:2181,127.0.0.2:2181,127.0.0.3:2181" // HBase クラスターの ZooKeeper アドレスおよびポート

    hbase_extra_config={ # HBase 接続の追加設定。特に必要がなければ設定不要
      zookeeper.znode.parent=/bdphbs07/hbaseid
      hbase.client.ipc.pool.size=***
      hbase.client.scanner.timeout.period=6000000
      hbase.rpc.timeout=6000000
      hbase.cells.scanned.per.heartbeat.check=10000
    }

    SourceModeKV = {
     specified_namespace = "***" # エクスポート対象の Namespace を指定
     table_match_rules = ["***"] # エクスポート対象テーブルのマッチングルールを指定
    }
  }
}

説明マルチスレッドについて

  • マルチスレッドを使用するには、まず env 設定でスレッド数を指定する必要があります。
  • エクスポートタスクはデフォルトルールに従って分割されます。各テーブルについて、regionServer 上のテーブル分布に基づいてシャードに分割され、各シャードが1つのエクスポートタスクとなります。
  • 複数テーブルが存在するため、マルチスレッド間のエクスポートタスクシャードのスケジューリングポリシーはテーブル単位を優先し、テーブルのスキャン順序(辞書順)に従って同一テーブルのエクスポートタスクを優先的にスケジューリングします。

3. ターゲット側設定(sink)

データの書き込み先を設定します。JDBC(TDSQL Boundless データベースに直接書き込み)と LocalFile(ローカルファイルシステムにエクスポート)の2つのモードをサポートします。

方式1(推奨):JDBC によるインポート

sink {
    jdbc {
        # url で bulkload を有効にする必要があります。rewriteBatchedStatements=true を必ず設定してください。
        # 設定しない場合は単一 INSERT になり、インポート効率が極めて低下します。
        url = "jdbc:mysql://{ip}:{port}/?rewriteBatchedStatements=true&sessionVariables=tdsql_bulk_load='ON'"
        driver = "com.mysql.cj.jdbc.Driver"

        user = "test"
        password = "test123"
        schema_save_mode = "CREATE_SCHEMA_WHEN_NOT_EXIST"
        enable_upsert = false           # UPSERT を無効化
        max_retries = 3                 # リトライ回数
        generate_sink_sql = "true"      # DB に基づいてインポート SQL を生成

        database = "hbase"              # hbase データベースにインポート
        batch_size = 500000             # ツールのバッチトランザクションサイズ。効率化のため指定数量をまとめてからフラッシュします。
                                        # ツールが稼働するマシンのメモリ制限を超えない範囲で大きいほど効果的です。
        batch_interval_ms = 500
        transaction_timeout = 60000     # トランザクションタイムアウトを60秒に設定(大規模バッチのタイムアウト回避のため)
    }
}

方式2:LocalFile

sink {
    LocalFile {
    # 変更不可:
    file_format_type = "text"         # エクスポートファイル形式。現在は text のみサポート
    field_delimiter = ","             # データ区切り文字。カンマのみサポート
    row_delimiter = "\n"              # 行区切り文字。改行のみサポート
    is_enable_transaction = false     # トランザクション非対応

    # 必須:
    path = "/root/dumper_test/apache-seatunnel-2.3.4"     # エクスポートパス
    tmp_path = "/root/dumper_test/apache-seatunnel-2.3.4" # エクスポート時にまず tmp_path に出力し、その後 path に移動します。
                                                           # tmp_path を必ず設定してください。設定しないとルートディレクトリの容量を消費します。
    result_table_name = "test_mytable"                    # エクスポートテーブル名
    result_database_name = "test_mydatabase"              # エクスポートデータベース名(ファイル名生成に使用)
    result_column_names = ["id", "name"]                  # エクスポート列名。source の schema の columns 数と一致する必要があります

    # 任意:
    parent_directory = "test_directory"         # 実際のエクスポートファイルの親ディレクトリ。指定した場合のパス:path/parent_directory/file、未指定の場合:path/file
    file_start_index = 1000                     # ファイル名の開始シーケンス番号。デフォルトは 0
  }
}

説明: LocalFile にエクスポートする場合、HBase データをローカル SQL ファイルに変換するだけです。myloader などのツールを使用して LocalFile を TDSQL Boundless にインポートする必要があります。 詳細なパラメータの説明は設定パラメータをご参照ください。

4. パラメータ調整

SeaTunnel ツールの JVM に割り当てるメモリを変更します。config/jvm_client_options を編集してください。

# 設定ファイルの初期値は小さく設定されています。高並列度では、メモリリソース不足により
# タスクが実行できない場合があります。マシンリソースに応じて適切に調整してください。
-Xms32g  # JVM 初期ヒープメモリサイズ
-Xmx128G

データ移行を実行する前に、TDSQL Boundless インスタンスにログインし、以下のコマンドでパラメータ設定を調整してください。

sql
set persist tdsql_bulk_load_allow_auto_organize_txn = on;   // bulkload の自動トランザクション編成を許可(増分移行では無効化が必要)
set persist tdsql_bulk_load_allow_unsorted = on;            // bulkload トランザクションの順序外書き込みを許可
set persist tdsql_bulk_load_commit_threshold = 4294967296;  // コミット閾値の調整
set persist tdsql_bulk_load_rpc_timeout = 7200000;          // RPC タイムアウト

ステップ2:データ移行の実行

設定ファイルの記入が完了したら、エクスポートを開始できます。各設定ファイルは1つの SeaTunnel タスクとして実行されます。以下では、SeaTunnel エクスポートタスクをローカルクラスターモードで起動する方法を説明します。

  1. 任意のディレクトリ(本ドキュメントでは /data/tdsql-project/apache-seatunnel-2.3.8/ を例に説明)で、記入済みの設定ファイルを ./config ディレクトリに保存します。

  2. 任意のディレクトリで以下のコマンドを実行し、この設定ファイルを使用して SeaTunnel ローカルクラスターを起動し、エクスポートを実行します。

bash
nohup ./bin/seatunnel.sh --config ./config/hbase_config -m local > seatunnel_hbase_local.log 2>&1 &

tail -f seatunnel_hbase_local.log

パラメータの説明:

  • --config:設定ファイルのパスを指定
  • -m local:ローカルモードで実行

ステップ3:データ検証

検証ルール:HBase と TDSQL Boundless からそれぞれデータシャードごとにデータを読み取り、各行の CRC32 チェックサム値を計算・累積し、最終的な累積結果を比較します。

データ検証の手順:

  1. 検証設定ファイル config/validation_config を作成・記入します。
HBaseDataValidation {
    parallelism = 10
    specified_namespace = "default"
    table_match_rules = []
    zookeeper_quorum = "127.0.0.1:2181,127.0.0.2:2181,127.0.0.3:2181"
    // ユーザー名、パスワード、IP、ポートを適宜置き換えてください
    mysql_jdbc_url = "jdbc:mysql://{ip}:{port}/hbase?user=test&password=test1234"
}
  1. データ検証を実行します。

検証プログラムは hbase-connector.jar にパッケージされています。パッケージのデプロイディレクトリで以下のスクリプトを実行してください:

bash
./bin/validateKV.sh ./config/validation_config

ツールのディレクトリ構造:

./apache-seatunnel-2.3.8/bin/
├── install-plugin.cmd
├── install-plugin.sh
├── seatunnel-cluster.cmd
├── seatunnel-cluster.sh
├── seatunnel.cmd
├── seatunnel-connector.cmd
├── seatunnel-connector.sh
├── seatunnel.sh
└── validateKV.sh

増分同期(スナップショット全量同期 + 増分同期を含む)

増分同期では、全量スナップショットデータの準備と増分データ受信の開始位置の設定が行われます。移行ツールはまずスナップショットの全量データを同期し、スナップショットデータの同期完了後に増分データの同期に移行します。全量データの手動同期は不要です。

注意: 増分同期モードでは、ツールが HBase クラスターに対して以下の操作を行います:

  1. テーブル自体の replication_scope 属性が有効になっていない場合、disable table; enable table; を実行する必要があります。
  2. スナップショットを作成し、スナップショットテーブルを復元して、全量スナップショットデータのエクスポートポイントとします。
  3. 元の HBase クラスターで add_peer を実行し、増分データをキャプチャします。

ステップ1:環境チェック

  1. 同期対象テーブルのカラムファミリ属性を確認します。replication_scope 属性が 1(同期有効)である必要があります。そうでない場合は、ツールの対応するスイッチ auto_enable_replication = true を有効にしてください。
bash
# 同期対象テーブルのカラムファミリの replication_scope 属性が 1 であることを確認
hbase shell
describe 'your_table_name'
  1. ネットワーク接続性の検証:HBase クラスターがドメイン名で移行ツールノードにアクセスできることを確認してください。

  2. Kafka クラスター:移行マシンが Kafka に正常にアクセスできることを確認してください。移行ツールは増分データを一時的に格納する必要があります。全量同期完了後に一時格納された増分データを順次消費します。増分データは Kafka クラスターに一時格納されます。データ量が大きい場合は、安定した Kafka 環境を用意することを推奨します。Docker を使用してローカルに Kafka クラスターを素早くデプロイすることも可能です:

bash
docker pull apache/kafka:4.1.0
docker run -p 9092:9092 apache/kafka:4.1.0

詳細は公式サイトをご参照ください。

  1. ポートの確認:増分移行ツールは HBase を自動起動します。その際、ローカルディレクトリおよび指定ポート(デフォルト:2181、16000、16010、16020、16030)を使用して、移行ツールが依存する HBase 構造を起動します。上記のポートが利用可能であることを確認してください。利用できない場合は設定ファイルでポートを指定する必要があります。

ステップ2:増分同期の開始

  1. 増分同期の設定ファイルを作成します。

注意

  1. job.modeSTREAMING に設定する必要があります。そうでない場合、増分モードではなく全量データエクスポートとして動作します。
  2. テーブルのレプリケーション属性を有効にする際、まずテーブルを disable してから enable します。この処理は通常短時間で完了しますが、auto_enable_replication のデフォルトは false に設定することを推奨します。テーブルのレプリケーション属性(replication_scope)が有効でない場合は、このスイッチを有効にするか、手動でテーブルのカラムファミリの replication_scope 属性を 1 に設定してください。
  3. Kafka クラスターの設定:上記の Docker で起動した場合は追加設定不要で、デフォルトのままで動作します。
  4. 詳細なパラメータの説明は設定パラメータをご参照ください。
env {
  parallelism = 4
  job.mode = "STREAMING" // 増分モードでは STREAMING に設定する必要があります
  job.retry.times = 0
}

source {
  Hbase {
    HbaseCDCConfig {
       auto_enable_replication = true
       kafka_bootstrap_servers = "localhost:9092" // Docker で起動した場合はデフォルトのまま
       peer_name = "incr_001" // HBase peer 名
       // 以下は増分移行ツールが自動起動する HBase のポート設定。
       // ポート競合がある場合は手動で設定可能。デフォルトは 2181、16000、16010、16020、16030
       zk_host = "ip:12181"
       hbase.master.port = 26000
       hbase.master.info.port = 26010
       hbase.regionserver.port = 26020
       hbase.regionserver.info.port = 26030
    }
    SourceModeKV = {
     specified_namespace = "***" // エクスポート対象の Namespace を指定
     table_match_rules = ["***"] // エクスポート対象テーブルのマッチングルールを指定

     dump_create_schema = true
     dump_source_split_metrics = false
    }

    zookeeper_quorum = "127.0.0.1:2181"

    # オプション設定
    is_kerberos_connection=true # Kerberos 認証が必要かどうか。デフォルトは false。true の場合のみ以下の4項目が必要です。
    kerberos_ms_name="xxx"      # hbase.master.kerberos.principal
    kerberos_rs_name="xxx"      # hbase.regionserver.kerberos.principal
    kerberos_user_name="xxx"
    kerberos_keytab="xxx"
  }
}

sink {
  jdbc {
    // bulk_load を有効化
    url = "jdbc:mysql://127.0.0.1:6050/?rewriteBatchedStatements=true&sessionVariables=tdsql_bulk_load='ON'"
    driver = "com.mysql.cj.jdbc.Driver"

    properties {
      autoReconnect = "true"
      failOverReadOnly = "false"
      maxReconnects = "10"
      tcpKeepAlive = "true"
      rewriteBatchedStatements = "true"
    }

    user = "test"
    password = "test123"
    schema_save_mode = "CREATE_SCHEMA_WHEN_NOT_EXIST"
    enable_upsert = true
    max_retries = 1
    generate_sink_sql = true

    database = "hbase" // hbase データベースにインポート
    batch_size = 500000
  }
}
  1. パラメータ調整

SeaTunnel ツールの JVM に割り当てるメモリを変更します。config/jvm_client_options を編集してください。

# 設定ファイルの初期値は小さく設定されています。高並列度では、メモリリソース不足により
# タスクが実行できない場合があります。マシンリソースに応じて適切に調整してください。
-Xms32g  # JVM 初期ヒープメモリサイズ
-Xmx128G

データ移行を実行する前に、TDSQL Boundless インスタンスにログインし、以下のコマンドでパラメータ設定を調整してください。

sql
set persist tdsql_bulk_load_allow_unsorted = on;            // bulkload トランザクションの順序外書き込みを許可
set persist tdsql_bulk_load_commit_threshold = 4294967296;  // コミット閾値の調整
set persist tdsql_bulk_load_rpc_timeout = 7200000;          // RPC タイムアウト
  1. 同期タスクの起動

任意のディレクトリ(本ドキュメントでは /data/tdsql-project/apache-seatunnel-2.3.8/ を例に説明)で、以下のコマンドを実行して増分同期を開始します。

bash
./bin/seatunnel.sh --config ./config/incremental_config -m local

ステップ3:増分データ同期の進捗確認と適切なタイミングでの移行停止

ツールはまず全量データを同期します。このプロセスは通常時間がかかります。全量データの同期完了後、増分データの移行が開始されます。

ログ内の Progress Information を確認して、全体の同期状況を把握できます。

IncSplit Progress Information で増分同期の状況を確認できます。

ツールは増分同期時に、各テーブルの Kafka での LAG(増分データの消費状況)を出力します。

ステップ4:データ検証

  1. 移行完了後、データ検証を行います。まずビジネストラフィックを停止し、検証設定ファイル config/validation_config を作成してください。
HBaseDataValidation {
    parallelism = 10
    specified_namespace = "default" // HBase の Namespace
    table_match_rules = ["(?!.*snapshot_for_seatunnel_incremental_source).*"] // 検証対象のテーブル。検証ツールが作成したスナップショットテーブルを除外
    zookeeper_quorum = "127.0.0.1:2181,127.0.0.2:2181,127.0.0.3:2181"
    // ユーザー名、パスワード、IP、ポートを適宜置き換えてください
    mysql_jdbc_url = "jdbc:mysql://{ip}:{port}/hbase?user=test&password=test1234"
}
  1. 増分移行ツールは元の HBase にスナップショットテーブルを作成します。スナップショットテーブルはデフォルトで $(originTableName)_snapshot_for_seatunnel_incremental_source の形式で命名されます。検証を開始する前に、table_match_rules_snapshot_for_seatunnel_incremental_source で終わるテーブルをフィルタリングして影響を回避するか、ステップ5のクリーンアップツールを実行してこのテーブルを削除してください。

  2. 検証を実行します。検証プログラムは hbase-connector.jar にパッケージされています。パッケージのデプロイディレクトリで以下のスクリプトを実行してください。

bash
./bin/validateKV.sh ./config/validation_config

ステップ5:クリーンアップツールの実行

検証に合格したら、ツールスクリプトを使用して、HBase で作成されたレプリケーションピア(Peer)、Kafka トピックなどの一時リソースをクリーンアップします。

config/incr_migration_config を使用して増分移行を行った場合、以下のスクリプトでクリーンアップを実行します。新しい設定ファイルの作成は不要で、移行時の設定ファイルをそのまま再利用できます。

bash
./bin/cleanupIncrementResource.sh ./config/incr_migration_config

設定パラメータの説明

移行タイプパラメータ分類パラメータ名パラメータ説明設定例
全量/増分移行環境設定(env)parallelismSource/Sink の並列スレッド数を指定。実際の並列度はデータソースのシャード分割に影響される場合があります1、4、10
全量/増分移行環境設定(env)job.modeタスクの実行モード。全量移行はバッチモード、増分移行はストリーミングモードに指定する必要があります"BATCH"、"STREAMING"
全量/増分移行環境設定(env)read_limit.bytes_per_second読み取りバイト数によるレート制限。全量移行時の HBase クラスターへの負荷を制御します7000000(7MB/s)
全量/増分移行環境設定(env)read_limit.rows_per_second読み取り行数によるレート制限。移行速度の補助制御200000(20万行/s)
全量/増分移行HBase データソース(source)zookeeper_quorumHBase クラスターの ZooKeeper アドレスおよびポート。HBase 接続の確立に使用"127.0.0.1:2181,127.0.0.2:2181"
全量/増分移行HBase データソース(source)SourceModeKV.specified_namespaceエクスポート対象の HBase Namespace を指定"default"、"hbase_test"
全量/増分移行HBase データソース(source)SourceModeKV.table_match_rulesエクスポート対象テーブルのマッチングルールを指定(ワイルドカード対応)["table_*", "user_info"]
全量/増分移行ターゲット側(sink-jdbc)urlTDSQL Boundless の JDBC 接続アドレス"jdbc:mysql://{ip}:{port}/?rewriteBatchedStatements=true&sessionVariables=tdsql_bulk_load='ON'"
全量/増分移行ターゲット側(sink-jdbc)batch_sizeツールのバッチトランザクションサイズ。同期効率とメモリ使用量に影響します500000、1000000
増分移行HBase データソース(source)HbaseCDCConfig.auto_enable_replicationHBase テーブルのカラムファミリの replication_scope 属性を自動的に有効化(1 に設定)true、false
増分移行HBase データソース(source)HbaseCDCConfig.kafka_bootstrap_servers増分データ一時格納用の Kafka クラスターアドレス"localhost:9092"、"192.168.1.1:9092"
全量/増分移行HBase データソース(source)is_kerberos_connectionKerberos 認証による HBase 接続を有効にするかどうかtrue、false
全量/増分移行HBase データソース(source)kerberos_ms_nameHBase Master ノードの Kerberos プリンシパル名"hbase/master@EXAMPLE.COM"
全量/増分移行HBase データソース(source)kerberos_rs_nameHBase RegionServer ノードの Kerberos プリンシパル名"hbase/regionserver@EXAMPLE.COM"
全量/増分移行HBase データソース(source)kerberos_user_nameKerberos 認証ユーザー名"hbase_user"
全量/増分移行HBase データソース(source)kerberos_keytabKerberos 認証 keytab ファイルパス"/etc/kerberos/keytabs/hbase.keytab"
全量/増分移行ターゲット側(sink-jdbc)enable_upsertUPSERT セマンティクスを有効化(存在する場合は更新、存在しない場合は挿入)true、false
増分移行HBase ポート設定hbase.master.portHBase Master サービスポートを手動で指定(デフォルトポート 16000 の競合を解決)16001、16002
増分移行HBase ポート設定hbase.master.info.portHBase Master Web UI ポートを手動で指定(デフォルトポート 16010 の競合を解決)16011、16012
増分移行HBase ポート設定zk_hostHBase ZooKeeper ポートを手動で指定"ip:port"、"127.0.0.1:12181"
増分移行HBase ポート設定hbase.regionserver.portHBase RegionServer ポートを手動で指定(デフォルトポート 16020 の競合を解決)16021、16022

Tencent Cloud プロダクトドキュメント