Skip to content

Introduce atomic slot migration#1591

Closed
murphyjacob4 wants to merge 18 commits into
valkey-io:unstablefrom
murphyjacob4:asm
Closed

Introduce atomic slot migration#1591
murphyjacob4 wants to merge 18 commits into
valkey-io:unstablefrom
murphyjacob4:asm

Conversation

@murphyjacob4

Copy link
Copy Markdown
Contributor

Starting this PR in draft so we can review early and gather alignment.

Summary

The solution is mostly based on discussion and proposals in #23 :

  • Target driven: the user sends CLUSTER IMPORT to the target and it syncs down the slot
  • Uses AOF format for snapshot, proxying all commands directly from source primary through the target primary to all target replicas
  • Consensus-less epoch bump to broadcast the topology change

Import and Export Workflows

We utilize a new internal CLUSTER SYNCSLOTS command with additional sub-commands to transition through the slot migration state machine.

On either end of the migration, we track the ongoing migrations via two job queues: slot_import_jobs and slot_export_jobs. Right now, we only support one concurrent job in both the slot import and slot export job queue. There is no design restriction here (outside of perhaps some protocol additions to CLUSTER SYNCSLOTS - but it helps with simplicity for the first iteration.

Workflow overview

  1. User sends CLUSTER IMPORT SLOTSRANGE <slot start> <slot end> ... to target node T
  2. T initiates a new connection to source node S
  3. If required, T runs AUTH based on replication configuration
  4. T initiates CLUSTER SYNCSLOTS START to S
  5. S begins tracking the client that sent the command as the slot export client. It spawns a child process at the next available time and runs AOF rewrite with just the specified slots. It then begins accumulating a backlog of writes in the slot export client output buffer, without installing the write handler.
  6. At the end of the AOF rewrite, S also appends CLUSTER SYNCSLOTS ENDSNAPSHOT
  7. T processes the AOF rewrite as it would any other client using readQueryFromClient. Once it gets the CLUSTER SYNCSLOTS ENDSNAPSHOT, T sends back a CLUSTER SYNCSLOTS PAUSE to pause S.
  8. Upon getting the command, S unblocks the slot export client to T which has been accumulating ongoing writes. S then pauses itself, and sends CLUSTER SYNCSLOTS PAUSEOFFSET <offset> back to T with the current offset. (Note that the offset is not the primary replication offset, it is actually a computed offset based on how much we have been accumulating on T's client.)
  9. T waits for it's replication offset to catch up to the sent offset, and once caught up executes the consensus-less bump.
  10. S finds out about the bump via cluster gossip, unpauses itself, and cleans up dirty keys.

If at any point a client is disconnected on either end, or a timeout is reached on the target node, the migration is marked as failed. If a migration fails, we delete all keys in the slots we were migrating.

Filtering traffic

We filter the traffic to the target node through a filtered AOF rewrite and a filtered replication stream. The filtered AOF rewrite requires some refactoring of the snapshot code for reuse, but utilizes the same overall procedure (piping through the parent process to the target node connection).

The filtered replication stream hooks into the existing replication code and appends the commands directly to the client output buffer. We don't use replication backlog as there is no easy way to filter it once added to the backlog without re-processing it to query for the slot number of each command.

We add a new check in putClientInPendingWriteQueue to prevent the two command streams from merging. The parent process will just accumulate the replication stream in the client output buffer until we get the notification that the target is done with the snapshot.

Other notes

  • Since there are many checks that apply to both slot migration and replication - I added a new client flag "replicated" which simply means that a client is being replicated from another node. In many places, we now check the replicated flag instead of the primary flag.
  • To support non-contiguous slot ranges, the APIs and code use a slot bitmap to share information about what slots are being migrated.
  • We already delete unowned slots after loading RDB and AOF. Replicas that are promoted to primary now go through the same steps to delete all unowned keys. This prevents the keys from being leaked forever if the previous primary disappears.

Remaining work items

  • Continue cleaning up code, especially around the state machines
  • TCL tests
  • Metrics
  • Improved logging
  • Improved pause logic - where we only pause the slots being moved
  • Hide the in-progress import better for commands like KEYS and RANDOMKEY
  • Other management APIs (i.e. CLUSTER IMPORT STATUS, CLUSTER IMPORT CANCEL)
  • Incorporate dual-channel-style replication

Signed-off-by: Jacob Murphy <jkmurphy@google.com>
Signed-off-by: Jacob Murphy <jkmurphy@google.com>
Signed-off-by: Jacob Murphy <jkmurphy@google.com>
Signed-off-by: Jacob Murphy <jkmurphy@google.com>
Signed-off-by: Jacob Murphy <jkmurphy@google.com>
Signed-off-by: Jacob Murphy <jkmurphy@google.com>
Signed-off-by: Jacob Murphy <jkmurphy@google.com>
Signed-off-by: Jacob Murphy <jkmurphy@google.com>
Signed-off-by: Jacob Murphy <jkmurphy@google.com>
Signed-off-by: Jacob Murphy <jkmurphy@google.com>
Signed-off-by: Jacob Murphy <jkmurphy@google.com>
Signed-off-by: Jacob Murphy <jkmurphy@google.com>
Signed-off-by: Jacob Murphy <jkmurphy@google.com>
@codecov

codecov Bot commented Jan 20, 2025

Copy link
Copy Markdown

Codecov Report

Attention: Patch coverage is 27.00730% with 400 lines in your changes missing coverage. Please review.

Project coverage is 70.58%. Comparing base (2a1a65b) to head (f1d824f).
Report is 12 commits behind head on unstable.

Files with missing lines Patch % Lines
src/cluster_legacy.c 10.73% 374 Missing ⚠️
src/kvstore.c 43.75% 9 Missing ⚠️
src/rdb.c 79.06% 9 Missing ⚠️
src/aof.c 50.00% 5 Missing ⚠️
src/io_threads.c 0.00% 1 Missing ⚠️
src/module.c 0.00% 1 Missing ⚠️
src/networking.c 96.77% 1 Missing ⚠️
Additional details and impacted files
@@             Coverage Diff              @@
##           unstable    #1591      +/-   ##
============================================
- Coverage     70.78%   70.58%   -0.21%     
============================================
  Files           120      121       +1     
  Lines         65046    65595     +549     
============================================
+ Hits          46045    46302     +257     
- Misses        19001    19293     +292     
Files with missing lines Coverage Δ
src/blocked.c 91.38% <100.00%> (ø)
src/cluster.c 89.21% <100.00%> (ø)
src/commands.def 100.00% <ø> (ø)
src/db.c 89.57% <100.00%> (+<0.01%) ⬆️
src/object.c 82.05% <100.00%> (+0.01%) ⬆️
src/replication.c 87.63% <100.00%> (+0.18%) ⬆️
src/server.c 87.63% <100.00%> (+0.02%) ⬆️
src/server.h 100.00% <ø> (ø)
src/io_threads.c 6.94% <0.00%> (ø)
src/module.c 9.59% <0.00%> (-0.01%) ⬇️
... and 5 more

... and 14 files with indirect coverage changes

@hwware

hwware commented Jan 20, 2025

Copy link
Copy Markdown
Contributor

Do you have plan to implement the following 2 cases ?

  1. CLUSTER IMPORT CANCEL
  2. Source Primary failover during slot migration

@murphyjacob4

Copy link
Copy Markdown
Contributor Author
  1. CLUSTER IMPORT CANCEL

All this needs to do is call freeClient on the slot migration source client and delete all keys in the slot bitmap of that migration. The source is tracking the slot migration and when the client close notification comes in - it frees its local tracking information. This is the same process that already occurs if a migration times out.

  1. Source Primary failover during slot migration

The implementation plan would be:

  1. Where we currently check the timeout on the target side, also check that the source is still primary, and if not cancel the operation (as discussed above)
  2. On the source side, we would introduce a similar check, if we find out we are demoted, cancel the operation and close the slot migration link. Right now the source will just give up if that happens, however if we add retry in the future, subsequent retries to do CLUSTER SYNCSLOTS will fail with an error due to not being primary, which we would detect and fail the operation.

Note that due to consensus-less epoch bump - if there is a race between failover and slot migration - both may succeed, and later one of those will win deterministically based on the epoch collision protocol - so we will lose some writes on the epoch collision losing side due to a period of two nodes declaring primaryship. This is the same issue that exists in the current consensus-less slot migration implementation - and we are looking to address it as part of #1355 instead

Signed-off-by: Jacob Murphy <jkmurphy@google.com>
Signed-off-by: Jacob Murphy <jkmurphy@google.com>
Signed-off-by: Jacob Murphy <jkmurphy@google.com>
Signed-off-by: Jacob Murphy <jkmurphy@google.com>
Signed-off-by: Jacob Murphy <jkmurphy@google.com>
@murphyjacob4

Copy link
Copy Markdown
Contributor Author

Hi folks, so @enjoy-binbin, @PingXie, and I discussed offline. Tencent has offered a solution that they have developed internally. Moving forward, @enjoy-binbin and I will join efforts and work on bridging gaps in the Tencent solution to meet the requirements we outlined in #23. Hopefully we will have a shared PR for review soon.

Given this, I am going to go ahead and close this pull request.

murphyjacob4 added a commit to enjoy-binbin/valkey that referenced this pull request Jan 31, 2025
Signed-off-by: Jacob Murphy <jkmurphy@google.com>
madolson added a commit that referenced this pull request Aug 12, 2025
Introduces a new family of commands for migrating slots via replication.
The procedure is driven by the source node which pushes an AOF formatted
snapshot of the slots to the target, followed by a replication stream of
changes on that slot (a la manual failover).

This solution is an adaptation of the solution provided by
@enjoy-binbin, combined with the solution I previously posted at #1591,
modified to meet the designs we had outlined in #23.

## New commands

* `CLUSTER MIGRATESLOTS SLOTSRANGE start end [start end]... NODE
node-id`: Begin sending the slot via replication to the target. Multiple
targets can be specified by repeating `SLOTSRANGE ... NODE ...`
*  `CLUSTER CANCELMIGRATION ALL`: Cancel all slot migrations
* `CLUSTER GETSLOTMIGRATIONS`: See a recent log of migrations

This PR only implements "one shot" semantics with an asynchronous model.
Later, "two phase" (e.g. slot level replicate/failover commands) can be
added with the same core.

## Slot migration jobs

Introduces the concept of a slot migration job. While active, a job
tracks a connection created by the source to the target over which the
contents of the slots are sent. This connection is used for control
messages as well as replicated slot data. Each job is given a 40
character random name to help uniquely identify it.

All jobs, including those that finished recently, can be observed using
the `CLUSTER GETSLOTMIGRATIONS` command.

## Replication

* Since the snapshot uses AOF, the snapshot can be replayed verbatim to
any replicas of the target node.
* We use the same proxying mechanism used for chaining replication to
copy the content sent by the source node directly to the replica nodes.

## `CLUSTER SYNCSLOTS`

To coordinate the state machine transitions across the two nodes, a new
command is added, `CLUSTER SYNCSLOTS`, that performs this control flow.

Each end of the slot migration connection is expected to install a read
handler in order to handle `CLUSTER SYNCSLOTS` commands:

* `ESTABLISH`: Begins a slot migration. Provides slot migration
information to the target and authorizes the connection to write to
unowned slots.
* `SNAPSHOT-EOF`: appended to the end of the snapshot to signal that the
snapshot is done being written to the target.
* `PAUSE`: informs the source node to pause whenever it gets the
opportunity
* `PAUSED`: added to the end of the client output buffer when the pause
is performed. The pause is only performed after the buffer shrinks below
a configurable size
* `REQUEST-FAILOVER`: request the source to either grant or deny a
failover for the slot migration. The grant is only granted if the target
is still paused. Once a failover is granted, the paused is refreshed for
a short duration
* `FAILOVER-GRANTED`: sent to the target to inform that REQUEST-FAILOVER
is granted
* `ACK`: heartbeat command used to ensure liveness

## Interactions with other commands

* FLUSHDB on the source node (which flushes the migrating slot) will
result in the source dropping the connection, which will flush the slot
on the target and reset the state machine back to the beginning. The
subsequent retry should very quickly succeed (it is now empty)
* FLUSHDB on the target will fail the slot migration. We can iterate
with better handling, but for now it is expected that the operator would
retry.
* Genearlly, FLUSHDB is expected to be executed cluster wide, so
preserving partially migrated slots doesn't make much sense
* SCAN and KEYS are filtered to avoid exposing importing slot data

## Error handling

* For any transient connection drops, the migration will be failed and
require the user to retry.
* If there is an OOM while reading from the import connection, we will
fail the import, which will drop the importing slot data
* If there is a client output buffer limit reached on the source node,
it will drop the connection, which will cause the migration to fail
* If at any point the export loses ownership or either node is failed
over, a callback will be triggered on both ends of the migration to fail
the import. The import will not reattempt with a new owner
* The two ends of the migration are routinely pinging each other with
SYNCSLOTS ACK messages. If at any point there is no interaction on the
connection for longer than `repl-timeout`, the connection will be
dropped, resulting in migration failure
* If a failover happens, we will drop keys in all unowned slots. The
migration does not persist through failovers and would need to be
retried on the new source/target.

## State machine

```
                                                                            
                Target/Importing Node State Machine                         
   ─────────────────────────────────────────────────────────────            
                                                                            
             ┌────────────────────┐
             │SLOT_IMPORT_WAIT_ACK┼──────┐
             └──────────┬─────────┘      │
                     ACK│                │
         ┌──────────────▼─────────────┐  │
         │SLOT_IMPORT_RECEIVE_SNAPSHOT┼──┤
         └──────────────┬─────────────┘  │
            SNAPSHOT-EOF│                │                                  
        ┌───────────────▼──────────────┐ │                                  
        │SLOT_IMPORT_WAITING_FOR_PAUSED┼─┤                                  
        └───────────────┬──────────────┘ │                                  
                  PAUSED│                │                                  
        ┌───────────────▼──────────────┐ │ Error Conditions:                
        │SLOT_IMPORT_FAILOVER_REQUESTED┼─┤  1. OOM                          
        └───────────────┬──────────────┘ │  2. Slot Ownership Change        
        FAILOVER-GRANTED│                │  3. Demotion to replica          
         ┌──────────────▼─────────────┐  │  4. FLUSHDB                      
         │SLOT_IMPORT_FAILOVER_GRANTED┼──┤  5. Connection Lost              
         └──────────────┬─────────────┘  │  6. No ACK from source (timeout) 
      Takeover Performed│                │                                  
         ┌──────────────▼───────────┐    │                                  
         │SLOT_MIGRATION_JOB_SUCCESS┼────┤                                  
         └──────────────────────────┘    │                                  
                                         │                                  
   ┌─────────────────────────────────────▼─┐                                
   │SLOT_IMPORT_FINISHED_WAITING_TO_CLEANUP│                                
   └────────────────────┬──────────────────┘                                
Unowned Slots Cleaned Up│                                                   
          ┌─────────────▼───────────┐                                      
          │SLOT_MIGRATION_JOB_FAILED│                                      
          └─────────────────────────┘                                      

                                                                                           
                                                                                           
                      Source/Exporting Node State Machine                                  
         ─────────────────────────────────────────────────────────────                     
                                                                                           
               ┌──────────────────────┐                                                    
               │SLOT_EXPORT_CONNECTING├─────────┐                                          
               └───────────┬──────────┘         │                                          
                  Connected│                    │                                          
             ┌─────────────▼────────────┐       │                                          
             │SLOT_EXPORT_AUTHENTICATING┼───────┤                                          
             └─────────────┬────────────┘       │                                          
              Authenticated│                    │                                          
             ┌─────────────▼────────────┐       │                                          
             │SLOT_EXPORT_SEND_ESTABLISH┼───────┤                                          
             └─────────────┬────────────┘       │                                          
  ESTABLISH command written│                    │                                          
     ┌─────────────────────▼─────────────┐      │                                          
     │SLOT_EXPORT_READ_ESTABLISH_RESPONSE┼──────┤                                          
     └─────────────────────┬─────────────┘      │                                          
   Full response read (+OK)│                    │                                          
          ┌────────────────▼──────────────┐     │ Error Conditions:                        
          │SLOT_EXPORT_WAITING_TO_SNAPSHOT┼─────┤  1. User sends CANCELMIGRATION           
          └────────────────┬──────────────┘     │  2. Slot ownership change                
     No other child process│                    │  3. Demotion to replica                  
              ┌────────────▼───────────┐        │  4. FLUSHDB                              
              │SLOT_EXPORT_SNAPSHOTTING┼────────┤  5. Connection Lost                      
              └────────────┬───────────┘        │  6. AUTH failed                          
              Snapshot done│                    │  7. ERR from ESTABLISH command           
               ┌───────────▼─────────┐          │  8. Unpaused before failover completed   
               │SLOT_EXPORT_STREAMING┼──────────┤  9. Snapshot failed (e.g. Child OOM)     
               └───────────┬─────────┘          │  10. No ack from target (timeout)        
                      PAUSE│                    │  11. Client output buffer overrun        
            ┌──────────────▼─────────────┐      │                                          
            │SLOT_EXPORT_WAITING_TO_PAUSE┼──────┤                                          
            └──────────────┬─────────────┘      │                                          
             Buffer drained│                    │                                          
            ┌──────────────▼────────────┐       │                                          
            │SLOT_EXPORT_FAILOVER_PAUSED┼───────┤                                          
            └──────────────┬────────────┘       │                                          
   Failover request granted│                    │                                          
           ┌───────────────▼────────────┐       │                                          
           │SLOT_EXPORT_FAILOVER_GRANTED┼───────┤                                          
           └───────────────┬────────────┘       │                                          
      New topology received│                    │                                          
            ┌──────────────▼───────────┐        │                                          
            │SLOT_MIGRATION_JOB_SUCCESS│        │                                          
            └──────────────────────────┘        │                                          
                                                │                                          
            ┌─────────────────────────┐         │                                          
            │SLOT_MIGRATION_JOB_FAILED│◄────────┤                                          
            └─────────────────────────┘         │                                          
                                                │                                          
           ┌────────────────────────────┐       │                                          
           │SLOT_MIGRATION_JOB_CANCELLED│◄──────┘                                          
           └────────────────────────────┘                                                 
```

Co-authored-by: Binbin <binloveplay1314@qq.com>

---------

Signed-off-by: Binbin <binloveplay1314@qq.com>
Signed-off-by: Jacob Murphy <jkmurphy@google.com>
Signed-off-by: Madelyn Olson <madelyneolson@gmail.com>
Co-authored-by: Binbin <binloveplay1314@qq.com>
Co-authored-by: Ping Xie <pingxie@outlook.com>
Co-authored-by: Madelyn Olson <madelyneolson@gmail.com>
allenss-amazon pushed a commit to allenss-amazon/valkey-core that referenced this pull request Aug 19, 2025
Introduces a new family of commands for migrating slots via replication.
The procedure is driven by the source node which pushes an AOF formatted
snapshot of the slots to the target, followed by a replication stream of
changes on that slot (a la manual failover).

This solution is an adaptation of the solution provided by
@enjoy-binbin, combined with the solution I previously posted at valkey-io#1591,
modified to meet the designs we had outlined in valkey-io#23.

## New commands

* `CLUSTER MIGRATESLOTS SLOTSRANGE start end [start end]... NODE
node-id`: Begin sending the slot via replication to the target. Multiple
targets can be specified by repeating `SLOTSRANGE ... NODE ...`
*  `CLUSTER CANCELMIGRATION ALL`: Cancel all slot migrations
* `CLUSTER GETSLOTMIGRATIONS`: See a recent log of migrations

This PR only implements "one shot" semantics with an asynchronous model.
Later, "two phase" (e.g. slot level replicate/failover commands) can be
added with the same core.

## Slot migration jobs

Introduces the concept of a slot migration job. While active, a job
tracks a connection created by the source to the target over which the
contents of the slots are sent. This connection is used for control
messages as well as replicated slot data. Each job is given a 40
character random name to help uniquely identify it.

All jobs, including those that finished recently, can be observed using
the `CLUSTER GETSLOTMIGRATIONS` command.

## Replication

* Since the snapshot uses AOF, the snapshot can be replayed verbatim to
any replicas of the target node.
* We use the same proxying mechanism used for chaining replication to
copy the content sent by the source node directly to the replica nodes.

## `CLUSTER SYNCSLOTS`

To coordinate the state machine transitions across the two nodes, a new
command is added, `CLUSTER SYNCSLOTS`, that performs this control flow.

Each end of the slot migration connection is expected to install a read
handler in order to handle `CLUSTER SYNCSLOTS` commands:

* `ESTABLISH`: Begins a slot migration. Provides slot migration
information to the target and authorizes the connection to write to
unowned slots.
* `SNAPSHOT-EOF`: appended to the end of the snapshot to signal that the
snapshot is done being written to the target.
* `PAUSE`: informs the source node to pause whenever it gets the
opportunity
* `PAUSED`: added to the end of the client output buffer when the pause
is performed. The pause is only performed after the buffer shrinks below
a configurable size
* `REQUEST-FAILOVER`: request the source to either grant or deny a
failover for the slot migration. The grant is only granted if the target
is still paused. Once a failover is granted, the paused is refreshed for
a short duration
* `FAILOVER-GRANTED`: sent to the target to inform that REQUEST-FAILOVER
is granted
* `ACK`: heartbeat command used to ensure liveness

## Interactions with other commands

* FLUSHDB on the source node (which flushes the migrating slot) will
result in the source dropping the connection, which will flush the slot
on the target and reset the state machine back to the beginning. The
subsequent retry should very quickly succeed (it is now empty)
* FLUSHDB on the target will fail the slot migration. We can iterate
with better handling, but for now it is expected that the operator would
retry.
* Genearlly, FLUSHDB is expected to be executed cluster wide, so
preserving partially migrated slots doesn't make much sense
* SCAN and KEYS are filtered to avoid exposing importing slot data

## Error handling

* For any transient connection drops, the migration will be failed and
require the user to retry.
* If there is an OOM while reading from the import connection, we will
fail the import, which will drop the importing slot data
* If there is a client output buffer limit reached on the source node,
it will drop the connection, which will cause the migration to fail
* If at any point the export loses ownership or either node is failed
over, a callback will be triggered on both ends of the migration to fail
the import. The import will not reattempt with a new owner
* The two ends of the migration are routinely pinging each other with
SYNCSLOTS ACK messages. If at any point there is no interaction on the
connection for longer than `repl-timeout`, the connection will be
dropped, resulting in migration failure
* If a failover happens, we will drop keys in all unowned slots. The
migration does not persist through failovers and would need to be
retried on the new source/target.

## State machine

```
                                                                            
                Target/Importing Node State Machine                         
   ─────────────────────────────────────────────────────────────            
                                                                            
             ┌────────────────────┐
             │SLOT_IMPORT_WAIT_ACK┼──────┐
             └──────────┬─────────┘      │
                     ACK│                │
         ┌──────────────▼─────────────┐  │
         │SLOT_IMPORT_RECEIVE_SNAPSHOT┼──┤
         └──────────────┬─────────────┘  │
            SNAPSHOT-EOF│                │                                  
        ┌───────────────▼──────────────┐ │                                  
        │SLOT_IMPORT_WAITING_FOR_PAUSED┼─┤                                  
        └───────────────┬──────────────┘ │                                  
                  PAUSED│                │                                  
        ┌───────────────▼──────────────┐ │ Error Conditions:                
        │SLOT_IMPORT_FAILOVER_REQUESTED┼─┤  1. OOM                          
        └───────────────┬──────────────┘ │  2. Slot Ownership Change        
        FAILOVER-GRANTED│                │  3. Demotion to replica          
         ┌──────────────▼─────────────┐  │  4. FLUSHDB                      
         │SLOT_IMPORT_FAILOVER_GRANTED┼──┤  5. Connection Lost              
         └──────────────┬─────────────┘  │  6. No ACK from source (timeout) 
      Takeover Performed│                │                                  
         ┌──────────────▼───────────┐    │                                  
         │SLOT_MIGRATION_JOB_SUCCESS┼────┤                                  
         └──────────────────────────┘    │                                  
                                         │                                  
   ┌─────────────────────────────────────▼─┐                                
   │SLOT_IMPORT_FINISHED_WAITING_TO_CLEANUP│                                
   └────────────────────┬──────────────────┘                                
Unowned Slots Cleaned Up│                                                   
          ┌─────────────▼───────────┐                                      
          │SLOT_MIGRATION_JOB_FAILED│                                      
          └─────────────────────────┘                                      

                                                                                           
                                                                                           
                      Source/Exporting Node State Machine                                  
         ─────────────────────────────────────────────────────────────                     
                                                                                           
               ┌──────────────────────┐                                                    
               │SLOT_EXPORT_CONNECTING├─────────┐                                          
               └───────────┬──────────┘         │                                          
                  Connected│                    │                                          
             ┌─────────────▼────────────┐       │                                          
             │SLOT_EXPORT_AUTHENTICATING┼───────┤                                          
             └─────────────┬────────────┘       │                                          
              Authenticated│                    │                                          
             ┌─────────────▼────────────┐       │                                          
             │SLOT_EXPORT_SEND_ESTABLISH┼───────┤                                          
             └─────────────┬────────────┘       │                                          
  ESTABLISH command written│                    │                                          
     ┌─────────────────────▼─────────────┐      │                                          
     │SLOT_EXPORT_READ_ESTABLISH_RESPONSE┼──────┤                                          
     └─────────────────────┬─────────────┘      │                                          
   Full response read (+OK)│                    │                                          
          ┌────────────────▼──────────────┐     │ Error Conditions:                        
          │SLOT_EXPORT_WAITING_TO_SNAPSHOT┼─────┤  1. User sends CANCELMIGRATION           
          └────────────────┬──────────────┘     │  2. Slot ownership change                
     No other child process│                    │  3. Demotion to replica                  
              ┌────────────▼───────────┐        │  4. FLUSHDB                              
              │SLOT_EXPORT_SNAPSHOTTING┼────────┤  5. Connection Lost                      
              └────────────┬───────────┘        │  6. AUTH failed                          
              Snapshot done│                    │  7. ERR from ESTABLISH command           
               ┌───────────▼─────────┐          │  8. Unpaused before failover completed   
               │SLOT_EXPORT_STREAMING┼──────────┤  9. Snapshot failed (e.g. Child OOM)     
               └───────────┬─────────┘          │  10. No ack from target (timeout)        
                      PAUSE│                    │  11. Client output buffer overrun        
            ┌──────────────▼─────────────┐      │                                          
            │SLOT_EXPORT_WAITING_TO_PAUSE┼──────┤                                          
            └──────────────┬─────────────┘      │                                          
             Buffer drained│                    │                                          
            ┌──────────────▼────────────┐       │                                          
            │SLOT_EXPORT_FAILOVER_PAUSED┼───────┤                                          
            └──────────────┬────────────┘       │                                          
   Failover request granted│                    │                                          
           ┌───────────────▼────────────┐       │                                          
           │SLOT_EXPORT_FAILOVER_GRANTED┼───────┤                                          
           └───────────────┬────────────┘       │                                          
      New topology received│                    │                                          
            ┌──────────────▼───────────┐        │                                          
            │SLOT_MIGRATION_JOB_SUCCESS│        │                                          
            └──────────────────────────┘        │                                          
                                                │                                          
            ┌─────────────────────────┐         │                                          
            │SLOT_MIGRATION_JOB_FAILED│◄────────┤                                          
            └─────────────────────────┘         │                                          
                                                │                                          
           ┌────────────────────────────┐       │                                          
           │SLOT_MIGRATION_JOB_CANCELLED│◄──────┘                                          
           └────────────────────────────┘                                                 
```

Co-authored-by: Binbin <binloveplay1314@qq.com>

---------

Signed-off-by: Binbin <binloveplay1314@qq.com>
Signed-off-by: Jacob Murphy <jkmurphy@google.com>
Signed-off-by: Madelyn Olson <madelyneolson@gmail.com>
Co-authored-by: Binbin <binloveplay1314@qq.com>
Co-authored-by: Ping Xie <pingxie@outlook.com>
Co-authored-by: Madelyn Olson <madelyneolson@gmail.com>
hpatro pushed a commit to hpatro/valkey that referenced this pull request Oct 3, 2025
Introduces a new family of commands for migrating slots via replication.
The procedure is driven by the source node which pushes an AOF formatted
snapshot of the slots to the target, followed by a replication stream of
changes on that slot (a la manual failover).

This solution is an adaptation of the solution provided by
@enjoy-binbin, combined with the solution I previously posted at valkey-io#1591,
modified to meet the designs we had outlined in valkey-io#23.

## New commands

* `CLUSTER MIGRATESLOTS SLOTSRANGE start end [start end]... NODE
node-id`: Begin sending the slot via replication to the target. Multiple
targets can be specified by repeating `SLOTSRANGE ... NODE ...`
*  `CLUSTER CANCELMIGRATION ALL`: Cancel all slot migrations
* `CLUSTER GETSLOTMIGRATIONS`: See a recent log of migrations

This PR only implements "one shot" semantics with an asynchronous model.
Later, "two phase" (e.g. slot level replicate/failover commands) can be
added with the same core.

## Slot migration jobs

Introduces the concept of a slot migration job. While active, a job
tracks a connection created by the source to the target over which the
contents of the slots are sent. This connection is used for control
messages as well as replicated slot data. Each job is given a 40
character random name to help uniquely identify it.

All jobs, including those that finished recently, can be observed using
the `CLUSTER GETSLOTMIGRATIONS` command.

## Replication

* Since the snapshot uses AOF, the snapshot can be replayed verbatim to
any replicas of the target node.
* We use the same proxying mechanism used for chaining replication to
copy the content sent by the source node directly to the replica nodes.

## `CLUSTER SYNCSLOTS`

To coordinate the state machine transitions across the two nodes, a new
command is added, `CLUSTER SYNCSLOTS`, that performs this control flow.

Each end of the slot migration connection is expected to install a read
handler in order to handle `CLUSTER SYNCSLOTS` commands:

* `ESTABLISH`: Begins a slot migration. Provides slot migration
information to the target and authorizes the connection to write to
unowned slots.
* `SNAPSHOT-EOF`: appended to the end of the snapshot to signal that the
snapshot is done being written to the target.
* `PAUSE`: informs the source node to pause whenever it gets the
opportunity
* `PAUSED`: added to the end of the client output buffer when the pause
is performed. The pause is only performed after the buffer shrinks below
a configurable size
* `REQUEST-FAILOVER`: request the source to either grant or deny a
failover for the slot migration. The grant is only granted if the target
is still paused. Once a failover is granted, the paused is refreshed for
a short duration
* `FAILOVER-GRANTED`: sent to the target to inform that REQUEST-FAILOVER
is granted
* `ACK`: heartbeat command used to ensure liveness

## Interactions with other commands

* FLUSHDB on the source node (which flushes the migrating slot) will
result in the source dropping the connection, which will flush the slot
on the target and reset the state machine back to the beginning. The
subsequent retry should very quickly succeed (it is now empty)
* FLUSHDB on the target will fail the slot migration. We can iterate
with better handling, but for now it is expected that the operator would
retry.
* Genearlly, FLUSHDB is expected to be executed cluster wide, so
preserving partially migrated slots doesn't make much sense
* SCAN and KEYS are filtered to avoid exposing importing slot data

## Error handling

* For any transient connection drops, the migration will be failed and
require the user to retry.
* If there is an OOM while reading from the import connection, we will
fail the import, which will drop the importing slot data
* If there is a client output buffer limit reached on the source node,
it will drop the connection, which will cause the migration to fail
* If at any point the export loses ownership or either node is failed
over, a callback will be triggered on both ends of the migration to fail
the import. The import will not reattempt with a new owner
* The two ends of the migration are routinely pinging each other with
SYNCSLOTS ACK messages. If at any point there is no interaction on the
connection for longer than `repl-timeout`, the connection will be
dropped, resulting in migration failure
* If a failover happens, we will drop keys in all unowned slots. The
migration does not persist through failovers and would need to be
retried on the new source/target.

## State machine

```

                Target/Importing Node State Machine
   ─────────────────────────────────────────────────────────────

             ┌────────────────────┐
             │SLOT_IMPORT_WAIT_ACK┼──────┐
             └──────────┬─────────┘      │
                     ACK│                │
         ┌──────────────▼─────────────┐  │
         │SLOT_IMPORT_RECEIVE_SNAPSHOT┼──┤
         └──────────────┬─────────────┘  │
            SNAPSHOT-EOF│                │
        ┌───────────────▼──────────────┐ │
        │SLOT_IMPORT_WAITING_FOR_PAUSED┼─┤
        └───────────────┬──────────────┘ │
                  PAUSED│                │
        ┌───────────────▼──────────────┐ │ Error Conditions:
        │SLOT_IMPORT_FAILOVER_REQUESTED┼─┤  1. OOM
        └───────────────┬──────────────┘ │  2. Slot Ownership Change
        FAILOVER-GRANTED│                │  3. Demotion to replica
         ┌──────────────▼─────────────┐  │  4. FLUSHDB
         │SLOT_IMPORT_FAILOVER_GRANTED┼──┤  5. Connection Lost
         └──────────────┬─────────────┘  │  6. No ACK from source (timeout)
      Takeover Performed│                │
         ┌──────────────▼───────────┐    │
         │SLOT_MIGRATION_JOB_SUCCESS┼────┤
         └──────────────────────────┘    │
                                         │
   ┌─────────────────────────────────────▼─┐
   │SLOT_IMPORT_FINISHED_WAITING_TO_CLEANUP│
   └────────────────────┬──────────────────┘
Unowned Slots Cleaned Up│
          ┌─────────────▼───────────┐
          │SLOT_MIGRATION_JOB_FAILED│
          └─────────────────────────┘

                      Source/Exporting Node State Machine
         ─────────────────────────────────────────────────────────────

               ┌──────────────────────┐
               │SLOT_EXPORT_CONNECTING├─────────┐
               └───────────┬──────────┘         │
                  Connected│                    │
             ┌─────────────▼────────────┐       │
             │SLOT_EXPORT_AUTHENTICATING┼───────┤
             └─────────────┬────────────┘       │
              Authenticated│                    │
             ┌─────────────▼────────────┐       │
             │SLOT_EXPORT_SEND_ESTABLISH┼───────┤
             └─────────────┬────────────┘       │
  ESTABLISH command written│                    │
     ┌─────────────────────▼─────────────┐      │
     │SLOT_EXPORT_READ_ESTABLISH_RESPONSE┼──────┤
     └─────────────────────┬─────────────┘      │
   Full response read (+OK)│                    │
          ┌────────────────▼──────────────┐     │ Error Conditions:
          │SLOT_EXPORT_WAITING_TO_SNAPSHOT┼─────┤  1. User sends CANCELMIGRATION
          └────────────────┬──────────────┘     │  2. Slot ownership change
     No other child process│                    │  3. Demotion to replica
              ┌────────────▼───────────┐        │  4. FLUSHDB
              │SLOT_EXPORT_SNAPSHOTTING┼────────┤  5. Connection Lost
              └────────────┬───────────┘        │  6. AUTH failed
              Snapshot done│                    │  7. ERR from ESTABLISH command
               ┌───────────▼─────────┐          │  8. Unpaused before failover completed
               │SLOT_EXPORT_STREAMING┼──────────┤  9. Snapshot failed (e.g. Child OOM)
               └───────────┬─────────┘          │  10. No ack from target (timeout)
                      PAUSE│                    │  11. Client output buffer overrun
            ┌──────────────▼─────────────┐      │
            │SLOT_EXPORT_WAITING_TO_PAUSE┼──────┤
            └──────────────┬─────────────┘      │
             Buffer drained│                    │
            ┌──────────────▼────────────┐       │
            │SLOT_EXPORT_FAILOVER_PAUSED┼───────┤
            └──────────────┬────────────┘       │
   Failover request granted│                    │
           ┌───────────────▼────────────┐       │
           │SLOT_EXPORT_FAILOVER_GRANTED┼───────┤
           └───────────────┬────────────┘       │
      New topology received│                    │
            ┌──────────────▼───────────┐        │
            │SLOT_MIGRATION_JOB_SUCCESS│        │
            └──────────────────────────┘        │
                                                │
            ┌─────────────────────────┐         │
            │SLOT_MIGRATION_JOB_FAILED│◄────────┤
            └─────────────────────────┘         │
                                                │
           ┌────────────────────────────┐       │
           │SLOT_MIGRATION_JOB_CANCELLED│◄──────┘
           └────────────────────────────┘
```

Co-authored-by: Binbin <binloveplay1314@qq.com>

---------

Signed-off-by: Binbin <binloveplay1314@qq.com>
Signed-off-by: Jacob Murphy <jkmurphy@google.com>
Signed-off-by: Madelyn Olson <madelyneolson@gmail.com>
Co-authored-by: Binbin <binloveplay1314@qq.com>
Co-authored-by: Ping Xie <pingxie@outlook.com>
Co-authored-by: Madelyn Olson <madelyneolson@gmail.com>
Signed-off-by: Harkrishn Patro <harkrisp@amazon.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants