From 4fb6bf3e677e07db78e9314913f28310cebe8322 Mon Sep 17 00:00:00 2001 From: Miguel Portilla Date: Fri, 18 Dec 2015 19:17:56 -0500 Subject: [PATCH] Add coroutine thread specific storage --- Builds/VisualStudio2015/RippleD.vcxproj | 2 + .../VisualStudio2015/RippleD.vcxproj.filters | 3 + src/ripple/basics/LocalValue.h | 125 ++++++++++++++ src/ripple/core/JobCoro.h | 3 + src/ripple/core/JobCoro.ipp | 6 +- src/ripple/core/tests/Coroutine.test.cpp | 156 +++++++++++++----- 6 files changed, 249 insertions(+), 46 deletions(-) create mode 100644 src/ripple/basics/LocalValue.h diff --git a/Builds/VisualStudio2015/RippleD.vcxproj b/Builds/VisualStudio2015/RippleD.vcxproj index 6eafb27e52..af49afa6b8 100644 --- a/Builds/VisualStudio2015/RippleD.vcxproj +++ b/Builds/VisualStudio2015/RippleD.vcxproj @@ -1954,6 +1954,8 @@ + + diff --git a/Builds/VisualStudio2015/RippleD.vcxproj.filters b/Builds/VisualStudio2015/RippleD.vcxproj.filters index 0567e23efc..a8646aff5c 100644 --- a/Builds/VisualStudio2015/RippleD.vcxproj.filters +++ b/Builds/VisualStudio2015/RippleD.vcxproj.filters @@ -2667,6 +2667,9 @@ ripple\basics + + ripple\basics + ripple\basics diff --git a/src/ripple/basics/LocalValue.h b/src/ripple/basics/LocalValue.h new file mode 100644 index 0000000000..28e8a6b3a3 --- /dev/null +++ b/src/ripple/basics/LocalValue.h @@ -0,0 +1,125 @@ +//------------------------------------------------------------------------------ +/* + This file is part of rippled: https://github.com/ripple/rippled + Copyright (c) 2012, 2013 Ripple Labs Inc. + + Permission to use, copy, modify, and/or distribute this software for any + purpose with or without fee is hereby granted, provided that the above + copyright notice and this permission notice appear in all copies. + + THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES + WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF + MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR + ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES + WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN + ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF + OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. +*/ +//============================================================================== + +#ifndef RIPPLE_BASICS_LOCALVALUE_H_INCLUDED +#define RIPPLE_BASICS_LOCALVALUE_H_INCLUDED + +#include +#include +#include + +namespace ripple { + +namespace detail { + +struct LocalValues +{ + bool onCoro = true; + + struct BasicValue + { + virtual ~BasicValue() = default; + virtual void* get() = 0; + }; + + template + struct Value : BasicValue + { + T t_; + + Value() = default; + Value(T const& t) : t_(t) {} + + void* get() override + { + return &t_; + } + }; + + std::unordered_map> values; + + static + inline + void + cleanup(LocalValues* lvs) + { + if (lvs && ! lvs->onCoro) + delete lvs; + } +}; + +template +boost::thread_specific_ptr& +getLocalValues() +{ + static boost::thread_specific_ptr< + detail::LocalValues> tsp(&detail::LocalValues::cleanup); + return tsp; +} + +} // detail + +template +class LocalValue +{ +public: + template + LocalValue(Args&&... args) + : t_(std::forward(args)...) + { + } + + /** Stores instance of T specific to the calling coroutine or thread. */ + T& operator*(); + + /** Stores instance of T specific to the calling coroutine or thread. */ + T* operator->() + { + return &**this; + } + +private: + T t_; +}; + +template +T& +LocalValue::operator*() +{ + auto lvs = detail::getLocalValues().get(); + if (! lvs) + { + lvs = new detail::LocalValues(); + lvs->onCoro = false; + detail::getLocalValues().reset(lvs); + } + else + { + auto const iter = lvs->values.find(this); + if (iter != lvs->values.end()) + return *reinterpret_cast(iter->second->get()); + } + + return *reinterpret_cast(lvs->values.emplace(this, + std::make_unique>(t_)). + first->second->get()); +} +} // ripple + +#endif diff --git a/src/ripple/core/JobCoro.h b/src/ripple/core/JobCoro.h index bcaa2b57f0..fa6f02682b 100644 --- a/src/ripple/core/JobCoro.h +++ b/src/ripple/core/JobCoro.h @@ -21,10 +21,12 @@ #define RIPPLE_CORE_JOBCORO_H_INCLUDED #include +#include #include #include #include #include +#include #include namespace ripple { @@ -40,6 +42,7 @@ struct JobCoro_create_t { }; class JobCoro : public std::enable_shared_from_this { private: + detail::LocalValues lvs_; JobQueue& jq_; JobType type_; std::string name_; diff --git a/src/ripple/core/JobCoro.ipp b/src/ripple/core/JobCoro.ipp index 947000bca8..727b9a5937 100644 --- a/src/ripple/core/JobCoro.ipp +++ b/src/ripple/core/JobCoro.ipp @@ -60,13 +60,15 @@ JobCoro::post() jq_.addJob(type_, name_, [this, sp = shared_from_this()](Job&) { + auto saved = detail::getLocalValues().release(); + detail::getLocalValues().reset(&lvs_); std::lock_guard lock(mutex_); - context_sp().reset(&ctx_); coro_(); + detail::getLocalValues().release(); + detail::getLocalValues().reset(saved); std::lock_guard lk(mutex_run_); running_ = false; cv_.notify_all(); - context_sp().reset(nullptr); }); } diff --git a/src/ripple/core/tests/Coroutine.test.cpp b/src/ripple/core/tests/Coroutine.test.cpp index f970bad485..baa426cd25 100644 --- a/src/ripple/core/tests/Coroutine.test.cpp +++ b/src/ripple/core/tests/Coroutine.test.cpp @@ -18,13 +18,11 @@ //============================================================================== #include -#include #include #include #include #include #include -#include namespace ripple { namespace test { @@ -32,82 +30,152 @@ namespace test { class Coroutine_test : public beast::unit_test::suite { public: + class gate + { + private: + std::condition_variable cv_; + std::mutex mutex_; + bool signaled_ = false; + + public: + // Thread safe, blocks until signaled or period expires. + // Returns `true` if signaled. + template + bool + wait_for(std::chrono::duration const& rel_time) + { + std::unique_lock lk(mutex_); + auto b = cv_.wait_for(lk, rel_time, [=]{ return signaled_; }); + signaled_ = false; + return b; + } + + void + signal() + { + std::lock_guard lk(mutex_); + signaled_ = true; + cv_.notify_all(); + } + }; + void - test_coroutine() + correct_order() { using namespace std::chrono_literals; using namespace jtx; Env env(*this); - std::atomic i{0}; - std::condition_variable cv; auto& jq = env.app().getJobQueue(); jq.setThreadCount(0, false); + gate g1, g2; + std::shared_ptr jc; jq.postCoro(jtCLIENT, "Coroutine-Test", - [&](std::shared_ptr jc) + [&](auto const& jcr) { - std::thread t( - [&i, jc]() - { - std::this_thread::sleep_for(20ms); - ++i; - jc->post(); - }); + jc = jcr; + g1.signal(); jc->yield(); - t.join(); - ++i; - cv.notify_one(); + g2.signal(); }); - - { - std::mutex m; - std::unique_lock lk(m); - expect(cv.wait_for(lk, 1s, - [&]() - { - return i == 2; - })); - } - jq.shutdown(); - expect(i == 2); + expect(g1.wait_for(5s)); + jc->join(); + jc->post(); + expect(g2.wait_for(5s)); } void - test_incorrect_order() + incorrect_order() { using namespace std::chrono_literals; using namespace jtx; Env env(*this); - std::atomic i{0}; - std::condition_variable cv; auto& jq = env.app().getJobQueue(); jq.setThreadCount(0, false); + gate g; jq.postCoro(jtCLIENT, "Coroutine-Test", - [&](std::shared_ptr jc) + [&](auto const& jc) { jc->post(); jc->yield(); - ++i; - cv.notify_one(); + g.signal(); }); + expect(g.wait_for(5s)); + } + void + thread_specific_storage() + { + using namespace std::chrono_literals; + using namespace jtx; + Env env(*this); + auto& jq = env.app().getJobQueue(); + jq.setThreadCount(0, true); + static int const N = 4; + std::array, N> a; + + LocalValue lv(-1); + expect(*lv == -1); + + gate g; + jq.addJob(jtCLIENT, "LocalValue-Test", + [&](auto const& job) + { + this->expect(*lv == -1); + *lv = -2; + this->expect(*lv == -2); + g.signal(); + }); + expect(g.wait_for(5s)); + expect(*lv == -1); + + for(int i = 0; i < N; ++i) { - std::mutex m; - std::unique_lock lk(m); - expect(cv.wait_for(lk, 1s, - [&]() + jq.postCoro(jtCLIENT, "Coroutine-Test", + [&, id = i](auto const& jc) { - return i == 1; - })); + a[id] = jc; + g.signal(); + jc->yield(); + + this->expect(*lv == -1); + *lv = id; + this->expect(*lv == id); + g.signal(); + jc->yield(); + + this->expect(*lv == id); + }); + expect(g.wait_for(5s)); + a[i]->join(); } - jq.shutdown(); - expect(i == 1); + for(auto const& jc : a) + { + jc->post(); + expect(g.wait_for(5s)); + jc->join(); + } + for(auto const& jc : a) + { + jc->post(); + jc->join(); + } + + jq.addJob(jtCLIENT, "LocalValue-Test", + [&](auto const& job) + { + this->expect(*lv == -2); + g.signal(); + }); + expect(g.wait_for(5s)); + expect(*lv == -1); } void run() { - test_coroutine(); - test_incorrect_order(); + correct_order(); + incorrect_order(); + thread_specific_storage(); } };