All Products
Search
Document Center

ApsaraDB for MongoDB:Migrate data from Amazon DynamoDB to ApsaraDB for MongoDB by using NimoShake

Last Updated:May 08, 2024

NimoShake, also known as DynamoShake, is a data synchronization tool developed by Alibaba Cloud. You can use this tool to migrate data from Amazon DynamoDB to ApsaraDB for MongoDB.

Prerequisites

An ApsaraDB for MongoDB instance is created. For more information, see Create a replica set instance or Create a sharded cluster instance.

Background information

This topic describes how to use NimoShake.

NimoShake is used to migrate data from Amazon DynamoDB. The destination must be an ApsaraDB for MongoDB database. For more information, see NimoShake overview.

Usage notes

A full data migration consumes the resources of the source and destination databases, which may increase the load of the database servers. If you migrate a large amount of data or if the server specifications are limited, database services may become unavailable. Before you migrate data, evaluate the impact of data migration on the performance of the source and destination databases. We recommend that you migrate data during off-peak hours.

Terms

  • Resumable transmission: In a resumable transmission task, data is split into multiple chunks. When transmission is interrupted due to network failures or other causes, the task can be resumed from where it was left off rather than being restarted from the beginning.

    Note

    Resumable transmission is supported for incremental migration, but not for full migration. If an incremental migration task is interrupted due to disconnection and the connection is recovered within a short time range, the task can be resumed. However, in some situations such as a prolonged disconnection or the loss of a previous checkpoint, the full migration may be restarted.

  • Checkpoint: Resumable transmission is performed based on checkpoints. Default checkpoints are written to an ApsaraDB for MongoDB database named dynamo-shake-checkpoint. Each collection records a checkpoint list and the status_table collection records whether the current task is a full or incremental migration.

NimoShake features

NimoShake performs full migration the first time and then incremental migration.

  • Full migration: consists of data migration and index migration. The following figure shows the basic architecture of a full migration.全量同步基本架构

    • Data migration: NimoShake uses multiple concurrent threads to pull source data, as shown in the following figure.数据同步

      Thread

      Description

      Fetcher

      Calls the protocol conversion driver provided by Amazon to pull data in the source collection in batches and then place the data into queues until all source data is pull.

      Note

      Only one fetcher thread is provided.

      Parser

      Reads data from queues and parses data into the BSON structure. After data is parsed, the parser thread writes data to the queues of the executor thread as entries. Multiple parser threads can be started. You can configure the FullDocumentParser parameter to specify the number of parser threads. The default value of this parameter is 2.

      Executor

      Pulls data from queues and then aggregates and writes data to the destination ApsaraDB for MongoDB database. Up to 16 MB data in 1,024 entries can be aggregated. Multiple executor threads can be started. You can configure the FullDocumentConcurrency parameter to specify the number of executor threads. The default value of this parameter is 4.

    • Index migration: NimoShake writes indexes after data migration is complete. Indexes include auto-generated indexes and user-created indexes.

      • Auto-generated indexes: If you have a partition key and a sort key, NimoShake creates a unique composite index and writes the index to the destination ApsaraDB for MongoDB database. NimoShake also creates a hash index for the partition key and writes the index to the ApsaraDB for MongoDB database. If you have only a partition key, NimoShake writes a hash index and a unique index to an ApsaraDB for MongoDB database.

      • User-created indexes: If you have a user-created index, NimoShake creates a hash index based on the primary key and writes the index to the destination ApsaraDB for MongoDB database.

  • Incremental migration: NimoShake migrates data but not the generated indexes. The following figure shows the basic architecture of incremental migration.增量同步架构图

    Thread

    Description

    Fetcher

    Senses the changes of shards in a stream.

    Manager

    Sends messages or creates a dispatcher to process messages. One shard corresponds to one dispatcher.

    Dispatcher

    Pulls incremental data from the source. In resumable transmission, data is pulled from the last checkpoint instead of the beginning.

    Batcher

    Parses, packages, and aggregates incremental data pulled by the dispatcher thread.

    Executor

    Writes the aggregated data to the destination ApsaraDB for MongoDB database and updates the checkpoint.

Migrate data from Amazon DynamoDB to ApsaraDB for MongoDB

This section uses Ubuntu to describe how to use NimoShake to migrate data from Amazon DynamoDB to ApsaraDB for MongoDB.

  1. Run the following command to download the NimoShake package:

    wget https://github.com/alibaba/NimoShake/releases/download/release-v1.0.13-20220411/nimo-shake-v1.0.13.tar.gz
    Note

    We recommend that you download the latest NimoShake package. For more information about the download address, see NimoShake.

  2. Run the following command to decompress the NimoShake package:

    tar zxvf nimo-shake-v1.0.13.tar.gz
  3. After decompression, run the cd nimo command to access the nimo folder.

  4. Run the vi nimo-shake.conf command to open the NimoShake configuration file.

  5. Configure the parameters described in the following table for NimoShake.

    Parameter

    Description

    Example

    id

    The ID of the migration task, which is customizable. The ID is used to display pid files and other information, such as the log name of the migration task, the name of the database that stores checkpoint information, and the name of the destination database.

    id = nimo-shake

    log.file

    The path of the log file. If you do not configure this parameter, logs are displayed in stdout.

    log.file = nimo-shake.log

    log.level

    The level of the logs to be generated. Valid values:

    • none: No logs are generated.

    • error: Logs that contain error messages are generated.

    • warn: Logs that contain warning information are generated.

    • info: Logs that indicate system status are generated.

    • debug: Logs that contain debugging information are generated.

    Default value: info.

    log.level = info

    log.buffer

    Specifies whether to enable log buffering. Valid values:

    • true: Log buffering is enabled. Log buffering ensures high performance. However, several latest log entries may be lost when the migration task ends or is suspended.

    • false: Log buffering is disabled. If log buffering is disabled, performance may be degraded. However, all log entries are displayed when the migration task ends or is suspended.

    Default value: true.

    log.buffer = true

    system_profile

    The pprof port. It is used for debugging and displaying stackful coroutine information.

    system_profile = 9330

    http_profile

    The HTTP port. After this port is enabled, you can view the current status of NimoShake over the Internet.

    http_profile = 9340

    sync_mode

    The type of data migration. Valid values:

    • all: Full migration and incremental migration are performed.

    • full: Only full migration is performed.

    • incr: Only incremental migration is performed.

    Default value: all.

    Note

    Full migration and incremental migration are performed by default. If you only want to perform full migration or incremental migration, change the parameter value to full or incr.

    sync_mode = all

    source.access_key_id

    The AccessKey ID used to access Amazon DynamoDB.

    source.access_key_id = xxxxxxxxxxx

    source.secret_access_key

    The AccessKey secret used to access Amazon DynamoDB.

    source.secret_access_key = xxxxxxxxxx

    source.session_token

    The temporary key used to access Amazon DynamoDB. If no temporary key is available, you can skip this parameter.

    source.session_token = xxxxxxxxxx

    source.region

    The region where Amazon DynamoDB is located. If no region is available, you can skip this parameter.

    source.region = us-east-2

    source.session.max_retries

    The maximum number of retries after a session failure.

    source.session.max_retries = 3

    source.session.timeout

    The session timeout. The value 0 indicates that the session timeout is disabled. The value of this parameter is a UNIX timestamp in milliseconds.

    source.session.timeout = 3000

    filter.collection.white

    The names of collections to be migrated. For example, filter.collection.white = c1;c2 indicates that the c1 and c2 collections are migrated and other collections are filtered out.

    Note

    You cannot specify the filter.collection.white and filter.collection.black parameters. Otherwise, all collections are migrated.

    filter.collection.white = c1;c2

    filter.collection.black

    The names of collections to be filtered out. For example, filter.collection.black = c1;c2 indicates that the c1 and c2 collections are filtered out and other collections are migrated.

    Note

    You cannot specify the filter.collection.white and filter.collection.black parameters. Otherwise, all collections are migrated.

    filter.collection.black = c1;c2

    qps.full

    The maximum number of calls for the scan command per second in full migration. It is used to limit the execution frequency of the scan command. Default value: 1000.

    qps.full = 1000

    qps.full.batch_num

    The number of data entries pulled per second in full migration. Default value: 128.

    qps.full.batch_num = 128

    qps.incr

    The maximum number of calls for the GetRecords command per second in incremental migration. It is used to limit the execution frequency of the GetRecords command. Default value: 1000.

    qps.incr = 1000

    qps.incr.batch_num

    The number of data entries pulled per second in incremental migration. Default value: 128.

    qps.incr.batch_num = 128

    target.type

    The category of the destination database. Valid values:

    • mongodb: an ApsaraDB for MongoDB instance.

    • aliyun_dynamo_proxy: a DynamoDB-compatible ApsaraDB for MongoDB instance.

    Note

    The aliyun_dynamo_proxy option is only available on the China site (aliyun.com).

    target.type = mongodb

    target.mongodb.type

    The category of the destination ApsaraDB for MongoDB instance. Valid values:

    • replica: replica set instance.

    • sharding: sharded cluster instance.

    target.mongodb.type = sharding

    target.address

    The connection string of the destination ApsaraDB for MongoDB database. The destination must be an ApsaraDB for MongoDB database or a DynamoDB-compatible ApsaraDB for MongoDB database. DynamoDB-compatible ApsaraDB for MongoDB instances are only available on the China site (aliyun.com).

    For more information about how to obtain the connection string of an ApsaraDB for MongoDB instance, see Connect to a replica set instance or Connect to a sharded cluster instance.

    Note

    DynamoDB-compatible ApsaraDB for MongoDB instances are only available on the China site (aliyun.com).

    target.address = mongodb://username:password@s-*****-pub.mongodb.rds.aliyuncs.com:3717

    target.db.exist

    Specifies how to handle an existing collection with the same name as another collection on the destination. Valid values:

    • rename: NimoShake renames an existing collection whose name is the same as another collection by adding a timestamp suffix to the name. For example, NimoShake changes c1 to c1.2019-07-01Z12:10:11.

      Warning

      This operation modifies the names of destination collections and may cause business interruption. You must make preparations before data migration.

    • drop: NimoShake deletes an existing collection whose name is the same as another collection.

    If this parameter is not specified and the destination already contains a collection with the same name as another collection, the migration task is terminated and an error message is returned.

    target.db.exist = drop

    full.concurrency

    The maximum number of collections that can be migrated concurrently in full migration. Default value: 4.

    full.concurrency = 4

    full.document.concurrency

    A parameter for full migration. The maximum number of threads used concurrently to write documents in a collection to the destination. Default value: 4.

    full.document.concurrency = 4

    full.document.parser

    A parameter for full migration. The maximum number of parser threads used concurrently to convert the Dynamo protocol to the corresponding protocol on the destination. Default value: 2.

    full.document.parser = 2

    full.enable_index.user

    A parameter for full migration. Specifies whether to migrate user-created indexes. Valid values:

    • true

    • false

    full.enable_index.user = true

    full.executor.insert_on_dup_update

    A parameter for full migration. Specifies whether to change the INSERT statement to the UPDATE statement if the same keys exist on the destination. Valid values:

    • true

    • false

    full.executor.insert_on_dup_update = true

    increase.executor.insert_on_dup_update

    A parameter for incremental migration. Specifies whether to change the INSERT statement to the UPDATE statement if the same keys exist on the destination. Valid values:

    • true

    • false

    increase.executor.insert_on_dup_update = true

    increase.executor.upsert

    A parameter for incremental migration. Specifies whether to change the UPDATE statement to the UPSERT statement if no keys are provided on the destination. Valid values:

    • true

    • false

    Note

    The UPSERT statement checks whether the specified keys exist. If the keys exist, the UPDATE statement is executed. Otherwise, the INSERT statement is executed.

    increase.executor.upsert = true

    convert.type

    A parameter for incremental migration. Specifies whether to convert the Dynamo protocol. Valid values:

    • raw: writes data directly without conversion of the Dynamo protocol.

    • change: converts the Dynamo protocol. For example, NimoShake converts {"hello":"1"} to {"hello": 1}.

    convert.type = change

    increase.concurrency

    A parameter for incremental migration. The maximum number of shards that can be captured concurrently. Default value: 16.

    increase.concurrency = 16

    checkpoint.type

    The type of the storage that stores checkpoint information. Valid values:

    • mongodb: Checkpoint information is stored in the ApsaraDB for MongoDB database. This value is available only when the target.type parameter is set to mongodb.

    • file: Checkpoint information is store in your computer.

    checkpoint.type = mongodb

    checkpoint.address

    The address used to store checkpoint information.

    • If the checkpoint.type parameter is set to mongodb, enter the connection string of the ApsaraDB for MongoDB database. If you do not specify this parameter, checkpoint information is stored in the destination ApsaraDB for MongoDB database. For more information about how to view the connection string of an ApsaraDB for MongoDB instance, see Connect to a replica set instance or Connect to a sharded cluster instance.

    • If the checkpoint.type parameter is set to file, enter a relative path based on the path of the NimoShake file. Example: checkpoint. If this parameter is not specified, checkpoint information is stored in the checkpoint folder.

    checkpoint.address = mongodb://username:password@s-*****-pub.mongodb.rds.aliyuncs.com:3717

    checkpoint.db

    The name of the database in which checkpoint information is stored. If this parameter is not specified, the database name is in the <Task ID>-checkpoint format. Example: nimoshake-checkpoint.

    checkpoint.db = nimoshake-checkpoint

  6. Run the following command to start data migration by using the configured nimo-shake.conf file:

    ./nimo-shake.linux -conf=nimo-shake.conf
    Note

    After the full migration is complete, full sync done! is displayed on the screen. If the migration is terminated due to an error, NimoShake automatically stops and the corresponding error message is displayed on the screen for you to troubleshoot the error.