From 45a155bd0f1fda38fe132498a9441feac56cb45f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Viktor=20S=C3=B6derqvist?= Date: Sun, 2 Jan 2022 08:50:15 +0100 Subject: [PATCH] Wait for replicas when shutting down (#9872) To avoid data loss, this commit adds a grace period for lagging replicas to catch up the replication offset. Done: * Wait for replicas when shutdown is triggered by SIGTERM and SIGINT. * Wait for replicas when shutdown is triggered by the SHUTDOWN command. A new blocked client type BLOCKED_SHUTDOWN is introduced, allowing multiple clients to call SHUTDOWN in parallel. Note that they don't expect a response unless an error happens and shutdown is aborted. * Log warning for each replica lagging behind when finishing shutdown. * CLIENT_PAUSE_WRITE while waiting for replicas. * Configurable grace period 'shutdown-timeout' in seconds (default 10). * New flags for the SHUTDOWN command: - NOW disables the grace period for lagging replicas. - FORCE ignores errors writing the RDB or AOF files which would normally prevent a shutdown. - ABORT cancels ongoing shutdown. Can't be combined with other flags. * New field in the output of the INFO command: 'shutdown_in_milliseconds'. The value is the remaining maximum time to wait for lagging replicas before finishing the shutdown. This field is present in the Server section **only** during shutdown. Not directly related: * When shutting down, if there is an AOF saving child, it is killed **even** if AOF is disabled. This can happen if BGREWRITEAOF is used when AOF is off. * Client pause now has end time and type (WRITE or ALL) per purpose. The different pause purposes are *CLIENT PAUSE command*, *failover* and *shutdown*. If clients are unpaused for one purpose, it doesn't affect client pause for other purposes. For example, the CLIENT UNPAUSE command doesn't affect client pause initiated by the failover or shutdown procedures. A completed failover or a failed shutdown doesn't unpause clients paused by the CLIENT PAUSE command. Notes: * DEBUG RESTART doesn't wait for replicas. * We already have a warning logged when a replica disconnects. This means that if any replica connection is lost during the shutdown, it is either logged as disconnected or as lagging at the time of exit. Co-authored-by: Oran Agra --- .github/.codespellrc | 2 +- redis.conf | 14 ++ src/blocked.c | 18 ++ src/cluster.c | 6 +- src/commands.c | 10 +- src/commands/shutdown.json | 25 ++ src/config.c | 1 + src/db.c | 54 ++++- src/networking.c | 85 +++++-- src/replication.c | 4 +- src/server.c | 249 ++++++++++++++++---- src/server.h | 32 ++- tests/integration/psync2-master-restart.tcl | 21 +- tests/integration/shutdown.tcl | 238 +++++++++++++++++++ tests/support/server.tcl | 6 +- tests/support/util.tcl | 7 +- tests/test_helper.tcl | 1 + tests/unit/shutdown.tcl | 11 + 18 files changed, 688 insertions(+), 96 deletions(-) create mode 100644 tests/integration/shutdown.tcl diff --git a/.github/.codespellrc b/.github/.codespellrc index 88146bef7..6cc43b26e 100644 --- a/.github/.codespellrc +++ b/.github/.codespellrc @@ -1,5 +1,5 @@ [codespell] quiet-level = 2 count = -skip = ./deps,./src/crc16_slottable.h +skip = ./deps,./src/crc16_slottable.h,tmp*,./.git,./lcov-html ignore-words = ./.github/wordlist.txt diff --git a/redis.conf b/redis.conf index 8bac2afb5..7289277cf 100644 --- a/redis.conf +++ b/redis.conf @@ -1442,6 +1442,20 @@ aof-use-rdb-preamble yes # the AOF format in a way that may not be compatible with existing AOF parsers. aof-timestamp-enabled no +################################ SHUTDOWN ##################################### + +# Maximum time to wait for replicas when shutting down, in seconds. +# +# During shut down, a grace period allows any lagging replicas to catch up with +# the latest replication offset before the master exists. This period can +# prevent data loss, especially for deployments without configured disk backups. +# +# The 'shutdown-timeout' value is the grace period's duration in seconds. It is +# only applicable when the instance has replicas. To disable the feature, set +# the value to 0. +# +# shutdown-timeout 10 + ################################ LUA SCRIPTING ############################### # Max execution time of a Lua script in milliseconds. diff --git a/src/blocked.c b/src/blocked.c index ccab0e0e1..6a553926b 100644 --- a/src/blocked.c +++ b/src/blocked.c @@ -191,6 +191,8 @@ void unblockClient(client *c) { } else if (c->btype == BLOCKED_PAUSE) { listDelNode(server.paused_clients,c->paused_list_node); c->paused_list_node = NULL; + } else if (c->btype == BLOCKED_SHUTDOWN) { + /* No special cleanup. */ } else { serverPanic("Unknown btype in unblockClient()."); } @@ -231,6 +233,22 @@ void replyToBlockedClientTimedOut(client *c) { } } +/* If one or more clients are blocked on the SHUTDOWN command, this function + * sends them an error reply and unblocks them. */ +void replyToClientsBlockedOnShutdown(void) { + if (server.blocked_clients_by_type[BLOCKED_SHUTDOWN] == 0) return; + listNode *ln; + listIter li; + listRewind(server.clients, &li); + while((ln = listNext(&li))) { + client *c = listNodeValue(ln); + if (c->flags & CLIENT_BLOCKED && c->btype == BLOCKED_SHUTDOWN) { + addReplyError(c, "Errors trying to SHUTDOWN. Check logs."); + unblockClient(c); + } + } +} + /* Mass-unblock clients because something changed in the instance that makes * blocking no longer safe. For example clients blocked in list operations * in an instance which turns from master to slave is unsafe, so this function diff --git a/src/cluster.c b/src/cluster.c index 3b35972d5..78e273f34 100644 --- a/src/cluster.c +++ b/src/cluster.c @@ -2325,7 +2325,9 @@ int clusterProcessPacket(clusterLink *link) { resetManualFailover(); server.cluster->mf_end = now + CLUSTER_MF_TIMEOUT; server.cluster->mf_slave = sender; - pauseClients(now+(CLUSTER_MF_TIMEOUT*CLUSTER_MF_PAUSE_MULT),CLIENT_PAUSE_WRITE); + pauseClients(PAUSE_DURING_FAILOVER, + now + (CLUSTER_MF_TIMEOUT * CLUSTER_MF_PAUSE_MULT), + CLIENT_PAUSE_WRITE); serverLog(LL_WARNING,"Manual failover requested by replica %.40s.", sender->name); /* We need to send a ping message to the replica, as it would carry @@ -3590,7 +3592,7 @@ void resetManualFailover(void) { if (server.cluster->mf_slave) { /* We were a master failing over, so we paused clients. Regardless * of the outcome we unpause now to allow traffic again. */ - unpauseClients(); + unpauseClients(PAUSE_DURING_FAILOVER); } server.cluster->mf_end = 0; /* No manual failover in progress. */ server.cluster->mf_can_start = 0; diff --git a/src/commands.c b/src/commands.c index 8933fa247..bf0537b4f 100644 --- a/src/commands.c +++ b/src/commands.c @@ -4321,7 +4321,10 @@ struct redisCommandArg REPLICAOF_Args[] = { /********** SHUTDOWN ********************/ /* SHUTDOWN history */ -#define SHUTDOWN_History NULL +commandHistory SHUTDOWN_History[] = { +{"7.0","Added the `NOW`, `FORCE` and `ABORT` modifiers. Introduced waiting for lagging replicas before exiting."}, +{0} +}; /* SHUTDOWN hints */ #define SHUTDOWN_Hints NULL @@ -4336,6 +4339,9 @@ struct redisCommandArg SHUTDOWN_nosave_save_Subargs[] = { /* SHUTDOWN argument table */ struct redisCommandArg SHUTDOWN_Args[] = { {"nosave_save",ARG_TYPE_ONEOF,-1,NULL,NULL,NULL,CMD_ARG_OPTIONAL,.subargs=SHUTDOWN_nosave_save_Subargs}, +{"now",ARG_TYPE_PURE_TOKEN,-1,"NOW",NULL,NULL,CMD_ARG_OPTIONAL}, +{"force",ARG_TYPE_PURE_TOKEN,-1,"FORCE",NULL,NULL,CMD_ARG_OPTIONAL}, +{"abort",ARG_TYPE_PURE_TOKEN,-1,"ABORT",NULL,NULL,CMD_ARG_OPTIONAL}, {0} }; @@ -6542,7 +6548,7 @@ struct redisCommand redisCommandTable[] = { {"restore-asking","An internal command for migrating keys in a cluster","O(1) to create the new key and additional O(N*M) to reconstruct the serialized value, where N is the number of Redis objects composing the value and M their average size. For small string values the time complexity is thus O(1)+O(1*M) where M is small, so simply O(1). However for sorted set values the complexity is O(N*M*log(N)) because inserting values into sorted sets is O(log(N)).","3.0.0",CMD_DOC_SYSCMD,NULL,NULL,COMMAND_GROUP_SERVER,RESTORE_ASKING_History,RESTORE_ASKING_Hints,restoreCommand,-4,CMD_WRITE|CMD_DENYOOM|CMD_ASKING,ACL_CATEGORY_KEYSPACE|ACL_CATEGORY_DANGEROUS,{{CMD_KEY_WRITE,KSPEC_BS_INDEX,.bs.index={1},KSPEC_FK_RANGE,.fk.range={0,1,0}}}}, {"role","Return the role of the instance in the context of replication","O(1)","2.8.12",CMD_DOC_NONE,NULL,NULL,COMMAND_GROUP_SERVER,ROLE_History,ROLE_Hints,roleCommand,1,CMD_NOSCRIPT|CMD_LOADING|CMD_STALE|CMD_FAST|CMD_SENTINEL,ACL_CATEGORY_ADMIN|ACL_CATEGORY_DANGEROUS}, {"save","Synchronously save the dataset to disk","O(N) where N is the total number of keys in all databases","1.0.0",CMD_DOC_NONE,NULL,NULL,COMMAND_GROUP_SERVER,SAVE_History,SAVE_Hints,saveCommand,1,CMD_NO_ASYNC_LOADING|CMD_ADMIN|CMD_NOSCRIPT,0}, -{"shutdown","Synchronously save the dataset to disk and then shut down the server",NULL,"1.0.0",CMD_DOC_NONE,NULL,NULL,COMMAND_GROUP_SERVER,SHUTDOWN_History,SHUTDOWN_Hints,shutdownCommand,-1,CMD_ADMIN|CMD_NOSCRIPT|CMD_LOADING|CMD_STALE|CMD_SENTINEL,0,.args=SHUTDOWN_Args}, +{"shutdown","Synchronously save the dataset to disk and then shut down the server","O(N) when saving, where N is the total number of keys in all databases when saving data, otherwise O(1)","1.0.0",CMD_DOC_NONE,NULL,NULL,COMMAND_GROUP_SERVER,SHUTDOWN_History,SHUTDOWN_Hints,shutdownCommand,-1,CMD_ADMIN|CMD_NOSCRIPT|CMD_LOADING|CMD_STALE|CMD_SENTINEL,0,.args=SHUTDOWN_Args}, {"slaveof","Make the server a replica of another instance, or promote it as master. Deprecated starting with Redis 5. Use REPLICAOF instead.","O(1)","1.0.0",CMD_DOC_NONE,NULL,NULL,COMMAND_GROUP_SERVER,SLAVEOF_History,SLAVEOF_Hints,replicaofCommand,3,CMD_NO_ASYNC_LOADING|CMD_ADMIN|CMD_NOSCRIPT|CMD_STALE,0,.args=SLAVEOF_Args}, {"slowlog","A container for slow log commands","Depends on subcommand.","2.2.12",CMD_DOC_NONE,NULL,NULL,COMMAND_GROUP_SERVER,SLOWLOG_History,SLOWLOG_Hints,NULL,-2,0,0,.subcommands=SLOWLOG_Subcommands}, {"swapdb","Swaps two Redis databases","O(N) where N is the count of clients watching or blocking on keys from both databases.","4.0.0",CMD_DOC_NONE,NULL,NULL,COMMAND_GROUP_SERVER,SWAPDB_History,SWAPDB_Hints,swapdbCommand,3,CMD_WRITE|CMD_FAST,ACL_CATEGORY_KEYSPACE|ACL_CATEGORY_DANGEROUS,.args=SWAPDB_Args}, diff --git a/src/commands/shutdown.json b/src/commands/shutdown.json index 1a594d74d..deb2e48ba 100644 --- a/src/commands/shutdown.json +++ b/src/commands/shutdown.json @@ -1,10 +1,17 @@ { "SHUTDOWN": { "summary": "Synchronously save the dataset to disk and then shut down the server", + "complexity": "O(N) when saving, where N is the total number of keys in all databases when saving data, otherwise O(1)", "group": "server", "since": "1.0.0", "arity": -1, "function": "shutdownCommand", + "history": [ + [ + "7.0", + "Added the `NOW`, `FORCE` and `ABORT` modifiers. Introduced waiting for lagging replicas before exiting." + ] + ], "command_flags": [ "ADMIN", "NOSCRIPT", @@ -29,6 +36,24 @@ "token": "SAVE" } ] + }, + { + "name": "now", + "type": "pure-token", + "token": "NOW", + "optional": true + }, + { + "name": "force", + "type": "pure-token", + "token": "FORCE", + "optional": true + }, + { + "name": "abort", + "type": "pure-token", + "token": "ABORT", + "optional": true } ] } diff --git a/src/config.c b/src/config.c index 407cf7249..03b6af29a 100644 --- a/src/config.c +++ b/src/config.c @@ -2717,6 +2717,7 @@ standardConfig configs[] = { createIntConfig("min-replicas-to-write", "min-slaves-to-write", MODIFIABLE_CONFIG, 0, INT_MAX, server.repl_min_slaves_to_write, 0, INTEGER_CONFIG, NULL, updateGoodSlaves), createIntConfig("min-replicas-max-lag", "min-slaves-max-lag", MODIFIABLE_CONFIG, 0, INT_MAX, server.repl_min_slaves_max_lag, 10, INTEGER_CONFIG, NULL, updateGoodSlaves), createIntConfig("watchdog-period", NULL, MODIFIABLE_CONFIG | HIDDEN_CONFIG, 0, INT_MAX, server.watchdog_period, 0, INTEGER_CONFIG, NULL, updateWatchdogPeriod), + createIntConfig("shutdown-timeout", NULL, MODIFIABLE_CONFIG, 0, INT_MAX, server.shutdown_timeout, 10, INTEGER_CONFIG, NULL, NULL), /* Unsigned int configs */ createUIntConfig("maxclients", NULL, MODIFIABLE_CONFIG, 1, UINT_MAX, server.maxclients, 10000, INTEGER_CONFIG, NULL, updateMaxclients), diff --git a/src/db.c b/src/db.c index d19c4d92a..fb08152e6 100644 --- a/src/db.c +++ b/src/db.c @@ -1033,23 +1033,59 @@ void typeCommand(client *c) { } void shutdownCommand(client *c) { - int flags = 0; - - if (c->argc > 2) { - addReplyErrorObject(c,shared.syntaxerr); - return; - } else if (c->argc == 2) { - if (!strcasecmp(c->argv[1]->ptr,"nosave")) { + int flags = SHUTDOWN_NOFLAGS; + int abort = 0; + for (int i = 1; i < c->argc; i++) { + if (!strcasecmp(c->argv[i]->ptr,"nosave")) { flags |= SHUTDOWN_NOSAVE; - } else if (!strcasecmp(c->argv[1]->ptr,"save")) { + } else if (!strcasecmp(c->argv[i]->ptr,"save")) { flags |= SHUTDOWN_SAVE; + } else if (!strcasecmp(c->argv[i]->ptr, "now")) { + flags |= SHUTDOWN_NOW; + } else if (!strcasecmp(c->argv[i]->ptr, "force")) { + flags |= SHUTDOWN_FORCE; + } else if (!strcasecmp(c->argv[i]->ptr, "abort")) { + abort = 1; } else { addReplyErrorObject(c,shared.syntaxerr); return; } } + if ((abort && flags != SHUTDOWN_NOFLAGS) || + (flags & SHUTDOWN_NOSAVE && flags & SHUTDOWN_SAVE)) + { + /* Illegal combo. */ + addReplyErrorObject(c,shared.syntaxerr); + return; + } + + if (abort) { + if (abortShutdown() == C_OK) + addReply(c, shared.ok); + else + addReplyError(c, "No shutdown in progress."); + return; + } + + if (!(flags & SHUTDOWN_NOW) && c->flags & CLIENT_DENY_BLOCKING) { + addReplyError(c, "SHUTDOWN without NOW or ABORT isn't allowed for DENY BLOCKING client"); + return; + } + + if (!(flags & SHUTDOWN_NOSAVE) && scriptIsTimedout()) { + /* Script timed out. Shutdown allowed only with the NOSAVE flag. See + * also processCommand where these errors are returned. */ + if (scriptIsEval()) + addReplyErrorObject(c, shared.slowevalerr); + else + addReplyErrorObject(c, shared.slowscripterr); + return; + } + + blockClient(c, BLOCKED_SHUTDOWN); if (prepareForShutdown(flags) == C_OK) exit(0); - addReplyError(c,"Errors trying to SHUTDOWN. Check logs."); + /* If we're here, then shutdown is ongoing (the client is still blocked) or + * failed (the client has received an error). */ } void renameGenericCommand(client *c, int nx) { diff --git a/src/networking.c b/src/networking.c index d0c74faba..2fa05f0d6 100644 --- a/src/networking.c +++ b/src/networking.c @@ -2873,7 +2873,7 @@ NULL addReplyNull(c); } else if (!strcasecmp(c->argv[1]->ptr,"unpause") && c->argc == 2) { /* CLIENT UNPAUSE */ - unpauseClients(); + unpauseClients(PAUSE_BY_CLIENT_COMMAND); addReply(c,shared.ok); } else if (!strcasecmp(c->argv[1]->ptr,"pause") && (c->argc == 3 || c->argc == 4)) @@ -2895,7 +2895,7 @@ NULL if (getTimeoutFromObjectOrReply(c,c->argv[2],&end, UNIT_MILLISECONDS) != C_OK) return; - pauseClients(end, type); + pauseClients(PAUSE_BY_CLIENT_COMMAND, end, type); addReply(c,shared.ok); } else if (!strcasecmp(c->argv[1]->ptr,"tracking") && c->argc >= 3) { /* CLIENT TRACKING (on|off) [REDIRECT ] [BCAST] [PREFIX first] @@ -3539,6 +3539,48 @@ void flushSlavesOutputBuffers(void) { } } +/* Compute current most restictive pause type and its end time, aggregated for + * all pause purposes. */ +static void updateClientPauseTypeAndEndTime(void) { + pause_type old_type = server.client_pause_type; + pause_type type = CLIENT_PAUSE_OFF; + mstime_t end = 0; + for (int i = 0; i < NUM_PAUSE_PURPOSES; i++) { + pause_event *p = server.client_pause_per_purpose[i]; + if (p == NULL) { + /* Nothing to do. */ + } else if (p->end < server.mstime) { + /* This one expired. */ + zfree(p); + server.client_pause_per_purpose[i] = NULL; + } else if (p->type > type) { + /* This type is the most restrictive so far. */ + type = p->type; + } + } + + /* Find the furthest end time among the pause purposes of the most + * restrictive type */ + for (int i = 0; i < NUM_PAUSE_PURPOSES; i++) { + pause_event *p = server.client_pause_per_purpose[i]; + if (p != NULL && p->type == type && p->end > end) end = p->end; + } + server.client_pause_type = type; + server.client_pause_end_time = end; + + /* If the pause type is less restrictive than before, we unblock all clients + * so they are reprocessed (may get re-paused). */ + if (type < old_type) { + listNode *ln; + listIter li; + listRewind(server.paused_clients, &li); + while ((ln = listNext(&li)) != NULL) { + client *c = listNodeValue(ln); + unblockClient(c); + } + } +} + /* Pause clients up to the specified unixtime (in ms) for a given type of * commands. * @@ -3552,14 +3594,18 @@ void flushSlavesOutputBuffers(void) { * The function always succeed, even if there is already a pause in progress. * In such a case, the duration is set to the maximum and new end time and the * type is set to the more restrictive type of pause. */ -void pauseClients(mstime_t end, pause_type type) { - if (type > server.client_pause_type) { - server.client_pause_type = type; - } - - if (end > server.client_pause_end_time) { - server.client_pause_end_time = end; +void pauseClients(pause_purpose purpose, mstime_t end, pause_type type) { + /* Manage pause type and end time per pause purpose. */ + if (server.client_pause_per_purpose[purpose] == NULL) { + server.client_pause_per_purpose[purpose] = zmalloc(sizeof(pause_event)); + server.client_pause_per_purpose[purpose]->type = type; + server.client_pause_per_purpose[purpose]->end = end; + } else { + pause_event *p = server.client_pause_per_purpose[purpose]; + p->type = max(p->type, type); + p->end = max(p->end, end); } + updateClientPauseTypeAndEndTime(); /* We allow write commands that were queued * up before and after to execute. We need @@ -3571,20 +3617,11 @@ void pauseClients(mstime_t end, pause_type type) { } /* Unpause clients and queue them for reprocessing. */ -void unpauseClients(void) { - listNode *ln; - listIter li; - client *c; - - server.client_pause_type = CLIENT_PAUSE_OFF; - server.client_pause_end_time = 0; - - /* Unblock all of the clients so they are reprocessed. */ - listRewind(server.paused_clients,&li); - while ((ln = listNext(&li)) != NULL) { - c = listNodeValue(ln); - unblockClient(c); - } +void unpauseClients(pause_purpose purpose) { + if (server.client_pause_per_purpose[purpose] == NULL) return; + zfree(server.client_pause_per_purpose[purpose]); + server.client_pause_per_purpose[purpose] = NULL; + updateClientPauseTypeAndEndTime(); } /* Returns true if clients are paused and false otherwise. */ @@ -3599,7 +3636,7 @@ int checkClientPauseTimeoutAndReturnIfPaused(void) { if (!areClientsPaused()) return 0; if (server.client_pause_end_time < server.mstime) { - unpauseClients(); + updateClientPauseTypeAndEndTime(); } return areClientsPaused(); } diff --git a/src/replication.c b/src/replication.c index e7a092645..80dc33ad4 100644 --- a/src/replication.c +++ b/src/replication.c @@ -3786,7 +3786,7 @@ void clearFailoverState() { server.target_replica_host = NULL; server.target_replica_port = 0; server.failover_state = NO_FAILOVER; - unpauseClients(); + unpauseClients(PAUSE_DURING_FAILOVER); } /* Abort an ongoing failover if one is going on. */ @@ -3935,7 +3935,7 @@ void failoverCommand(client *c) { server.force_failover = force_flag; server.failover_state = FAILOVER_WAIT_FOR_SYNC; /* Cluster failover will unpause eventually */ - pauseClients(LLONG_MAX,CLIENT_PAUSE_WRITE); + pauseClients(PAUSE_DURING_FAILOVER, LLONG_MAX, CLIENT_PAUSE_WRITE); addReply(c,shared.ok); } diff --git a/src/server.c b/src/server.c index a5a95d1d8..47bf1628e 100644 --- a/src/server.c +++ b/src/server.c @@ -83,6 +83,13 @@ double R_Zero, R_PosInf, R_NegInf, R_Nan; /* Global vars */ struct redisServer server; /* Server global state */ +/*============================ Internal prototypes ========================== */ + +static inline int isShutdownInitiated(void); +int isReadyToShutdown(void); +int finishShutdown(void); +const char *replstateToString(int replstate); + /*============================ Utility functions ============================ */ /* We use a private localtime implementation which is fork-safe. The logging @@ -1137,10 +1144,13 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) { /* We received a SIGTERM, shutting down here in a safe way, as it is * not ok doing so inside the signal handler. */ - if (server.shutdown_asap) { + if (server.shutdown_asap && !isShutdownInitiated()) { if (prepareForShutdown(SHUTDOWN_NOFLAGS) == C_OK) exit(0); - serverLog(LL_WARNING,"SIGTERM received but errors trying to shut down the server, check the logs for more information"); - server.shutdown_asap = 0; + } else if (isShutdownInitiated()) { + if (server.mstime >= server.shutdown_mstime || isReadyToShutdown()) { + if (finishShutdown() == C_OK) exit(0); + /* Shutdown failed. Continue running. An error has been logged. */ + } } /* Show some info about non-empty databases */ @@ -1383,6 +1393,14 @@ void whileBlockedCron() { } } +static void sendGetackToReplicas(void) { + robj *argv[3]; + argv[0] = shared.replconf; + argv[1] = shared.getack; + argv[2] = shared.special_asterick; /* Not used argument. */ + replicationFeedSlaves(server.slaves, server.slaveseldb, argv, 3); +} + extern int ProcessingEventsWhileBlocked; /* This function gets called every time Redis is entering the @@ -1467,12 +1485,7 @@ void beforeSleep(struct aeEventLoop *eventLoop) { * increment the replication backlog, they'll be sent after the pause * if we are still the master. */ if (server.get_ack_from_slaves && !checkClientPauseTimeoutAndReturnIfPaused()) { - robj *argv[3]; - - argv[0] = shared.replconf; - argv[1] = shared.getack; - argv[2] = shared.special_asterick; /* Not used argument. */ - replicationFeedSlaves(server.slaves, server.slaveseldb, argv, 3); + sendGetackToReplicas(); server.get_ack_from_slaves = 0; } @@ -1750,6 +1763,8 @@ void initServerConfig(void) { memset(server.blocked_clients_by_type,0, sizeof(server.blocked_clients_by_type)); server.shutdown_asap = 0; + server.shutdown_flags = 0; + server.shutdown_mstime = 0; server.cluster_module_flags = CLUSTER_MODULE_FLAG_NONE; server.migrate_cached_sockets = dictCreate(&migrateCacheDictType); server.next_client_id = 1; /* Client IDs, start from 1 .*/ @@ -1852,9 +1867,9 @@ int restartServer(int flags, mstime_t delay) { return C_ERR; } - /* Perform a proper shutdown. */ + /* Perform a proper shutdown. We don't wait for lagging replicas though. */ if (flags & RESTART_SERVER_GRACEFULLY && - prepareForShutdown(SHUTDOWN_NOFLAGS) != C_OK) + prepareForShutdown(SHUTDOWN_NOW) != C_OK) { serverLog(LL_WARNING,"Can't restart: error preparing for shutdown"); return C_ERR; @@ -2266,6 +2281,8 @@ void initServer(void) { server.get_ack_from_slaves = 0; server.client_pause_type = CLIENT_PAUSE_OFF; server.client_pause_end_time = 0; + memset(server.client_pause_per_purpose, 0, + sizeof(server.client_pause_per_purpose)); server.paused_clients = listCreate(); server.events_processed_while_blocked = 0; server.system_memory_size = zmalloc_get_memory_size(); @@ -3532,9 +3549,7 @@ int processCommand(client *c) { c->cmd->proc != unwatchCommand && c->cmd->proc != quitCommand && c->cmd->proc != resetCommand && - !(c->cmd->proc == shutdownCommand && - c->argc == 2 && - tolower(((char*)c->argv[1]->ptr)[0]) == 'n') && + c->cmd->proc != shutdownCommand && /* more checks in shutdownCommand */ !(c->cmd->proc == scriptCommand && c->argc == 2 && tolower(((char*)c->argv[1]->ptr)[0]) == 'k') && @@ -3620,7 +3635,32 @@ void closeListeningSockets(int unlink_unix_socket) { } } +/* Prepare for shutting down the server. Flags: + * + * - SHUTDOWN_SAVE: Save a database dump even if the server is configured not to + * save any dump. + * + * - SHUTDOWN_NOSAVE: Don't save any database dump even if the server is + * configured to save one. + * + * - SHUTDOWN_NOW: Don't wait for replicas to catch up before shutting down. + * + * - SHUTDOWN_FORCE: Ignore errors writing AOF and RDB files on disk, which + * would normally prevent a shutdown. + * + * Unless SHUTDOWN_NOW is set and if any replicas are lagging behind, C_ERR is + * returned and server.shutdown_mstime is set to a timestamp to allow a grace + * period for the replicas to catch up. This is checked and handled by + * serverCron() which completes the shutdown as soon as possible. + * + * If shutting down fails due to errors writing RDB or AOF files, C_ERR is + * returned and an error is logged. If the flag SHUTDOWN_FORCE is set, these + * errors are logged but ignored and C_OK is returned. + * + * On success, this function returns C_OK and then it's OK to call exit(0). */ int prepareForShutdown(int flags) { + if (isShutdownInitiated()) return C_ERR; + /* When SHUTDOWN is called while the server is loading a dataset in * memory we need to make sure no attempt is performed to save * the dataset on shutdown (otherwise it could overwrite the current DB @@ -3630,13 +3670,108 @@ int prepareForShutdown(int flags) { if (server.loading || server.sentinel_mode) flags = (flags & ~SHUTDOWN_SAVE) | SHUTDOWN_NOSAVE; - int save = flags & SHUTDOWN_SAVE; - int nosave = flags & SHUTDOWN_NOSAVE; + server.shutdown_flags = flags; serverLog(LL_WARNING,"User requested shutdown..."); if (server.supervised_mode == SUPERVISED_SYSTEMD) redisCommunicateSystemd("STOPPING=1\n"); + /* If we have any replicas, let them catch up the replication offset before + * we shut down, to avoid data loss. */ + if (!(flags & SHUTDOWN_NOW) && + server.shutdown_timeout != 0 && + !isReadyToShutdown()) + { + server.shutdown_mstime = server.mstime + server.shutdown_timeout * 1000; + if (!areClientsPaused()) sendGetackToReplicas(); + pauseClients(PAUSE_DURING_SHUTDOWN, LLONG_MAX, CLIENT_PAUSE_WRITE); + serverLog(LL_NOTICE, "Waiting for replicas before shutting down."); + return C_ERR; + } + + return finishShutdown(); +} + +static inline int isShutdownInitiated(void) { + return server.shutdown_mstime != 0; +} + +/* Returns 0 if there are any replicas which are lagging in replication which we + * need to wait for before shutting down. Returns 1 if we're ready to shut + * down now. */ +int isReadyToShutdown(void) { + if (listLength(server.slaves) == 0) return 1; /* No replicas. */ + + listIter li; + listNode *ln; + listRewind(server.slaves, &li); + while ((ln = listNext(&li)) != NULL) { + client *replica = listNodeValue(ln); + if (replica->repl_ack_off != server.master_repl_offset) return 0; + } + return 1; +} + +static void cancelShutdown(void) { + server.shutdown_asap = 0; + server.shutdown_flags = 0; + server.shutdown_mstime = 0; + replyToClientsBlockedOnShutdown(); + unpauseClients(PAUSE_DURING_SHUTDOWN); +} + +/* Returns C_OK if shutdown was aborted and C_ERR if shutdown wasn't ongoing. */ +int abortShutdown(void) { + if (isShutdownInitiated()) { + cancelShutdown(); + } else if (server.shutdown_asap) { + /* Signal handler has requested shutdown, but it hasn't been initiated + * yet. Just clear the flag. */ + server.shutdown_asap = 0; + } else { + /* Shutdown neither initiated nor requested. */ + return C_ERR; + } + serverLog(LL_NOTICE, "Shutdown manually aborted."); + return C_OK; +} + +/* The final step of the shutdown sequence. Returns C_OK if the shutdown + * sequence was successful and it's OK to call exit(). If C_ERR is returned, + * it's not safe to call exit(). */ +int finishShutdown(void) { + + int save = server.shutdown_flags & SHUTDOWN_SAVE; + int nosave = server.shutdown_flags & SHUTDOWN_NOSAVE; + int force = server.shutdown_flags & SHUTDOWN_FORCE; + + /* Log a warning for each replica that is lagging. */ + listIter replicas_iter; + listNode *replicas_list_node; + int num_replicas = 0, num_lagging_replicas = 0; + listRewind(server.slaves, &replicas_iter); + while ((replicas_list_node = listNext(&replicas_iter)) != NULL) { + client *replica = listNodeValue(replicas_list_node); + num_replicas++; + if (replica->repl_ack_off != server.master_repl_offset) { + num_lagging_replicas++; + long lag = replica->replstate == SLAVE_STATE_ONLINE ? + time(NULL) - replica->repl_ack_time : 0; + serverLog(LL_WARNING, + "Lagging replica %s reported offset %lld behind master, lag=%ld, state=%s.", + replicationGetSlaveName(replica), + server.master_repl_offset - replica->repl_ack_off, + lag, + replstateToString(replica->replstate)); + } + } + if (num_replicas > 0) { + serverLog(LL_NOTICE, + "%d of %d replicas are in sync when shutting down.", + num_replicas - num_lagging_replicas, + num_replicas); + } + /* Kill all the Lua debugger forked sessions. */ ldbKillForkedSessions(); @@ -3661,20 +3796,24 @@ int prepareForShutdown(int flags) { TerminateModuleForkChild(server.child_pid,0); } - if (server.aof_state != AOF_OFF) { - /* Kill the AOF saving child as the AOF we already have may be longer - * but contains the full dataset anyway. */ - if (server.child_type == CHILD_TYPE_AOF) { - /* If we have AOF enabled but haven't written the AOF yet, don't - * shutdown or else the dataset will be lost. */ - if (server.aof_state == AOF_WAIT_REWRITE) { + /* Kill the AOF saving child as the AOF we already have may be longer + * but contains the full dataset anyway. */ + if (server.child_type == CHILD_TYPE_AOF) { + /* If we have AOF enabled but haven't written the AOF yet, don't + * shutdown or else the dataset will be lost. */ + if (server.aof_state == AOF_WAIT_REWRITE) { + if (force) { + serverLog(LL_WARNING, "Writing initial AOF. Exit anyway."); + } else { serverLog(LL_WARNING, "Writing initial AOF, can't exit."); - return C_ERR; + goto error; } - serverLog(LL_WARNING, - "There is a child rewriting the AOF. Killing it!"); - killAppendOnlyChild(); } + serverLog(LL_WARNING, + "There is a child rewriting the AOF. Killing it!"); + killAppendOnlyChild(); + } + if (server.aof_state != AOF_OFF) { /* Append only file: flush buffers and fsync() the AOF at exit */ serverLog(LL_NOTICE,"Calling fsync() on the AOF file."); flushAppendOnlyFile(1); @@ -3698,10 +3837,14 @@ int prepareForShutdown(int flags) { * in the next cron() Redis will be notified that the background * saving aborted, handling special stuff like slaves pending for * synchronization... */ - serverLog(LL_WARNING,"Error trying to save the DB, can't exit."); - if (server.supervised_mode == SUPERVISED_SYSTEMD) - redisCommunicateSystemd("STATUS=Error trying to save the DB, can't exit.\n"); - return C_ERR; + if (force) { + serverLog(LL_WARNING,"Error trying to save the DB. Exit anyway."); + } else { + serverLog(LL_WARNING,"Error trying to save the DB, can't exit."); + if (server.supervised_mode == SUPERVISED_SYSTEMD) + redisCommunicateSystemd("STATUS=Error trying to save the DB, can't exit.\n"); + goto error; + } } } @@ -3723,6 +3866,11 @@ int prepareForShutdown(int flags) { serverLog(LL_WARNING,"%s is now ready to exit, bye bye...", server.sentinel_mode ? "Sentinel" : "Redis"); return C_OK; + +error: + serverLog(LL_WARNING, "Errors trying to shut down the server. Check the logs for more information."); + cancelShutdown(); + return C_ERR; } /*================================== Commands =============================== */ @@ -4404,6 +4552,20 @@ sds getFullCommandName(struct redisCommand *cmd) { } } +const char *replstateToString(int replstate) { + switch (replstate) { + case SLAVE_STATE_WAIT_BGSAVE_START: + case SLAVE_STATE_WAIT_BGSAVE_END: + return "wait_bgsave"; + case SLAVE_STATE_SEND_BULK: + return "send_bulk"; + case SLAVE_STATE_ONLINE: + return "online"; + default: + return ""; + } +} + /* Characters we sanitize on INFO output to maintain expected format. */ static char unsafe_info_chars[] = "#:\n\r"; static char unsafe_info_chars_substs[] = "____"; /* Must be same length as above */ @@ -4550,6 +4712,13 @@ sds genRedisInfoString(const char *section) { server.executable ? server.executable : "", server.configfile ? server.configfile : "", server.io_threads_active); + + /* Conditional properties */ + if (isShutdownInitiated()) { + info = sdscatfmt(info, + "shutdown_in_milliseconds:%I\r\n", + (int64_t)(server.shutdown_mstime - server.mstime)); + } } /* Clients */ @@ -5047,7 +5216,6 @@ sds genRedisInfoString(const char *section) { listRewind(server.slaves,&li); while((ln = listNext(&li))) { client *slave = listNodeValue(ln); - char *state = NULL; char ip[NET_IP_STR_LEN], *slaveip = slave->slave_addr; int port; long lag = 0; @@ -5057,19 +5225,8 @@ sds genRedisInfoString(const char *section) { continue; slaveip = ip; } - switch(slave->replstate) { - case SLAVE_STATE_WAIT_BGSAVE_START: - case SLAVE_STATE_WAIT_BGSAVE_END: - state = "wait_bgsave"; - break; - case SLAVE_STATE_SEND_BULK: - state = "send_bulk"; - break; - case SLAVE_STATE_ONLINE: - state = "online"; - break; - } - if (state == NULL) continue; + const char *state = replstateToString(slave->replstate); + if (state[0] == '\0') continue; if (slave->replstate == SLAVE_STATE_ONLINE) lag = time(NULL) - slave->repl_ack_time; @@ -5589,7 +5746,7 @@ static void sigShutdownHandler(int sig) { /* SIGINT is often delivered via Ctrl+C in an interactive session. * If we receive the signal the second time, we interpret this as * the user really wanting to quit ASAP without waiting to persist - * on disk. */ + * on disk and without waiting for lagging replicas. */ if (server.shutdown_asap && sig == SIGINT) { serverLogFromHandler(LL_WARNING, "You insist... exiting now."); rdbRemoveTempFile(getpid(), 1); diff --git a/src/server.h b/src/server.h index e3cf50b65..c281ac30a 100644 --- a/src/server.h +++ b/src/server.h @@ -321,7 +321,8 @@ extern int configOOMScoreAdjValuesDefaults[CONFIG_OOM_COUNT]; #define BLOCKED_STREAM 4 /* XREAD. */ #define BLOCKED_ZSET 5 /* BZPOP et al. */ #define BLOCKED_PAUSE 6 /* Blocked by CLIENT PAUSE */ -#define BLOCKED_NUM 7 /* Number of blocked states. */ +#define BLOCKED_SHUTDOWN 7 /* SHUTDOWN. */ +#define BLOCKED_NUM 8 /* Number of blocked states. */ /* Client request types */ #define PROTO_REQ_INLINE 1 @@ -484,6 +485,8 @@ typedef enum { #define SHUTDOWN_SAVE 1 /* Force SAVE on SHUTDOWN even if no save points are configured. */ #define SHUTDOWN_NOSAVE 2 /* Don't SAVE on SHUTDOWN. */ +#define SHUTDOWN_NOW 4 /* Don't wait for replicas to catch up. */ +#define SHUTDOWN_FORCE 8 /* Don't let errors prevent shutdown. */ /* Command call flags, see call() function */ #define CMD_CALL_NONE 0 @@ -508,6 +511,19 @@ typedef enum { CLIENT_PAUSE_ALL /* Pause all commands */ } pause_type; +/* Client pause purposes. Each purpose has its own end time and pause type. */ +typedef enum { + PAUSE_BY_CLIENT_COMMAND = 0, + PAUSE_DURING_SHUTDOWN, + PAUSE_DURING_FAILOVER, + NUM_PAUSE_PURPOSES /* This value is the number of purposes above. */ +} pause_purpose; + +typedef struct { + pause_type type; + mstime_t end; +} pause_event; + /* RDB active child save type. */ #define RDB_CHILD_TYPE_NONE 0 #define RDB_CHILD_TYPE_DISK 1 /* RDB is written to disk. */ @@ -1353,7 +1369,9 @@ struct redisServer { aeEventLoop *el; rax *errors; /* Errors table */ redisAtomic unsigned int lruclock; /* Clock for LRU eviction */ - volatile sig_atomic_t shutdown_asap; /* SHUTDOWN needed ASAP */ + volatile sig_atomic_t shutdown_asap; /* Shutdown ordered by signal handler. */ + mstime_t shutdown_mstime; /* Timestamp to limit graceful shutdown. */ + int shutdown_flags; /* Flags passed to prepareForShutdown(). */ int activerehashing; /* Incremental rehash in serverCron() */ int active_defrag_running; /* Active defragmentation running (holds current scan aggressiveness) */ char *pidfile; /* PID file path */ @@ -1413,6 +1431,7 @@ struct redisServer { pause_type client_pause_type; /* True if clients are currently paused */ list *paused_clients; /* List of pause clients */ mstime_t client_pause_end_time; /* Time when we undo clients_paused */ + pause_event *client_pause_per_purpose[NUM_PAUSE_PURPOSES]; char neterr[ANET_ERR_LEN]; /* Error buffer for anet.c */ dict *migrate_cached_sockets;/* MIGRATE cached sockets */ redisAtomic uint64_t next_client_id; /* Next client unique ID. Incremental. */ @@ -1613,6 +1632,9 @@ struct redisServer { int memcheck_enabled; /* Enable memory check on crash. */ int use_exit_on_panic; /* Use exit() on panic and assert rather than * abort(). useful for Valgrind. */ + /* Shutdown */ + int shutdown_timeout; /* Graceful shutdown time limit in seconds. */ + /* Replication (master) */ char replid[CONFIG_RUN_ID_SIZE+1]; /* My current replication ID. */ char replid2[CONFIG_RUN_ID_SIZE+1]; /* replid inherited from master*/ @@ -2344,8 +2366,8 @@ void flushSlavesOutputBuffers(void); void disconnectSlaves(void); void evictClients(void); int listenToPort(int port, socketFds *fds); -void pauseClients(mstime_t duration, pause_type type); -void unpauseClients(void); +void pauseClients(pause_purpose purpose, mstime_t end, pause_type type); +void unpauseClients(pause_purpose purpose); int areClientsPaused(void); int checkClientPauseTimeoutAndReturnIfPaused(void); void processEventsWhileBlocked(void); @@ -2708,6 +2730,8 @@ void preventCommandAOF(client *c); void preventCommandReplication(client *c); void slowlogPushCurrentCommand(client *c, struct redisCommand *cmd, ustime_t duration); int prepareForShutdown(int flags); +void replyToClientsBlockedOnShutdown(void); +int abortShutdown(void); void afterCommand(client *c); int inNestedCall(void); #ifdef __GNUC__ diff --git a/tests/integration/psync2-master-restart.tcl b/tests/integration/psync2-master-restart.tcl index 672143102..c0c8f471f 100644 --- a/tests/integration/psync2-master-restart.tcl +++ b/tests/integration/psync2-master-restart.tcl @@ -42,7 +42,9 @@ start_server {} { $replica config resetstat catch { - restart_server 0 true false + # SHUTDOWN NOW ensures master doesn't send GETACK to replicas before + # shutting down which would affect the replication offset. + restart_server 0 true false true now set master [srv 0 client] } wait_for_condition 50 1000 { @@ -77,9 +79,14 @@ start_server {} { after 20 + # Wait until master has received ACK from replica. If the master thinks + # that any replica is lagging when it shuts down, master would send + # GETACK to the replicas, affecting the replication offset. + set offset [status $master master_repl_offset] wait_for_condition 500 100 { - [status $master master_repl_offset] == [status $replica master_repl_offset] && - [status $master master_repl_offset] == [status $sub_replica master_repl_offset] + [string match "*slave0:*,offset=$offset,*" [$master info replication]] && + $offset == [status $replica master_repl_offset] && + $offset == [status $sub_replica master_repl_offset] } else { show_cluster_status fail "Replicas and master offsets were unable to match *exactly*." @@ -89,6 +96,11 @@ start_server {} { $replica config resetstat catch { + # Unlike the test above, here we use SIGTERM, which behaves + # differently compared to SHUTDOWN NOW if there are lagging + # replicas. This is just to increase coverage and let each test use + # a different shutdown approach. In this case there are no lagging + # replicas though. restart_server 0 true false set master [srv 0 client] } @@ -136,6 +148,9 @@ start_server {} { $replica config resetstat catch { + # Unlike the test above, here we use SIGTERM. This is just to + # increase coverage and let each test use a different shutdown + # approach. restart_server 0 true false set master [srv 0 client] } diff --git a/tests/integration/shutdown.tcl b/tests/integration/shutdown.tcl new file mode 100644 index 000000000..60afc5c7f --- /dev/null +++ b/tests/integration/shutdown.tcl @@ -0,0 +1,238 @@ +# This test suite tests shutdown when there are lagging replicas connected. + +# Fill up the OS socket send buffer for the replica connection 1M at a time. +# When the replication buffer memory increases beyond 2M (often after writing 4M +# or so), we assume it's because the OS socket send buffer can't swallow +# anymore. +proc fill_up_os_socket_send_buffer_for_repl {idx} { + set i 0 + while {1} { + incr i + populate 1024 junk$i: 1024 $idx + after 10 + set buf_size [s $idx mem_total_replication_buffers] + if {$buf_size > 2*1024*1024} { + break + } + } +} + +foreach how {sigterm shutdown} { + test "Shutting down master waits for replica to catch up ($how)" { + start_server {} { + start_server {} { + set master [srv -1 client] + set master_host [srv -1 host] + set master_port [srv -1 port] + set master_pid [srv -1 pid] + set replica [srv 0 client] + set replica_pid [srv 0 pid] + + # Config master. + $master config set shutdown-timeout 300; # 5min for slow CI + $master config set repl-backlog-size 1; # small as possible + $master config set hz 100; # cron runs every 10ms + + # Config replica. + $replica replicaof $master_host $master_port + wait_for_sync $replica + + # Preparation: Set k to 1 on both master and replica. + $master set k 1 + wait_for_ofs_sync $master $replica + + # Pause the replica. + exec kill -SIGSTOP $replica_pid + after 10 + + # Fill up the OS socket send buffer for the replica connection + # to prevent the following INCR from reaching the replica via + # the OS. + fill_up_os_socket_send_buffer_for_repl -1 + + # Incr k and immediately shutdown master. + $master incr k + switch $how { + sigterm { + exec kill -SIGTERM $master_pid + } + shutdown { + set rd [redis_deferring_client -1] + $rd shutdown + } + } + wait_for_condition 50 100 { + [s -1 shutdown_in_milliseconds] > 0 + } else { + fail "Master not indicating ongoing shutdown." + } + + # Wake up replica and check if master has waited for it. + after 20; # 2 cron intervals + exec kill -SIGCONT $replica_pid + wait_for_condition 300 1000 { + [$replica get k] eq 2 + } else { + fail "Master exited before replica could catch up." + } + + # Check shutdown log messages on master + wait_for_log_messages -1 {"*ready to exit, bye bye*"} 0 100 500 + assert_equal 0 [count_log_message -1 "*Lagging replica*"] + verify_log_message -1 "*1 of 1 replicas are in sync*" 0 + } + } + } {} {repl external:skip} +} + +test {Shutting down master waits for replica timeout} { + start_server {} { + start_server {} { + set master [srv -1 client] + set master_host [srv -1 host] + set master_port [srv -1 port] + set master_pid [srv -1 pid] + set replica [srv 0 client] + set replica_pid [srv 0 pid] + + # Config master. + $master config set shutdown-timeout 1; # second + + # Config replica. + $replica replicaof $master_host $master_port + wait_for_sync $replica + + # Preparation: Set k to 1 on both master and replica. + $master set k 1 + wait_for_ofs_sync $master $replica + + # Pause the replica. + exec kill -SIGSTOP $replica_pid + after 10 + + # Fill up the OS socket send buffer for the replica connection to + # prevent the following INCR k from reaching the replica via the OS. + fill_up_os_socket_send_buffer_for_repl -1 + + # Incr k and immediately shutdown master. + $master incr k + exec kill -SIGTERM $master_pid + wait_for_condition 50 100 { + [s -1 shutdown_in_milliseconds] > 0 + } else { + fail "Master not indicating ongoing shutdown." + } + + # Let master finish shutting down and check log. + wait_for_log_messages -1 {"*ready to exit, bye bye*"} 0 100 100 + verify_log_message -1 "*Lagging replica*" 0 + verify_log_message -1 "*0 of 1 replicas are in sync*" 0 + + # Wake up replica. + exec kill -SIGCONT $replica_pid + assert_equal 1 [$replica get k] + } + } +} {} {repl external:skip} + +test "Shutting down master waits for replica then fails" { + start_server {} { + start_server {} { + set master [srv -1 client] + set master_host [srv -1 host] + set master_port [srv -1 port] + set master_pid [srv -1 pid] + set replica [srv 0 client] + set replica_pid [srv 0 pid] + + # Config master and replica. + $replica replicaof $master_host $master_port + wait_for_sync $replica + + # Pause the replica and write a key on master. + exec kill -SIGSTOP $replica_pid + after 10 + $master incr k + + # Two clients call blocking SHUTDOWN in parallel. + set rd1 [redis_deferring_client -1] + set rd2 [redis_deferring_client -1] + $rd1 shutdown + $rd2 shutdown + set info_clients [$master info clients] + assert_match "*connected_clients:3*" $info_clients + assert_match "*blocked_clients:2*" $info_clients + + # Start a very slow initial AOFRW, which will prevent shutdown. + $master config set rdb-key-save-delay 30000000; # 30 seconds + $master config set appendonly yes + + # Wake up replica, causing master to continue shutting down. + exec kill -SIGCONT $replica_pid + + # SHUTDOWN returns an error to both clients blocking on SHUTDOWN. + catch { $rd1 read } e1 + catch { $rd2 read } e2 + assert_match "*Errors trying to SHUTDOWN. Check logs*" $e1 + assert_match "*Errors trying to SHUTDOWN. Check logs*" $e2 + $rd1 close + $rd2 close + + # Check shutdown log messages on master. + verify_log_message -1 "*1 of 1 replicas are in sync*" 0 + verify_log_message -1 "*Writing initial AOF, can't exit*" 0 + verify_log_message -1 "*Errors trying to shut down*" 0 + + # Let master to exit fast, without waiting for the very slow AOFRW. + catch {$master shutdown nosave force} + } + } +} {} {repl external:skip} + +test "Shutting down master waits for replica then aborted" { + start_server {} { + start_server {} { + set master [srv -1 client] + set master_host [srv -1 host] + set master_port [srv -1 port] + set master_pid [srv -1 pid] + set replica [srv 0 client] + set replica_pid [srv 0 pid] + + # Config master and replica. + $replica replicaof $master_host $master_port + wait_for_sync $replica + + # Pause the replica and write a key on master. + exec kill -SIGSTOP $replica_pid + after 10 + $master incr k + + # Two clients call blocking SHUTDOWN in parallel. + set rd1 [redis_deferring_client -1] + set rd2 [redis_deferring_client -1] + $rd1 shutdown + $rd2 shutdown + set info_clients [$master info clients] + assert_match "*connected_clients:3*" $info_clients + assert_match "*blocked_clients:2*" $info_clients + + # Abort the shutdown + $master shutdown abort + + # Wake up replica, causing master to continue shutting down. + exec kill -SIGCONT $replica_pid + + # SHUTDOWN returns an error to both clients blocking on SHUTDOWN. + catch { $rd1 read } e1 + catch { $rd2 read } e2 + assert_match "*Errors trying to SHUTDOWN. Check logs*" $e1 + assert_match "*Errors trying to SHUTDOWN. Check logs*" $e2 + $rd1 close + $rd2 close + + # Check shutdown log messages on master. + verify_log_message -1 "*Shutdown manually aborted*" 0 + } + } +} {} {repl external:skip} diff --git a/tests/support/server.tcl b/tests/support/server.tcl index ee39c8df9..51d3629a2 100644 --- a/tests/support/server.tcl +++ b/tests/support/server.tcl @@ -674,8 +674,12 @@ proc start_server {options {code undefined}} { } } -proc restart_server {level wait_ready rotate_logs {reconnect 1}} { +proc restart_server {level wait_ready rotate_logs {reconnect 1} {shutdown sigterm}} { set srv [lindex $::servers end+$level] + if {$shutdown ne {sigterm}} { + catch {[dict get $srv "client"] shutdown $shutdown} + } + # Kill server doesn't mind if the server is already dead kill_server $srv # Remove the default client from the server dict unset srv "client" diff --git a/tests/support/util.tcl b/tests/support/util.tcl index c4f90b29e..aa14fd3a0 100644 --- a/tests/support/util.tcl +++ b/tests/support/util.tcl @@ -591,8 +591,11 @@ proc stop_bg_complex_data {handle} { catch {exec /bin/kill -9 $handle} } -proc populate {num {prefix key:} {size 3}} { - set rd [redis_deferring_client] +# Write num keys with the given key prefix and value size (in bytes). If idx is +# given, it's the index (AKA level) used with the srv procedure and it specifies +# to which Redis instance to write the keys. +proc populate {num {prefix key:} {size 3} {idx 0}} { + set rd [redis_deferring_client $idx] for {set j 0} {$j < $num} {incr j} { $rd set $prefix$j [string repeat A $size] } diff --git a/tests/test_helper.tcl b/tests/test_helper.tcl index 570d9e85f..722b04ed0 100644 --- a/tests/test_helper.tcl +++ b/tests/test_helper.tcl @@ -44,6 +44,7 @@ set ::all_tests { integration/replication-4 integration/replication-psync integration/replication-buffer + integration/shutdown integration/aof integration/rdb integration/corrupt-dump diff --git a/tests/unit/shutdown.tcl b/tests/unit/shutdown.tcl index 359f5bb63..5c618d285 100644 --- a/tests/unit/shutdown.tcl +++ b/tests/unit/shutdown.tcl @@ -26,6 +26,17 @@ start_server {tags {"shutdown external:skip"}} { } start_server {tags {"shutdown external:skip"}} { + test {SHUTDOWN ABORT can cancel SIGTERM} { + r debug pause-cron 1 + set pid [s process_id] + exec kill -SIGTERM $pid + after 10; # Give signal handler some time to run + r shutdown abort + verify_log_message 0 "*Shutdown manually aborted*" 0 + r debug pause-cron 0 + r ping + } {PONG} + test {Temp rdb will be deleted in signal handle} { for {set i 0} {$i < 20} {incr i} { r set $i $i