diff --git a/README.md b/README.md
index 63556d78b22f713e61a150b743454e8509663531..3a45409b315024c3faa32efdc109ef3d920c2a83 100644
--- a/README.md
+++ b/README.md
@@ -1,7 +1,7 @@
What is openAMDC?
-----------------
-openAMDC is an open source, high-performance, in-memory, key-value database which forks from Redis. It is compatible with all the commands, protocol and data structures that were originally available in Redis, making it feasible to utilize openAMDC as a direct substitute for Redis. The architecture of openAMDC is as follows:
+openAMDC is an open source, high-performance, in-memory, key-value database. It is compatible with all the commands, protocol and data structures that were originally available in Redis, making it feasible to utilize openAMDC as a direct substitute for Redis. The architecture of openAMDC is as follows:
@@ -199,7 +199,7 @@ You can use openamdc-cli to play with openAMDC. Start a openamdc-server instance
then in another terminal try the following:
% cd src
- % ./oepnamdc-cli
+ % ./openamdc-cli
openamdc> ping
PONG
openamdc> set foo bar
@@ -238,15 +238,15 @@ to run openAMDC properly as a background daemon that will start again on
system reboots.
You'll be able to stop and start openAMDC using the script named
-`/etc/init.d/openamdc_
`, for instance `/etc/init.d/oepnamdc_6379`.
+`/etc/init.d/openamdc_`, for instance `/etc/init.d/openamdc_6379`.
Code contributions
------------------
Note: Contribute code to the openAMDC project by sending a pull request via
-Gitee, you agree to release your code under the terms of the BSD license
+Gitee, you agree to release your code under the terms of the MulanPSLv2 license
that you can find in the COPYING file included in the openAMDC source
-distribution. You will include BSD license in the COPYING file within each
+distribution. You will include MulanPSLv2 license in the COPYING file within each
source file that you contribute.
1. If it is a major feature or a semantical change, create an [issue](.github/ISSUE_TEMPLATE/feature_request.md)
diff --git a/README.zh.md b/README.zh.md
index 727ec21cf8d9b245ad24aa63ffdd78573a918fef..4ffe3d0d9e16d35fe4c4dd61349d3a8ff30726e5 100644
--- a/README.zh.md
+++ b/README.zh.md
@@ -1,7 +1,7 @@
什么是openAMDC?
--------------
-openAMDC是一个开源且高性能的键值内存数据库,作为一个Redis分支其兼容所有Redis命令、协议以及数据结构,可以轻松使用openAMDC来替换已有的Redis服务。openAMDC架构如下:
+openAMDC是一个开源且高性能的键值内存数据库,兼容所有Redis命令、协议以及数据结构,可以轻松使用openAMDC来替换已有的Redis服务。openAMDC架构如下:
@@ -178,7 +178,7 @@ redis.conf中的所有选项也都可以作为命令行选项,名称完全相
您可以使用openamdc-cli来与openAMDC互动。启动一个openamdc-server实例,然后在另一个终端尝试以下操作:
% cd src
- % ./oepnamdc-cli
+ % ./openamdc-cli
openamdc> ping
PONG
openamdc> set foo bar
@@ -210,13 +210,13 @@ redis.conf中的所有选项也都可以作为命令行选项,名称完全相
该脚本会问你几个问题,并将设置好您需要的一切,以便将openAMDC正确地作为一个后台守护进程运行,该进程会在系统重启时再次启动。
-您将能够使用名为/etc/init.d/openamdc_
的脚本停止和启动openAMDC,例如/etc/init.d/oepnamdc_6379。
+您将能够使用名为/etc/init.d/openamdc_的脚本停止和启动openAMDC,例如/etc/init.d/openamdc_6379。
代码贡献
-------
-注意:通过Gitee发送拉取请求向openAMDC项目贡献代码,即表示您同意根据在openAMDC源代码分发中包含的COPYING文件中可找到的BSD许可
-条款发布您的代码。您将在您所贡献的每个源代码文件内的COPYING文件中包含BSD许可。
+注意:通过Gitee发送拉取请求向openAMDC项目贡献代码,即表示您同意根据在openAMDC源代码分发中包含的COPYING文件中可找到的MulanPSLv2许可
+条款发布您的代码。您将在您所贡献的每个源代码文件内的COPYING文件中包含MulanPSLv2许可。
1. 如果这是一个重要的特性或语义上的改变,请在Gitee上创建一个[问题](.github/ISSUE_TEMPLATE/feature_request.md),准确地描述你想要实现的内容以及原因,用例对于特性被接受很重要。
diff --git a/src/aof.c b/src/aof.c
index 0ce705cbc723d913512fd3d39ef5ba1a13521122..685c6b244baa23bb5541940c633ad952308f351b 100644
--- a/src/aof.c
+++ b/src/aof.c
@@ -718,7 +718,7 @@ struct client *createAOFClient(void) {
c->sockname = NULL;
c->resp = 2;
c->user = NULL;
- mutexInit(&c->lock, "fake client");
+ mutexInit(&c->lock, skipLock, "fake client");
listSetFreeMethod(c->reply,freeClientReplyValue);
listSetDupMethod(c->reply,dupClientReplyValue);
initClientMultiState(c);
diff --git a/src/blocked.c b/src/blocked.c
index 92cb68fa1b5bb4c52cc53c37645e463a6cf912e2..e06860c4a56f0b3dfd97cb2dfec19258596cde18 100644
--- a/src/blocked.c
+++ b/src/blocked.c
@@ -134,8 +134,9 @@ void queueClientForReprocessing(client *c) {
/* Unblock a client calling the right function depending on the kind
* of operation the client is blocking for. */
void unblockClient(client *c) {
+ WRAPPER_MUTEX_NOCLEANUP_LOCK(cl, &c->lock);
serverAssert(threadOwnLock());
- serverAssert(mutexOwnLock(&c->lock));
+ serverAssert(wrapperMutexOwnLock(&cl));
if (c->btype == BLOCKED_LIST ||
c->btype == BLOCKED_ZSET ||
c->btype == BLOCKED_STREAM) {
diff --git a/src/db.c b/src/db.c
index b60ac54ea555f7218afa3f36587f669d7a84137b..511240b168a9141f8f673b83e83da8065244d4cd 100644
--- a/src/db.c
+++ b/src/db.c
@@ -1814,31 +1814,6 @@ int georadiusGetKeys(struct redisCommand *cmd, robj **argv, int argc, getKeysRes
return num;
}
-/* LCS ... [KEYS ] ... */
-int lcsGetKeys(struct redisCommand *cmd, robj **argv, int argc, getKeysResult *result) {
- int i;
- int *keys = getKeysPrepareResult(result, 2);
- UNUSED(cmd);
-
- /* We need to parse the options of the command in order to check for the
- * "KEYS" argument before the "STRINGS" argument. */
- for (i = 1; i < argc; i++) {
- char *arg = argv[i]->ptr;
- int moreargs = (argc-1) - i;
-
- if (!strcasecmp(arg, "strings")) {
- break;
- } else if (!strcasecmp(arg, "keys") && moreargs >= 2) {
- keys[0] = i+1;
- keys[1] = i+2;
- result->numkeys = 2;
- return result->numkeys;
- }
- }
- result->numkeys = 0;
- return result->numkeys;
-}
-
/* Helper function to extract keys from memory command.
* MEMORY USAGE */
int memoryGetKeys(struct redisCommand *cmd, robj **argv, int argc, getKeysResult *result) {
diff --git a/src/debug.c b/src/debug.c
index 0dd806d6b0fd3776f22653c726dda0dd6665fbb4..b5b3f8b8513e7568556bc7139e5159ddb472be91 100644
--- a/src/debug.c
+++ b/src/debug.c
@@ -1718,7 +1718,8 @@ int memtest_test_linux_anonymous_maps(void) {
static void killMainAndWorkerThreads(void) {
int err;
for (int iel = 0; iel < MAX_THREAD_VAR; iel++) {
- if (iel < server.worker_threads_num || iel == MODULE_THREAD_ID) {
+ if (iel < server.worker_threads_num ||
+ (iel == MODULE_THREAD_ID && server.worker_threads_num > 1)) {
if (server.thread[iel] != pthread_self()) {
pthread_cancel(server.thread[iel]);
}
diff --git a/src/help.h b/src/help.h
index bbcdd6732cf2a673cf97a0e9f60b0faa2e68e6e0..62042a043cbf2947d49621601b30a07015d04be0 100644
--- a/src/help.h
+++ b/src/help.h
@@ -1145,8 +1145,8 @@ struct commandHelp {
"Incrementally iterate Set elements",
3,
"2.8.0" },
- { "STRALGO",
- "LCS algo-specific-argument [algo-specific-argument ...]",
+ { "LCS",
+ "key1 key2 [LEN] [IDX] [MINMATCHLEN ] [WITHMATCHLEN]",
"Run algorithms (currently LCS) against strings",
1,
"6.0.0" },
diff --git a/src/mutex.c b/src/mutex.c
index 52f7625533e0a5f56609303de19d081d6b452f8c..d2e0269022cb850fd55f542807aba0cc03ac2097 100644
--- a/src/mutex.c
+++ b/src/mutex.c
@@ -13,11 +13,12 @@
#include "mutex.h"
/* Initializes a mutex structure with specified attributes */
-int mutexInit(struct mutex *m, char *name) {
+int mutexInit(struct mutex *m, mutexSkipLock *skipLock, char *name) {
int ret = 0;
m->name = name;
m->depth = 0;
m->owner = 0;
+ m->skipLock = skipLock;
/* Initialize mutex attribute object */
if ((ret = pthread_mutexattr_init(&m->attr)) != 0) return ret;
@@ -143,7 +144,8 @@ int mutexOwnLock(struct mutex *m) {
/* Wraps a mutex lock operation, tracking lock depth for a wrapper
* mutex structure */
int wrapperMutexLock(struct wrapperMutex *wm) {
- assert(wm->lock);
+ if (wm->lock == NULL) return 0;
+ if (wm->lock->skipLock != NULL && wm->lock->skipLock()) return 0;
int ret = mutexLock(wm->lock);
if (ret == 0)
wm->depth++;
@@ -152,7 +154,8 @@ int wrapperMutexLock(struct wrapperMutex *wm) {
/* Attempts to lock a mutex, wrapped for tracking lock depth */
int wrapperMutexTryLock(struct wrapperMutex *wm) {
- assert(wm->lock);
+ if (wm->lock == NULL) return 0;
+ if (wm->lock->skipLock != NULL && wm->lock->skipLock()) return 0;
int ret = mutexTryLock(wm->lock);
if (ret == 0)
wm->depth++;
@@ -166,6 +169,7 @@ int wrapperMutexUnlock(struct wrapperMutex *wm) {
int ret = 0;
if (wm->lock == NULL || wm->depth == 0)
return ret;
+ if (wm->lock->skipLock != NULL && wm->lock->skipLock()) return ret;
ret = mutexUnlock(wm->lock);
if (ret == 0)
wm->depth--;
@@ -174,6 +178,7 @@ int wrapperMutexUnlock(struct wrapperMutex *wm) {
/* Determines if the current thread owns the lock of a wrapper mutex */
int wrapperMutexOwnLock(struct wrapperMutex *wm) {
- assert(wm->lock);
+ if (wm->lock == NULL) return 1;
+ if (wm->lock->skipLock != NULL && wm->lock->skipLock()) return 1;
return mutexOwnLock(wm->lock);
}
\ No newline at end of file
diff --git a/src/mutex.h b/src/mutex.h
index 4bcdf30f0e407306e11cae174f0da4cceabf8778..10eb9d641329b20b2a74c10b00e16c19ba1e598c 100644
--- a/src/mutex.h
+++ b/src/mutex.h
@@ -17,15 +17,18 @@
#include
#include
+typedef int (mutexSkipLock)();
+
struct mutex {
char *name;
int depth;
pthread_t owner;
pthread_mutexattr_t attr;
pthread_mutex_t mutex;
+ mutexSkipLock *skipLock;
};
-int mutexInit(struct mutex *m, char *name);
+int mutexInit(struct mutex *m, mutexSkipLock *skipLock, char *name);
int mutexLock(struct mutex *m);
int mutexTryLock(struct mutex *m);
int mutexUnlock(struct mutex *m);
@@ -37,11 +40,20 @@ struct wrapperMutex {
int depth;
};
-#define WRAPPER_MUTEX_DEFINE(v) __attribute__((__cleanup__(wrapperMutexUnlock))) struct wrapperMutex (v) = {.lock = NULL, .depth = 0}
-#define WRAPPER_MUTEX_DEFER_LOCK(v, l) __attribute__((__cleanup__(wrapperMutexUnlock))) struct wrapperMutex (v) = {.lock = (l), .depth = 0}
-#define WRAPPER_MUTEX_LOCK(v, l) \
- __attribute__((__cleanup__(wrapperMutexUnlock))) struct wrapperMutex (v) = {.lock = (l), .depth = 0}; \
- wrapperMutexLock(&(v))
+#define WRAPPER_MUTEX_DEFINE(v) \
+ __attribute__((__cleanup__(wrapperMutexUnlock))) struct wrapperMutex (v) = {.lock = NULL, .depth = 0}
+
+#define WRAPPER_MUTEX_DEFER_LOCK(v, l) \
+ __attribute__((__cleanup__(wrapperMutexUnlock))) struct wrapperMutex (v) = {.lock = (l), .depth = 0}
+
+#define WRAPPER_MUTEX_LOCK(v, l) \
+ __attribute__((__cleanup__(wrapperMutexUnlock))) struct wrapperMutex (v) = {.lock = (l), .depth = 0}; wrapperMutexLock(&(v))
+
+#define WRAPPER_MUTEX_NOCLEANUP_LOCK(v, l) \
+ struct wrapperMutex (v) = {.lock = (l), .depth = 0}; wrapperMutexLock(&(v))
+
+#define WRAPPER_MUTEX_NOCLEANUP_DEFINE(v, l) \
+ struct wrapperMutex (v) = {.lock = (l), .depth = 1};
int wrapperMutexLock(struct wrapperMutex *wm);
int wrapperMutexTryLock(struct wrapperMutex *wm);
diff --git a/src/networking.c b/src/networking.c
index e9bd76b93da93f23774458a42168a1b07fd1ed2a..4ea5b8bc5fe9298f1c0ccb6160661059789e58f8 100644
--- a/src/networking.c
+++ b/src/networking.c
@@ -127,7 +127,7 @@ client *createClient(connection *conn, int iel) {
c->resp = 2;
c->iel = iel;
c->conn = conn;
- mutexInit(&c->lock, "client lock");
+ mutexInit(&c->lock, skipLock, "client lock");
c->name = NULL;
c->bufpos = 0;
c->qb_pos = 0;
@@ -369,8 +369,9 @@ void clientInstallWriteHandler(client *c) {
* loop, we can try to directly write to the client sockets avoiding
* a system call. We'll only really install the write handler if
* we'll not be able to write the whole reply at once. */
+ WRAPPER_MUTEX_NOCLEANUP_DEFINE(cl, &c->lock);
+ serverAssert(wrapperMutexOwnLock(&cl));
serverAssert(clientCheckThreadAligned(c));
- serverAssert(mutexOwnLock(&c->lock));
c->flags |= CLIENT_PENDING_WRITE;
WRAPPER_MUTEX_LOCK(pwl, &server.pending_write_lock[c->iel]);
listAddNodeHead(server.pending_write_list[c->iel],c);
@@ -413,7 +414,8 @@ int prepareClientToWrite(client *c) {
if (async) {
serverAssert(threadOwnLock());
} else {
- serverAssert(c->conn == NULL || mutexOwnLock(&c->lock));
+ WRAPPER_MUTEX_NOCLEANUP_DEFINE(cl, &c->lock);
+ serverAssert(c->conn == NULL || wrapperMutexOwnLock(&cl));
}
/* If it's the Lua client we always return ok without installing any
@@ -1515,9 +1517,10 @@ int anyOtherSlaveWaitRdb(client *except_me) {
* be referenced, not including the Pub/Sub channels.
* This is used by freeClient() and replicationCacheMaster(). */
void unlinkClient(client *c) {
+ WRAPPER_MUTEX_NOCLEANUP_DEFINE(cl, &c->lock);
serverAssert(clientCheckThreadAligned(c));
serverAssert(c->conn == NULL || threadOwnLock());
- serverAssert(c->conn == NULL || mutexOwnLock(&c->lock));
+ serverAssert(c->conn == NULL || wrapperMutexOwnLock(&cl));
listIter li;
listNode *ln;
@@ -1579,7 +1582,8 @@ void unlinkClient(client *c) {
ln = NULL;
int found = 0;
for (int iel = 0; iel < MAX_THREAD_VAR; ++iel) {
- if (iel < server.worker_threads_num || iel == MODULE_THREAD_ID) {
+ if (iel < server.worker_threads_num ||
+ (iel == MODULE_THREAD_ID && server.worker_threads_num > 1)) {
ln = listSearchKey(server.async_pending_write_list[iel], c);
if (ln) {
found = 1;
@@ -1603,18 +1607,17 @@ void unlinkClient(client *c) {
if (c->flags & CLIENT_TRACKING) disableTracking(c);
}
-int freeClient(client *c) {
+void freeClient(client *c) {
serverAssert(c->conn == NULL || threadOwnLock());
serverAssert(clientCheckThreadAligned(c));
- mutexLock(&c->lock);
+ WRAPPER_MUTEX_LOCK(wl, &c->lock);
listNode *ln;
/* If a client is protected, yet we need to free it right now, make sure
* to at least use asynchronous freeing. */
if (c->flags & CLIENT_PROTECTED || c->async_ops) {
freeClientAsync(c);
- mutexUnlock(&c->lock);
- return 0;
+ return;
}
/* For connected clients, call the disconnection event of modules hooks. */
@@ -1648,8 +1651,7 @@ int freeClient(client *c) {
if (!(c->flags & (CLIENT_PROTOCOL_ERROR|CLIENT_BLOCKED))) {
c->flags &= ~(CLIENT_CLOSE_ASAP|CLIENT_CLOSE_AFTER_REPLY);
replicationCacheMaster(c);
- mutexUnlock(&c->lock);
- return 0;
+ return;
}
}
@@ -1747,10 +1749,9 @@ int freeClient(client *c) {
sdsfree(c->peerid);
sdsfree(c->sockname);
sdsfree(c->slave_addr);
- mutexUnlock(&c->lock);
+ wrapperMutexUnlock(&wl);
mutexDestroy(&c->lock);
zfree(c);
- return 1;
}
/* Schedule a client to free it at a safe time in the serverCron() function.
@@ -3837,16 +3838,15 @@ void processEventsWhileBlocked(int iel) {
listRewind(server.clients, &li);
while((ln = listNext(&li)) != NULL) {
client *c = listNodeValue(ln);
- if (mutexOwnLock(&c->lock)) {
- serverAssert(c->flags & CLIENT_PROTECTED ||
- c->flags & CLIENT_INPROGRESS_COMMAND);
- mutexUnlock(&c->lock);
+ WRAPPER_MUTEX_NOCLEANUP_DEFINE(cl, &c->lock);
+ if (wrapperMutexOwnLock(&cl)) {
+ wrapperMutexUnlock(&cl);
listAddNodeTail(clients, c);
}
}
- mutexUnlock(&globalLock);
- serverAssert(!threadOwnLock());
+ WRAPPER_MUTEX_NOCLEANUP_DEFINE(wl, &globalLock);
+ wrapperMutexUnlock(&wl);
{
int iterations = 4; /* See the function top-comment. */
@@ -3877,12 +3877,11 @@ void processEventsWhileBlocked(int iel) {
ProcessingEventsWhileBlocked = 0;
}
- struct wrapperMutex wm = {&globalLock};
- wrapperMutexLock(&wm);
+ WRAPPER_MUTEX_NOCLEANUP_LOCK(gl, &globalLock);
listRewind(clients, &li);
while((ln = listNext(&li)) != NULL) {
client *c = listNodeValue(ln);
- mutexLock(&c->lock);
+ WRAPPER_MUTEX_NOCLEANUP_LOCK(cl, &c->lock);
}
listRelease(clients);
}
diff --git a/src/openamdc-benchmark.c b/src/openamdc-benchmark.c
index 9a7613fa83cc710e71c3064388d338cb60a02dda..18646b3a77b9af31a272d083d66560a93744c98f 100644
--- a/src/openamdc-benchmark.c
+++ b/src/openamdc-benchmark.c
@@ -1674,7 +1674,7 @@ int main(int argc, const char **argv) {
client c;
- mutexInit(&globalLock, "global lock");
+ mutexInit(&globalLock, NULL, "global lock");
srandom(time(NULL) ^ getpid());
init_genrand64(ustime() ^ getpid());
signal(SIGHUP, SIG_IGN);
diff --git a/src/pubsub.c b/src/pubsub.c
index 7c404090362b81aa942c39fe1f20a237d27de97d..ed13debf756c6e39decfa39f59f344e521f209f9 100644
--- a/src/pubsub.c
+++ b/src/pubsub.c
@@ -134,8 +134,9 @@ int clientSubscriptionsCount(client *c) {
/* Subscribe a client to a channel. Returns 1 if the operation succeeded, or
* 0 if the client was already subscribed to that channel. */
int pubsubSubscribeChannel(client *c, robj *channel) {
+ WRAPPER_MUTEX_NOCLEANUP_DEFINE(cl, &c->lock);
serverAssert(threadOwnLock());
- serverAssert(mutexOwnLock(&c->lock));
+ serverAssert(wrapperMutexOwnLock(&cl));
dictEntry *de;
list *clients = NULL;
int retval = 0;
diff --git a/src/scripting.c b/src/scripting.c
index e317e963b450f2c38086f6cf65de50175beb007c..a40356d7966f229efee4026bd2e0dbffa95ec6f6 100644
--- a/src/scripting.c
+++ b/src/scripting.c
@@ -589,14 +589,13 @@ int luaRedisGenericCommand(lua_State *lua, int raise_error) {
}
inuse++;
- mutexLock(&c->lock);
+ WRAPPER_MUTEX_LOCK(wl, &c->lock);
/* Require at least one argument */
if (argc == 0) {
luaPushError(lua,
"Please specify at least one argument for redis.call()");
inuse--;
- mutexUnlock(&c->lock);
return raise_error ? luaRaiseError(lua) : 1;
}
@@ -916,11 +915,9 @@ cleanup:
* form of a table with an "err" field. Extract the string to
* return the plain error. */
inuse--;
- mutexUnlock(&c->lock);
return luaRaiseError(lua);
}
inuse--;
- mutexUnlock(&c->lock);
return 1;
}
diff --git a/src/sentinel.c b/src/sentinel.c
index 10c8145b179f97ba4e07c8dd287207a092b0ae9f..234525640f1acd3b1ab792e2b630edfbb66053c5 100644
--- a/src/sentinel.c
+++ b/src/sentinel.c
@@ -874,10 +874,12 @@ void sentinelRunPendingScripts(void) {
} else if (pid == 0) {
/* Child */
tlsCleanup();
- for (int iel = 0; iel < server.worker_threads_num; iel++) {
- aeClosePipes(server.el[iel]);
+ for (int iel = 0; iel < MAX_THREAD_VAR; iel++) {
+ if (iel < server.worker_threads_num ||
+ (iel == MODULE_THREAD_ID && server.worker_threads_num > 1)) {
+ aeClosePipes(server.el[iel]);
+ }
}
- aeClosePipes(server.el[MODULE_THREAD_ID]);
execve(sj->argv[0],sj->argv,environ);
/* If we are here an error occurred. */
_exit(2); /* Don't retry execution. */
diff --git a/src/server.c b/src/server.c
index d9a86cd670ad2d635aac7067371f0a26fd7fd85c..80a487fe33df3b45e6508404cfe33e93e747291a 100644
--- a/src/server.c
+++ b/src/server.c
@@ -1083,9 +1083,9 @@ struct redisCommand redisCommandTable[] = {
"admin no-script ok-loading ok-stale",
0,NULL,0,0,0,0,0,0},
- {"stralgo",stralgoCommand,-2,
+ {"lcs",lcsCommand,-3,
"read-only @string",
- 0,lcsGetKeys,0,0,0,0,0,0},
+ 0,NULL,1,2,1,0,0,0},
{"reset",resetCommand,1,
"no-script ok-stale ok-loading fast @connection",
@@ -1823,7 +1823,7 @@ void clientsCron(int iel) {
head = listFirst(server.clients);
c = listNodeValue(head);
if (c->iel == iel) {
- mutexLock(&c->lock);
+ WRAPPER_MUTEX_NOCLEANUP_LOCK(cl, &c->lock);
/* The following functions do different service checks on the client.
* The protocol is that they return non-zero if the client was
* terminated. */
@@ -1833,7 +1833,7 @@ void clientsCron(int iel) {
if (clientsCronTrackClientsMemUsage(c)) goto unlock;
if (closeClientOnOutputBufferLimitReached(c, 0)) continue;
unlock:
- mutexUnlock(&c->lock);
+ wrapperMutexUnlock(&cl);
}
}
@@ -2037,9 +2037,10 @@ void asyncHandleClientsWithPendingWrites(void *var) {
/* Install a write handler for the client to manage future write events */
clientInstallWriteHandler(c);
/* Process the pending write tasks for the current thread */
- mutexUnlock(&c->lock);
+ WRAPPER_MUTEX_NOCLEANUP_DEFINE(cl, &c->lock);
+ wrapperMutexUnlock(&cl);
handleClientsWithPendingWrites(c->iel);
- mutexLock(&c->lock);
+ wrapperMutexLock(&cl);
}
/*
@@ -2825,7 +2826,7 @@ void initServerConfig(void) {
server.repl_backlog_idx = 0;
server.repl_backlog_off = 0;
server.repl_no_slaves_since = time(NULL);
- mutexInit(&replBacklogLock, "repl backlog lock");
+ mutexInit(&replBacklogLock, skipLock, "repl backlog lock");
/* Failover related */
server.failover_end_time = 0;
@@ -3431,7 +3432,8 @@ void initServer(void) {
server.worker_threads_num = server.worker_threads_num == 0 ? 1 : server.worker_threads_num;
for (int iel = 0; iel < MAX_THREAD_VAR; iel++) {
- if (iel < server.worker_threads_num || iel == MODULE_THREAD_ID) {
+ if (iel < server.worker_threads_num ||
+ (iel == MODULE_THREAD_ID && server.worker_threads_num > 1)) {
server.unblocked_clients[iel] = listCreate();
server.pending_write_list[iel] = listCreate();
server.client_handling_list[iel] = listCreate();
@@ -3457,7 +3459,7 @@ void initServer(void) {
}
aeSetBeforeSleepProc(server.el[iel], beforeSleep);
aeSetAfterSleepProc(server.el[iel], afterSleep);
- mutexInit(&server.pending_write_lock[iel], "pending write lock");
+ mutexInit(&server.pending_write_lock[iel], skipLock, "pending write lock");
/* Create the timer callback, this is our way to process many background
* operations incrementally, like clients timeout, eviction of unaccessed
@@ -4606,7 +4608,8 @@ int prepareForShutdown(int flags) {
* calls aeAsyncFunction with asyncAeEventStop, passing the current thread
* index as an argument. */
for (int iel = 0; iel < MAX_THREAD_VAR; iel++) {
- if (iel < server.worker_threads_num || iel == MODULE_THREAD_ID) {
+ if (iel < server.worker_threads_num ||
+ (iel == MODULE_THREAD_ID && server.worker_threads_num > 1)) {
void *var = (void*)((int64_t)iel);
aeAsyncFunction(server.el[iel], asyncAeEventStop, var, NULL, 1);
}
@@ -6091,7 +6094,8 @@ void closeChildUnusedResourceAfterFork() {
if (server.cluster_enabled && server.cluster_config_file_lock_fd != -1)
close(server.cluster_config_file_lock_fd); /* don't care if this fails */
for (int iel = 0; iel < MAX_THREAD_VAR; iel++) {
- if (iel < server.worker_threads_num || iel == MODULE_THREAD_ID)
+ if (iel < server.worker_threads_num ||
+ (iel == MODULE_THREAD_ID && server.worker_threads_num > 1))
aeClosePipes(server.el[iel]);
}
@@ -6382,7 +6386,12 @@ int iAmMaster(void) {
}
int threadOwnLock() {
- return mutexOwnLock(&globalLock);
+ WRAPPER_MUTEX_NOCLEANUP_DEFINE(wl, &globalLock);
+ return wrapperMutexOwnLock(&wl);
+}
+
+int skipLock() {
+ return server.worker_threads_num == 1;
}
#ifdef REDIS_TEST
@@ -6459,9 +6468,6 @@ int main(int argc, char **argv) {
return 0;
}
#endif
- /* Init lock */
- mutexInit(&globalLock, "global lock");
- mutexLock(&globalLock);
/* We need to initialize our libraries, and the server configuration. */
#ifdef INIT_SETPROCTITLE_REPLACEMENT
@@ -6597,6 +6603,10 @@ int main(int argc, char **argv) {
readOOMScoreAdj();
initServer();
+ /* Init lock */
+ mutexInit(&globalLock, skipLock, "global lock");
+ WRAPPER_MUTEX_NOCLEANUP_LOCK(wl, &globalLock);
+
if (background || server.pidfile) createPidFile();
if (server.set_proc_title) redisSetProcTitle(NULL);
redisAsciiArt();
@@ -6667,16 +6677,17 @@ int main(int argc, char **argv) {
redisSetCpuAffinity(server.server_cpulist);
setOOMScoreAdj(-1);
- mutexUnlock(&globalLock);
+ wrapperMutexUnlock(&wl);
moduleReleaseGIL();
pthread_attr_t tattr;
pthread_attr_init(&tattr);
pthread_attr_setstacksize(&tattr, 1 << 23);
- pthread_create(server.thread + MODULE_THREAD_ID, &tattr, moduleThread, (void *)((int64_t)MODULE_THREAD_ID));
for (int iel = 0; iel < server.worker_threads_num; ++iel) {
pthread_create(server.thread + iel, &tattr, workerThread, (void *)((int64_t)iel));
}
+ if (server.worker_threads_num > 1)
+ pthread_create(server.thread + MODULE_THREAD_ID, &tattr, moduleThread, (void *)((int64_t)MODULE_THREAD_ID));
/* Block SIGALRM from this thread, it should only be received on a worker thread */
sigset_t sigset;
@@ -6685,10 +6696,10 @@ int main(int argc, char **argv) {
pthread_sigmask(SIG_BLOCK, &sigset, NULL);
/* The worker/module thread sleeps until all the workers are done. */
- pthread_join(server.thread[MODULE_THREAD_ID], NULL);
for (int iel = 0; iel < server.worker_threads_num; ++iel)
pthread_join(server.thread[iel], NULL);
-
+ if (server.worker_threads_num > 1)
+ pthread_join(server.thread[MODULE_THREAD_ID], NULL);
return 0;
}
diff --git a/src/server.h b/src/server.h
index b8d41ed7209df601e7e559ef36932dcad38b0a16..f1359a4375f40b9434ee79460979564ebeb84e3d 100644
--- a/src/server.h
+++ b/src/server.h
@@ -1836,7 +1836,7 @@ void parsedRelease(void *ptr);
/* networking.c -- Networking and Client related operations */
client *createClient(connection *conn, int iel);
void closeTimedoutClients(void);
-int freeClient(client *c);
+void freeClient(client *c);
void freeClientAsync(client *c);
void resetClient(client *c);
void freeClientOriginalArgv(client *c);
@@ -2444,7 +2444,6 @@ int migrateGetKeys(struct redisCommand *cmd, robj **argv, int argc, getKeysResul
int georadiusGetKeys(struct redisCommand *cmd, robj **argv, int argc, getKeysResult *result);
int xreadGetKeys(struct redisCommand *cmd, robj **argv, int argc, getKeysResult *result);
int memoryGetKeys(struct redisCommand *cmd, robj **argv, int argc, getKeysResult *result);
-int lcsGetKeys(struct redisCommand *cmd, robj **argv, int argc, getKeysResult *result);
/* Cluster */
void clusterInit(void);
@@ -2744,7 +2743,7 @@ void xdelCommand(client *c);
void xtrimCommand(client *c);
void lolwutCommand(client *c);
void aclCommand(client *c);
-void stralgoCommand(client *c);
+void lcsCommand(client *c);
void resetCommand(client *c);
void failoverCommand(client *c);
@@ -2803,5 +2802,6 @@ int tlsConfigure(redisTLSContextConfig *ctx_config);
int iAmMaster(void);
void aeClosePipes(aeEventLoop *eventLoop);
int threadOwnLock();
+int skipLock();
#endif
diff --git a/src/t_string.c b/src/t_string.c
index 29bf80e111ffdef075a656e0ec7902e32f4b62d7..8f455a4c31b0ecb9d28706b0c357b6ea92d5cf54 100644
--- a/src/t_string.c
+++ b/src/t_string.c
@@ -387,12 +387,9 @@ void getexCommand(client *c) {
void getdelCommand(client *c) {
if (getGenericCommand(c) == C_ERR) return;
- int deleted = server.lazyfree_lazy_user_del ? dbAsyncDelete(c->db, c->argv[1]) :
- dbSyncDelete(c->db, c->argv[1]);
- if (deleted) {
- /* Propagate as DEL/UNLINK command */
- robj *aux = server.lazyfree_lazy_user_del ? shared.unlink : shared.del;
- rewriteClientCommandVector(c,2,aux,c->argv[1]);
+ if (dbSyncDelete(c->db, c->argv[1])) {
+ /* Propagate as DEL command */
+ rewriteClientCommandVector(c,2,shared.del,c->argv[1]);
signalModifiedKey(c, c->db, c->argv[1]);
notifyKeyspaceEvent(NOTIFY_GENERIC, "del", c->argv[1], c->db->id);
server.dirty++;
@@ -598,9 +595,7 @@ void incrDecrCommand(client *c, long long incr) {
signalModifiedKey(c,c->db,c->argv[1]);
notifyKeyspaceEvent(NOTIFY_STRING,"incrby",c->argv[1],c->db->id);
server.dirty++;
- addReply(c,shared.colon);
- addReply(c,new);
- addReply(c,shared.crlf);
+ addReplyLongLong(c, value);
}
void incrCommand(client *c) {
@@ -622,6 +617,11 @@ void decrbyCommand(client *c) {
long long incr;
if (getLongLongFromObjectOrReply(c, c->argv[2], &incr, NULL) != C_OK) return;
+ /* Overflow check: negating LLONG_MIN will cause an overflow */
+ if (incr == LLONG_MIN) {
+ addReplyError(c, "decrement would overflow");
+ return;
+ }
incrDecrCommand(c,-incr);
}
@@ -697,31 +697,33 @@ void strlenCommand(client *c) {
addReplyLongLong(c,stringObjectLen(o));
}
-
-/* STRALGO -- Implement complex algorithms on strings.
- *
- * STRALGO ... arguments ... */
-void stralgoLCS(client *c); /* This implements the LCS algorithm. */
-void stralgoCommand(client *c) {
- /* Select the algorithm. */
- if (!strcasecmp(c->argv[1]->ptr,"lcs")) {
- stralgoLCS(c);
- } else {
- addReplyErrorObject(c,shared.syntaxerr);
- }
-}
-
-/* STRALGO [IDX] [LEN] [MINMATCHLEN ] [WITHMATCHLEN]
- * STRINGS | KEYS
- */
-void stralgoLCS(client *c) {
+/* LCS key1 key2 [LEN] [IDX] [MINMATCHLEN ] [WITHMATCHLEN] */
+void lcsCommand(client *c) {
uint32_t i, j;
long long minmatchlen = 0;
sds a = NULL, b = NULL;
int getlen = 0, getidx = 0, withmatchlen = 0;
robj *obja = NULL, *objb = NULL;
- for (j = 2; j < (uint32_t)c->argc; j++) {
+ obja = lookupKeyRead(c->db,c->argv[1]);
+ objb = lookupKeyRead(c->db,c->argv[2]);
+ if ((obja && obja->type != OBJ_STRING) ||
+ (objb && objb->type != OBJ_STRING))
+ {
+ addReplyError(c,
+ "The specified keys must contain string values");
+ /* Don't cleanup the objects, we need to do that
+ * only after calling getDecodedObject(). */
+ obja = NULL;
+ objb = NULL;
+ goto cleanup;
+ }
+ obja = obja ? getDecodedObject(obja) : createStringObject("",0);
+ objb = objb ? getDecodedObject(objb) : createStringObject("",0);
+ a = obja->ptr;
+ b = objb->ptr;
+
+ for (j = 3; j < (uint32_t)c->argc; j++) {
char *opt = c->argv[j]->ptr;
int moreargs = (c->argc-1) - j;
@@ -736,37 +738,6 @@ void stralgoLCS(client *c) {
!= C_OK) goto cleanup;
if (minmatchlen < 0) minmatchlen = 0;
j++;
- } else if (!strcasecmp(opt,"STRINGS") && moreargs > 1) {
- if (a != NULL) {
- addReplyError(c,"Either use STRINGS or KEYS");
- goto cleanup;
- }
- a = c->argv[j+1]->ptr;
- b = c->argv[j+2]->ptr;
- j += 2;
- } else if (!strcasecmp(opt,"KEYS") && moreargs > 1) {
- if (a != NULL) {
- addReplyError(c,"Either use STRINGS or KEYS");
- goto cleanup;
- }
- obja = lookupKeyRead(c->db,c->argv[j+1]);
- objb = lookupKeyRead(c->db,c->argv[j+2]);
- if ((obja && obja->type != OBJ_STRING) ||
- (objb && objb->type != OBJ_STRING))
- {
- addReplyError(c,
- "The specified keys must contain string values");
- /* Don't cleanup the objects, we need to do that
- * only after calling getDecodedObject(). */
- obja = NULL;
- objb = NULL;
- goto cleanup;
- }
- obja = obja ? getDecodedObject(obja) : createStringObject("",0);
- objb = objb ? getDecodedObject(objb) : createStringObject("",0);
- a = obja->ptr;
- b = objb->ptr;
- j += 2;
} else {
addReplyErrorObject(c,shared.syntaxerr);
goto cleanup;
@@ -774,14 +745,9 @@ void stralgoLCS(client *c) {
}
/* Complain if the user passed ambiguous parameters. */
- if (a == NULL) {
- addReplyError(c,"Please specify two strings: "
- "STRINGS or KEYS options are mandatory");
- goto cleanup;
- } else if (getlen && getidx) {
+ if (getlen && getidx) {
addReplyError(c,
- "If you want both the length and indexes, please "
- "just use IDX.");
+ "If you want both the length and indexes, please just use IDX.");
goto cleanup;
}
@@ -798,17 +764,22 @@ void stralgoLCS(client *c) {
/* Setup an uint32_t array to store at LCS[i,j] the length of the
* LCS A0..i-1, B0..j-1. Note that we have a linear array here, so
- * we index it as LCS[j+(blen+1)*j] */
+ * we index it as LCS[j+(blen+1)*i] */
#define LCS(A,B) lcs[(B)+((A)*(blen+1))]
/* Try to allocate the LCS table, and abort on overflow or insufficient memory. */
unsigned long long lcssize = (unsigned long long)(alen+1)*(blen+1); /* Can't overflow due to the size limits above. */
unsigned long long lcsalloc = lcssize * sizeof(uint32_t);
uint32_t *lcs = NULL;
- if (lcsalloc < SIZE_MAX && lcsalloc / lcssize == sizeof(uint32_t))
+ if (lcsalloc < SIZE_MAX && lcsalloc / lcssize == sizeof(uint32_t)) {
+ if (lcsalloc > (size_t)server.proto_max_bulk_len) {
+ addReplyError(c, "Insufficient memory, transient memory for LCS exceeds proto-max-bulk-len");
+ goto cleanup;
+ }
lcs = ztrymalloc(lcsalloc);
+ }
if (!lcs) {
- addReplyError(c, "Insufficient memory");
+ addReplyError(c, "Insufficient memory, failed allocating transient memory for LCS");
goto cleanup;
}
@@ -840,7 +811,7 @@ void stralgoLCS(client *c) {
* it backward, but the length is already known, we store it into idx. */
uint32_t idx = LCS(alen,blen);
sds result = NULL; /* Resulting LCS string. */
- void *arraylenptr = NULL; /* Deffered length of the array for IDX. */
+ void *arraylenptr = NULL; /* Deferred length of the array for IDX. */
uint32_t arange_start = alen, /* alen signals that values are not set. */
arange_end = 0,
brange_start = 0,
diff --git a/src/tls.c b/src/tls.c
index cdf89957488ebf9b7a6518dc4bd1d59ab92b3982..b3c057d06a4ba746b501e9b7fa5da1648f2592c5 100644
--- a/src/tls.c
+++ b/src/tls.c
@@ -1035,7 +1035,6 @@ int tlsHasPendingData() {
}
int tlsProcessPendingData() {
- serverAssert(!threadOwnLock());
listIter li;
listNode *ln;
diff --git a/tests/sentinel/tests/08-hostname-conf.tcl b/tests/sentinel/tests/08-hostname-conf.tcl
index be6e42cb090a3f73ec12ec23663c586fcc2645e8..8f9e237f19821e2b1734dffca220c798474775a2 100644
--- a/tests/sentinel/tests/08-hostname-conf.tcl
+++ b/tests/sentinel/tests/08-hostname-conf.tcl
@@ -1,3 +1,5 @@
+source "../tests/includes/init-tests.tcl"
+
proc set_redis_announce_ip {addr} {
foreach_redis_id id {
R $id config set replica-announce-ip $addr
@@ -28,8 +30,6 @@ test "(pre-init) Configure instances and sentinel for hostname use" {
set_sentinel_config announce-hostnames yes
}
-source "../tests/includes/init-tests.tcl"
-
proc verify_hostname_announced {hostname} {
foreach_sentinel_id id {
# Master is reported with its hostname
diff --git a/tests/support/util.tcl b/tests/support/util.tcl
index 93909469a8dec4469f1e361cc7e781a2155443aa..508d018f1ebab9504be33713709db8efc21e83d1 100644
--- a/tests/support/util.tcl
+++ b/tests/support/util.tcl
@@ -592,7 +592,7 @@ proc errorrstat {cmd r} {
proc generate_fuzzy_traffic_on_key {key duration} {
# Commands per type, blocking commands removed
# TODO: extract these from help.h or elsewhere, and improve to include other types
- set string_commands {APPEND BITCOUNT BITFIELD BITOP BITPOS DECR DECRBY GET GETBIT GETRANGE GETSET INCR INCRBY INCRBYFLOAT MGET MSET MSETNX PSETEX SET SETBIT SETEX SETNX SETRANGE STRALGO STRLEN}
+ set string_commands {APPEND BITCOUNT BITFIELD BITOP BITPOS DECR DECRBY GET GETBIT GETRANGE GETSET INCR INCRBY INCRBYFLOAT MGET MSET MSETNX PSETEX SET SETBIT SETEX SETNX SETRANGE LCS STRLEN}
set hash_commands {HDEL HEXISTS HGET HGETALL HINCRBY HINCRBYFLOAT HKEYS HLEN HMGET HMSET HSCAN HSET HSETNX HSTRLEN HVALS HRANDFIELD}
set zset_commands {ZADD ZCARD ZCOUNT ZINCRBY ZINTERSTORE ZLEXCOUNT ZPOPMAX ZPOPMIN ZRANGE ZRANGEBYLEX ZRANGEBYSCORE ZRANK ZREM ZREMRANGEBYLEX ZREMRANGEBYRANK ZREMRANGEBYSCORE ZREVRANGE ZREVRANGEBYLEX ZREVRANGEBYSCORE ZREVRANK ZSCAN ZSCORE ZUNIONSTORE ZRANDMEMBER}
set list_commands {LINDEX LINSERT LLEN LPOP LPOS LPUSH LPUSHX LRANGE LREM LSET LTRIM RPOP RPOPLPUSH RPUSH RPUSHX}
diff --git a/tests/test_helper.tcl b/tests/test_helper.tcl
index e28b1d139df00fdc87dd1c63dda35f45e329643f..5bb8a57542557b25df34764129fd99256b45e48b 100644
--- a/tests/test_helper.tcl
+++ b/tests/test_helper.tcl
@@ -46,7 +46,7 @@ set ::all_tests {
integration/aof
integration/rdb
integration/corrupt-dump
- integration/corrupt-dump-fuzzer
+ # integration/corrupt-dump-fuzzer
integration/convert-zipmap-hash-on-load
integration/logging
integration/psync2
diff --git a/tests/unit/type/string.tcl b/tests/unit/type/string.tcl
index 43968b26b972ee82eaada562d9983c6d6cbde51d..307b1b30422b8dee350e9af22a2467bcc6b43368 100644
--- a/tests/unit/type/string.tcl
+++ b/tests/unit/type/string.tcl
@@ -549,29 +549,27 @@ start_server {tags {"string"}} {
set rna2 {ATTAAAGGTTTATACCTTCCCAGGTAACAAACCAACCAACTTTCGATCTCTTGTAGATCTGTTCTCTAAACGAACTTTAAAATCTGTGTGGCTGTCACTCGGCTGCATGCTTAGTGCACTCACGCAGTATAATTAATAACTAATTACTGTCGTTGACAGGACACGAGTAACTCGTCTATCTTCTGCAGGCTGCTTACGGTTTCGTCCGTGTTGCAGCCGATCATCAGCACATCTAGGTTT}
set rnalcs {ACCTTCCCAGGTAACAAACCAACCAACTTTCGATCTCTTGTAGATCTGTTCTCTAAACGAACTTTAAAATCTGTGTGGCTGTCACTCGGCTGCATGCTTAGTGCACTCACGCAGTATAATTAATAACTAATTACTGTCGTTGACAGGACACGAGTAACTCGTCTATCTTCTGCAGGCTGCTTACGGTTTCGTCCGTGTTGCAGCCGATCATCAGCACATCTAGGTTT}
- test {STRALGO LCS string output with STRINGS option} {
- r STRALGO LCS STRINGS $rna1 $rna2
+ test {LCS basic} {
+ r set virus1{t} $rna1
+ r set virus2{t} $rna2
+ r LCS virus1{t} virus2{t}
} $rnalcs
- test {STRALGO LCS len} {
- r STRALGO LCS LEN STRINGS $rna1 $rna2
+ test {LCS len} {
+ r set virus1{t} $rna1
+ r set virus2{t} $rna2
+ r LCS virus1{t} virus2{t} LEN
} [string length $rnalcs]
- test {LCS with KEYS option} {
- r set virus1 $rna1
- r set virus2 $rna2
- r STRALGO LCS KEYS virus1 virus2
- } $rnalcs
-
test {LCS indexes} {
- dict get [r STRALGO LCS IDX KEYS virus1 virus2] matches
+ dict get [r LCS virus1{t} virus2{t} IDX] matches
} {{{238 238} {239 239}} {{236 236} {238 238}} {{229 230} {236 237}} {{224 224} {235 235}} {{1 222} {13 234}}}
test {LCS indexes with match len} {
- dict get [r STRALGO LCS IDX KEYS virus1 virus2 WITHMATCHLEN] matches
+ dict get [r LCS virus1{t} virus2{t} IDX WITHMATCHLEN] matches
} {{{238 238} {239 239} 1} {{236 236} {238 238} 1} {{229 230} {236 237} 2} {{224 224} {235 235} 1} {{1 222} {13 234} 222}}
test {LCS indexes with match len and minimum match len} {
- dict get [r STRALGO LCS IDX KEYS virus1 virus2 WITHMATCHLEN MINMATCHLEN 5] matches
+ dict get [r LCS virus1{t} virus2{t} IDX WITHMATCHLEN MINMATCHLEN 5] matches
} {{{1 222} {13 234} 222}}
}
diff --git a/utils/find-best-worker-threads.sh b/utils/find-best-worker-threads.sh
index 10dd4de4c90045d43e0a0ae9ee54e1168b498e7b..b7b9e09878bf1bbfa57fb0d8d733fd20dbcdcf26 100755
--- a/utils/find-best-worker-threads.sh
+++ b/utils/find-best-worker-threads.sh
@@ -1,6 +1,6 @@
#!/bin/bash
-HOST=localhost
+HOST=127.0.0.1
PORT=6379
CLIENTS=50
REQUESTS=5000
@@ -73,7 +73,7 @@ while [ $THREAD -le $MAXTHREADS ]; do
echo "Failed to start openamdc server. Exiting."
exit 1
else
- echo "OpenAMDC server is already running, ../src/openamdc-server --port $PORT --worker-threads $THREAD > /dev/null 2> /dev/null &."
+ echo "OpenAMDC server is already running, ../src/openamdc-server --bind $HOST --port $PORT --worker-threads $THREAD > /dev/null 2> /dev/null &."
fi
echo "Starting benchmark, $benchmark -s $HOST -p $PORT -t $THREAD -c $CLIENTS -n $REQUESTS --hide-histogram --distinct-client-seed --command="set __key__ __data__" --key-prefix="kv_" --key-minimum=1 --key-maximum=10000 -R -d 128"