Skip to content

Commit

Permalink
Update documentation
Browse files Browse the repository at this point in the history
  • Loading branch information
plypaul committed Apr 8, 2016
1 parent 368d7f2 commit 876d7cb
Show file tree
Hide file tree
Showing 7 changed files with 130 additions and 64 deletions.
86 changes: 28 additions & 58 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,27 +1,29 @@
# ReAir

ReAir is a collection of tools for replicating tables and partitions between Hive data warehouses (aka Hive clusters).
ReAir is a collection of easy-to-use tools for replicating tables and partitions between Hive data warehouses. These tools are targeted at developers that already have some familiarity with operating warehouses based on Hadoop and Hive.

## Overview

The replication features in ReAir are useful for the following use cases:

* Migration of a Hive cluster
* Incremental replication between two clusters
* Migration of a Hive warehouse
* Incremental replication between two warehouses
* Disaster recovery

When migrating a Hive cluster, ReAir can be used to copy over existing data to the new cluster. Because ReAir copies both data and metadata, datasets are ready to query as soon as the copy completes.
When migrating a Hive warehouse, ReAir can be used to copy over existing data to the new warehouse. Because ReAir copies both data and metadata, datasets are ready to query as soon as the copy completes.

While many organizations start out with a single Hive cluster, they often want better isolation between production and adhoc workloads. Two isolated Hive clusters accommodate this need well, and with two clusters, there is a need to replicate evolving datasets. ReAir can be used to replicate data from one cluster to another and propagate updates incrementally as they occur.
While many organizations start out with a single Hive warehouse, they often want better isolation between production and ad hoc workloads. Two isolated Hive warehouses accommodate this need well, and with two warehouses, there is a need to replicate evolving datasets. ReAir can be used to replicate data from one warehouse to another and propagate updates incrementally as they occur.

Lastly, ReAir can be used to replicated datasets to a hot-standby cluster for fast failover in disaster recovery scenarios.
Lastly, ReAir can be used to replicated datasets to a hot-standby warehouse for fast failover in disaster recovery scenarios.

To accommodate these use cases, ReAir includes both batch and incremental replication tools. Batch replication executes a one-time copy of a list of tables. Incremental replication is run as a long-running process that copies objects as they are created or changed on the source cluster.
To accommodate these use cases, ReAir includes both batch and incremental replication tools. Batch replication executes a one-time copy of a list of tables. Incremental replication is a long-running process that copies objects as they are created or changed on the source warehouse.

## Additional Documentation

* Blog post
* FAQ
* Blog Post
* [FAQ](docs/faq.md)
* [Known Issues](docs/known_issues.md)
* [Large HDFS Directory Copy](docs/hdfs_copy.md)

## Batch Replication

Expand All @@ -47,47 +49,15 @@ my_db1 my_table1
my_db2 my_table2
```

* Launch the job using the `hadoop jar` command, specifying the config file and the list of tables to copy. A larger heap for the client may be needed for large batches.
* Launch the job using the `hadoop jar` command on the destination, specifying the config file and the list of tables to copy. A larger heap for the client may be needed for large batches.

```
export HADOOP_OPTS="-Dlog4j.configuration=file://<path to log4j.properties>"
export HADOOP_HEAPSIZE=8096
hadoop jar airbnb-reair-main-1.0.0-all.jar --config-file my_config_file.xml --table-list my_tables_to_copy.txt
```

* Additional CLI Options: `--step`, `--override-input`. These arguments are useful if want to run one of the three MR job individually. `--step` indicates which step to run. `--override-input` provides the path for the input when running the second and third stage MR jobs. The input path will usually be the output for the first stage MR job.

### Run a Standalone HDFS Copy

* Switch to the repo directory and build the JAR.

```
cd reair
gradlew shadowjar -p main -x test
```

* If HDFS copy logging table does not exist, create using [these commands](main/src/main/resources/create_hdfs_copy_logging_tables.sql)

* CLI options:
```
-source: source directory
-destination: destination directory. source and destination must have same relative path.
-temp-path: copy temporary directory
-output-path: logging directory
-operation: checking options: comma separated option including a(add),d(delete),u(update)
-blacklist: directory name blacklist regex
-dry-run: dry run mode
```

* To starts HDFS replication

```
export HADOOP_HEAPSIZE=8096
timestamp="$(date +"%s")"
hadoop jar airbnb-reair-main-1.0.0-all.jar com.airbnb.di.hive.batchreplication.hdfscopy.ReplicationJob -Dmapreduce.job.reduces=500 -Dmapreduce.map.memory.mb=8000 -Dmapreduce.map.java.opts="-Djava.net.preferIPv4Stack=true -Xmx7000m" -source hdfs://airfs-src/ -destination hdfs://airfs-dest/ -temp-path hdfs://airfs-dest/tmp -output-path hdfs://airfs-dest/user/test/fullrepljob -blacklist "tmp.*" -operation a,u,d
hive -e "LOAD DATA INPATH '/user/test/fullrepljob' OVERWRITE INTO TABLE hdfscopy_result partition ( jobts = $timestamp);"
```
* Additional CLI Options: `--step`, `--override-input`. These arguments are useful if want to run one of the three MR job individually for faster failure recovery. `--step` indicates which step to run. `--override-input` provides the path for the input when running the second and third stage MR jobs. The input path will usually be the output for the first stage MR job.

## Incremental Replication

Expand All @@ -102,15 +72,14 @@ Build and deploy the JAR containing the audit log hook

* Switch to the repository directory and build the JAR.


```
cd reair
gradlew shadowjar -p hive-hooks -x test
```

* Once built, the JAR for the audit log hook can be found under `hive-hooks/build/libs/airbnb-reair-hive-hooks-1.0.0-all.jar`
* Once built, the JAR for the audit log hook can be found under `hive-hooks/build/libs/airbnb-reair-hive-hooks-1.0.0-all.jar`.

* Copy the JAR to the Hive auxiliary library path. The specifics of the path depending on your setup. Generally, the Hive client's auxiliary library path can be configured using the configuration parameter `hive.aux.jars.path` or through environment variables as defined in shell scripts that launch Hive.
* Copy the JAR to the Hive auxiliary library path. The specifics of the path depending on your setup. Generally, the auxiliary library path can be configured using the configuration parameter `hive.aux.jars.path` or through environment variables as defined in shell scripts that launch Hive.

* Create and setup the tables on MySQL required for the audit log. You can create the tables by running the create table commands in all of the .sql files [here](hive-hooks/src/main/resources/). If you're planning to use the same DB to store the tables for incremental replication, also run the create table commands [here](main/src/main/resources/create_tables.sql).

Expand All @@ -120,38 +89,40 @@ gradlew shadowjar -p hive-hooks -x test

### Process Setup

* If the tables for incremental replication were not set up while setting up the audit log, create the state tables for incremental replication on desired MySQL instance by running the create table commands listed [here](main/src/main/resources/create_tables.sql).
* If the MySQL tables for incremental replication were not set up while setting up the audit log, create the state tables for incremental replication on desired MySQL instance by running the create table commands listed [here](main/src/main/resources/create_tables.sql).

* Read through and fill out the configuration from the [template](main/src/main/resources/replication_configuration_template.xml). You might want to deploy the file to a widely accessible location.

* Switch to the repo directory and build the JAR. You can skip the unit tests if no changes have been made (via the '-x test' flag).
* Switch to the repo directory and build the JAR. You can skip the unit tests if no changes have been made (via the `'-x test'` flag).

```
cd reair
gradlew shadowjar -p main
```

Once it finishes, the JAR to run the incremental replication process can be found under `main/build/libs/airbnb-reair-main-1.0.0-all.jar`
Once the build finishes, the JAR to run the incremental replication process can be found under `main/build/libs/airbnb-reair-main-1.0.0-all.jar`

* To start replicating, add the directory containing the Hive libraries to the Hadoop classpath and then kick off the replication launcher by using the `hadoop jar` command. Be sure to specify the configuration file that was filled out in the prior step.
* To start replicating, set options to point to the appropriate logging configuration and kick off the replication launcher by using the `hadoop jar` command on the destination cluster. Be sure to specify the configuration file that was filled out in the prior step.

```
export HADOOP_OPTS="-Dlog4j.configuration=file://<path to log4j.properties>"
hadoop jar airbnb-reair-main-1.0.0-all.jar com.airbnb.di.hive.replication.deploy.ReplicationLauncher --config-files my_config_file.xml
```

By default logging messages with the `INFO` level will be printed to stderr, but more detailed logging messages with >= `DEBUG` logging level will be recorded to a log file in the current working directory. To see how this is configured, take a look at the [logging configuration file]((main/src/main/resources/log4j.properties).
If you use the recommended [`log4j.properties`](main/src/main/resources/log4j.properties) file that is shipped with the tool, messages with the `INFO` level will be printed to `stderr`, but more detailed logging messages with >= `DEBUG` logging level will be recorded to a log file in the current working directory.

When the incremental replication process is launched for the first time, it will start replicating entries after the highest numbered ID in the audit log. Because the process periodically checkpoints progress to the DB, it can be killed and will resume from where it left off when restarted. To override this behavior, please see the additional options section.

* Verify that entries are replicated properly by creating a test table on the source cluster and checking to see if it appears on the destination cluster.
* Verify that entries are replicated properly by creating a test table on the source warehouse and checking to see if it appears on the destination warehouse.

* For production deployment, an external process should monitor and restart the replication process if it exits.
For production deployment, an external process should monitor and restart the replication process if it exits. The replication process will exit if the number of consecutive failures while making RPCs or DB queries exceed the configured number of retries.

### Additional CLI options:

To force the process to start replicating entries after a particular audit log ID, you can pass the `--start-after-id` parameter:

```
export HADOOP_OPTS="-Dlog4j.configuration=file://<path to log4j.properties>"
hadoop jar airbnb-reair-main-1.0.0-all.jar com.airbnb.di.hive.replication.deploy.ReplicationLauncher --config-files my_config_file.xml --start-after-id 123456
```

Expand Down Expand Up @@ -180,8 +151,7 @@ web-server/build/libs/airbnb-reair-web-server-1.0.0-all.jar
java -jar airbnb-reair-web-server-1.0.0-all.jar --thrift-host localhost --thrift-port 9996 --http-port 8080
```

* Point your browser to the appropriate URL e.g. `http://localhost:8080` to view the active / retired replication jobs.
* Point your browser to the appropriate URL e.g. `http://localhost:8080` to view the active and retired replication jobs.

### Known Issues
* Due to https://issues.apache.org/jira/browse/HIVE-12865, exchange partition commands will be replicated under limited conditions. Resolution is pending.
* Since the audit log hook writes the changes in a separate transaction from the Hive metastore, it's possible to miss updates if the client fails after the metastore write, but before hook execution. In practice, this is not an issue if failed Hive queries are re-run.
# Discussion Group
A discussion group is available [here](https://groups.google.com/forum/#!forum/airbnb-reair).
37 changes: 37 additions & 0 deletions docs/faq.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
# Frequently Asked Questions

### How do I use these tools to migrate a warehouse?

One approach to migrating a warehouse is to use batch replication to get an initial copy of the tables in the original warehouse into the new warehouse. Once the initial copy is done, incremental replication can be used to replicate changes from a point shortly before batch replication was kicked off. Then, incremental replication keeps both clusters in sync until the cutover date.

### How do I run against different versions of Hadoop / Hive?

As shipped, the default configuration will work with most Hive and Hadoop v2 deployments because of the generally backward-compatible nature of the API calls used in this project. We have not been able to test against a variety of different versions, but it's possible to modify `build.gradle` and specify different Hadoop and Hive versions to produce more appropriate binaries if you encounter issues.

### For idempotent file copy operations, how is file equality determined?

To provide a fast check of equality, files are considered equal if the sizes and modified times match between the source and destination warehouses. Files that are considered equal will not be re-copied, and this behavior may not be suitable for all applications.

### How do I filter out specific tables from getting replicated?

Both batch and incremental replication provide a blacklist mechanism. For both, please see the example configuration templates for the appropriate configuration variable for blacklisting entries.

### How do tables on S3 get replicated?

For tables and partitions where the location is on S3, the metadata for the tables is copied over to the destination warehouse. Be aware that replicated S3-backed tables should be created as external tables, or there could be issues with Hive operations (e.g. `DROP TABLE`) causing inconsistencies.

## Batch Replication

### What kind of consistency guarantees does batch replication provide while it's running?

While batch replication is running, there are no consistency guarantees on the destination warehouse. Because files are copied directly to destination directories, it's possible to observe a partially complete data directory. Once batch replication finishes, it guarantees that if a table exists in the metastore, it is consistent with the state of the source table at some point after batch replication was kicked off. In general, please wait for batch replication to finish running before running any queries on the destination warehouse.

## Incremental Replication

### What kind of consistency guarantees does incremental replication provide while it's running?

Overall, incremental replication provides eventual consistency. Increment replication also guarantees that data directories never contain partial data. While incremental replication is running and there are updates to a table on the source warehouse, tables on the destination warehouse will either contain old data, new data, but never a partial result. These are the same semantics that Hive provides when overwriting a table with a query. In addition, incremental replication guarantees that data for a table will be copied before the before the metadata, so if a table is present in the metastore, the table can be queried.

### Are there any issues with restarting the incremental replication process from a previous point in the audit log?

Since incremental replication is idempotent, it is safe to restart from a previous point in the audit log. Files that have been copied will not be copied again, so recovery should be relatively fast.
58 changes: 58 additions & 0 deletions docs/hdfs_copy.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
# Large HDFS directory copy

## Overview

This is a tool for migrating HDFS data when `distcp` has issues copying a directory with a large number of files.

Depending on the options specified, it can:

* Copy files that exist on the source but not on the destination (add option)
* Copy files that exist on the source and on the destination but differ in file size (update option)
* Delete files that exist on the destination, but not the source (delete option)

Directories can be excluded from the copy by configuring the blacklist regex option. Directory names (not full paths) matching the regex are not traversed.

The dry run mode only does a comparison between source and destination directories and outputs the operations that it would have done to the logging directory in text format. Please see the schema of the logging table below for details on the output.

## Usage

* Switch to the repo directory and build the JAR.

```
cd reair
gradlew shadowjar -p main -x test
```

* If Hive logging table does not exist, create it using [these commands](../main/src/main/resources/create_hdfs_copy_logging_tables.hql).

* CLI options:
```
-source: source directory
-destination: destination directory
-temp-path: temporary directory where files will be copied to first
-output-path: directory for logging data as produced by the MR job
-operation: comma separated options for the copy: a(add), d(delete), u(update)
-blacklist: skip directory names matching this regex
-dry-run: don't execute the copy, but populate logging directory with data about planned operations
```

The following is an example invocation. Please replace with appropriate values before trying this out. Typically, the job should be run on the destination cluster.

```
export HADOOP_HEAPSIZE=8096
JOB_START_TIME="$(date +"%s")"
hadoop jar airbnb-reair-main-1.0.0-all.jar \
com.airbnb.di.hive.batchreplication.hdfscopy.ReplicationJob \
-Dmapreduce.job.reduces=500 \
-Dmapreduce.map.memory.mb=8000 \
-Dmapreduce.map.java.opts="-Djava.net.preferIPv4Stack=true -Xmx7000m" \
-source hdfs://airfs-src/user/hive/warehouse \
-destination hdfs://airfs-dest/user/hive/warehouse \
-output-path hdfs://airfs-dest/user/replication/log/$JOB_START_TIME \
-temp-path hdfs://airfs-dest/tmp/replication/$JOB_START_TIME$
-blacklist "tmp.*" \
-operation a,u,d
hive -e "LOAD DATA INPATH 'hdfs://airfs-dest/user/replication/log/$JOB_START_TIME' OVERWRITE INTO TABLE hdfs_copy_results PARTITION (job_start_time = $JOB_START_TIME);"
```
4 changes: 4 additions & 0 deletions docs/known_issues.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
# Known Issues
## Incremental Replication
* Due to https://issues.apache.org/jira/browse/HIVE-12865, exchange partition commands will be replicated under limited conditions. Resolution is pending.
* Since the audit log hook writes the changes in a separate transaction from the Hive metastore, it's possible to miss updates if the client fails after the metastore write, but before hook execution. In practice, this is not an issue as failed Hive queries are re-run.
6 changes: 3 additions & 3 deletions main/src/main/resources/create_hdfs_copy_logging_tables.hql
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
-- hdfs copy job output table.
CREATE TABLE IF NOT EXISTS hdfscopy_result(
-- HDFS copy job output table.
CREATE TABLE IF NOT EXISTS hdfs_copy_results(
dst_path string, -- destination path
action string, -- action, add, update, delete
src_path string, -- source path
size bigint, -- size
ts bigint) -- file timestamp
PARTITIONED BY (
jobts bigint)
job_start_time bigint)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY '\t'
STORED AS TEXTFILE;
1 change: 0 additions & 1 deletion main/src/main/resources/create_tables.sql
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ CREATE TABLE `replication_jobs` (
)

-- Holds misc. key value pairs

CREATE TABLE `key_value` (
`update_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
`key_string` varchar(256) NOT NULL,
Expand Down
2 changes: 0 additions & 2 deletions thrift/src/main/resources/reair.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,4 @@ service TReplicationService {

// Get the lag for replication process in ms
i64 getLag();


}

0 comments on commit 876d7cb

Please sign in to comment.