mirror of
https://github.com/XRPLF/rippled.git
synced 2025-12-06 17:27:55 +00:00
Add coroutine thread specific storage
This commit is contained in:
committed by
Edward Hennis
parent
f73c55a922
commit
4fb6bf3e67
@@ -1954,6 +1954,8 @@
|
||||
</ClCompile>
|
||||
<ClInclude Include="..\..\src\ripple\basics\KeyCache.h">
|
||||
</ClInclude>
|
||||
<ClInclude Include="..\..\src\ripple\basics\LocalValue.h">
|
||||
</ClInclude>
|
||||
<ClInclude Include="..\..\src\ripple\basics\Log.h">
|
||||
</ClInclude>
|
||||
<ClInclude Include="..\..\src\ripple\basics\make_SSLContext.h">
|
||||
|
||||
@@ -2667,6 +2667,9 @@
|
||||
<ClInclude Include="..\..\src\ripple\basics\KeyCache.h">
|
||||
<Filter>ripple\basics</Filter>
|
||||
</ClInclude>
|
||||
<ClInclude Include="..\..\src\ripple\basics\LocalValue.h">
|
||||
<Filter>ripple\basics</Filter>
|
||||
</ClInclude>
|
||||
<ClInclude Include="..\..\src\ripple\basics\Log.h">
|
||||
<Filter>ripple\basics</Filter>
|
||||
</ClInclude>
|
||||
|
||||
125
src/ripple/basics/LocalValue.h
Normal file
125
src/ripple/basics/LocalValue.h
Normal file
@@ -0,0 +1,125 @@
|
||||
//------------------------------------------------------------------------------
|
||||
/*
|
||||
This file is part of rippled: https://github.com/ripple/rippled
|
||||
Copyright (c) 2012, 2013 Ripple Labs Inc.
|
||||
|
||||
Permission to use, copy, modify, and/or distribute this software for any
|
||||
purpose with or without fee is hereby granted, provided that the above
|
||||
copyright notice and this permission notice appear in all copies.
|
||||
|
||||
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
|
||||
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
|
||||
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
|
||||
ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
|
||||
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
|
||||
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
|
||||
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
|
||||
*/
|
||||
//==============================================================================
|
||||
|
||||
#ifndef RIPPLE_BASICS_LOCALVALUE_H_INCLUDED
|
||||
#define RIPPLE_BASICS_LOCALVALUE_H_INCLUDED
|
||||
|
||||
#include <boost/thread/tss.hpp>
|
||||
#include <memory>
|
||||
#include <unordered_map>
|
||||
|
||||
namespace ripple {
|
||||
|
||||
namespace detail {
|
||||
|
||||
struct LocalValues
|
||||
{
|
||||
bool onCoro = true;
|
||||
|
||||
struct BasicValue
|
||||
{
|
||||
virtual ~BasicValue() = default;
|
||||
virtual void* get() = 0;
|
||||
};
|
||||
|
||||
template <class T>
|
||||
struct Value : BasicValue
|
||||
{
|
||||
T t_;
|
||||
|
||||
Value() = default;
|
||||
Value(T const& t) : t_(t) {}
|
||||
|
||||
void* get() override
|
||||
{
|
||||
return &t_;
|
||||
}
|
||||
};
|
||||
|
||||
std::unordered_map<void const*, std::unique_ptr<BasicValue>> values;
|
||||
|
||||
static
|
||||
inline
|
||||
void
|
||||
cleanup(LocalValues* lvs)
|
||||
{
|
||||
if (lvs && ! lvs->onCoro)
|
||||
delete lvs;
|
||||
}
|
||||
};
|
||||
|
||||
template<class = void>
|
||||
boost::thread_specific_ptr<detail::LocalValues>&
|
||||
getLocalValues()
|
||||
{
|
||||
static boost::thread_specific_ptr<
|
||||
detail::LocalValues> tsp(&detail::LocalValues::cleanup);
|
||||
return tsp;
|
||||
}
|
||||
|
||||
} // detail
|
||||
|
||||
template <class T>
|
||||
class LocalValue
|
||||
{
|
||||
public:
|
||||
template <class... Args>
|
||||
LocalValue(Args&&... args)
|
||||
: t_(std::forward<Args>(args)...)
|
||||
{
|
||||
}
|
||||
|
||||
/** Stores instance of T specific to the calling coroutine or thread. */
|
||||
T& operator*();
|
||||
|
||||
/** Stores instance of T specific to the calling coroutine or thread. */
|
||||
T* operator->()
|
||||
{
|
||||
return &**this;
|
||||
}
|
||||
|
||||
private:
|
||||
T t_;
|
||||
};
|
||||
|
||||
template <class T>
|
||||
T&
|
||||
LocalValue<T>::operator*()
|
||||
{
|
||||
auto lvs = detail::getLocalValues().get();
|
||||
if (! lvs)
|
||||
{
|
||||
lvs = new detail::LocalValues();
|
||||
lvs->onCoro = false;
|
||||
detail::getLocalValues().reset(lvs);
|
||||
}
|
||||
else
|
||||
{
|
||||
auto const iter = lvs->values.find(this);
|
||||
if (iter != lvs->values.end())
|
||||
return *reinterpret_cast<T*>(iter->second->get());
|
||||
}
|
||||
|
||||
return *reinterpret_cast<T*>(lvs->values.emplace(this,
|
||||
std::make_unique<detail::LocalValues::Value<T>>(t_)).
|
||||
first->second->get());
|
||||
}
|
||||
} // ripple
|
||||
|
||||
#endif
|
||||
@@ -21,10 +21,12 @@
|
||||
#define RIPPLE_CORE_JOBCORO_H_INCLUDED
|
||||
|
||||
#include <ripple/core/Job.h>
|
||||
#include <ripple/basics/LocalValue.h>
|
||||
#include <beast/win32_workaround.h>
|
||||
#include <boost/coroutine/all.hpp>
|
||||
#include <condition_variable>
|
||||
#include <string>
|
||||
#include <memory>
|
||||
#include <mutex>
|
||||
|
||||
namespace ripple {
|
||||
@@ -40,6 +42,7 @@ struct JobCoro_create_t { };
|
||||
class JobCoro : public std::enable_shared_from_this<JobCoro>
|
||||
{
|
||||
private:
|
||||
detail::LocalValues lvs_;
|
||||
JobQueue& jq_;
|
||||
JobType type_;
|
||||
std::string name_;
|
||||
|
||||
@@ -60,13 +60,15 @@ JobCoro::post()
|
||||
jq_.addJob(type_, name_,
|
||||
[this, sp = shared_from_this()](Job&)
|
||||
{
|
||||
auto saved = detail::getLocalValues().release();
|
||||
detail::getLocalValues().reset(&lvs_);
|
||||
std::lock_guard<std::mutex> lock(mutex_);
|
||||
context_sp().reset(&ctx_);
|
||||
coro_();
|
||||
detail::getLocalValues().release();
|
||||
detail::getLocalValues().reset(saved);
|
||||
std::lock_guard<std::mutex> lk(mutex_run_);
|
||||
running_ = false;
|
||||
cv_.notify_all();
|
||||
context_sp().reset(nullptr);
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
@@ -18,13 +18,11 @@
|
||||
//==============================================================================
|
||||
|
||||
#include <BeastConfig.h>
|
||||
#include <ripple/core/JobCoro.h>
|
||||
#include <ripple/core/JobQueue.h>
|
||||
#include <ripple/test/jtx.h>
|
||||
#include <chrono>
|
||||
#include <condition_variable>
|
||||
#include <mutex>
|
||||
#include <thread>
|
||||
|
||||
namespace ripple {
|
||||
namespace test {
|
||||
@@ -32,82 +30,152 @@ namespace test {
|
||||
class Coroutine_test : public beast::unit_test::suite
|
||||
{
|
||||
public:
|
||||
class gate
|
||||
{
|
||||
private:
|
||||
std::condition_variable cv_;
|
||||
std::mutex mutex_;
|
||||
bool signaled_ = false;
|
||||
|
||||
public:
|
||||
// Thread safe, blocks until signaled or period expires.
|
||||
// Returns `true` if signaled.
|
||||
template <class Rep, class Period>
|
||||
bool
|
||||
wait_for(std::chrono::duration<Rep, Period> const& rel_time)
|
||||
{
|
||||
std::unique_lock<std::mutex> lk(mutex_);
|
||||
auto b = cv_.wait_for(lk, rel_time, [=]{ return signaled_; });
|
||||
signaled_ = false;
|
||||
return b;
|
||||
}
|
||||
|
||||
void
|
||||
test_coroutine()
|
||||
signal()
|
||||
{
|
||||
std::lock_guard<std::mutex> lk(mutex_);
|
||||
signaled_ = true;
|
||||
cv_.notify_all();
|
||||
}
|
||||
};
|
||||
|
||||
void
|
||||
correct_order()
|
||||
{
|
||||
using namespace std::chrono_literals;
|
||||
using namespace jtx;
|
||||
Env env(*this);
|
||||
std::atomic<int> i{0};
|
||||
std::condition_variable cv;
|
||||
auto& jq = env.app().getJobQueue();
|
||||
jq.setThreadCount(0, false);
|
||||
gate g1, g2;
|
||||
std::shared_ptr<JobCoro> jc;
|
||||
jq.postCoro(jtCLIENT, "Coroutine-Test",
|
||||
[&](std::shared_ptr<JobCoro> jc)
|
||||
[&](auto const& jcr)
|
||||
{
|
||||
std::thread t(
|
||||
[&i, jc]()
|
||||
{
|
||||
std::this_thread::sleep_for(20ms);
|
||||
++i;
|
||||
jc->post();
|
||||
});
|
||||
jc = jcr;
|
||||
g1.signal();
|
||||
jc->yield();
|
||||
t.join();
|
||||
++i;
|
||||
cv.notify_one();
|
||||
g2.signal();
|
||||
});
|
||||
|
||||
{
|
||||
std::mutex m;
|
||||
std::unique_lock<std::mutex> lk(m);
|
||||
expect(cv.wait_for(lk, 1s,
|
||||
[&]()
|
||||
{
|
||||
return i == 2;
|
||||
}));
|
||||
}
|
||||
jq.shutdown();
|
||||
expect(i == 2);
|
||||
expect(g1.wait_for(5s));
|
||||
jc->join();
|
||||
jc->post();
|
||||
expect(g2.wait_for(5s));
|
||||
}
|
||||
|
||||
void
|
||||
test_incorrect_order()
|
||||
incorrect_order()
|
||||
{
|
||||
using namespace std::chrono_literals;
|
||||
using namespace jtx;
|
||||
Env env(*this);
|
||||
std::atomic<int> i{0};
|
||||
std::condition_variable cv;
|
||||
auto& jq = env.app().getJobQueue();
|
||||
jq.setThreadCount(0, false);
|
||||
gate g;
|
||||
jq.postCoro(jtCLIENT, "Coroutine-Test",
|
||||
[&](std::shared_ptr<JobCoro> jc)
|
||||
[&](auto const& jc)
|
||||
{
|
||||
jc->post();
|
||||
jc->yield();
|
||||
++i;
|
||||
cv.notify_one();
|
||||
g.signal();
|
||||
});
|
||||
|
||||
{
|
||||
std::mutex m;
|
||||
std::unique_lock<std::mutex> lk(m);
|
||||
expect(cv.wait_for(lk, 1s,
|
||||
[&]()
|
||||
{
|
||||
return i == 1;
|
||||
}));
|
||||
expect(g.wait_for(5s));
|
||||
}
|
||||
jq.shutdown();
|
||||
expect(i == 1);
|
||||
|
||||
void
|
||||
thread_specific_storage()
|
||||
{
|
||||
using namespace std::chrono_literals;
|
||||
using namespace jtx;
|
||||
Env env(*this);
|
||||
auto& jq = env.app().getJobQueue();
|
||||
jq.setThreadCount(0, true);
|
||||
static int const N = 4;
|
||||
std::array<std::shared_ptr<JobCoro>, N> a;
|
||||
|
||||
LocalValue<int> lv(-1);
|
||||
expect(*lv == -1);
|
||||
|
||||
gate g;
|
||||
jq.addJob(jtCLIENT, "LocalValue-Test",
|
||||
[&](auto const& job)
|
||||
{
|
||||
this->expect(*lv == -1);
|
||||
*lv = -2;
|
||||
this->expect(*lv == -2);
|
||||
g.signal();
|
||||
});
|
||||
expect(g.wait_for(5s));
|
||||
expect(*lv == -1);
|
||||
|
||||
for(int i = 0; i < N; ++i)
|
||||
{
|
||||
jq.postCoro(jtCLIENT, "Coroutine-Test",
|
||||
[&, id = i](auto const& jc)
|
||||
{
|
||||
a[id] = jc;
|
||||
g.signal();
|
||||
jc->yield();
|
||||
|
||||
this->expect(*lv == -1);
|
||||
*lv = id;
|
||||
this->expect(*lv == id);
|
||||
g.signal();
|
||||
jc->yield();
|
||||
|
||||
this->expect(*lv == id);
|
||||
});
|
||||
expect(g.wait_for(5s));
|
||||
a[i]->join();
|
||||
}
|
||||
for(auto const& jc : a)
|
||||
{
|
||||
jc->post();
|
||||
expect(g.wait_for(5s));
|
||||
jc->join();
|
||||
}
|
||||
for(auto const& jc : a)
|
||||
{
|
||||
jc->post();
|
||||
jc->join();
|
||||
}
|
||||
|
||||
jq.addJob(jtCLIENT, "LocalValue-Test",
|
||||
[&](auto const& job)
|
||||
{
|
||||
this->expect(*lv == -2);
|
||||
g.signal();
|
||||
});
|
||||
expect(g.wait_for(5s));
|
||||
expect(*lv == -1);
|
||||
}
|
||||
|
||||
void
|
||||
run()
|
||||
{
|
||||
test_coroutine();
|
||||
test_incorrect_order();
|
||||
correct_order();
|
||||
incorrect_order();
|
||||
thread_specific_storage();
|
||||
}
|
||||
};
|
||||
|
||||
|
||||
Reference in New Issue
Block a user