Cloudera Enterprise 6.0.x | Other versions

HBase Replication

If your data is already in an HBase cluster, replication is useful for getting the data into additional HBase clusters. In HBase, cluster replication refers to keeping one cluster state synchronized with that of another cluster, using the write-ahead log (WAL) of the source cluster to propagate the changes. Replication is enabled at column family granularity. Before enabling replication for a column family, create the table and all column families to be replicated, on the destination cluster.

Cluster replication uses an active-push methodology. An HBase cluster can be a source (also called active, meaning that it writes new data), a destination (also called passive, meaning that it receives data using replication), or can fulfill both roles at once. Replication is asynchronous, and the goal of replication is consistency.

When data is replicated from one cluster to another, the original source of the data is tracked with a cluster ID, which is part of the metadata. All clusters that have already consumed the data are also tracked. This prevents replication loops.

Continue reading:

Common Replication Topologies

  • A central source cluster might propagate changes to multiple destination clusters, for failover or due to geographic distribution.
  • A source cluster might push changes to a destination cluster, which might also push its own changes back to the original cluster.
  • Many different low-latency clusters might push changes to one centralized cluster for backup or resource-intensive data-analytics jobs. The processed data might then be replicated back to the low-latency clusters.
  • Multiple levels of replication can be chained together to suit your needs. The following diagram shows a hypothetical scenario. Use the arrows to follow the data paths.

At the top of the diagram, the San Jose and Tokyo clusters, shown in red, replicate changes to each other, and each also replicates changes to a User Data and a Payment Data cluster.

Each cluster in the second row, shown in blue, replicates its changes to the All Data Backup 1 cluster, shown in grey. The All Data Backup 1 cluster replicates changes to the All Data Backup 2 cluster (also shown in grey), as well as the Data Analysis cluster (shown in green). All Data Backup 2 also propagates any of its own changes back to All Data Backup 1.

The Data Analysis cluster runs MapReduce jobs on its data, and then pushes the processed data back to the San Jose and Tokyo clusters.

Notes about Replication

  • The timestamps of the replicated HLog entries are kept intact. In case of a collision (two entries identical as to row key, column family, column qualifier, and timestamp) only the entry arriving later write will be read.
  • Increment Column Values (ICVs) are treated as simple puts when they are replicated. In the case where each side of replication is active (new data originates from both sources, which then replicate each other), this may be undesirable, creating identical counters that overwrite one another. (See https://issues.apache.org/jira/browse/HBase-2804.)
  • Make sure the source and destination clusters are time-synchronized with each other. Cloudera recommends you use Network Time Protocol (NTP).
  • Some changes are not replicated and must be propagated through other means, such as Snapshots or CopyTable.

    • Data that existed in the active cluster before replication was enabled.

    • Operations that bypass the WAL, such as when using BulkLoad or API calls such as writeToWal(false).

    • Table schema modifications.

Requirements

Before configuring replication, make sure your environment meets the following requirements:

  • You must manage ZooKeeper yourself. It must not be managed by HBase, and must be available throughout the deployment.
  • Each host in both clusters must be able to reach every other host, including those in the ZooKeeper cluster.
  • Both clusters must be running the same major version of CDH; for example CDH 5.
  • Every table that contains families that are scoped for replication must exist on each cluster and have exactly the same name. If your tables do not yet exist on the destination cluster, see Creating the Empty Table On the Destination Cluster.
  • HBase version 0.92 or greater is required for complex replication topologies, such as active-active.

Deploying HBase Replication

Follow these steps to enable replication from one cluster to another.
  Important: You cannot run replication-related HBase commands as an HBase administrator. To run replication-related HBase commands, you must have HBase user permissions. If ZooKeeper uses Kerberos, configure HBase Shell to authenticate to ZooKeeper using Kerberos before attempting to run replication-related commands. No replication-related ACLs are available at this time.
  1. Configure and start the source and destination clusters.
  2. Create tables with the same names and column families on both the source and destination clusters, so that the destination cluster knows where to store data it receives. All hosts in the source and destination clusters should be reachable to each other. See Creating the Empty Table On the Destination Cluster.
  3. On the source cluster, enable replication in Cloudera Manager, or by setting hbase.replication to true in hbase-site.xml.
  4. Obtain Kerberos credentials as the HBase principal. Substitute your fully.qualified.domain.name and realm in the following command:
    $ kinit -k -t /etc/hbase/conf/hbase.keytab hbase/fully.qualified.domain.name@YOUR-REALM.COM
  5. On the source cluster, in HBase Shell, add the destination cluster as a peer, using the add_peer command. The syntax is as follows:
    add_peer 'ID', 'CLUSTER_KEY'
    The ID must be a short integer. To compose the CLUSTER_KEY, use the following template:
    hbase.zookeeper.quorum:hbase.zookeeper.property.clientPort:zookeeper.znode.parent

    If both clusters use the same ZooKeeper cluster, you must use a different zookeeper.znode.parent, because they cannot write in the same folder.

  6. On the source cluster, configure each column family to be replicated by setting its REPLICATION_SCOPE to 1, using commands such as the following in HBase Shell.
    hbase> disable 'example_table'
    hbase> alter 'example_table', {NAME => 'example_family', REPLICATION_SCOPE => '1'}
    hbase> enable 'example_table'
  7. Verify that replication is occurring by examining the logs on the source cluster for messages such as the following.
    Considering 1 rs, with ratio 0.1
    Getting 1 rs from peer cluster # 0
    Choosing peer 192.0.2.49:62020
  8. To verify the validity of replicated data, use the included VerifyReplication MapReduce job on the source cluster, providing it with the ID of the replication peer and table name to verify. Other options are available, such as a time range or specific families to verify.

    The command has the following form:

    hbase org.apache.hadoop.hbase.mapreduce.replication.VerifyReplication [--starttime=timestamp1] [--stoptime=timestamp] [--families=comma separated list of families] <peerId> <tablename>

    The VerifyReplication command prints GOODROWS and BADROWS counters to indicate rows that did and did not replicate correctly.

Replicating Across Three or More Clusters

When configuring replication among three or more clusters, Cloudera recommends you enable KEEP_DELETED_CELLS on column families in the destination cluster, where REPLICATION_SCOPE=1 in the source cluster. The following commands show how to enable this configuration using HBase Shell.
  • On the source cluster:
    create 't1',{NAME=>'f1', REPLICATION_SCOPE=>1}
  • On the destination cluster:
    create 't1',{NAME=>'f1', KEEP_DELETED_CELLS=>'true'}

Enabling Replication on a Specific Table

To enable replication for a specific table on the source cluster, run the enable_table_replication <table> command from the HBase shell on a cluster where a peer has been configured.

Running enable_table_replication <table> does the following:
  1. Verifies that the table exists on the source cluster.
  2. If the table does not exist on the remote cluster, uses the peer configuration to duplicate the table schema (including splits) on the remote cluster.
  3. Enables replication on that table.

Configuring Secure Replication

The following steps describe how to set up secure replication between clusters. The steps are the same whether your clusters are all in the same realm or not, with the exception of the last step.

The last step involves setting up custom secure replication configurations per peer. This can be convenient when you need to replicate to a cluster that uses different cross-realm authentication rules than the source cluster. For example, a cluster in Realm A may be allowed to replicate to Realm B and Realm C, but Realm B may not be allowed to replicate to Realm C. If you do not need this feature, skip the last step.

To use this feature, service-level principals and keytabs (specific to HBase) must be specified when you create the cluster peers using HBase Shell.

  Note: HBase peer-to-peer replication from a non-Kerberized cluster to a Kerberized cluster is not supported.
  1. Set up Kerberos on your cluster, as described in Enabling Kerberos Authentication for CDH.
  2. If necessary, configure Kerberos cross-realm authentication.
    • At the command line, use the list_principals command to list the kdc, admin_server, and default_domain for each realm.
    • Add this information to each cluster using Cloudera Manager. For each cluster, go to HDFS > Configuration > Trusted Kerberos Realms. Add the target and source. This requires a restart of HDFS.
  3. Configure ZooKeeper.
  4. Configure the following HDFS parameters on both clusters, in Cloudera Manager or in the listed files if you do not use Cloudera Manager:
      Note:

    If you use Cloudera Manager to manage your cluster, do not set these properties directly in configuration files, because Cloudera Manager will overwrite or ignore these settings. You must set these properties in Cloudera Manager.

    For brevity, the Cloudera Manager setting names are not listed here, but you can search by property name. For instance, in the HDFS service configuration screen, search for dfs.encrypt.data.transfer. The Enable Data Transfer Encryption setting is shown. Selecting the box is equivalent to setting the value to true.

    <!-- In hdfs-site.xml or advanced configuration snippet -->
    <property>
      <name>dfs.encrypt.data.transfer</name>
      <value>true</value>
    </property>
    <property>
      <name>dfs.data.transfer.protection</name>
      <value>privacy</value>
    </property>
    
    <!-- In core-site.xml or advanced configuration snippet -->
    <property>
      <name>hadoop.security.authorization</name>
      <value>true</value>
    </property>
    <property>
      <name>hadoop.rpc.protection</name>
      <value>privacy</value>
    </property>
    <property>
      <name>hadoop.security.crypto.cipher.suite</name>
      <value>AES/CTR/NoPadding</value>
    </property>
    <property>
      <name>hadoop.ssl.enabled</name>
      <value>true</value>
    </property>
  5. Configure the following HBase parameters on both clusters, using Cloudera Manager or in hbase-site.xml if you do not use Cloudera Managert.
    <!-- In hbase-site.xml -->
    <property>
      <name>hbase.rpc.protection</name>
      <value>privacy</value>
    </property>
    <property>
      <name>hbase.thrift.security.qop</name>
      <value>auth-conf</value>
    </property>
    <property>
      <name>hbase.thrift.ssl.enabled</name>
      <value>true</value>
    </property>
    <property>
      <name>hbase.rest.ssl.enabled</name>
      <value>true</value>
    </property>
    <property>
      <name>hbase.ssl.enabled</name>
      <value>true</value>
    </property>
    <property>
      <name>hbase.security.authentication</name>
      <value>kerberos</value>
    </property>
    <property>
      <name>hbase.security.authorization</name>
      <value>true</value>
    </property>
    <property>
      <name>hbase.secure.rpc.engine</name>
      <value>true</value>
    </property>
  6. Add the cluster peers using the simplified add_peer syntax, as described in Add Peer.
    add_peer 'ID', 'CLUSTER_KEY'
  7. If you need to add any peers which require custom security configuration, modify the add_peer syntax, using the following examples as a model.
    add_peer 'vegas', CLUSTER_KEY => 'zk1.vegas.example.com:2181:/hbase',
             CONFIG => {'hbase.master.kerberos.principal'           => 'hbase/_HOST@TO_VEGAS',
                        'hbase.regionserver.kerberos.principal'     => 'hbase/_HOST@TO_VEGAS',
                        'hbase.regionserver.keytab.file'            => '/keytabs/vegas_hbase.keytab',
                        'hbase.master.keytab.file'                  => '/keytabs/vegas_hbase.keytab'},
              TABLE_CFS => {"tbl" => [cf1']}
    
    add_peer 'atlanta', CLUSTER_KEY => 'zk1.vegas.example.com:2181:/hbase',
             CONFIG => {'hbase.master.kerberos.principal'           => 'hbase/_HOST@TO_ATLANTA',
                        'hbase.regionserver.kerberos.principal'     => 'hbase/_HOST@TO_ATLANTA',
                        'hbase.regionserver.keytab.file'            => '/keytabs/atlanta_hbase.keytab',
                        'hbase.master.keytab.file'                  => '/keytabs/atlanta_hbase.keytab'},
              TABLE_CFS => {"tbl" => [cf2']}

Disabling Replication at the Peer Level

Use the command disable_peer ("<peerID>") to disable replication for a specific peer. This will stop replication to the peer, but the logs will be kept for future reference.
  Note: This log accumulation is a powerful side effect of the disable_peer command and can be used to your advantage. See Initiating Replication When Data Already Exists.

To re-enable the peer, use the command enable_peer(<"peerID">). Replication resumes.

Examples:

  • To disable peer 1:
disable_peer("1")
  • To re-enable peer 1:
enable_peer("1")
If you disable replication, and then later decide to enable it again, you must manually remove the old replication data from ZooKeeper by deleting the contents of the replication queue within the /hbase/replication/rs/ znode. If you fail to do so, and you re-enable replication, the source cluster cannot reassign previously-replicated regions. Instead, you will see logged errors such as the following:
2015-04-20 11:05:25,428 INFO  org.apache.hadoop.hbase.replication.ReplicationQueuesZKImpl:
    Won't  transfer the queue, another RS took care of it because of:  KeeperErrorCode = NoNode for
    /hbase/replication/rs/c856fqz.example.com,60020,1426225601879/lock

Stopping Replication in an Emergency

If replication is causing serious problems, you can stop it while the clusters are running.

Open the shell on the source cluster and use the disable_peer command for each peer, then the disable_table_replication command. For example:

hbase> disable_peer("1")
hbase> disable_table_replication

Already queued edits will be replicated after you use the disable_table_replication command, but new entries will not. See Understanding How WAL Rolling Affects Replication.

To start replication again, use the enable_peer command.

Creating the Empty Table On the Destination Cluster

If the table to be replicated does not yet exist on the destination cluster, you must create it. The easiest way to do this is to extract the schema using HBase Shell.

  1. On the source cluster, describe the table using HBase Shell. The output below has been reformatted for readability.
    hbase> describe acme_users
    
    Table acme_users is ENABLED
    acme_users
    COLUMN FAMILIES DESCRIPTION
    {NAME => 'user', DATA_BLOCK_ENCODING => 'NONE', BLOOMFILTER => 'NONE',
    REPLICATION_SCOPE => '0', VERSIONS => '3', COMPRESSION => 'NONE',
    MIN_VERSIONS => '0', TTL => 'FOREVER', KEEP_DELETED_CELLS => 'FALSE',
    BLOCKSIZE => '65536', IN_MEMORY => 'false', BLOCKCACHE => 'false'}
  2. Copy the output and make the following changes:
    • For the TTL, change FOREVER to org.apache.hadoop.hbase.HConstants::FOREVER.
    • Add the word CREATE before the table name.
    • Remove the line COLUMN FAMILIES DESCRIPTION and everything above the table name.
    The result is a command like the following:
    "CREATE 'cme_users' ,
    
    {NAME => 'user', DATA_BLOCK_ENCODING => 'NONE', BLOOMFILTER => 'NONE',
    REPLICATION_SCOPE => '0', VERSIONS => '3', COMPRESSION => 'NONE',
    MIN_VERSIONS => '0', TTL => org.apache.hadoop.hbase.HConstants::FOREVER, KEEP_DELETED_CELLS => 'FALSE',
    BLOCKSIZE => '65536', IN_MEMORY => 'false', BLOCKCACHE => 'false'}
  3. On the destination cluster, paste the command from the previous step into HBase Shell to create the table.

Initiating Replication When Data Already Exists

You may need to start replication from some point in the past. For example, suppose you have a primary HBase cluster in one location and are setting up a disaster-recovery (DR) cluster in another. To initialize the DR cluster, you need to copy over the existing data from the primary to the DR cluster, so that when you need to switch to the DR cluster you have a full copy of the data generated by the primary cluster. Once that is done, replication of new data can proceed as normal.

One way to do this is to take advantage of the write accumulation that happens when a replication peer is disabled.

  1. Start replication.
  2. Add the destination cluster as a peer and immediately disable it using disable_peer.
  3. On the source cluster, take a snapshot of the table and export it. The snapshot command flushes the table from memory for you.
  4. On the destination cluster, import and restore the snapshot.
  5. Run enable_peer to re-enable the destination cluster.

Replicating Pre-existing Data in an Active-Active Deployment

In the case of active-active replication, run the copyTable job before starting the replication. (If you start the job after enabling replication, the second cluster will re-send the data to the first cluster, because copyTable does not edit the clusterId in the mutation objects. The following is one way to accomplish this:

  1. Run the copyTable job and note the start timestamp of the job.
  2. Start replication.
  3. Run the copyTable job again with a start time equal to the start time you noted in step 1.

This results in some data being pushed back and forth between the two clusters; but it minimizes the amount of data.

Understanding How WAL Rolling Affects Replication

When you add a new peer cluster, it only receives new writes from the source cluster since the last time the WAL was rolled.

The following diagram shows the consequences of adding and removing peer clusters with unpredictable WAL rolling occurring. Follow the time line and notice which peer clusters receive which writes. Writes that occurred before the WAL is rolled are not retroactively replicated to new peers that were not participating in the cluster before the WAL was rolled.

Configuring Secure HBase Replication

If you want to make HBase Replication secure, follow the instructions under HBase Authentication.

Restoring Data From A Replica

One of the main reasons for replications is to be able to restore data, whether during disaster recovery or for other reasons. During restoration, the source and sink roles are reversed. The source is the replica cluster, and the sink is the cluster that needs restoration. This can be confusing, especially if you are in the middle of a disaster recovery scenario. The following image illustrates the role reversal between normal production and disaster recovery.


HBase replication roles in normal production versus disaster recovery

Follow these instructions to recover HBase data from a replicated cluster in a disaster recovery scenario.
  1. Change the value of the column family property REPLICATION_SCOPE on the sink to 0 for each column to be restored, so that its data will not be replicated during the restore operation.
  2. Change the value of the column family property REPLICATION_SCOPE on the source to 1 for each column to be restored, so that its data will be replicated.
  3. Use the CopyTable or distcp commands to import the data from the backup to the sink cluster, as outlined in Initiating Replication When Data Already Exists.
  4. Add the sink as a replication peer to the source, using the add_peer command as discussed in Deploying HBase Replication. If you used distcp in the previous step, restart or rolling restart both clusters, so that the RegionServers will pick up the new files. If you used CopyTable, you do not need to restart the clusters. New data will be replicated as it is written.
  5. When restoration is complete, change the REPLICATION_SCOPE values back to their values before initiating the restoration.

Verifying that Replication is Working

To verify that HBase replication is working, follow these steps to confirm data has been replicated from a source cluster to a remote destination cluster.

  1. Install and configure YARN on the source cluster.

    If YARN cannot be used in the source cluster, configure YARN on the destination cluster to verify replication. If neither the source nor the destination clusters can have YARN installed, you can configure the tool to use local mode; however, performance and consistency could be negatively impacted.

  2. Make sure that you have the required permissions:
    • You have sudo permissions to run commands as the hbase user, or a user with admin permissions on both clusters.
    • You are an hbase user configured for submitting jobs with YARN.
        Note: To use the hbase user in a secure cluster, use Cloudera Manager to add the hbase user as a YARN whitelisted user. For a new installation, the hbase user is already added to the whitelisted users. In addition, /user/hbase should exist on HDFS and owned as the hbase user, because YARN will create a job staging directory there.
  3. Run the VerifyReplication command:
    src-node$ sudo -u hbase hbase org.apache.hadoop.hbase.mapreduce.replication.VerifyReplication peer1 table1
    ...
            org.apache.hadoop.hbase.mapreduce.replication.VerifyReplication$Verifier$Counters
                    BADROWS=2
                    CONTENT_DIFFERENT_ROWS=1
                    GOODROWS=1
                    ONLY_IN_PEER_TABLE_ROWS=1
            File Input Format Counters
                    Bytes Read=0
            File Output Format Counters
                    Bytes Written=0
    The following table describes the VerifyReplication counters:
    Table 1. VerifyReplication Counters
    Counter Description
    GOODROWS Number of rows. On both clusters, and all values are the same.
    CONTENT_DIFFERENT_ROWS The key is the same on both source and destination clusters for a row, but the value differs.
    ONLY_IN_SOURCE_TABLE_ROWS Rows that are only present in the source cluster but not in the destination cluster.
    ONLY_IN_PEER_TABLE_ROWS Rows that are only present in the destination cluster but not in the source cluster.
    BADROWS Total number of rows that differ from the source and destination clusters; the sum of CONTENT_DIFFERENT_ROWS + ONLY_IN_SOURCE_TABLE_ROWS + ONLY_IN_PEER_TABLE_ROWS

By default, VerifyReplication compares the entire content of table1 on the source cluster against table1 on the destination cluster that is configured to use the replication peer peer1.

Use the following options to define the period of time, versions, or column families
Table 2. VerifyReplication Counters
Option Description
--starttime=<timestamp> Beginning of the time range, in milliseconds. Time range is forever if no end time is defined.
--endtime=<timestamp> End of the time range, in milliseconds.
--versions=<versions> Number of cell versions to verify.
--families=<cf1,cf2,..> Families to copy; separated by commas.
The following example, verifies replication only for rows with a timestamp range of one day:
src-node$ sudo -u hbase hbase org.apache.hadoop.hbase.mapreduce.replication.VerifyReplication --starttime=1472499077000 --endtime=1472585477000 --families=c1  peer1 table1

Replication Caveats

  • Two variables govern replication: hbase.replication as described above under Deploying HBase Replication, and a replication znode. Stopping replication (using disable_table_replication as above) sets the znode to false. Two problems can result:
    • If you add a new RegionServer to the active cluster while replication is stopped, its current log will not be added to the replication queue, because the replication znode is still set to false. If you restart replication at this point (using enable_peer), entries in the log will not be replicated.
    • Similarly, if a log rolls on an existing RegionServer on the active cluster while replication is stopped, the new log will not be replicated, because the replication znode was set to false when the new log was created.
  • In the case of a long-running, write-intensive workload, the destination cluster may become unresponsive if its meta-handlers are blocked while performing the replication. CDH has three properties to deal with this problem:
    • hbase.regionserver.replication.handler.count - the number of replication handlers in the destination cluster (default is 3). Replication is now handled by separate handlers in the destination cluster to avoid the above-mentioned sluggishness. Increase it to a high value if the ratio of active to passive RegionServers is high.
    • replication.sink.client.retries.number - the number of times the HBase replication client at the sink cluster should retry writing the WAL entries (default is 1).
    • replication.sink.client.ops.timeout - the timeout for the HBase replication client at the sink cluster (default is 20 seconds).
  • For namespaces, tables, column families, or cells with associated ACLs, the ACLs themselves are not replicated. The ACLs need to be re-created manually on the target table. This behavior opens up the possibility for the ACLs could be different in the source and destination cluster.
Page generated July 25, 2018.