From 7cfac1a91a4b95a83704ffbddcd45c278b3d1f09 Mon Sep 17 00:00:00 2001 From: Tom Ritchford Date: Mon, 10 Nov 2014 12:47:08 -0500 Subject: [PATCH] Wrap Output in a coroutine that periodically yields. --- Builds/VisualStudio2013/RippleD.vcxproj | 8 ++ .../VisualStudio2013/RippleD.vcxproj.filters | 9 ++ src/ripple/rpc/Coroutine.h | 54 +++++++++ src/ripple/rpc/Yield.h | 100 +++++++++++++++++ src/ripple/rpc/impl/Coroutine.cpp | 59 ++++++++++ src/ripple/rpc/impl/Coroutine.test.cpp | 74 ++++++++++++ src/ripple/rpc/impl/TestOutputSuite.h | 5 +- src/ripple/rpc/impl/Yield.cpp | 76 +++++++++++++ src/ripple/rpc/impl/Yield.test.cpp | 105 ++++++++++++++++++ src/ripple/unity/rpcx.cpp | 10 +- 10 files changed, 495 insertions(+), 5 deletions(-) create mode 100644 src/ripple/rpc/Coroutine.h create mode 100644 src/ripple/rpc/Yield.h create mode 100644 src/ripple/rpc/impl/Coroutine.cpp create mode 100644 src/ripple/rpc/impl/Coroutine.test.cpp create mode 100644 src/ripple/rpc/impl/Yield.cpp create mode 100644 src/ripple/rpc/impl/Yield.test.cpp diff --git a/Builds/VisualStudio2013/RippleD.vcxproj b/Builds/VisualStudio2013/RippleD.vcxproj index 3e08997d20..fdfb01ef77 100644 --- a/Builds/VisualStudio2013/RippleD.vcxproj +++ b/Builds/VisualStudio2013/RippleD.vcxproj @@ -3075,6 +3075,12 @@ + + True + + + True + @@ -3087,6 +3093,8 @@ + + diff --git a/Builds/VisualStudio2013/RippleD.vcxproj.filters b/Builds/VisualStudio2013/RippleD.vcxproj.filters index 042774126b..855f0e1484 100644 --- a/Builds/VisualStudio2013/RippleD.vcxproj.filters +++ b/Builds/VisualStudio2013/RippleD.vcxproj.filters @@ -4185,6 +4185,12 @@ ripple\rpc\impl + + ripple\rpc\impl + + + ripple\rpc\impl + ripple\rpc @@ -4203,6 +4209,9 @@ ripple\rpc + + ripple\rpc + ripple\server diff --git a/src/ripple/rpc/Coroutine.h b/src/ripple/rpc/Coroutine.h new file mode 100644 index 0000000000..b0c0136a9f --- /dev/null +++ b/src/ripple/rpc/Coroutine.h @@ -0,0 +1,54 @@ +//------------------------------------------------------------------------------ +/* + This file is part of rippled: https://github.com/ripple/rippled + Copyright (c) 2012, 2013 Ripple Labs Inc. + + Permission to use, copy, modify, and/or distribute this software for any + purpose with or without fee is hereby granted, provided that the above + copyright notice and this permission notice appear in all copies. + + THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES + WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF + MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR + ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES + WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN + ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF + OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. +*/ +//============================================================================== + +#ifndef RIPPLED_RIPPLE_RPC_COROUTINE_H +#define RIPPLED_RIPPLE_RPC_COROUTINE_H + +#include + +namespace ripple { +namespace RPC { + +/** Runs a function that takes a yield as a coroutine. */ +class Coroutine +{ +public: + using YieldFunction = std::function ; + + explicit Coroutine (YieldFunction const&); + ~Coroutine(); + + /** Is the coroutine finished? */ + operator bool() const; + + /** Run one more step of the coroutine. */ + void operator()() const; + +private: + struct Impl; + + std::shared_ptr impl_; + // We'd prefer to use std::unique_ptr here, but unfortunately, in C++11 + // move semantics don't work well with `std::bind` or lambdas. +}; + +} // RPC +} // ripple + +#endif diff --git a/src/ripple/rpc/Yield.h b/src/ripple/rpc/Yield.h new file mode 100644 index 0000000000..40f99cc400 --- /dev/null +++ b/src/ripple/rpc/Yield.h @@ -0,0 +1,100 @@ +//------------------------------------------------------------------------------ +/* + This file is part of rippled: https://github.com/ripple/rippled + Copyright (c) 2012, 2013 Ripple Labs Inc. + + Permission to use, copy, modify, and/or distribute this software for any + purpose with or without fee is hereby granted, provided that the above + copyright notice and this permission notice appear in all copies. + + THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES + WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF + MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR + ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES + WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN + ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF + OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. +*/ +//============================================================================== + +#ifndef RIPPLED_RIPPLE_RPC_YIELD_H +#define RIPPLED_RIPPLE_RPC_YIELD_H + +#include +#include +#include + +namespace ripple { + +class Section; + +namespace RPC { + +/** Yield is a generic placeholder for a function that yields control of + execution - perhaps to another coroutine. + + When code calls Yield, it might block for an indeterminate period of time. + + By convention you must not be holding any locks or any resource that would + prevent any other task from making forward progress when you call Yield. +*/ +using Yield = std::function ; + +/** Wrap an Output so it yields after approximately `chunkSize` bytes. + + chunkedYieldingOutput() only yields after a call to output(), so there might + more than chunkSize bytes sent between calls to yield(). + + chunkedYieldingOutput() also only yields before it's about to output more + data. This is to avoid the case where you yield after outputting data, but + then never send more data. + */ +Output chunkedYieldingOutput ( + Output const&, Yield const&, std::size_t chunkSize); + +/** Yield every yieldCount calls. If yieldCount is 0, never yield. */ +class CountedYield +{ +public: + CountedYield (std::size_t yieldCount, Yield const& yield); + void yield(); + +private: + std::size_t count_ = 0; + std::size_t const yieldCount_; + Yield const yield_; +}; + +/** When do we yield when performing a ledger computation? */ +struct YieldStrategy +{ + enum class Streaming {no, yes}; + enum class UseCoroutines {no, yes}; + + /** Is the data streamed, or generated monolithically? */ + Streaming streaming = Streaming::no; + + /** Are results generated in a coroutine? If this is no, then the code can + never yield. */ + UseCoroutines useCoroutines = UseCoroutines::no; + + /** How many bytes do we emit before yielding? 0 means "never yield due to + number of bytes sent". */ + std::size_t byteYieldCount = 0; + + /** How many accounts do we process before yielding? 0 means "never yield + due to number of accounts processed." */ + std::size_t accountYieldCount = 0; + + /** How many transactions do we process before yielding? 0 means "never + yield due to number of transactions processed." */ + std::size_t transactionYieldCount = 0; +}; + +/** Create a yield strategy from a configuration Section. */ +YieldStrategy makeYieldStrategy (Section const&); + +} // RPC +} // ripple + +#endif diff --git a/src/ripple/rpc/impl/Coroutine.cpp b/src/ripple/rpc/impl/Coroutine.cpp new file mode 100644 index 0000000000..fc3c007ab5 --- /dev/null +++ b/src/ripple/rpc/impl/Coroutine.cpp @@ -0,0 +1,59 @@ +//------------------------------------------------------------------------------ +/* + This file is part of rippled: https://github.com/ripple/rippled + Copyright (c) 2012, 2013 Ripple Labs Inc. + + Permission to use, copy, modify, and/or distribute this software for any + purpose with or without fee is hereby granted, provided that the above + copyright notice and this permission notice appear in all copies. + + THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES + WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF + MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR + ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES + WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN + ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF + OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. +*/ +//============================================================================== + +#include +#include + +namespace ripple { +namespace RPC { + +using CoroutinePull = boost::coroutines::coroutine ::pull_type; + +struct Coroutine::Impl : CoroutinePull +{ + Impl (CoroutinePull&& p) : CoroutinePull (std::move(p)) {} +}; + +Coroutine::Coroutine (YieldFunction const& yieldFunction) +{ + CoroutinePull pull ([yieldFunction] ( + boost::coroutines::coroutine ::push_type& push) + { + Yield yield = [&push] () { push(); }; + yield (); + yieldFunction (yield); + }); + + impl_ = std::make_shared (std::move (pull)); +} + +Coroutine::~Coroutine() = default; + +Coroutine::operator bool() const +{ + return bool (*impl_); +} + +void Coroutine::operator()() const +{ + (*impl_)(); +} + +} // RPC +} // ripple diff --git a/src/ripple/rpc/impl/Coroutine.test.cpp b/src/ripple/rpc/impl/Coroutine.test.cpp new file mode 100644 index 0000000000..d97706f8cb --- /dev/null +++ b/src/ripple/rpc/impl/Coroutine.test.cpp @@ -0,0 +1,74 @@ +//------------------------------------------------------------------------------ +/* + This file is part of rippled: https://github.com/ripple/rippled + Copyright (c) 2012, 2013 Ripple Labs Inc. + + Permission to use, copy, modify, and/or distribute this software for any + purpose with or without fee is hereby granted, provided that the above + copyright notice and this permission notice appear in all copies. + + THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES + WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF + MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR + ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES + WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN + ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF + OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. +*/ +//============================================================================== + +#include +#include +#include + +namespace ripple { +namespace RPC { + +class Coroutine_test : public TestOutputSuite +{ +public: + using Strings = std::vector ; + + void test (std::string const& name, int chunkSize, Strings const& expected) + { + setup (name); + + std::string buffer; + Output output = stringOutput (buffer); + + auto coroutine = Coroutine ([=] (Yield yield) + { + auto out = chunkedYieldingOutput (output, yield, chunkSize); + out ("hello "); + out ("there "); + out ("world."); + }); + + Strings result; + while (coroutine) + { + coroutine(); + result.push_back (buffer); + } + + auto r = strJoin (result.begin(), result.end(), ", "); + auto e = strJoin (expected.begin(), expected.end(), ", "); + expectEquals (r, e); + } + + void run() override + { + test ("zero", 0, {"hello ", "hello there ", "hello there world."}); + test ("three", 3, {"hello ", "hello there ", "hello there world."}); + test ("five", 5, {"hello ", "hello there ", "hello there world."}); + test ("seven", 7, {"hello there ", "hello there world."}); + test ("ten", 10, {"hello there ", "hello there world."}); + test ("thirteen", 13, {"hello there world."}); + test ("fifteen", 15, {"hello there world."}); + } +}; + +BEAST_DEFINE_TESTSUITE(Coroutine, RPC, ripple); + +} // RPC +} // ripple diff --git a/src/ripple/rpc/impl/TestOutputSuite.h b/src/ripple/rpc/impl/TestOutputSuite.h index 86e7561b9a..6eb12a8d9f 100644 --- a/src/ripple/rpc/impl/TestOutputSuite.h +++ b/src/ripple/rpc/impl/TestOutputSuite.h @@ -45,11 +45,12 @@ protected: // Test the result and report values. void expectResult (std::string const& expected) { - expectResult (output_, expected); + writer_.reset (); + expectEquals (output_, expected); } // Test the result and report values. - void expectResult (std::string const& result, std::string const& expected) + void expectEquals (std::string const& result, std::string const& expected) { expect (result == expected, "\n" "result: '" + result + "'" + diff --git a/src/ripple/rpc/impl/Yield.cpp b/src/ripple/rpc/impl/Yield.cpp new file mode 100644 index 0000000000..c9101f5319 --- /dev/null +++ b/src/ripple/rpc/impl/Yield.cpp @@ -0,0 +1,76 @@ +//------------------------------------------------------------------------------ +/* + This file is part of rippled: https://github.com/ripple/rippled + Copyright (c) 2012, 2013 Ripple Labs Inc. + + Permission to use, copy, modify, and/or distribute this software for any + purpose with or without fee is hereby granted, provided that the above + copyright notice and this permission notice appear in all copies. + + THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES + WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF + MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR + ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES + WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN + ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF + OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. +*/ +//============================================================================== + +#include +#include + +namespace ripple { +namespace RPC { + +Output chunkedYieldingOutput ( + Output const& output, Yield const& yield, std::size_t chunkSize) +{ + auto count = std::make_shared (0); + return [chunkSize, count, output, yield] (boost::string_ref const& bytes) + { + if (*count > chunkSize) + { + yield(); + *count = 0; + } + output (bytes); + *count += bytes.size(); + }; +} + + +CountedYield::CountedYield (std::size_t yieldCount, Yield const& yield) + : yieldCount_ (yieldCount), yield_ (yield) +{ +} + +void CountedYield::yield() +{ + if (yieldCount_) { + if (++count_ >= yieldCount_) + { + yield_(); + count_ = 0; + } + } +} + +YieldStrategy makeYieldStrategy (Section const& s) +{ + YieldStrategy ys; + ys.streaming = get (s, "streaming") ? + YieldStrategy::Streaming::yes : + YieldStrategy::Streaming::no; + ys.useCoroutines = get (s, "use_coroutines") ? + YieldStrategy::UseCoroutines::yes : + YieldStrategy::UseCoroutines::no; + ys.byteYieldCount = get (s, "byte_yield_count"); + ys.accountYieldCount = get (s, "account_yield_count"); + ys.transactionYieldCount = get (s, "transaction_yield_count"); + + return ys; +} + +} // RPC +} // ripple diff --git a/src/ripple/rpc/impl/Yield.test.cpp b/src/ripple/rpc/impl/Yield.test.cpp new file mode 100644 index 0000000000..695f17460e --- /dev/null +++ b/src/ripple/rpc/impl/Yield.test.cpp @@ -0,0 +1,105 @@ +//------------------------------------------------------------------------------ +/* + This file is part of rippled: https://github.com/ripple/rippled + Copyright (c) 2012, 2013 Ripple Labs Inc. + + Permission to use, copy, modify, and/or distribute this software for any + purpose with or without fee is hereby granted, provided that the above + copyright notice and this permission notice appear in all copies. + + THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES + WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF + MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR + ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES + WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN + ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF + OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. +*/ +//============================================================================== + +#include +#include + +namespace ripple { +namespace RPC { + +struct Yield_test : TestOutputSuite +{ + void chunkedYieldingTest () + { + setup ("chunkedYieldingTest"); + std::string lastYield; + + auto yield = [&]() { lastYield = output_; }; + auto output = chunkedYieldingOutput (stringOutput (output_), yield, 5); + output ("hello"); + expectResult ("hello"); + expectEquals (lastYield, ""); + + output (", th"); // Goes over the boundary. + expectResult ("hello, th"); + expectEquals (lastYield, ""); + + output ("ere!"); // Forces a yield. + expectResult ("hello, there!"); + expectEquals (lastYield, "hello, th"); + + output ("!!"); + expectResult ("hello, there!!!"); + expectEquals (lastYield, "hello, th"); + + output (""); // Forces a yield. + expectResult ("hello, there!!!"); + expectEquals (lastYield, "hello, there!!!"); + } + + void trivialCountedYieldTest() + { + setup ("trivialCountedYield"); + + auto didYield = false; + auto yield = [&]() { didYield = true; }; + + CountedYield cy (0, yield); + + for (auto i = 0; i < 4; ++i) + { + cy.yield(); + expect (!didYield, "We yielded when we shouldn't have."); + } + } + + void countedYieldTest() + { + setup ("countedYield"); + + auto didYield = false; + auto yield = [&]() { didYield = true; }; + + CountedYield cy (5, yield); + + for (auto j = 0; j < 3; ++j) + { + for (auto i = 0; i < 4; ++i) + { + cy.yield(); + expect (!didYield, "We yielded when we shouldn't have."); + } + cy.yield(); + expect (didYield, "We didn't yield"); + didYield = false; + } + } + + void run () override + { + chunkedYieldingTest(); + trivialCountedYieldTest(); + countedYieldTest(); + } +}; + +BEAST_DEFINE_TESTSUITE(Yield, ripple_basics, ripple); + +} // RPC +} // ripple diff --git a/src/ripple/unity/rpcx.cpp b/src/ripple/unity/rpcx.cpp index 0167b6e578..cfa0664ff1 100644 --- a/src/ripple/unity/rpcx.cpp +++ b/src/ripple/unity/rpcx.cpp @@ -30,13 +30,13 @@ #include #include -#include #include #include #include #include #include #include +#include #include #include @@ -110,5 +110,9 @@ #include #include -#include -#include +#include +#include +#include +#include +#include +#include