Book Image

HBase High Performance Cookbook

By : Ruchir Choudhry
Book Image

HBase High Performance Cookbook

By: Ruchir Choudhry

Overview of this book

Apache HBase is a non-relational NoSQL database management system that runs on top of HDFS. It is an open source, disturbed, versioned, column-oriented store and is written in Java to provide random real-time access to big Data. We’ll start off by ensuring you have a solid understanding the basics of HBase, followed by giving you a thorough explanation of architecting a HBase cluster as per our project specifications. Next, we will explore the scalable structure of tables and we will be able to communicate with the HBase client. After this, we’ll show you the intricacies of MapReduce and the art of performance tuning with HBase. Following this, we’ll explain the concepts pertaining to scaling with HBase. Finally, you will get an understanding of how to integrate HBase with other tools such as ElasticSearch. By the end of this book, you will have learned enough to exploit HBase for boost system performance.
Table of Contents (19 chapters)
HBase High Performance Cookbook
Credits
About the Author
About the Reviewer
www.PacktPub.com
Customer Feedback
Preface
7
Large-Scale MapReduce
Index

Using the filesystem


HBase depends on the Hadoop Distributed File System (HDFS).

HDFS fundamentally is a distributed file system, which relies on following core principles:

Getting ready

The following are the benefits of using HDFS:

  • It's designed to work as a fault-tolerant system and is rack aware.

  • It works on the low-cost commodity hardware.

  • HDFS relaxes core system POSIX requirements to facilitate streaming access to the underlying OS access of file system data.

  • It's designed to write once and read many times. It also supports parallel reading and processing the data (read, write, and append). It doesn't support random writes of data.

  • It's designed to scale at a very large level, which means file size like petabyte of data.

  • It works with minimum data motion. The MapReduce processes the data on the machine/node where the data is actually present. This intelligent invocation process, thus avoiding or minimizing the network I/O and keep the expensive I/O operation localized (within the same rack or to the local disk).

  • HDFS has an excellent checksummed file system at a block level, and if an inconsistency between the checksum and the block contents is observed, This does not make sense!, the communication is sent to the HDFS master, which synchronizes the making of a new replica of the affected block as well, as the removal of the corrupted block immediately.

A lot of work is continuously happening on the core implementations of HDFS; some are as follows:

  • Much granular file-level permissions and authentication.

  • Rack awareness was added to optimize the physical location during scheduling task and allocating storage.

  • For administrative purposes, a new feature was added known as Safemode.

  • In addition to these, for administrators a diagnostics service like fsck was added, this enables is to do an analysis on the missing blocks of a file system.

  • Rebalancer tool is an internal distribution mechanism which re-distributes the load in the DataNode, which becomes unbalanced due to the continuous data between DataNodes.

  • An upgrade and rollback step was added for administrators, which now allow reverting to the old version of HDFS in case of any unforeseen situations which was caused by the upgrade; this allows us a safe and painless recovery.

  • The concept of checkpoints by secondary NameNode is introduced to make sure size of the file which holds logs of HDFS changes stays within the specified limits at the NameNode.

    More Information can be obtained at this locations http://hadoop.apache.org/.

    Tip

    We are not considering a local setup of HBase as we are more focused on the HA and larger scale fully distributed setup.

Data in HDFS is not placed homogeneously in the distributed DataNodes. The most obvious reason is addition of new DataNodes is the preexisting cluster. Internally the system (NameNode) performs various checks before is starts sending the data/new blocks to the DataNode, which are listed as below:

  • One replica of a blow is kept on the same node which is writing the block.

    To make sure the fault tolerant design is compiled, the replicas are kept across the distributed rack within the cluster.

  • To reduce cross-network chattiness, one replica is placed on the same rack of the node writing to the file. This also helps to keep the homogeneousness of HDFS data in a distributed very large DataNode cluster.

  • In some scenario's there can be competing considerations, and this may cause non-uniform data across DataNode.

    To overcome this scenario, the new HDFS framework enables administrators with tools which can be use to re-balance, check the data across different DataNodes.

You would need to set up Hadoop 2.2.0 in a fully distributed mode, as discussed in the previous section. Web interface is also used for browsing the file system.

How to do it…

To use the File system we go as per the following steps:

  1. Logging the NameNode instance by the following:

    ssh hadoop@your-namenode 
    ( you can you IP or the fully qualified machine name) 
    then type cd /u/HBase B/hadoop-2.2.0/bin
    
  2. Let's run some commands related to dfs:

    Note: this will make sure the setup is proper and we are able to interact with it
    /u/HBase B/hadoop-2.2.0/bin/hadoop  dfs -ls /
    drwxr-xr-x   - hadoop supergroup  0 2014-08-13 22:48 /nn01
    drwxr-xr-x   - hadoop supergroup  0 2014-08-17 23:28 /nn02
    

    For Putting the file into HDFS:

    /u/HBase B/hadoop-2.2.0/bin/hadoop dfs  -put hello.txt /nn02/hello.txt
    running /u/HBase B/hadoop-2.2.0/bin/hadoop dfs  –du /nn01/  /nn02
    0  /nn02/hello.txt
    0  /nn01/hello.txt
    

    For the recursive version:

    /u/HBase B/hadoop-2.2.0/bin/hadoop dfs  -ltr /
    drwxr-xr-x   - hadoop supergroup  0 2014-08-13 22:48 /nn01
    -rw-r--r--   3 hadoop supergroup  0 2014-08-13 22:48 /nn01/hello.txt
    drwxr-xr-x   - hadoop supergroup  0 2014-08-17 23:39 /nn02
    -rw-r--r--   3 hadoop supergroup  0 2014-08-17 23:39 /nn02/hello.txt
    

    Similarly you can use the following commands:

    touchz, text,tail, stat, setrep, rmr, rm, put, mv, movefromLocal, mkdir, lsr, ls, getmerge, get, dus, expunge, du, copyToLocal, chown, chmod, chgrp, cat.
    
  3. Let us take a look at fsck commands:

    hdfs fsck [GENERIC_OPTIONS] <path> [-move | -delete | -openforwrite] [-files [-blocks [-locations | -racks]]]
    
    • -move: This moves the corrupted files to /lost +found

    • -delete: This deletes the corrupted files

    • -openforwrite: This prints out the files opened for write

    • -files: This prints out the files being checked

    • -blocks: This prints the block report

    • -locaitons: This prints location of every block

    • -rackes: This prints network topology for the data-node location

  4. Let's take a look on some NameNode:

    hadoop namenode [-format] | [-upgrade] | [-rollback] | [-finalize] | [-importCheckpoint]
    hadoop namenode –format  Formats the namenode.
    Hafoop namenode –upgrade ,first it upgraded the namenode and then distributes and starts the new namenode 
    Hadop namnode –rollback as the name suggests the Rollsback namenode to the previous version. This should be used only after stopping the cluster and distributing the old hadoop version.
    hadoop namenode -finalize Resent upgrade will become permanent.
    hadoop namnode –importCheckpoint Load image from a checkpoint directory and save it into the current one.
    
  5. Let's consider seconderynamenode:

    hadoop secondarynamenode [-checkpoint [force]] | [-geteditsize]
    hadoop secondarynamenode –geteditsize  Prints the Edit Log size
    hadoop secondarynamenode –checkpoint [force] checkpoints the secondary namenode if EditLog size >= fs.checkpoint.size. If –force is used, checkpoint irrespective of EditLog size.
    
  6. We have discussed DataNode and its functions:

    hadoop datanode [-rollback]
    It rollsback the datanode to the previous version. This should be only used after stopping the all the datanode and distributing the old hadoop version.
    
  7. Considering Jobtracker runs the MapReduce job tracker node:

    hadoop jobtracker
    

The HBase setup

Configuring HBase in a fully distributed environment:

  • Prerequisites: The hadoop/hdfs cluster is healthy

  • It has namenode,data node, secondary namenode setup done as discussed earlier

  • Passwordless access is there between the namenode, datanode, secondary namenocde

  • The directory structure is having appropriate access levels

  • Hope paths are set as described earlier

Just for recap you can run this command, and it must show the following details:

Tip

Please check the compatibility of Hadoop and HBase .

In this book, we used hadoop-2.2.0 and HBase 0.98.5-hadoop2.

  1. Let's go to the NameNode of Hadoop/HDFS by typing this command:

    Vi /u/HBase B/hadoop-2.2.0/etc/hadoop/ hdfs-site.xml
    

    The setup should be like this:

    These are the data nodes that we will use for regional servers later on. We will use NameNode as an HBase master node.

    vi  /u/HBase B/hadoop-2.2.0/etc/hadoop/slave
    
      it should have the nodes which will be used as a data node
      your-datanode01
      your-datenode02

The following steps will help you to implement the same:

  1. Copy the hdfs-stie.xml which is in Hadoop setup to:

    cd $HBase _HOME/conf
    
  2. Also, copy it to all the Region servers. Edit the regionserver file by:

    Vi $HBase _HOME/conf/ regionservers on the HMASTER server
    
  3. Place the IP or the fully qualified name of the region servers.

    Vi HBase -env.sh and change the export HBase _MANAGES_ZK=true
    
  4. This will allow HBase to manage the zookeeper internally on port 2181.

Starting the cluster

For starting the HBase cluster, we will go to:

  cd $HBase _HOME/bin start-HBase .sh 

This will start the entire cluster and its region servers.

Tip

Please check the logs in the log folder just to make sure the cluster starts properly:

cd $HBase _LOGS/
ls -ltr
-rw-rw-r--. 1 hadoop hadoop      0 Aug 29 19:22 SecurityAuth.audit
-rw-rw-r--. 1 hadoop hadoop  92590 Aug 30 15:04 HBase -hadoop-zookeeper-your-HBase -master.log
-rw-rw-r--. 1 hadoop hadoop 484092 Aug 30 16:31 HBase -hadoop-master-rchoudhry-your-HBase -master.log

tail -200 HBase -hadoop-zookeeper-your-HBase -master.log

There you will see no binding errors or exceptions.

tail -200 hadoop hadoop 484092 Aug 30 16:31 HBase -hadoop-master-rchoudhry-your-HBase -master.log

There should be no errors or exceptions.

Validating the cluster

Let's validate all of the setup of HBase ; on the master node run jps, it will show the following:

[hadoop@rchoudhry-linux64 logs]$ jps
960 SecondaryNameNode  // secondary name node is up
8467 NameNode // Name node is up
11892 HQuorumPeer // zookeeper is running in Quorum mode
25318 Jps // pls neglect this 
12008 HMaster // HBase Master is running successfully
8699 ResourceManager // Resource manager is running 
12171 HRegionServer  // HBase Region server is running
8974 JobHistoryServer // JobHistory Server is running

This will ensure that all the system on the master is working perfectly. We are having a region server on the master node; hence, we are seeing HRegionServer listed as earlier.

On the region server (your region server running on different node), use the same command and you will see the following:

13026 NodeManager
12425 Jps
12778 DataNode
13567 HRegionServer

We will make sure that all the region servers are working. Basic operations on the cluster:

On the HBase Master:

cd $HBase _HOME/bin
[hadoop@rchoudhry-linux64 bin]$ HBase shell -d
HBase (main):001:0>

This is the command line for HBase shell. We are using the –d option to manage it in a debug mode. In production, it should be avoided and we should see the logs file to make sure that the logs is not having connection errors to any of the components:

HBase (main):001:0> list
City_Table                                                        
MyClickStream                                                      
t1                                                                 
3 row(s) in 1.1270 seconds

["City_Table", "MyClickStream", "t1"]
HBase (main):002:0>  statusHBase (main):002:0> status 'simple'HBase (main):002:0> status 'summary'HBase (main):002:0> status 'detailed'
HBase (main):002:0> describe 'MyClickStream'
HBase (main):002:0> scan 'yourtablename'
HBase (main):002:0> create 'yourtablename','cf01',cf'02'

There are many such commands that we can run from the HBase shell command line, which we will discuss as we go through different chapters as we go ahead.

The preceding tables are created in the following section. It's just for reference.

The following is the Snapshot process:

  • We will consider from Hadoop and then from an HBase prospective; once the directory is marked as ready to snapshot, which essentially means it's not getting any operations of read/write at this particular time, at this time a snapshot can be taken.

  • It can be taken on any dir within the Hadoop/HBase data ecosystem. A snapshottable directory has a limit of 65,536 concurrent snapshots. There is no limit on the snapshottable directories (however file descriptor or other OS limitations can come into the picture). It's a good practice for administrators to set any directory to be snapshottable.

    Note

    If a snapshottable directory has snapshots, it won't allow deletes or renames before all the snapshots residing are deleted.

  • There is a system limitation that doesn't allow nested snapshottable directories.

Create a directory as a snapshot:

hdfs dfs -mkdir /snapshot 
using this command we can make it enable for snapshots.
hdfs dfsadmin -allowSnapshot /snapshot
hdfs dfs -createSnapshot /snapshot [<snapshotName>]

Deleting a snapshot:

Delete a snapshot from a snapshottable directory.

This can be only done using the owners privilege of the snapshottable directory:

  hdfs dfs -deleteSnapshot <path> <snapshotName>

Snapshots in HBase :

To reduce the impact on the Region Servers, HBase snapshots by design give flexibility to clone a table without making data copies. In addition to this, we can export the table to another cluster, this will also avoid any impact on the region server.

Configuring HBase Snapshot:

<property>
    <name>HBase .snapshot.enabled</name>
    <value>true</value>
</property>

We are assuming that a table MyClickStream is created in HBase . We can also create the table if it's not present:

./bin/HBase shell
HBase > create 'MyClickStream' ,'cf01', 'cf2'

cf01-> is represented as a column family  01
cf02-> is represented as a column family 02 

./bin/HBase shell –d 
HBase > disable 'MyClickStream'
HBase > snapshot 'MyClickStream' ,'MyClickStreamSnapshot-08302014'

Listing a Snapshot: List all the snapshots taken:

./bin/HBase shell 
HBase > list_snapshots 
  • Deleting a Snapshot: We can remove the unwanted Snapshots by running the following command:

    ./bin/HBase / shell
    HBase > delete_snapshot ''MyClickStreamSnapshot-08212014'
    
  • Clone a table from Snapshot: Cloning allows us to create a new table with the same dataset when the snapshot was taken. Changes to the clone table are isolated to itself, and the changes in the original table are not going to impact the snapshot:

    ./bin/HBase shell
    HBase > clone_snapshot 'MyClickStreamSnapshot-08212014', 'MyClickStreamSnapshot01-08212014'
    
  • Restoring Snapshots: This can be only performed when the table is disabled. The effectiveness of this process is that the table comes up with the same state as before, when we took the snapshot:

    ./bin/HBase / shell
    HBase > disable 'MyClickSteam' –-- the name of the table
    

    This will disable the table for active use and no operation like read/write it does at this point:

    HBase > restore_snapshot ''MyClickStreamSnapshot-08212014'
    

    Internally there are differences in which replication and snapshot works.

    Replication is performed at log level wherein snapshots are always at file system . Thus its essential to sync the states from the master as once the restore operation is done the replica will be different then the master. In case of we performed restore operation, it's pivotal to stop the replication process first and perform the bootstrapping operation again.

    In the scenario of limited data loss due to any client, it's recommended to clone the table using the existing snapshot and run a MapReduce job which essentially copies the data from cloned to the main, this way we don't have to go for a full restore which predecessor process is to disable the tables :

    Specify the HBase .rootdir of the other cluster:

    ./bin/HBase 
    HBase org.apache.hadoop.HBase .snapshot.ExportSnapshot -snapshot 'MyClickStreamSnapshot-08212014 -copy-to hdfs:///mynamendoe server02:8082/HBase mapper -8
    

    In case of a highly used production environment, it's advisable to restrict bandwidth consumption while exporting a snapshot.

    This can be achieved by invoking the preceding command with bandwidth parameter, as shown next; the unit of measure is megabyte per second and the value is an integer:

    ./bin/HBase 
    HBase org.apache.hadoop.HBase .snapshot.ExportSnapshot -snapshot 'MyClickStreamSnapshot-08212014 -copy-to hdfs:///mynamendoe server02:8082/HBase mapper -8 –bandwidth 200
    

How it works…

To better understand the concepts, I have broken down the parameter into:

  • WebInterface: This shows the details of NameNode and DataNode and display basic information about the cluster. The URL will be http://your -namenode-name:50070/. Alternatively you can use the same interface for navigating the filesystem within the NameNode.

  • Snapshots: Snapshots in HDFS are always read-only and represent the status of the file at the time snapshot was taken. You can restrict Inconsistency throughout chapter of snapshot versus Snapshot to a limited scope of a filesystem or Snapshot can or it can span to the entire file system.

  • HBase Snapshots: A snapshot is an array of metadata information used by administrators to restore the previous state of the tables on which it was taken. In technical meaning it's not a copy of table but it's a set of operation which calibrates metadata (which is nothing but table and regions) and the actual data (HFiles, me store, WALs).

  • Offline Snapshots: The standard scenario is to take the snapshot when the table is disabled, This makes sure that all the data is flushed on disk, and no writes or reads are accepted on this dataset. Which means, taking a snapshot is just a matter of working through the table metadata and the HFiles which reside on the disk and keeping a reference to them. The master invokes this operation, and the time required to do this operation is governed by the time taken by the HDFS NameNode to calibrate and provide the list of the files.

  • Online Snapshots: This type of snapshot works differently; in it, tables are enabled and the regions are getting read and write, or in the other words it's getting put and get by the live traffic, when master receives the request for snapshot, master coordinates it by asking all the region server to take a snapshot of their region. This works on simple-flush and does not provide casual consistency. This type of snapshot has minimal performance overhead.

DFS Administration commands:

  • bin/hadoop dfsadmin -help: provide you all the commands.

  • bin/hadoop dfsadmin -reports: provides statistics and file information.

  • bin/hadoop dfsadmin -safemode enter | leave | get | wait –.

  • Safe mode: Immediately blocks changes to the name space and converts it to read only. It also blocks replication and any delete operations on the data block.

    Note

    An important point to note about the safe mode, is that during the startup process, safe mode is turned on automatically but is switched to normal once the process detects the minimum condition is fulfilled. You can also manual trigger safe mode but in this case you have to switch-off manual mode too.

  • bin/hadoop dfsadmin –saveNamespace: This command requires su permission and saves the current namespace and resets the edit logs.

  • bin/hadoop dfsadmin –rollEdits: This rolls the edit logs. Note that this requires super user permission.

  • bin/hadoop dfsadmin -restoreFailedStorage: This comes with three parameters (Set/Unset/Check) it attempts to restore failed storage replicas only if they become available.

    Note

    This can be only done by su option.

  • bin/hadoop dfsadmin –refreshNodes: This commend updated the NameNode by allowing the DataNode to connect to the NameNode.

  • bin/hadoop dfsadmin - finalizeUpgrade: This concludes the upgrade of HDFS. This invokes an internal process and instructs the DataNodes to delete their previous version working directories, and then invoking the Namenode to do the same. This finishes the upgrade process.

  • bin/hadoop dfsadmin -deleteBlockPool: Arguments are datanodehost:port, blockpool id and an optional argument force. If force is passed, block pool directory for the given blockpool Inconsistency between id and ID on the given DataNode is deleted along with its contents; otherwise, the directory is deleted only if it is empty. The command will fail if DataNode is still serving the block pool. Refer to refresh NameNodes to shut down a block pool service on a DataNode:

  • bin/hadoop dfsadmin –help.

Let's discuss other important components:

  • SecondaryNameNode: NameNode stores changes to the native file system file (edits). During the startup process, the HDFS state is read from the image file commonly known as fsimage. These changes are applied to the edit log files. The latest state of the HDFS is pushed to the fsimage, then the normal process is invoked by generating a blank edit log file. In essence, NameNode combines these two(fsimage and log) files during the startup. This merge process makes the next restart faster.

  • Rebalancer: The HDFS cluster gets easily imbalanced due to the following reasons:

    When a new DataNode joins the cluster, any map task assigned to the machine most likely does not access local data, thus consuming more network bandwidth. When the DataNodes becomes full new, atablocks are placed on full data nodes, thus reducing the read parallelism.

  • Rack Awareness: As NameNode design is for HA/Fault tolerant thus the system attempts to cascade the replicas of block on the multiple racks. Using the variable dfs.network.script, the administrator can govern these settings.

  • Safemode: Makes the HDFS block read-only.

  • fsck: It's designed to report problems with missing blocks, under-replicated blocks; fsck ignores open files. Depending on the needs it can be run on a subsection of files or can be run on the entire file system which is under NameNode.

  • Snapshotting: We will consider from Hadoop and then from the HBase perspective.

    Snapshots process is very flexible and robust and it allows snapshots at directory level, cascaded directory level. Total of 65,536 simultaneous snapshots can be accommodated. In essence there is no limit on snapshottable directories.

    Tip

    Nested snapshottable directories are currently not possible.

    Exporting to another cluster tool helps us duplicate the data between clusters. The data copied is hfiles, logs, and snapshot metadata. This works at a file system (HDFS) level, thus it's necessary to have an HBase cluster fully online also. This is the reason it does not impact the RegionServer workload.

    In the preceding section, we discussed the core file system, which is the foundation of HBase . We discussed HDFS and how it's related to Hadoop ecosystem and then how HBase relies on the Hadoop/HBase foundation to work. In doing so, we discussed the internal structure of the HDFS, HBase integration points. In step 1 to 9, we discussed the HDFS/Hadoop commands in a fully distributed mode. This is needed to make sure that HBase runs in the fully distributed environment. We cannot run HBase if we don't have the Hadoop setup; however for development purposes we can run HBase using standalone mode installation; the other way will be to run it in Pseudo-Distributed.

There is more…

The entire process helps us set up the Hadoop/HDFS file system, and later on HBase can sit on get the benefits of the HDFS distributed architecture.

See also

Refer to the following chapter:

  • Working with Large Distributed Systems.