diff --git a/functionsystem/src/function_proxy/local_scheduler/instance_control/instance_ctrl_actor.cpp b/functionsystem/src/function_proxy/local_scheduler/instance_control/instance_ctrl_actor.cpp index 1223a2cb10902ddafc475fc971858c62ca3cf81c..a91e76e1ef3e3083f5c4e8aa677845bf3ce44105 100644 --- a/functionsystem/src/function_proxy/local_scheduler/instance_control/instance_ctrl_actor.cpp +++ b/functionsystem/src/function_proxy/local_scheduler/instance_control/instance_ctrl_actor.cpp @@ -792,7 +792,19 @@ litebus::Future InstanceCtrlActor::Exit(const std::shared_ptrTagStop(); - return KillResponse(); + // get client to check connection is already closed. + return clientManager_->GetControlInterfacePosixClient(instanceInfo.instanceid()) + .Then([instanceInfo, aid(GetAID())](const std::shared_ptr &client) { + // if connection of client is closed, the client should be removed immediately, avoid garbage collection + // deferred. + if (client == nullptr || client->IsDone()) { + YRLOG_INFO("connection of {} is already closed, immediately to remove driver info", + instanceInfo.instanceid()); + litebus::Async(aid, &InstanceCtrlActor::DeleteDriverClient, instanceInfo.instanceid(), + instanceInfo.jobid()); + } + return KillResponse(); + }); } if (auto iter = exiting_.find(instanceInfo.instanceid()); iter != exiting_.end()) { YRLOG_INFO("{}|instance({}) is exiting", instanceInfo.requestid(), instanceInfo.instanceid()); diff --git a/functionsystem/tests/unit/function_proxy/local_scheduler/instance_control/instance_ctrl_test.cpp b/functionsystem/tests/unit/function_proxy/local_scheduler/instance_control/instance_ctrl_test.cpp index 7190df16fab62ff67291fdec7bc1ee3e718529e5..170ca3a2752f6349ea203fd507fb451225496ac7 100644 --- a/functionsystem/tests/unit/function_proxy/local_scheduler/instance_control/instance_ctrl_test.cpp +++ b/functionsystem/tests/unit/function_proxy/local_scheduler/instance_control/instance_ctrl_test.cpp @@ -5943,4 +5943,38 @@ TEST_F(InstanceCtrlTest, KillInstanceWithCheckpoint) EXPECT_EQ(killRsp.code(), common::ErrorCode::ERR_NONE); } +TEST_F(InstanceCtrlTest, KillDriverInstance) +{ + const std::string instanceID = "driver-job_12345"; + const std::string function = "12345678901234561234567890123456/0-test-helloWorld/$latest"; + const std::string runtimeID = "runtimeA"; + const std::string functionProxyID = "nodeN"; + + auto stateMachine = std::make_shared("nodeN"); + auto &mockStateMachine = *stateMachine; + EXPECT_CALL(*instanceControlView_, GetInstance).WillRepeatedly(Return(stateMachine)); + resources::InstanceInfo instanceInfo; + instanceInfo.set_instanceid(instanceID); + instanceInfo.mutable_instancestatus()->set_code(int32_t(InstanceState::RUNNING)); + instanceInfo.set_function(function); + instanceInfo.set_runtimeid(runtimeID); + instanceInfo.set_functionproxyid(functionProxyID); + auto scheduleReq = std::make_shared(); + scheduleReq->mutable_instance()->CopyFrom(instanceInfo); + auto instanceContext = std::make_shared(scheduleReq); + EXPECT_CALL(mockStateMachine, GetInstanceContextCopy).WillRepeatedly(Return(instanceContext)); + EXPECT_CALL(mockStateMachine, TagStop).WillRepeatedly(Return()); + EXPECT_CALL(*funcAgentMgr_, IsFuncAgentRecovering(testing::_)).WillRepeatedly(Return(true)); + auto mockSharedClient = std::make_shared(); + EXPECT_CALL(*mockSharedClientManagerProxy_, GetControlInterfacePosixClient(_)) + .WillRepeatedly(Return(mockSharedClient)); + EXPECT_CALL(*mockSharedClient, IsDone).WillOnce(Return(false)); + auto killReq = GenKillRequest(instanceID, SHUT_DOWN_SIGNAL); + auto srcInstance = "instanceM"; + + auto killRsp = instanceCtrl_->Kill(srcInstance, killReq); + ASSERT_AWAIT_READY(killRsp); + EXPECT_EQ(killRsp.Get().code(), common::ErrorCode::ERR_NONE); +} + } // namespace functionsystem::test \ No newline at end of file diff --git a/functionsystem/tests/unit/mocks/mock_instance_state_machine.h b/functionsystem/tests/unit/mocks/mock_instance_state_machine.h index 14a6c1debc6cdecb55f48dfa42e87f40fa442ff6..94ea3ff680a6d8cd5fcdd83448d4cf3c585fe715 100644 --- a/functionsystem/tests/unit/mocks/mock_instance_state_machine.h +++ b/functionsystem/tests/unit/mocks/mock_instance_state_machine.h @@ -93,6 +93,7 @@ public: MOCK_METHOD(std::shared_ptr, GetInstanceContextCopy, (), (override)); MOCK_METHOD(litebus::Future, GetCancelFuture, (), (override)); MOCK_METHOD(int64_t, GetModRevision, (), (override)); + MOCK_METHOD(void, TagStop, (), (override)); }; } // namespace functionsystem::test diff --git a/scripts/deploy/function_system/install.sh b/scripts/deploy/function_system/install.sh index 66c1055cfeb1620c59cf93549e1b179b2554f080..a05e62636d7d4ad2263bad7d55562f50a8fd0f33 100644 --- a/scripts/deploy/function_system/install.sh +++ b/scripts/deploy/function_system/install.sh @@ -177,7 +177,7 @@ function install_faas_frontend() { POD_NAME="frontend-process" \ FUNCTION_LIB_PATH=${PATTERN_FAAS_HOME_DIR}/faasfrontend/faasfrontend.so \ INIT_ARGS_FILE_PATH=${install_init_frontend_config} \ - LD_LIBRARY_PATH=${FUNCTION_SYSTEM_DIR}/lib:${GO_RUNTIME_BIN}:${LD_LIBRARY_PATH} \ + LD_LIBRARY_PATH=${GO_RUNTIME_BIN}:${FUNCTION_SYSTEM_DIR}/lib:${LD_LIBRARY_PATH} \ ENABLE_SERVER_MODE="true" \ INIT_HANDLER="faasfrontend.InitHandler" \ CALL_HANDLER="faasfrontend.CallHandler" \ @@ -214,6 +214,70 @@ function install_faas_frontend() { log_info "succeed to start faas frontend, http_ip=${IP_ADDRESS}, http_port=${FAAS_FRONTEND_HTTP_PORT}, grpc_port=${FAAS_FRONTEND_GRPC_PORT}, pid=${FAAS_FRONTEND_PID}" } +function install_function_scheduler() { + log_info "start scheduler..." + init_scheduler_config=${FUNCTION_SYSTEM_DIR}/config/init_scheduler_args.json + install_init_scheduler_config=${config_install_dir}/init_scheduler_args_temp.json + cp ${init_scheduler_config} ${install_init_scheduler_config} + sed -i "s/{etcdAddr}/$(echo ${ETCD_CLUSTER_ADDRESS} | sed 's/,/","/g')/g" ${install_init_scheduler_config} + sed -i "s/{sslEnable}/${SSL_ENABLE}/g" ${install_init_scheduler_config} + sed -i "s/{sccEnable}/${SCC_ENABLE}/g" ${install_init_scheduler_config} + sed -i "s/{etcdAuthType}/${ETCD_AUTH_TYPE}/g" ${install_init_scheduler_config} + sed -i "s*{azPrefix}*${ETCD_TABLE_PREFIX}*g" ${install_init_scheduler_config} + sed -i "s*{sslBasePath}*${SSL_BASE_PATH}*g" ${install_init_scheduler_config} + sed -i "s*{sccBasePath}*${SCC_BASE_PATH}*g" ${install_init_scheduler_config} + if [ "X${SSL_ENABLE}" = "Xtrue" ] && [ -n "${ETCD_SSL_BASE_PATH}" ]; then + sed -i "s*{etcdCAFile}*${ETCD_SSL_BASE_PATH}/${ETCD_CA_FILE}*g" ${install_init_scheduler_config} + sed -i "s*{etcdCertFile}*${ETCD_SSL_BASE_PATH}/${ETCD_CLIENT_CERT_FILE}*g" ${install_init_scheduler_config} + sed -i "s*{etcdKeyFile}*${ETCD_SSL_BASE_PATH}/${ETCD_CLIENT_KEY_FILE}*g" ${install_init_scheduler_config} + sed -i "s*{passphraseFile}*${ETCD_SSL_BASE_PATH}/${ETCD_CLIENT_PWD_FILE}*g" ${install_init_scheduler_config} + else + sed -i "s*{etcdCAFile}**g" ${install_init_scheduler_config} + sed -i "s*{etcdCertFile}**g" ${install_init_scheduler_config} + sed -i "s*{etcdKeyFile}**g" ${install_init_scheduler_config} + sed -i "s*{passphraseFile}**g" ${install_init_scheduler_config} + fi + GO_RUNTIME_BIN=${RUNTIME_HOME_DIR}/service/go/bin + POD_NAME="scheduler-process" \ + FUNCTION_LIB_PATH=${PATTERN_FAAS_HOME_DIR}/faasscheduler/faasscheduler.so \ + INIT_ARGS_FILE_PATH=${install_init_scheduler_config} \ + LD_LIBRARY_PATH=${GO_RUNTIME_BIN}:${FUNCTION_SYSTEM_DIR}/lib:${LD_LIBRARY_PATH} \ + ENABLE_SERVER_MODE="true" \ + INIT_HANDLER="faasscheduler.InitHandler" \ + CALL_HANDLER="faasscheduler.CallHandler" \ + CHECKPOINT_HANDLER="faasscheduler.CheckpointHandler" \ + RECOVER_HANDLER="faasscheduler.RecoverHandler" \ + SHUTDOWN_HANDLER="faasscheduler.ShutdownHandler" \ + SIGNAL_HANDLER="faasscheduler.SignalHandler" \ + YR_FUNCTION_LIB_PATH=${PATTERN_FAAS_HOME_DIR}/faasscheduler/ \ + GLOG_log_dir="${FS_LOG_PATH}" \ + YR_LOG_LEVEL=${FS_LOG_LEVEL} \ + POD_IP=${IP_ADDRESS} \ + NODE_IP=${IP_ADDRESS} \ + DATASYSTEM_ADDR=${IP_ADDRESS}:${DS_WORKER_PORT} \ + INSTANCE_ID="driver-scheduler-${NODE_ID}" \ + FAAS_LOG_PATH=${FS_LOG_PATH} \ + ${GO_RUNTIME_BIN}/goruntime \ + -jobId=${NODE_ID} \ + -runtimeId='scheduler_libruntime' \ + -instanceId="driver-scheduler-${NODE_ID}" \ + -functionName='0/0-system-faasscheduler/$latest' \ + -logLevel=${FS_LOG_LEVEL} \ + -logPath=${FS_LOG_PATH} \ + -enableMTLS=${ENABLE_MTLS} \ + -privateKeyPath=${PRIVATE_KEY_PATH} \ + -certificateFilePath=${CERTIFICATE_FILE_PATH} \ + -verifyFilePath=${VERIFY_FILE_PATH} \ + -encryptPrivateKeyPasswd=${ENCRYPT_PRIVATE_KEY_PASSWD} \ + -primaryKeyStoreFile=${PRIMARY_KEY_STORE_FILE} \ + -standbyKeyStoreFile=${STANDBY_KEY_STORE_FILE} \ + -enableDsEncrypt=${RUNTIME_DS_ENCRYPT_ENABLE} \ + -functionSystemAddress="${IP_ADDRESS}:${FUNCTION_PROXY_GRPC_PORT}" \ + -driverMode true >> "${FS_LOG_PATH}/${NODE_ID}-scheduler${STD_LOG_SUFFIX}" 2>&1 & + SCHEDULER_PID="$!" + log_info "succeed to scheduler, pid=${SCHEDULER_PID}" +} + function install_function_agent() { install_function_agent_and_runtime_manager_in_the_same_process return $? @@ -448,6 +512,9 @@ function install_function_system() { faas_frontend) install_faas_frontend ;; + function_scheduler) + install_function_scheduler + ;; metaservice) install_metaservice ;;