From 62f21ad8e3a2c5c0356fb0358ac7124567d08885 Mon Sep 17 00:00:00 2001 From: caihongxu Date: Sat, 13 Dec 2025 15:39:12 +0800 Subject: [PATCH] umq: flush sqe before jetty destroy (cherry picked from commit 512986930835bcb43a7e64e30e4115c26aa4b4c7) --- src/urpc/umq/umq_ub/umq_ub_impl.c | 31 +++++++++++++++++++++++++++++++ 1 file changed, 31 insertions(+) diff --git a/src/urpc/umq/umq_ub/umq_ub_impl.c b/src/urpc/umq/umq_ub/umq_ub_impl.c index a21ab191..888d1198 100644 --- a/src/urpc/umq/umq_ub/umq_ub_impl.c +++ b/src/urpc/umq/umq_ub/umq_ub_impl.c @@ -3392,6 +3392,30 @@ static int process_tx_msg(umq_buf_t *buf, ub_queue_t *queue) return UMQ_SUCCESS; } +static int umq_ub_flush_sqe(ub_queue_t *queue, umq_buf_t **buf, uint32_t buf_count) +{ + urma_cr_t cr[buf_count]; + int cnt = 0; + int cr_cnt = urma_flush_jetty(queue->jetty, buf_count, cr); + for (int i = 0; i < cr_cnt; i++) { + if (cr[i].status == URMA_CR_WR_SUSPEND_DONE || cr[i].status == URMA_CR_WR_FLUSH_ERR_DONE || + cr[i].user_ctx <= UINT16_MAX) { + continue; + } + + buf[cnt] = (umq_buf_t *)(uintptr_t)cr[i].user_ctx; + buf[cnt]->io_direction = UMQ_IO_TX; + buf[cnt]->status = (umq_buf_status_t)cr[i].status; + cnt++; + } + + if (cr_cnt > 0) { + UMQ_VLOG_INFO("jetty flush %d sqe, cr_cnt %d\n", cnt, cr_cnt); + } + + return cnt; +} + static int umq_ub_poll_tx(uint64_t umqh, umq_buf_t **buf, uint32_t buf_count) { if (buf_count == 0) { @@ -3462,6 +3486,13 @@ static int umq_ub_poll_tx(uint64_t umqh, umq_buf_t **buf, uint32_t buf_count) } } + if (queue->state == QUEUE_STATE_ERR && queue->tx_flush_done && (int)buf_count > qbuf_cnt) { + tx_cr_cnt = umq_ub_flush_sqe(queue, &buf[qbuf_cnt], buf_count - qbuf_cnt); + if (tx_cr_cnt > 0) { + qbuf_cnt += tx_cr_cnt; + } + } + umq_dec_ref(queue->dev_ctx->io_lock_free, &queue->ref_cnt, 1); return qbuf_cnt; } -- Gitee