diff --git a/Builds/VisualStudio2015/RippleD.vcxproj b/Builds/VisualStudio2015/RippleD.vcxproj
index 6eafb27e5..af49afa6b 100644
--- a/Builds/VisualStudio2015/RippleD.vcxproj
+++ b/Builds/VisualStudio2015/RippleD.vcxproj
@@ -1954,6 +1954,8 @@
+
+
diff --git a/Builds/VisualStudio2015/RippleD.vcxproj.filters b/Builds/VisualStudio2015/RippleD.vcxproj.filters
index 0567e23ef..a8646aff5 100644
--- a/Builds/VisualStudio2015/RippleD.vcxproj.filters
+++ b/Builds/VisualStudio2015/RippleD.vcxproj.filters
@@ -2667,6 +2667,9 @@
ripple\basics
+
+ ripple\basics
+
ripple\basics
diff --git a/src/ripple/basics/LocalValue.h b/src/ripple/basics/LocalValue.h
new file mode 100644
index 000000000..28e8a6b3a
--- /dev/null
+++ b/src/ripple/basics/LocalValue.h
@@ -0,0 +1,125 @@
+//------------------------------------------------------------------------------
+/*
+ This file is part of rippled: https://github.com/ripple/rippled
+ Copyright (c) 2012, 2013 Ripple Labs Inc.
+
+ Permission to use, copy, modify, and/or distribute this software for any
+ purpose with or without fee is hereby granted, provided that the above
+ copyright notice and this permission notice appear in all copies.
+
+ THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
+ WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
+ MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
+ ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
+ WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
+ ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
+ OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
+*/
+//==============================================================================
+
+#ifndef RIPPLE_BASICS_LOCALVALUE_H_INCLUDED
+#define RIPPLE_BASICS_LOCALVALUE_H_INCLUDED
+
+#include
+#include
+#include
+
+namespace ripple {
+
+namespace detail {
+
+struct LocalValues
+{
+ bool onCoro = true;
+
+ struct BasicValue
+ {
+ virtual ~BasicValue() = default;
+ virtual void* get() = 0;
+ };
+
+ template
+ struct Value : BasicValue
+ {
+ T t_;
+
+ Value() = default;
+ Value(T const& t) : t_(t) {}
+
+ void* get() override
+ {
+ return &t_;
+ }
+ };
+
+ std::unordered_map> values;
+
+ static
+ inline
+ void
+ cleanup(LocalValues* lvs)
+ {
+ if (lvs && ! lvs->onCoro)
+ delete lvs;
+ }
+};
+
+template
+boost::thread_specific_ptr&
+getLocalValues()
+{
+ static boost::thread_specific_ptr<
+ detail::LocalValues> tsp(&detail::LocalValues::cleanup);
+ return tsp;
+}
+
+} // detail
+
+template
+class LocalValue
+{
+public:
+ template
+ LocalValue(Args&&... args)
+ : t_(std::forward(args)...)
+ {
+ }
+
+ /** Stores instance of T specific to the calling coroutine or thread. */
+ T& operator*();
+
+ /** Stores instance of T specific to the calling coroutine or thread. */
+ T* operator->()
+ {
+ return &**this;
+ }
+
+private:
+ T t_;
+};
+
+template
+T&
+LocalValue::operator*()
+{
+ auto lvs = detail::getLocalValues().get();
+ if (! lvs)
+ {
+ lvs = new detail::LocalValues();
+ lvs->onCoro = false;
+ detail::getLocalValues().reset(lvs);
+ }
+ else
+ {
+ auto const iter = lvs->values.find(this);
+ if (iter != lvs->values.end())
+ return *reinterpret_cast(iter->second->get());
+ }
+
+ return *reinterpret_cast(lvs->values.emplace(this,
+ std::make_unique>(t_)).
+ first->second->get());
+}
+} // ripple
+
+#endif
diff --git a/src/ripple/core/JobCoro.h b/src/ripple/core/JobCoro.h
index bcaa2b57f..fa6f02682 100644
--- a/src/ripple/core/JobCoro.h
+++ b/src/ripple/core/JobCoro.h
@@ -21,10 +21,12 @@
#define RIPPLE_CORE_JOBCORO_H_INCLUDED
#include
+#include
#include
#include
#include
#include
+#include
#include
namespace ripple {
@@ -40,6 +42,7 @@ struct JobCoro_create_t { };
class JobCoro : public std::enable_shared_from_this
{
private:
+ detail::LocalValues lvs_;
JobQueue& jq_;
JobType type_;
std::string name_;
diff --git a/src/ripple/core/JobCoro.ipp b/src/ripple/core/JobCoro.ipp
index 947000bca..727b9a593 100644
--- a/src/ripple/core/JobCoro.ipp
+++ b/src/ripple/core/JobCoro.ipp
@@ -60,13 +60,15 @@ JobCoro::post()
jq_.addJob(type_, name_,
[this, sp = shared_from_this()](Job&)
{
+ auto saved = detail::getLocalValues().release();
+ detail::getLocalValues().reset(&lvs_);
std::lock_guard lock(mutex_);
- context_sp().reset(&ctx_);
coro_();
+ detail::getLocalValues().release();
+ detail::getLocalValues().reset(saved);
std::lock_guard lk(mutex_run_);
running_ = false;
cv_.notify_all();
- context_sp().reset(nullptr);
});
}
diff --git a/src/ripple/core/tests/Coroutine.test.cpp b/src/ripple/core/tests/Coroutine.test.cpp
index f970bad48..baa426cd2 100644
--- a/src/ripple/core/tests/Coroutine.test.cpp
+++ b/src/ripple/core/tests/Coroutine.test.cpp
@@ -18,13 +18,11 @@
//==============================================================================
#include
-#include
#include
#include
#include
#include
#include
-#include
namespace ripple {
namespace test {
@@ -32,82 +30,152 @@ namespace test {
class Coroutine_test : public beast::unit_test::suite
{
public:
+ class gate
+ {
+ private:
+ std::condition_variable cv_;
+ std::mutex mutex_;
+ bool signaled_ = false;
+
+ public:
+ // Thread safe, blocks until signaled or period expires.
+ // Returns `true` if signaled.
+ template
+ bool
+ wait_for(std::chrono::duration const& rel_time)
+ {
+ std::unique_lock lk(mutex_);
+ auto b = cv_.wait_for(lk, rel_time, [=]{ return signaled_; });
+ signaled_ = false;
+ return b;
+ }
+
+ void
+ signal()
+ {
+ std::lock_guard lk(mutex_);
+ signaled_ = true;
+ cv_.notify_all();
+ }
+ };
+
void
- test_coroutine()
+ correct_order()
{
using namespace std::chrono_literals;
using namespace jtx;
Env env(*this);
- std::atomic i{0};
- std::condition_variable cv;
auto& jq = env.app().getJobQueue();
jq.setThreadCount(0, false);
+ gate g1, g2;
+ std::shared_ptr jc;
jq.postCoro(jtCLIENT, "Coroutine-Test",
- [&](std::shared_ptr jc)
+ [&](auto const& jcr)
{
- std::thread t(
- [&i, jc]()
- {
- std::this_thread::sleep_for(20ms);
- ++i;
- jc->post();
- });
+ jc = jcr;
+ g1.signal();
jc->yield();
- t.join();
- ++i;
- cv.notify_one();
+ g2.signal();
});
-
- {
- std::mutex m;
- std::unique_lock lk(m);
- expect(cv.wait_for(lk, 1s,
- [&]()
- {
- return i == 2;
- }));
- }
- jq.shutdown();
- expect(i == 2);
+ expect(g1.wait_for(5s));
+ jc->join();
+ jc->post();
+ expect(g2.wait_for(5s));
}
void
- test_incorrect_order()
+ incorrect_order()
{
using namespace std::chrono_literals;
using namespace jtx;
Env env(*this);
- std::atomic i{0};
- std::condition_variable cv;
auto& jq = env.app().getJobQueue();
jq.setThreadCount(0, false);
+ gate g;
jq.postCoro(jtCLIENT, "Coroutine-Test",
- [&](std::shared_ptr jc)
+ [&](auto const& jc)
{
jc->post();
jc->yield();
- ++i;
- cv.notify_one();
+ g.signal();
});
+ expect(g.wait_for(5s));
+ }
+ void
+ thread_specific_storage()
+ {
+ using namespace std::chrono_literals;
+ using namespace jtx;
+ Env env(*this);
+ auto& jq = env.app().getJobQueue();
+ jq.setThreadCount(0, true);
+ static int const N = 4;
+ std::array, N> a;
+
+ LocalValue lv(-1);
+ expect(*lv == -1);
+
+ gate g;
+ jq.addJob(jtCLIENT, "LocalValue-Test",
+ [&](auto const& job)
+ {
+ this->expect(*lv == -1);
+ *lv = -2;
+ this->expect(*lv == -2);
+ g.signal();
+ });
+ expect(g.wait_for(5s));
+ expect(*lv == -1);
+
+ for(int i = 0; i < N; ++i)
{
- std::mutex m;
- std::unique_lock lk(m);
- expect(cv.wait_for(lk, 1s,
- [&]()
+ jq.postCoro(jtCLIENT, "Coroutine-Test",
+ [&, id = i](auto const& jc)
{
- return i == 1;
- }));
+ a[id] = jc;
+ g.signal();
+ jc->yield();
+
+ this->expect(*lv == -1);
+ *lv = id;
+ this->expect(*lv == id);
+ g.signal();
+ jc->yield();
+
+ this->expect(*lv == id);
+ });
+ expect(g.wait_for(5s));
+ a[i]->join();
}
- jq.shutdown();
- expect(i == 1);
+ for(auto const& jc : a)
+ {
+ jc->post();
+ expect(g.wait_for(5s));
+ jc->join();
+ }
+ for(auto const& jc : a)
+ {
+ jc->post();
+ jc->join();
+ }
+
+ jq.addJob(jtCLIENT, "LocalValue-Test",
+ [&](auto const& job)
+ {
+ this->expect(*lv == -2);
+ g.signal();
+ });
+ expect(g.wait_for(5s));
+ expect(*lv == -1);
}
void
run()
{
- test_coroutine();
- test_incorrect_order();
+ correct_order();
+ incorrect_order();
+ thread_specific_storage();
}
};