Review fixes: add cv timeout in RipplePathFind, catch exceptions in coroutines

- Add 30-second timeout to condition_variable wait in RipplePathFind.
  If the path-finding continuation never fires (e.g. shutdown race),
  return rpcINTERNAL instead of blocking the RPC thread indefinitely.
- Add try-catch in fire-and-forget coroutine lambdas in ServerHandler
  (RPC-Client and WS-Client). Without this, exceptions thrown in
  processSession() are silently swallowed in the CoroTask promise
  since no one co_awaits the top-level task.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
Pratik Mankawde
2026-03-08 20:32:23 +00:00
parent 6e90a7631f
commit fec8f4ff2d
2 changed files with 33 additions and 9 deletions

View File

@@ -287,7 +287,14 @@ ServerHandler::onRequest(Session& session)
std::shared_ptr<Session> detachedSession = session.detach();
auto const postResult = m_jobQueue.postCoroTask(
jtCLIENT_RPC, "RPC-Client", [this, detachedSession](auto) -> CoroTask<void> {
processSession(detachedSession);
try
{
processSession(detachedSession);
}
catch (std::exception const& e)
{
JLOG(m_journal.error()) << "RPC-Client coroutine exception: " << e.what();
}
co_return;
});
if (postResult == nullptr)
@@ -328,13 +335,21 @@ ServerHandler::onWSMessage(
jtCLIENT_WEBSOCKET,
"WS-Client",
[this, session, jv = std::move(jv)](auto) -> CoroTask<void> {
auto const jr = this->processSession(session, jv);
auto const s = to_string(jr);
auto const n = s.length();
boost::beast::multi_buffer sb(n);
sb.commit(boost::asio::buffer_copy(sb.prepare(n), boost::asio::buffer(s.c_str(), n)));
session->send(std::make_shared<StreambufWSMsg<decltype(sb)>>(std::move(sb)));
session->complete();
try
{
auto const jr = this->processSession(session, jv);
auto const s = to_string(jr);
auto const n = s.length();
boost::beast::multi_buffer sb(n);
sb.commit(
boost::asio::buffer_copy(sb.prepare(n), boost::asio::buffer(s.c_str(), n)));
session->send(std::make_shared<StreambufWSMsg<decltype(sb)>>(std::move(sb)));
session->complete();
}
catch (std::exception const& e)
{
JLOG(m_journal.error()) << "WS-Client coroutine exception: " << e.what();
}
co_return;
});
if (postResult == nullptr)

View File

@@ -45,6 +45,8 @@ doRipplePathFind(RPC::JsonContext& context)
// until the path-finding continuation signals completion.
// If makeLegacyPathRequest cannot schedule the job (e.g. during
// shutdown), it returns an empty request and we skip the wait.
// Replaces the old Coro yield/resume pattern with synchronous
// blocking, eliminating shutdown race conditions.
std::mutex mtx;
std::condition_variable cv;
bool pathDone = false;
@@ -63,8 +65,15 @@ doRipplePathFind(RPC::JsonContext& context)
context.params);
if (request)
{
using namespace std::chrono_literals;
std::unique_lock lk(mtx);
cv.wait(lk, [&] { return pathDone; });
if (!cv.wait_for(lk, 30s, [&] { return pathDone; }))
{
// Path-finding continuation never fired (e.g. shutdown
// race or unexpected failure). Return an internal error
// rather than blocking the RPC thread indefinitely.
return rpcError(rpcINTERNAL);
}
jvResult = request->doStatus(context.params);
}