diff --git a/README.md b/README.md index 63556d78b22f713e61a150b743454e8509663531..3a45409b315024c3faa32efdc109ef3d920c2a83 100644 --- a/README.md +++ b/README.md @@ -1,7 +1,7 @@ What is openAMDC? ----------------- -openAMDC is an open source, high-performance, in-memory, key-value database which forks from Redis. It is compatible with all the commands, protocol and data structures that were originally available in Redis, making it feasible to utilize openAMDC as a direct substitute for Redis. The architecture of openAMDC is as follows: +openAMDC is an open source, high-performance, in-memory, key-value database. It is compatible with all the commands, protocol and data structures that were originally available in Redis, making it feasible to utilize openAMDC as a direct substitute for Redis. The architecture of openAMDC is as follows:
@@ -199,7 +199,7 @@ You can use openamdc-cli to play with openAMDC. Start a openamdc-server instance then in another terminal try the following: % cd src - % ./oepnamdc-cli + % ./openamdc-cli openamdc> ping PONG openamdc> set foo bar @@ -238,15 +238,15 @@ to run openAMDC properly as a background daemon that will start again on system reboots. You'll be able to stop and start openAMDC using the script named -`/etc/init.d/openamdc_`, for instance `/etc/init.d/oepnamdc_6379`. +`/etc/init.d/openamdc_`, for instance `/etc/init.d/openamdc_6379`. Code contributions ------------------ Note: Contribute code to the openAMDC project by sending a pull request via -Gitee, you agree to release your code under the terms of the BSD license +Gitee, you agree to release your code under the terms of the MulanPSLv2 license that you can find in the COPYING file included in the openAMDC source -distribution. You will include BSD license in the COPYING file within each +distribution. You will include MulanPSLv2 license in the COPYING file within each source file that you contribute. 1. If it is a major feature or a semantical change, create an [issue](.github/ISSUE_TEMPLATE/feature_request.md) diff --git a/README.zh.md b/README.zh.md index 727ec21cf8d9b245ad24aa63ffdd78573a918fef..4ffe3d0d9e16d35fe4c4dd61349d3a8ff30726e5 100644 --- a/README.zh.md +++ b/README.zh.md @@ -1,7 +1,7 @@ 什么是openAMDC? -------------- -openAMDC是一个开源且高性能的键值内存数据库,作为一个Redis分支其兼容所有Redis命令、协议以及数据结构,可以轻松使用openAMDC来替换已有的Redis服务。openAMDC架构如下: +openAMDC是一个开源且高性能的键值内存数据库,兼容所有Redis命令、协议以及数据结构,可以轻松使用openAMDC来替换已有的Redis服务。openAMDC架构如下:
@@ -178,7 +178,7 @@ redis.conf中的所有选项也都可以作为命令行选项,名称完全相 您可以使用openamdc-cli来与openAMDC互动。启动一个openamdc-server实例,然后在另一个终端尝试以下操作: % cd src - % ./oepnamdc-cli + % ./openamdc-cli openamdc> ping PONG openamdc> set foo bar @@ -210,13 +210,13 @@ redis.conf中的所有选项也都可以作为命令行选项,名称完全相 该脚本会问你几个问题,并将设置好您需要的一切,以便将openAMDC正确地作为一个后台守护进程运行,该进程会在系统重启时再次启动。 -您将能够使用名为/etc/init.d/openamdc_的脚本停止和启动openAMDC,例如/etc/init.d/oepnamdc_6379。 +您将能够使用名为/etc/init.d/openamdc_的脚本停止和启动openAMDC,例如/etc/init.d/openamdc_6379。 代码贡献 ------- -注意:通过Gitee发送拉取请求向openAMDC项目贡献代码,即表示您同意根据在openAMDC源代码分发中包含的COPYING文件中可找到的BSD许可 -条款发布您的代码。您将在您所贡献的每个源代码文件内的COPYING文件中包含BSD许可。 +注意:通过Gitee发送拉取请求向openAMDC项目贡献代码,即表示您同意根据在openAMDC源代码分发中包含的COPYING文件中可找到的MulanPSLv2许可 +条款发布您的代码。您将在您所贡献的每个源代码文件内的COPYING文件中包含MulanPSLv2许可。 1. 如果这是一个重要的特性或语义上的改变,请在Gitee上创建一个[问题](.github/ISSUE_TEMPLATE/feature_request.md),准确地描述你想要实现的内容以及原因,用例对于特性被接受很重要。 diff --git a/src/aof.c b/src/aof.c index 0ce705cbc723d913512fd3d39ef5ba1a13521122..685c6b244baa23bb5541940c633ad952308f351b 100644 --- a/src/aof.c +++ b/src/aof.c @@ -718,7 +718,7 @@ struct client *createAOFClient(void) { c->sockname = NULL; c->resp = 2; c->user = NULL; - mutexInit(&c->lock, "fake client"); + mutexInit(&c->lock, skipLock, "fake client"); listSetFreeMethod(c->reply,freeClientReplyValue); listSetDupMethod(c->reply,dupClientReplyValue); initClientMultiState(c); diff --git a/src/blocked.c b/src/blocked.c index 92cb68fa1b5bb4c52cc53c37645e463a6cf912e2..e06860c4a56f0b3dfd97cb2dfec19258596cde18 100644 --- a/src/blocked.c +++ b/src/blocked.c @@ -134,8 +134,9 @@ void queueClientForReprocessing(client *c) { /* Unblock a client calling the right function depending on the kind * of operation the client is blocking for. */ void unblockClient(client *c) { + WRAPPER_MUTEX_NOCLEANUP_LOCK(cl, &c->lock); serverAssert(threadOwnLock()); - serverAssert(mutexOwnLock(&c->lock)); + serverAssert(wrapperMutexOwnLock(&cl)); if (c->btype == BLOCKED_LIST || c->btype == BLOCKED_ZSET || c->btype == BLOCKED_STREAM) { diff --git a/src/db.c b/src/db.c index b60ac54ea555f7218afa3f36587f669d7a84137b..511240b168a9141f8f673b83e83da8065244d4cd 100644 --- a/src/db.c +++ b/src/db.c @@ -1814,31 +1814,6 @@ int georadiusGetKeys(struct redisCommand *cmd, robj **argv, int argc, getKeysRes return num; } -/* LCS ... [KEYS ] ... */ -int lcsGetKeys(struct redisCommand *cmd, robj **argv, int argc, getKeysResult *result) { - int i; - int *keys = getKeysPrepareResult(result, 2); - UNUSED(cmd); - - /* We need to parse the options of the command in order to check for the - * "KEYS" argument before the "STRINGS" argument. */ - for (i = 1; i < argc; i++) { - char *arg = argv[i]->ptr; - int moreargs = (argc-1) - i; - - if (!strcasecmp(arg, "strings")) { - break; - } else if (!strcasecmp(arg, "keys") && moreargs >= 2) { - keys[0] = i+1; - keys[1] = i+2; - result->numkeys = 2; - return result->numkeys; - } - } - result->numkeys = 0; - return result->numkeys; -} - /* Helper function to extract keys from memory command. * MEMORY USAGE */ int memoryGetKeys(struct redisCommand *cmd, robj **argv, int argc, getKeysResult *result) { diff --git a/src/debug.c b/src/debug.c index 0dd806d6b0fd3776f22653c726dda0dd6665fbb4..b5b3f8b8513e7568556bc7139e5159ddb472be91 100644 --- a/src/debug.c +++ b/src/debug.c @@ -1718,7 +1718,8 @@ int memtest_test_linux_anonymous_maps(void) { static void killMainAndWorkerThreads(void) { int err; for (int iel = 0; iel < MAX_THREAD_VAR; iel++) { - if (iel < server.worker_threads_num || iel == MODULE_THREAD_ID) { + if (iel < server.worker_threads_num || + (iel == MODULE_THREAD_ID && server.worker_threads_num > 1)) { if (server.thread[iel] != pthread_self()) { pthread_cancel(server.thread[iel]); } diff --git a/src/help.h b/src/help.h index bbcdd6732cf2a673cf97a0e9f60b0faa2e68e6e0..62042a043cbf2947d49621601b30a07015d04be0 100644 --- a/src/help.h +++ b/src/help.h @@ -1145,8 +1145,8 @@ struct commandHelp { "Incrementally iterate Set elements", 3, "2.8.0" }, - { "STRALGO", - "LCS algo-specific-argument [algo-specific-argument ...]", + { "LCS", + "key1 key2 [LEN] [IDX] [MINMATCHLEN ] [WITHMATCHLEN]", "Run algorithms (currently LCS) against strings", 1, "6.0.0" }, diff --git a/src/mutex.c b/src/mutex.c index 52f7625533e0a5f56609303de19d081d6b452f8c..d2e0269022cb850fd55f542807aba0cc03ac2097 100644 --- a/src/mutex.c +++ b/src/mutex.c @@ -13,11 +13,12 @@ #include "mutex.h" /* Initializes a mutex structure with specified attributes */ -int mutexInit(struct mutex *m, char *name) { +int mutexInit(struct mutex *m, mutexSkipLock *skipLock, char *name) { int ret = 0; m->name = name; m->depth = 0; m->owner = 0; + m->skipLock = skipLock; /* Initialize mutex attribute object */ if ((ret = pthread_mutexattr_init(&m->attr)) != 0) return ret; @@ -143,7 +144,8 @@ int mutexOwnLock(struct mutex *m) { /* Wraps a mutex lock operation, tracking lock depth for a wrapper * mutex structure */ int wrapperMutexLock(struct wrapperMutex *wm) { - assert(wm->lock); + if (wm->lock == NULL) return 0; + if (wm->lock->skipLock != NULL && wm->lock->skipLock()) return 0; int ret = mutexLock(wm->lock); if (ret == 0) wm->depth++; @@ -152,7 +154,8 @@ int wrapperMutexLock(struct wrapperMutex *wm) { /* Attempts to lock a mutex, wrapped for tracking lock depth */ int wrapperMutexTryLock(struct wrapperMutex *wm) { - assert(wm->lock); + if (wm->lock == NULL) return 0; + if (wm->lock->skipLock != NULL && wm->lock->skipLock()) return 0; int ret = mutexTryLock(wm->lock); if (ret == 0) wm->depth++; @@ -166,6 +169,7 @@ int wrapperMutexUnlock(struct wrapperMutex *wm) { int ret = 0; if (wm->lock == NULL || wm->depth == 0) return ret; + if (wm->lock->skipLock != NULL && wm->lock->skipLock()) return ret; ret = mutexUnlock(wm->lock); if (ret == 0) wm->depth--; @@ -174,6 +178,7 @@ int wrapperMutexUnlock(struct wrapperMutex *wm) { /* Determines if the current thread owns the lock of a wrapper mutex */ int wrapperMutexOwnLock(struct wrapperMutex *wm) { - assert(wm->lock); + if (wm->lock == NULL) return 1; + if (wm->lock->skipLock != NULL && wm->lock->skipLock()) return 1; return mutexOwnLock(wm->lock); } \ No newline at end of file diff --git a/src/mutex.h b/src/mutex.h index 4bcdf30f0e407306e11cae174f0da4cceabf8778..10eb9d641329b20b2a74c10b00e16c19ba1e598c 100644 --- a/src/mutex.h +++ b/src/mutex.h @@ -17,15 +17,18 @@ #include #include +typedef int (mutexSkipLock)(); + struct mutex { char *name; int depth; pthread_t owner; pthread_mutexattr_t attr; pthread_mutex_t mutex; + mutexSkipLock *skipLock; }; -int mutexInit(struct mutex *m, char *name); +int mutexInit(struct mutex *m, mutexSkipLock *skipLock, char *name); int mutexLock(struct mutex *m); int mutexTryLock(struct mutex *m); int mutexUnlock(struct mutex *m); @@ -37,11 +40,20 @@ struct wrapperMutex { int depth; }; -#define WRAPPER_MUTEX_DEFINE(v) __attribute__((__cleanup__(wrapperMutexUnlock))) struct wrapperMutex (v) = {.lock = NULL, .depth = 0} -#define WRAPPER_MUTEX_DEFER_LOCK(v, l) __attribute__((__cleanup__(wrapperMutexUnlock))) struct wrapperMutex (v) = {.lock = (l), .depth = 0} -#define WRAPPER_MUTEX_LOCK(v, l) \ - __attribute__((__cleanup__(wrapperMutexUnlock))) struct wrapperMutex (v) = {.lock = (l), .depth = 0}; \ - wrapperMutexLock(&(v)) +#define WRAPPER_MUTEX_DEFINE(v) \ + __attribute__((__cleanup__(wrapperMutexUnlock))) struct wrapperMutex (v) = {.lock = NULL, .depth = 0} + +#define WRAPPER_MUTEX_DEFER_LOCK(v, l) \ + __attribute__((__cleanup__(wrapperMutexUnlock))) struct wrapperMutex (v) = {.lock = (l), .depth = 0} + +#define WRAPPER_MUTEX_LOCK(v, l) \ + __attribute__((__cleanup__(wrapperMutexUnlock))) struct wrapperMutex (v) = {.lock = (l), .depth = 0}; wrapperMutexLock(&(v)) + +#define WRAPPER_MUTEX_NOCLEANUP_LOCK(v, l) \ + struct wrapperMutex (v) = {.lock = (l), .depth = 0}; wrapperMutexLock(&(v)) + +#define WRAPPER_MUTEX_NOCLEANUP_DEFINE(v, l) \ + struct wrapperMutex (v) = {.lock = (l), .depth = 1}; int wrapperMutexLock(struct wrapperMutex *wm); int wrapperMutexTryLock(struct wrapperMutex *wm); diff --git a/src/networking.c b/src/networking.c index e9bd76b93da93f23774458a42168a1b07fd1ed2a..4ea5b8bc5fe9298f1c0ccb6160661059789e58f8 100644 --- a/src/networking.c +++ b/src/networking.c @@ -127,7 +127,7 @@ client *createClient(connection *conn, int iel) { c->resp = 2; c->iel = iel; c->conn = conn; - mutexInit(&c->lock, "client lock"); + mutexInit(&c->lock, skipLock, "client lock"); c->name = NULL; c->bufpos = 0; c->qb_pos = 0; @@ -369,8 +369,9 @@ void clientInstallWriteHandler(client *c) { * loop, we can try to directly write to the client sockets avoiding * a system call. We'll only really install the write handler if * we'll not be able to write the whole reply at once. */ + WRAPPER_MUTEX_NOCLEANUP_DEFINE(cl, &c->lock); + serverAssert(wrapperMutexOwnLock(&cl)); serverAssert(clientCheckThreadAligned(c)); - serverAssert(mutexOwnLock(&c->lock)); c->flags |= CLIENT_PENDING_WRITE; WRAPPER_MUTEX_LOCK(pwl, &server.pending_write_lock[c->iel]); listAddNodeHead(server.pending_write_list[c->iel],c); @@ -413,7 +414,8 @@ int prepareClientToWrite(client *c) { if (async) { serverAssert(threadOwnLock()); } else { - serverAssert(c->conn == NULL || mutexOwnLock(&c->lock)); + WRAPPER_MUTEX_NOCLEANUP_DEFINE(cl, &c->lock); + serverAssert(c->conn == NULL || wrapperMutexOwnLock(&cl)); } /* If it's the Lua client we always return ok without installing any @@ -1515,9 +1517,10 @@ int anyOtherSlaveWaitRdb(client *except_me) { * be referenced, not including the Pub/Sub channels. * This is used by freeClient() and replicationCacheMaster(). */ void unlinkClient(client *c) { + WRAPPER_MUTEX_NOCLEANUP_DEFINE(cl, &c->lock); serverAssert(clientCheckThreadAligned(c)); serverAssert(c->conn == NULL || threadOwnLock()); - serverAssert(c->conn == NULL || mutexOwnLock(&c->lock)); + serverAssert(c->conn == NULL || wrapperMutexOwnLock(&cl)); listIter li; listNode *ln; @@ -1579,7 +1582,8 @@ void unlinkClient(client *c) { ln = NULL; int found = 0; for (int iel = 0; iel < MAX_THREAD_VAR; ++iel) { - if (iel < server.worker_threads_num || iel == MODULE_THREAD_ID) { + if (iel < server.worker_threads_num || + (iel == MODULE_THREAD_ID && server.worker_threads_num > 1)) { ln = listSearchKey(server.async_pending_write_list[iel], c); if (ln) { found = 1; @@ -1603,18 +1607,17 @@ void unlinkClient(client *c) { if (c->flags & CLIENT_TRACKING) disableTracking(c); } -int freeClient(client *c) { +void freeClient(client *c) { serverAssert(c->conn == NULL || threadOwnLock()); serverAssert(clientCheckThreadAligned(c)); - mutexLock(&c->lock); + WRAPPER_MUTEX_LOCK(wl, &c->lock); listNode *ln; /* If a client is protected, yet we need to free it right now, make sure * to at least use asynchronous freeing. */ if (c->flags & CLIENT_PROTECTED || c->async_ops) { freeClientAsync(c); - mutexUnlock(&c->lock); - return 0; + return; } /* For connected clients, call the disconnection event of modules hooks. */ @@ -1648,8 +1651,7 @@ int freeClient(client *c) { if (!(c->flags & (CLIENT_PROTOCOL_ERROR|CLIENT_BLOCKED))) { c->flags &= ~(CLIENT_CLOSE_ASAP|CLIENT_CLOSE_AFTER_REPLY); replicationCacheMaster(c); - mutexUnlock(&c->lock); - return 0; + return; } } @@ -1747,10 +1749,9 @@ int freeClient(client *c) { sdsfree(c->peerid); sdsfree(c->sockname); sdsfree(c->slave_addr); - mutexUnlock(&c->lock); + wrapperMutexUnlock(&wl); mutexDestroy(&c->lock); zfree(c); - return 1; } /* Schedule a client to free it at a safe time in the serverCron() function. @@ -3837,16 +3838,15 @@ void processEventsWhileBlocked(int iel) { listRewind(server.clients, &li); while((ln = listNext(&li)) != NULL) { client *c = listNodeValue(ln); - if (mutexOwnLock(&c->lock)) { - serverAssert(c->flags & CLIENT_PROTECTED || - c->flags & CLIENT_INPROGRESS_COMMAND); - mutexUnlock(&c->lock); + WRAPPER_MUTEX_NOCLEANUP_DEFINE(cl, &c->lock); + if (wrapperMutexOwnLock(&cl)) { + wrapperMutexUnlock(&cl); listAddNodeTail(clients, c); } } - mutexUnlock(&globalLock); - serverAssert(!threadOwnLock()); + WRAPPER_MUTEX_NOCLEANUP_DEFINE(wl, &globalLock); + wrapperMutexUnlock(&wl); { int iterations = 4; /* See the function top-comment. */ @@ -3877,12 +3877,11 @@ void processEventsWhileBlocked(int iel) { ProcessingEventsWhileBlocked = 0; } - struct wrapperMutex wm = {&globalLock}; - wrapperMutexLock(&wm); + WRAPPER_MUTEX_NOCLEANUP_LOCK(gl, &globalLock); listRewind(clients, &li); while((ln = listNext(&li)) != NULL) { client *c = listNodeValue(ln); - mutexLock(&c->lock); + WRAPPER_MUTEX_NOCLEANUP_LOCK(cl, &c->lock); } listRelease(clients); } diff --git a/src/openamdc-benchmark.c b/src/openamdc-benchmark.c index 9a7613fa83cc710e71c3064388d338cb60a02dda..18646b3a77b9af31a272d083d66560a93744c98f 100644 --- a/src/openamdc-benchmark.c +++ b/src/openamdc-benchmark.c @@ -1674,7 +1674,7 @@ int main(int argc, const char **argv) { client c; - mutexInit(&globalLock, "global lock"); + mutexInit(&globalLock, NULL, "global lock"); srandom(time(NULL) ^ getpid()); init_genrand64(ustime() ^ getpid()); signal(SIGHUP, SIG_IGN); diff --git a/src/pubsub.c b/src/pubsub.c index 7c404090362b81aa942c39fe1f20a237d27de97d..ed13debf756c6e39decfa39f59f344e521f209f9 100644 --- a/src/pubsub.c +++ b/src/pubsub.c @@ -134,8 +134,9 @@ int clientSubscriptionsCount(client *c) { /* Subscribe a client to a channel. Returns 1 if the operation succeeded, or * 0 if the client was already subscribed to that channel. */ int pubsubSubscribeChannel(client *c, robj *channel) { + WRAPPER_MUTEX_NOCLEANUP_DEFINE(cl, &c->lock); serverAssert(threadOwnLock()); - serverAssert(mutexOwnLock(&c->lock)); + serverAssert(wrapperMutexOwnLock(&cl)); dictEntry *de; list *clients = NULL; int retval = 0; diff --git a/src/scripting.c b/src/scripting.c index e317e963b450f2c38086f6cf65de50175beb007c..a40356d7966f229efee4026bd2e0dbffa95ec6f6 100644 --- a/src/scripting.c +++ b/src/scripting.c @@ -589,14 +589,13 @@ int luaRedisGenericCommand(lua_State *lua, int raise_error) { } inuse++; - mutexLock(&c->lock); + WRAPPER_MUTEX_LOCK(wl, &c->lock); /* Require at least one argument */ if (argc == 0) { luaPushError(lua, "Please specify at least one argument for redis.call()"); inuse--; - mutexUnlock(&c->lock); return raise_error ? luaRaiseError(lua) : 1; } @@ -916,11 +915,9 @@ cleanup: * form of a table with an "err" field. Extract the string to * return the plain error. */ inuse--; - mutexUnlock(&c->lock); return luaRaiseError(lua); } inuse--; - mutexUnlock(&c->lock); return 1; } diff --git a/src/sentinel.c b/src/sentinel.c index 10c8145b179f97ba4e07c8dd287207a092b0ae9f..234525640f1acd3b1ab792e2b630edfbb66053c5 100644 --- a/src/sentinel.c +++ b/src/sentinel.c @@ -874,10 +874,12 @@ void sentinelRunPendingScripts(void) { } else if (pid == 0) { /* Child */ tlsCleanup(); - for (int iel = 0; iel < server.worker_threads_num; iel++) { - aeClosePipes(server.el[iel]); + for (int iel = 0; iel < MAX_THREAD_VAR; iel++) { + if (iel < server.worker_threads_num || + (iel == MODULE_THREAD_ID && server.worker_threads_num > 1)) { + aeClosePipes(server.el[iel]); + } } - aeClosePipes(server.el[MODULE_THREAD_ID]); execve(sj->argv[0],sj->argv,environ); /* If we are here an error occurred. */ _exit(2); /* Don't retry execution. */ diff --git a/src/server.c b/src/server.c index d9a86cd670ad2d635aac7067371f0a26fd7fd85c..80a487fe33df3b45e6508404cfe33e93e747291a 100644 --- a/src/server.c +++ b/src/server.c @@ -1083,9 +1083,9 @@ struct redisCommand redisCommandTable[] = { "admin no-script ok-loading ok-stale", 0,NULL,0,0,0,0,0,0}, - {"stralgo",stralgoCommand,-2, + {"lcs",lcsCommand,-3, "read-only @string", - 0,lcsGetKeys,0,0,0,0,0,0}, + 0,NULL,1,2,1,0,0,0}, {"reset",resetCommand,1, "no-script ok-stale ok-loading fast @connection", @@ -1823,7 +1823,7 @@ void clientsCron(int iel) { head = listFirst(server.clients); c = listNodeValue(head); if (c->iel == iel) { - mutexLock(&c->lock); + WRAPPER_MUTEX_NOCLEANUP_LOCK(cl, &c->lock); /* The following functions do different service checks on the client. * The protocol is that they return non-zero if the client was * terminated. */ @@ -1833,7 +1833,7 @@ void clientsCron(int iel) { if (clientsCronTrackClientsMemUsage(c)) goto unlock; if (closeClientOnOutputBufferLimitReached(c, 0)) continue; unlock: - mutexUnlock(&c->lock); + wrapperMutexUnlock(&cl); } } @@ -2037,9 +2037,10 @@ void asyncHandleClientsWithPendingWrites(void *var) { /* Install a write handler for the client to manage future write events */ clientInstallWriteHandler(c); /* Process the pending write tasks for the current thread */ - mutexUnlock(&c->lock); + WRAPPER_MUTEX_NOCLEANUP_DEFINE(cl, &c->lock); + wrapperMutexUnlock(&cl); handleClientsWithPendingWrites(c->iel); - mutexLock(&c->lock); + wrapperMutexLock(&cl); } /* @@ -2825,7 +2826,7 @@ void initServerConfig(void) { server.repl_backlog_idx = 0; server.repl_backlog_off = 0; server.repl_no_slaves_since = time(NULL); - mutexInit(&replBacklogLock, "repl backlog lock"); + mutexInit(&replBacklogLock, skipLock, "repl backlog lock"); /* Failover related */ server.failover_end_time = 0; @@ -3431,7 +3432,8 @@ void initServer(void) { server.worker_threads_num = server.worker_threads_num == 0 ? 1 : server.worker_threads_num; for (int iel = 0; iel < MAX_THREAD_VAR; iel++) { - if (iel < server.worker_threads_num || iel == MODULE_THREAD_ID) { + if (iel < server.worker_threads_num || + (iel == MODULE_THREAD_ID && server.worker_threads_num > 1)) { server.unblocked_clients[iel] = listCreate(); server.pending_write_list[iel] = listCreate(); server.client_handling_list[iel] = listCreate(); @@ -3457,7 +3459,7 @@ void initServer(void) { } aeSetBeforeSleepProc(server.el[iel], beforeSleep); aeSetAfterSleepProc(server.el[iel], afterSleep); - mutexInit(&server.pending_write_lock[iel], "pending write lock"); + mutexInit(&server.pending_write_lock[iel], skipLock, "pending write lock"); /* Create the timer callback, this is our way to process many background * operations incrementally, like clients timeout, eviction of unaccessed @@ -4606,7 +4608,8 @@ int prepareForShutdown(int flags) { * calls aeAsyncFunction with asyncAeEventStop, passing the current thread * index as an argument. */ for (int iel = 0; iel < MAX_THREAD_VAR; iel++) { - if (iel < server.worker_threads_num || iel == MODULE_THREAD_ID) { + if (iel < server.worker_threads_num || + (iel == MODULE_THREAD_ID && server.worker_threads_num > 1)) { void *var = (void*)((int64_t)iel); aeAsyncFunction(server.el[iel], asyncAeEventStop, var, NULL, 1); } @@ -6091,7 +6094,8 @@ void closeChildUnusedResourceAfterFork() { if (server.cluster_enabled && server.cluster_config_file_lock_fd != -1) close(server.cluster_config_file_lock_fd); /* don't care if this fails */ for (int iel = 0; iel < MAX_THREAD_VAR; iel++) { - if (iel < server.worker_threads_num || iel == MODULE_THREAD_ID) + if (iel < server.worker_threads_num || + (iel == MODULE_THREAD_ID && server.worker_threads_num > 1)) aeClosePipes(server.el[iel]); } @@ -6382,7 +6386,12 @@ int iAmMaster(void) { } int threadOwnLock() { - return mutexOwnLock(&globalLock); + WRAPPER_MUTEX_NOCLEANUP_DEFINE(wl, &globalLock); + return wrapperMutexOwnLock(&wl); +} + +int skipLock() { + return server.worker_threads_num == 1; } #ifdef REDIS_TEST @@ -6459,9 +6468,6 @@ int main(int argc, char **argv) { return 0; } #endif - /* Init lock */ - mutexInit(&globalLock, "global lock"); - mutexLock(&globalLock); /* We need to initialize our libraries, and the server configuration. */ #ifdef INIT_SETPROCTITLE_REPLACEMENT @@ -6597,6 +6603,10 @@ int main(int argc, char **argv) { readOOMScoreAdj(); initServer(); + /* Init lock */ + mutexInit(&globalLock, skipLock, "global lock"); + WRAPPER_MUTEX_NOCLEANUP_LOCK(wl, &globalLock); + if (background || server.pidfile) createPidFile(); if (server.set_proc_title) redisSetProcTitle(NULL); redisAsciiArt(); @@ -6667,16 +6677,17 @@ int main(int argc, char **argv) { redisSetCpuAffinity(server.server_cpulist); setOOMScoreAdj(-1); - mutexUnlock(&globalLock); + wrapperMutexUnlock(&wl); moduleReleaseGIL(); pthread_attr_t tattr; pthread_attr_init(&tattr); pthread_attr_setstacksize(&tattr, 1 << 23); - pthread_create(server.thread + MODULE_THREAD_ID, &tattr, moduleThread, (void *)((int64_t)MODULE_THREAD_ID)); for (int iel = 0; iel < server.worker_threads_num; ++iel) { pthread_create(server.thread + iel, &tattr, workerThread, (void *)((int64_t)iel)); } + if (server.worker_threads_num > 1) + pthread_create(server.thread + MODULE_THREAD_ID, &tattr, moduleThread, (void *)((int64_t)MODULE_THREAD_ID)); /* Block SIGALRM from this thread, it should only be received on a worker thread */ sigset_t sigset; @@ -6685,10 +6696,10 @@ int main(int argc, char **argv) { pthread_sigmask(SIG_BLOCK, &sigset, NULL); /* The worker/module thread sleeps until all the workers are done. */ - pthread_join(server.thread[MODULE_THREAD_ID], NULL); for (int iel = 0; iel < server.worker_threads_num; ++iel) pthread_join(server.thread[iel], NULL); - + if (server.worker_threads_num > 1) + pthread_join(server.thread[MODULE_THREAD_ID], NULL); return 0; } diff --git a/src/server.h b/src/server.h index b8d41ed7209df601e7e559ef36932dcad38b0a16..f1359a4375f40b9434ee79460979564ebeb84e3d 100644 --- a/src/server.h +++ b/src/server.h @@ -1836,7 +1836,7 @@ void parsedRelease(void *ptr); /* networking.c -- Networking and Client related operations */ client *createClient(connection *conn, int iel); void closeTimedoutClients(void); -int freeClient(client *c); +void freeClient(client *c); void freeClientAsync(client *c); void resetClient(client *c); void freeClientOriginalArgv(client *c); @@ -2444,7 +2444,6 @@ int migrateGetKeys(struct redisCommand *cmd, robj **argv, int argc, getKeysResul int georadiusGetKeys(struct redisCommand *cmd, robj **argv, int argc, getKeysResult *result); int xreadGetKeys(struct redisCommand *cmd, robj **argv, int argc, getKeysResult *result); int memoryGetKeys(struct redisCommand *cmd, robj **argv, int argc, getKeysResult *result); -int lcsGetKeys(struct redisCommand *cmd, robj **argv, int argc, getKeysResult *result); /* Cluster */ void clusterInit(void); @@ -2744,7 +2743,7 @@ void xdelCommand(client *c); void xtrimCommand(client *c); void lolwutCommand(client *c); void aclCommand(client *c); -void stralgoCommand(client *c); +void lcsCommand(client *c); void resetCommand(client *c); void failoverCommand(client *c); @@ -2803,5 +2802,6 @@ int tlsConfigure(redisTLSContextConfig *ctx_config); int iAmMaster(void); void aeClosePipes(aeEventLoop *eventLoop); int threadOwnLock(); +int skipLock(); #endif diff --git a/src/t_string.c b/src/t_string.c index 29bf80e111ffdef075a656e0ec7902e32f4b62d7..8f455a4c31b0ecb9d28706b0c357b6ea92d5cf54 100644 --- a/src/t_string.c +++ b/src/t_string.c @@ -387,12 +387,9 @@ void getexCommand(client *c) { void getdelCommand(client *c) { if (getGenericCommand(c) == C_ERR) return; - int deleted = server.lazyfree_lazy_user_del ? dbAsyncDelete(c->db, c->argv[1]) : - dbSyncDelete(c->db, c->argv[1]); - if (deleted) { - /* Propagate as DEL/UNLINK command */ - robj *aux = server.lazyfree_lazy_user_del ? shared.unlink : shared.del; - rewriteClientCommandVector(c,2,aux,c->argv[1]); + if (dbSyncDelete(c->db, c->argv[1])) { + /* Propagate as DEL command */ + rewriteClientCommandVector(c,2,shared.del,c->argv[1]); signalModifiedKey(c, c->db, c->argv[1]); notifyKeyspaceEvent(NOTIFY_GENERIC, "del", c->argv[1], c->db->id); server.dirty++; @@ -598,9 +595,7 @@ void incrDecrCommand(client *c, long long incr) { signalModifiedKey(c,c->db,c->argv[1]); notifyKeyspaceEvent(NOTIFY_STRING,"incrby",c->argv[1],c->db->id); server.dirty++; - addReply(c,shared.colon); - addReply(c,new); - addReply(c,shared.crlf); + addReplyLongLong(c, value); } void incrCommand(client *c) { @@ -622,6 +617,11 @@ void decrbyCommand(client *c) { long long incr; if (getLongLongFromObjectOrReply(c, c->argv[2], &incr, NULL) != C_OK) return; + /* Overflow check: negating LLONG_MIN will cause an overflow */ + if (incr == LLONG_MIN) { + addReplyError(c, "decrement would overflow"); + return; + } incrDecrCommand(c,-incr); } @@ -697,31 +697,33 @@ void strlenCommand(client *c) { addReplyLongLong(c,stringObjectLen(o)); } - -/* STRALGO -- Implement complex algorithms on strings. - * - * STRALGO ... arguments ... */ -void stralgoLCS(client *c); /* This implements the LCS algorithm. */ -void stralgoCommand(client *c) { - /* Select the algorithm. */ - if (!strcasecmp(c->argv[1]->ptr,"lcs")) { - stralgoLCS(c); - } else { - addReplyErrorObject(c,shared.syntaxerr); - } -} - -/* STRALGO [IDX] [LEN] [MINMATCHLEN ] [WITHMATCHLEN] - * STRINGS | KEYS - */ -void stralgoLCS(client *c) { +/* LCS key1 key2 [LEN] [IDX] [MINMATCHLEN ] [WITHMATCHLEN] */ +void lcsCommand(client *c) { uint32_t i, j; long long minmatchlen = 0; sds a = NULL, b = NULL; int getlen = 0, getidx = 0, withmatchlen = 0; robj *obja = NULL, *objb = NULL; - for (j = 2; j < (uint32_t)c->argc; j++) { + obja = lookupKeyRead(c->db,c->argv[1]); + objb = lookupKeyRead(c->db,c->argv[2]); + if ((obja && obja->type != OBJ_STRING) || + (objb && objb->type != OBJ_STRING)) + { + addReplyError(c, + "The specified keys must contain string values"); + /* Don't cleanup the objects, we need to do that + * only after calling getDecodedObject(). */ + obja = NULL; + objb = NULL; + goto cleanup; + } + obja = obja ? getDecodedObject(obja) : createStringObject("",0); + objb = objb ? getDecodedObject(objb) : createStringObject("",0); + a = obja->ptr; + b = objb->ptr; + + for (j = 3; j < (uint32_t)c->argc; j++) { char *opt = c->argv[j]->ptr; int moreargs = (c->argc-1) - j; @@ -736,37 +738,6 @@ void stralgoLCS(client *c) { != C_OK) goto cleanup; if (minmatchlen < 0) minmatchlen = 0; j++; - } else if (!strcasecmp(opt,"STRINGS") && moreargs > 1) { - if (a != NULL) { - addReplyError(c,"Either use STRINGS or KEYS"); - goto cleanup; - } - a = c->argv[j+1]->ptr; - b = c->argv[j+2]->ptr; - j += 2; - } else if (!strcasecmp(opt,"KEYS") && moreargs > 1) { - if (a != NULL) { - addReplyError(c,"Either use STRINGS or KEYS"); - goto cleanup; - } - obja = lookupKeyRead(c->db,c->argv[j+1]); - objb = lookupKeyRead(c->db,c->argv[j+2]); - if ((obja && obja->type != OBJ_STRING) || - (objb && objb->type != OBJ_STRING)) - { - addReplyError(c, - "The specified keys must contain string values"); - /* Don't cleanup the objects, we need to do that - * only after calling getDecodedObject(). */ - obja = NULL; - objb = NULL; - goto cleanup; - } - obja = obja ? getDecodedObject(obja) : createStringObject("",0); - objb = objb ? getDecodedObject(objb) : createStringObject("",0); - a = obja->ptr; - b = objb->ptr; - j += 2; } else { addReplyErrorObject(c,shared.syntaxerr); goto cleanup; @@ -774,14 +745,9 @@ void stralgoLCS(client *c) { } /* Complain if the user passed ambiguous parameters. */ - if (a == NULL) { - addReplyError(c,"Please specify two strings: " - "STRINGS or KEYS options are mandatory"); - goto cleanup; - } else if (getlen && getidx) { + if (getlen && getidx) { addReplyError(c, - "If you want both the length and indexes, please " - "just use IDX."); + "If you want both the length and indexes, please just use IDX."); goto cleanup; } @@ -798,17 +764,22 @@ void stralgoLCS(client *c) { /* Setup an uint32_t array to store at LCS[i,j] the length of the * LCS A0..i-1, B0..j-1. Note that we have a linear array here, so - * we index it as LCS[j+(blen+1)*j] */ + * we index it as LCS[j+(blen+1)*i] */ #define LCS(A,B) lcs[(B)+((A)*(blen+1))] /* Try to allocate the LCS table, and abort on overflow or insufficient memory. */ unsigned long long lcssize = (unsigned long long)(alen+1)*(blen+1); /* Can't overflow due to the size limits above. */ unsigned long long lcsalloc = lcssize * sizeof(uint32_t); uint32_t *lcs = NULL; - if (lcsalloc < SIZE_MAX && lcsalloc / lcssize == sizeof(uint32_t)) + if (lcsalloc < SIZE_MAX && lcsalloc / lcssize == sizeof(uint32_t)) { + if (lcsalloc > (size_t)server.proto_max_bulk_len) { + addReplyError(c, "Insufficient memory, transient memory for LCS exceeds proto-max-bulk-len"); + goto cleanup; + } lcs = ztrymalloc(lcsalloc); + } if (!lcs) { - addReplyError(c, "Insufficient memory"); + addReplyError(c, "Insufficient memory, failed allocating transient memory for LCS"); goto cleanup; } @@ -840,7 +811,7 @@ void stralgoLCS(client *c) { * it backward, but the length is already known, we store it into idx. */ uint32_t idx = LCS(alen,blen); sds result = NULL; /* Resulting LCS string. */ - void *arraylenptr = NULL; /* Deffered length of the array for IDX. */ + void *arraylenptr = NULL; /* Deferred length of the array for IDX. */ uint32_t arange_start = alen, /* alen signals that values are not set. */ arange_end = 0, brange_start = 0, diff --git a/src/tls.c b/src/tls.c index cdf89957488ebf9b7a6518dc4bd1d59ab92b3982..b3c057d06a4ba746b501e9b7fa5da1648f2592c5 100644 --- a/src/tls.c +++ b/src/tls.c @@ -1035,7 +1035,6 @@ int tlsHasPendingData() { } int tlsProcessPendingData() { - serverAssert(!threadOwnLock()); listIter li; listNode *ln; diff --git a/tests/sentinel/tests/08-hostname-conf.tcl b/tests/sentinel/tests/08-hostname-conf.tcl index be6e42cb090a3f73ec12ec23663c586fcc2645e8..8f9e237f19821e2b1734dffca220c798474775a2 100644 --- a/tests/sentinel/tests/08-hostname-conf.tcl +++ b/tests/sentinel/tests/08-hostname-conf.tcl @@ -1,3 +1,5 @@ +source "../tests/includes/init-tests.tcl" + proc set_redis_announce_ip {addr} { foreach_redis_id id { R $id config set replica-announce-ip $addr @@ -28,8 +30,6 @@ test "(pre-init) Configure instances and sentinel for hostname use" { set_sentinel_config announce-hostnames yes } -source "../tests/includes/init-tests.tcl" - proc verify_hostname_announced {hostname} { foreach_sentinel_id id { # Master is reported with its hostname diff --git a/tests/support/util.tcl b/tests/support/util.tcl index 93909469a8dec4469f1e361cc7e781a2155443aa..508d018f1ebab9504be33713709db8efc21e83d1 100644 --- a/tests/support/util.tcl +++ b/tests/support/util.tcl @@ -592,7 +592,7 @@ proc errorrstat {cmd r} { proc generate_fuzzy_traffic_on_key {key duration} { # Commands per type, blocking commands removed # TODO: extract these from help.h or elsewhere, and improve to include other types - set string_commands {APPEND BITCOUNT BITFIELD BITOP BITPOS DECR DECRBY GET GETBIT GETRANGE GETSET INCR INCRBY INCRBYFLOAT MGET MSET MSETNX PSETEX SET SETBIT SETEX SETNX SETRANGE STRALGO STRLEN} + set string_commands {APPEND BITCOUNT BITFIELD BITOP BITPOS DECR DECRBY GET GETBIT GETRANGE GETSET INCR INCRBY INCRBYFLOAT MGET MSET MSETNX PSETEX SET SETBIT SETEX SETNX SETRANGE LCS STRLEN} set hash_commands {HDEL HEXISTS HGET HGETALL HINCRBY HINCRBYFLOAT HKEYS HLEN HMGET HMSET HSCAN HSET HSETNX HSTRLEN HVALS HRANDFIELD} set zset_commands {ZADD ZCARD ZCOUNT ZINCRBY ZINTERSTORE ZLEXCOUNT ZPOPMAX ZPOPMIN ZRANGE ZRANGEBYLEX ZRANGEBYSCORE ZRANK ZREM ZREMRANGEBYLEX ZREMRANGEBYRANK ZREMRANGEBYSCORE ZREVRANGE ZREVRANGEBYLEX ZREVRANGEBYSCORE ZREVRANK ZSCAN ZSCORE ZUNIONSTORE ZRANDMEMBER} set list_commands {LINDEX LINSERT LLEN LPOP LPOS LPUSH LPUSHX LRANGE LREM LSET LTRIM RPOP RPOPLPUSH RPUSH RPUSHX} diff --git a/tests/test_helper.tcl b/tests/test_helper.tcl index e28b1d139df00fdc87dd1c63dda35f45e329643f..5bb8a57542557b25df34764129fd99256b45e48b 100644 --- a/tests/test_helper.tcl +++ b/tests/test_helper.tcl @@ -46,7 +46,7 @@ set ::all_tests { integration/aof integration/rdb integration/corrupt-dump - integration/corrupt-dump-fuzzer + # integration/corrupt-dump-fuzzer integration/convert-zipmap-hash-on-load integration/logging integration/psync2 diff --git a/tests/unit/type/string.tcl b/tests/unit/type/string.tcl index 43968b26b972ee82eaada562d9983c6d6cbde51d..307b1b30422b8dee350e9af22a2467bcc6b43368 100644 --- a/tests/unit/type/string.tcl +++ b/tests/unit/type/string.tcl @@ -549,29 +549,27 @@ start_server {tags {"string"}} { set rna2 {ATTAAAGGTTTATACCTTCCCAGGTAACAAACCAACCAACTTTCGATCTCTTGTAGATCTGTTCTCTAAACGAACTTTAAAATCTGTGTGGCTGTCACTCGGCTGCATGCTTAGTGCACTCACGCAGTATAATTAATAACTAATTACTGTCGTTGACAGGACACGAGTAACTCGTCTATCTTCTGCAGGCTGCTTACGGTTTCGTCCGTGTTGCAGCCGATCATCAGCACATCTAGGTTT} set rnalcs {ACCTTCCCAGGTAACAAACCAACCAACTTTCGATCTCTTGTAGATCTGTTCTCTAAACGAACTTTAAAATCTGTGTGGCTGTCACTCGGCTGCATGCTTAGTGCACTCACGCAGTATAATTAATAACTAATTACTGTCGTTGACAGGACACGAGTAACTCGTCTATCTTCTGCAGGCTGCTTACGGTTTCGTCCGTGTTGCAGCCGATCATCAGCACATCTAGGTTT} - test {STRALGO LCS string output with STRINGS option} { - r STRALGO LCS STRINGS $rna1 $rna2 + test {LCS basic} { + r set virus1{t} $rna1 + r set virus2{t} $rna2 + r LCS virus1{t} virus2{t} } $rnalcs - test {STRALGO LCS len} { - r STRALGO LCS LEN STRINGS $rna1 $rna2 + test {LCS len} { + r set virus1{t} $rna1 + r set virus2{t} $rna2 + r LCS virus1{t} virus2{t} LEN } [string length $rnalcs] - test {LCS with KEYS option} { - r set virus1 $rna1 - r set virus2 $rna2 - r STRALGO LCS KEYS virus1 virus2 - } $rnalcs - test {LCS indexes} { - dict get [r STRALGO LCS IDX KEYS virus1 virus2] matches + dict get [r LCS virus1{t} virus2{t} IDX] matches } {{{238 238} {239 239}} {{236 236} {238 238}} {{229 230} {236 237}} {{224 224} {235 235}} {{1 222} {13 234}}} test {LCS indexes with match len} { - dict get [r STRALGO LCS IDX KEYS virus1 virus2 WITHMATCHLEN] matches + dict get [r LCS virus1{t} virus2{t} IDX WITHMATCHLEN] matches } {{{238 238} {239 239} 1} {{236 236} {238 238} 1} {{229 230} {236 237} 2} {{224 224} {235 235} 1} {{1 222} {13 234} 222}} test {LCS indexes with match len and minimum match len} { - dict get [r STRALGO LCS IDX KEYS virus1 virus2 WITHMATCHLEN MINMATCHLEN 5] matches + dict get [r LCS virus1{t} virus2{t} IDX WITHMATCHLEN MINMATCHLEN 5] matches } {{{1 222} {13 234} 222}} } diff --git a/utils/find-best-worker-threads.sh b/utils/find-best-worker-threads.sh index 10dd4de4c90045d43e0a0ae9ee54e1168b498e7b..b7b9e09878bf1bbfa57fb0d8d733fd20dbcdcf26 100755 --- a/utils/find-best-worker-threads.sh +++ b/utils/find-best-worker-threads.sh @@ -1,6 +1,6 @@ #!/bin/bash -HOST=localhost +HOST=127.0.0.1 PORT=6379 CLIENTS=50 REQUESTS=5000 @@ -73,7 +73,7 @@ while [ $THREAD -le $MAXTHREADS ]; do echo "Failed to start openamdc server. Exiting." exit 1 else - echo "OpenAMDC server is already running, ../src/openamdc-server --port $PORT --worker-threads $THREAD > /dev/null 2> /dev/null &." + echo "OpenAMDC server is already running, ../src/openamdc-server --bind $HOST --port $PORT --worker-threads $THREAD > /dev/null 2> /dev/null &." fi echo "Starting benchmark, $benchmark -s $HOST -p $PORT -t $THREAD -c $CLIENTS -n $REQUESTS --hide-histogram --distinct-client-seed --command="set __key__ __data__" --key-prefix="kv_" --key-minimum=1 --key-maximum=10000 -R -d 128"