postgres support

This commit is contained in:
CJ Cobb
2020-12-17 11:43:48 -05:00
parent 4ac6d61b05
commit d6e54c398a
7 changed files with 2059 additions and 47 deletions

View File

@@ -29,7 +29,6 @@ list(APPEND CMAKE_MODULE_PATH "${CMAKE_CURRENT_SOURCE_DIR}/deps")
include(ExternalProject)
message(${CMAKE_CURRENT_BINARY_DIR})
message(${CMAKE_MODULE_PATH})
include(cassandra)
#include(rippled/Builds/CMake/RippledCore.cmake)
add_subdirectory(rippled)
target_link_libraries(reporting PUBLIC xrpl_core grpc_pbufs)
@@ -48,6 +47,11 @@ INCLUDE_DIRECTORIES(${grpc_includes})
INCLUDE_DIRECTORIES(${SOURCE_DIR}/src)
ExternalProject_Get_Property(grpc_src SOURCE_DIR)
INCLUDE_DIRECTORIES(${SOURCE_DIR}/include)
get_target_property(xrpl_core_includes xrpl_core INCLUDE_DIRECTORIES)
message("${xrpl_core_includes}")
INCLUDE_DIRECTORIES(${xrpl_core_includes})
include(cassandra)
include(Postgres)
#include (FetchContent)
#FetchContent_Declare(
# rippled
@@ -118,7 +122,7 @@ INCLUDE_DIRECTORIES(${SOURCE_DIR}/include)
#target_link_libraries (grpc_protobufs ${_REFLECTION} ${_PROTOBUF_LIBPROTOBUF} ${_GRPC_GRPCPP})
target_sources(reporting PRIVATE reporting/ETLSource.cpp reporting/ReportingBackend.cpp)
target_sources(reporting PRIVATE reporting/ETLSource.cpp reporting/ReportingBackend.cpp reporting/Pg.cpp)
message(${Boost_LIBRARIES})

66
deps/Postgres.cmake vendored Normal file
View File

@@ -0,0 +1,66 @@
find_package(PostgreSQL)
if(NOT PostgreSQL_FOUND)
message("find_package did not find postgres")
find_library(postgres NAMES pq libpq libpq-dev pq-dev postgresql-devel)
find_path(libpq-fe NAMES libpq-fe.h PATH_SUFFIXES postgresql pgsql include)
if(NOT libpq-fe_FOUND OR NOT postgres_FOUND)
message("No system installed Postgres found. Will build")
add_library(postgres SHARED IMPORTED GLOBAL)
ExternalProject_Add(postgres_src
PREFIX ${nih_cache_path}
GIT_REPOSITORY https://github.com/postgres/postgres.git
GIT_TAG master
CONFIGURE_COMMAND ./configure --without-readline
BUILD_COMMAND ${CMAKE_COMMAND} -E env --unset=MAKELEVEL make
UPDATE_COMMAND ""
BUILD_IN_SOURCE 1
INSTALL_COMMAND ""
BUILD_BYPRODUCTS <BINARY_DIR>/src/interfaces/libpq/${ep_lib_prefix}pq.so
)
ExternalProject_Get_Property (postgres_src SOURCE_DIR)
ExternalProject_Get_Property (postgres_src BINARY_DIR)
set (postgres_src_SOURCE_DIR "${SOURCE_DIR}")
file (MAKE_DIRECTORY ${postgres_src_SOURCE_DIR})
list(APPEND INCLUDE_DIRS ${SOURCE_DIR}/src/include)
list(APPEND INCLUDE_DIRS ${SOURCE_DIR}/src/interfaces/libpq)
set_target_properties (postgres PROPERTIES
IMPORTED_LOCATION
${BINARY_DIR}/src/interfaces/libpq/${ep_lib_prefix}pq.so
INTERFACE_INCLUDE_DIRECTORIES
"${INCLUDE_DIRS}")
add_dependencies(postgres postgres_src)
file(TO_CMAKE_PATH "${postgres_src_SOURCE_DIR}" postgres_src_SOURCE_DIR)
target_link_libraries(reporting INTERFACE postgres)
else()
message("Found system installed Postgres via find_libary")
target_include_directories(reporting INTERFACE ${libpq-fe})
target_link_libraries(reporting INTERFACE ${postgres})
endif()
else()
message("Found system installed Postgres via find_package")
message("${PostgreSQL_INCLUDE_DIRS}")
include_directories(reporting INTERFACE "${PostgreSQL_INCLUDE_DIRS}")
target_link_libraries(reporting PUBLIC ${PostgreSQL_LIBRARIES})
endif()

View File

@@ -30,12 +30,14 @@
// Primarly used in read-only mode, to monitor when ledgers are validated
ETLSource::ETLSource(
boost::json::object const& config,
CassandraFlatMapBackend& backend)
CassandraFlatMapBackend& backend,
NetworkValidatedLedgers& networkValidatedLedgers)
: ws_(std::make_unique<
boost::beast::websocket::stream<boost::beast::tcp_stream>>(
boost::asio::make_strand(ioc_)))
, resolver_(boost::asio::make_strand(ioc_))
, timer_(ioc_)
, networkValidatedLedgers_(networkValidatedLedgers)
, backend_(backend)
{
if (config.contains("ip"))
@@ -352,7 +354,7 @@ ETLSource::handleMessage()
<< __func__ << " : "
<< "Pushing ledger sequence = " << ledgerIndex << " - "
<< toString();
// networkValidatedLedgers_.push(ledgerIndex);
networkValidatedLedgers_.push(ledgerIndex);
}
return true;
}
@@ -554,12 +556,13 @@ ETLSource::fetchLedger(uint32_t ledgerSequence, bool getObjects)
}
ETLLoadBalancer::ETLLoadBalancer(
boost::json::array const& config,
CassandraFlatMapBackend& backend)
CassandraFlatMapBackend& backend,
NetworkValidatedLedgers& nwvl)
{
for (auto& entry : config)
{
std::unique_ptr<ETLSource> source =
std::make_unique<ETLSource>(entry.as_object(), backend);
std::make_unique<ETLSource>(entry.as_object(), backend, nwvl);
sources_.push_back(std::move(source));
BOOST_LOG_TRIVIAL(info) << __func__ << " : added etl source - "
<< sources_.back()->toString();

View File

@@ -29,6 +29,7 @@
#include "org/xrpl/rpc/v1/xrp_ledger.grpc.pb.h"
#include <grpcpp/grpcpp.h>
#include <reporting/ETLHelpers.h>
/// This class manages a connection to a single ETL source. This is almost
/// always a p2p node, but really could be another reporting node. This class
@@ -60,7 +61,7 @@ class ETLSource
std::string validatedLedgersRaw_;
// NetworkValidatedLedgers& networkValidatedLedgers_;
NetworkValidatedLedgers& networkValidatedLedgers_;
// beast::Journal journal_;
@@ -113,7 +114,8 @@ public:
/// Primarly used in read-only mode, to monitor when ledgers are validated
ETLSource(
boost::json::object const& config,
CassandraFlatMapBackend& backend);
CassandraFlatMapBackend& backend,
NetworkValidatedLedgers& networkValidatedLedgers);
/// @param sequence ledger sequence to check for
/// @return true if this source has the desired ledger
@@ -283,7 +285,8 @@ private:
public:
ETLLoadBalancer(
boost::json::array const& config,
CassandraFlatMapBackend& backend);
CassandraFlatMapBackend& backend,
NetworkValidatedLedgers& nwvl);
/// Load the initial ledger, writing data to the queue
/// @param sequence sequence of ledger to download

1418
reporting/Pg.cpp Normal file

File diff suppressed because it is too large Load Diff

514
reporting/Pg.h Normal file
View File

@@ -0,0 +1,514 @@
//------------------------------------------------------------------------------
/*
This file is part of rippled: https://github.com/ripple/rippled
Copyright (c) 2020 Ripple Labs Inc.
Permission to use, copy, modify, and/or distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#ifndef RIPPLE_CORE_PG_H_INCLUDED
#define RIPPLE_CORE_PG_H_INCLUDED
#include <boost/json.hpp>
#include <boost/lexical_cast.hpp>
#include <atomic>
#include <chrono>
#include <condition_variable>
#include <functional>
#include <libpq-fe.h>
#include <memory>
#include <mutex>
#include <optional>
#include <string>
#include <utility>
#include <vector>
// These postgres structs must be freed only by the postgres API.
using pg_result_type = std::unique_ptr<PGresult, void (*)(PGresult*)>;
using pg_connection_type = std::unique_ptr<PGconn, void (*)(PGconn*)>;
/** first: command
* second: parameter values
*
* The 2nd member takes an optional string to
* distinguish between NULL parameters and empty strings. An empty
* item corresponds to a NULL parameter.
*
* Postgres reads each parameter as a c-string, regardless of actual type.
* Binary types (bytea) need to be converted to hex and prepended with
* \x ("\\x").
*/
using pg_params =
std::pair<char const*, std::vector<std::optional<std::string>>>;
/** Parameter values for pg API. */
using pg_formatted_params = std::vector<char const*>;
/** Parameters for managing postgres connections. */
struct PgConfig
{
/** Maximum connections allowed to db. */
std::size_t max_connections{std::numeric_limits<std::size_t>::max()};
/** Close idle connections past this duration. */
std::chrono::seconds timeout{600};
/** Index of DB connection parameter names. */
std::vector<char const*> keywordsIdx;
/** DB connection parameter names. */
std::vector<std::string> keywords;
/** Index of DB connection parameter values. */
std::vector<char const*> valuesIdx;
/** DB connection parameter values. */
std::vector<std::string> values;
};
//-----------------------------------------------------------------------------
/** Class that operates on postgres query results.
*
* The functions that return results do not check first whether the
* expected results are actually there. Therefore, the caller first needs
* to check whether or not a valid response was returned using the operator
* bool() overload. If number of tuples or fields are unknown, then check
* those. Each result field should be checked for null before attempting
* to return results. Finally, the caller must know the type of the field
* before calling the corresponding function to return a field. Postgres
* internally stores each result field as null-terminated strings.
*/
class PgResult
{
// The result object must be freed using the libpq API PQclear() call.
pg_result_type result_{nullptr, [](PGresult* result) { PQclear(result); }};
std::optional<std::pair<ExecStatusType, std::string>> error_;
public:
/** Constructor for when the process is stopping.
*
*/
PgResult()
{
}
/** Constructor for successful query results.
*
* @param result Query result.
*/
explicit PgResult(pg_result_type&& result) : result_(std::move(result))
{
}
/** Constructor for failed query results.
*
* @param result Query result that contains error information.
* @param conn Postgres connection that contains error information.
*/
PgResult(PGresult* result, PGconn* conn)
: error_({PQresultStatus(result), PQerrorMessage(conn)})
{
}
/** Return field as a null-terminated string pointer.
*
* Note that this function does not guarantee that the result struct
* exists, or that the row and fields exist, or that the field is
* not null.
*
* @param ntuple Row number.
* @param nfield Field number.
* @return Field contents.
*/
char const*
c_str(int ntuple = 0, int nfield = 0) const
{
return PQgetvalue(result_.get(), ntuple, nfield);
}
/** Return field as equivalent to Postgres' INT type (32 bit signed).
*
* Note that this function does not guarantee that the result struct
* exists, or that the row and fields exist, or that the field is
* not null, or that the type is that requested.
* @param ntuple Row number.
* @param nfield Field number.
* @return Field contents.
*/
std::int32_t
asInt(int ntuple = 0, int nfield = 0) const
{
return boost::lexical_cast<std::int32_t>(
PQgetvalue(result_.get(), ntuple, nfield));
}
/** Return field as equivalent to Postgres' BIGINT type (64 bit signed).
*
* Note that this function does not guarantee that the result struct
* exists, or that the row and fields exist, or that the field is
* not null, or that the type is that requested.
* @param ntuple Row number.
* @param nfield Field number.
* @return Field contents.
*/
std::int64_t
asBigInt(int ntuple = 0, int nfield = 0) const
{
return boost::lexical_cast<std::int64_t>(
PQgetvalue(result_.get(), ntuple, nfield));
}
/** Returns whether the field is NULL or not.
*
* Note that this function does not guarantee that the result struct
* exists, or that the row and fields exist.
*
* @param ntuple Row number.
* @param nfield Field number.
* @return Whether field is NULL.
*/
bool
isNull(int ntuple = 0, int nfield = 0) const
{
return PQgetisnull(result_.get(), ntuple, nfield);
}
/** Check whether a valid response occurred.
*
* @return Whether or not the query returned a valid response.
*/
operator bool() const
{
return result_ != nullptr;
}
/** Message describing the query results suitable for diagnostics.
*
* If error, then the postgres error type and message are returned.
* Otherwise, "ok"
*
* @return Query result message.
*/
std::string
msg() const;
/** Get number of rows in result.
*
* Note that this function does not guarantee that the result struct
* exists.
*
* @return Number of result rows.
*/
int
ntuples() const
{
return PQntuples(result_.get());
}
/** Get number of fields in result.
*
* Note that this function does not guarantee that the result struct
* exists.
*
* @return Number of result fields.
*/
int
nfields() const
{
return PQnfields(result_.get());
}
/** Return result status of the command.
*
* Note that this function does not guarantee that the result struct
* exists.
*
* @return
*/
ExecStatusType
status() const
{
return PQresultStatus(result_.get());
}
};
/* Class that contains and operates upon a postgres connection. */
class Pg
{
friend class PgPool;
friend class PgQuery;
PgConfig const& config_;
bool& stop_;
std::mutex& mutex_;
// The connection object must be freed using the libpq API PQfinish() call.
pg_connection_type conn_{nullptr, [](PGconn* conn) { PQfinish(conn); }};
/** Clear results from the connection.
*
* Results from previous commands must be cleared before new commands
* can be processed. This function should be called on connections
* that weren't processed completely before being reused, such as
* when being checked-in.
*
* @return whether or not connection still exists.
*/
bool
clear();
/** Connect to postgres.
*
* Idempotently connects to postgres by first checking whether an
* existing connection is already present. If connection is not present
* or in an errored state, reconnects to the database.
*/
void
connect();
/** Disconnect from postgres. */
void
disconnect()
{
conn_.reset();
}
/** Execute postgres query.
*
* If parameters are included, then the command should contain only a
* single SQL statement. If no parameters, then multiple SQL statements
* delimited by semi-colons can be processed. The response is from
* the last command executed.
*
* @param command postgres API command string.
* @param nParams postgres API number of parameters.
* @param values postgres API array of parameter.
* @return Query result object.
*/
PgResult
query(char const* command, std::size_t nParams, char const* const* values);
/** Execute postgres query with no parameters.
*
* @param command Query string.
* @return Query result object;
*/
PgResult
query(char const* command)
{
return query(command, 0, nullptr);
}
/** Execute postgres query with parameters.
*
* @param dbParams Database command and parameter values.
* @return Query result object.
*/
PgResult
query(pg_params const& dbParams);
/** Insert multiple records into a table using Postgres' bulk COPY.
*
* Throws upon error.
*
* @param table Name of table for import.
* @param records Records in the COPY IN format.
*/
void
bulkInsert(char const* table, std::string const& records);
public:
/** Constructor for Pg class.
*
* @param config Config parameters.
* @param j Logger object.
* @param stop Reference to connection pool's stop flag.
* @param mutex Reference to connection pool's mutex.
*/
Pg(PgConfig const& config, bool& stop, std::mutex& mutex)
: config_(config), stop_(stop), mutex_(mutex)
{
}
};
//-----------------------------------------------------------------------------
/** Database connection pool.
*
* Allow re-use of postgres connections. Postgres connections are created
* as needed until configurable limit is reached. After use, each connection
* is placed in a container ordered by time of use. Each request for
* a connection grabs the most recently used connection from the container.
* If none are available, a new connection is used (up to configured limit).
* Idle connections are destroyed periodically after configurable
* timeout duration.
*
* This should be stored as a shared pointer so PgQuery objects can safely
* outlive it.
*/
class PgPool
{
friend class PgQuery;
using clock_type = std::chrono::steady_clock;
PgConfig config_;
std::mutex mutex_;
std::condition_variable cond_;
std::size_t connections_{};
bool stop_{false};
/** Idle database connections ordered by timestamp to allow timing out. */
std::multimap<std::chrono::time_point<clock_type>, std::unique_ptr<Pg>>
idle_;
/** Get a postgres connection object.
*
* Return the most recent idle connection in the pool, if available.
* Otherwise, return a new connection unless we're at the threshold.
* If so, then wait until a connection becomes available.
*
* @return Postgres object.
*/
std::unique_ptr<Pg>
checkout();
/** Return a postgres object to the pool for reuse.
*
* If connection is healthy, place in pool for reuse. After calling this,
* the container no longer have a connection unless checkout() is called.
*
* @param pg Pg object.
*/
void
checkin(std::unique_ptr<Pg>& pg);
public:
/** Connection pool constructor.
*
* @param pgConfig Postgres config.
* @param j Logger object.
* @param parent Stoppable parent.
*/
PgPool(boost::json::object const& config);
~PgPool()
{
onStop();
}
/** Initiate idle connection timer.
*
* The PgPool object needs to be fully constructed to support asynchronous
* operations.
*/
void
setup();
/** Prepare for process shutdown. (Stoppable) */
void
onStop();
/** Disconnect idle postgres connections. */
void
idleSweeper();
};
//-----------------------------------------------------------------------------
/** Class to query postgres.
*
* This class should be used by functions outside of this
* compilation unit for querying postgres. It automatically acquires and
* relinquishes a database connection to handle each query.
*/
class PgQuery
{
private:
std::shared_ptr<PgPool> pool_;
std::unique_ptr<Pg> pg_;
public:
PgQuery() = delete;
PgQuery(std::shared_ptr<PgPool> const& pool)
: pool_(pool), pg_(pool->checkout())
{
}
~PgQuery()
{
pool_->checkin(pg_);
}
/** Execute postgres query with parameters.
*
* @param dbParams Database command with parameters.
* @return Result of query, including errors.
*/
PgResult
operator()(pg_params const& dbParams)
{
if (!pg_) // It means we're stopping. Return empty result.
return PgResult();
return pg_->query(dbParams);
}
/** Execute postgres query with only command statement.
*
* @param command Command statement.
* @return Result of query, including errors.
*/
PgResult
operator()(char const* command)
{
return operator()(pg_params{command, {}});
}
/** Insert multiple records into a table using Postgres' bulk COPY.
*
* Throws upon error.
*
* @param table Name of table for import.
* @param records Records in the COPY IN format.
*/
void
bulkInsert(char const* table, std::string const& records)
{
pg_->bulkInsert(table, records);
}
};
//-----------------------------------------------------------------------------
/** Create Postgres connection pool manager.
*
* @param pgConfig Configuration for Postgres.
* @param j Logger object.
* @param parent Stoppable parent object.
* @return Postgres connection pool manager
*/
std::shared_ptr<PgPool>
make_PgPool(boost::json::object const& pgConfig);
/** Initialize the Postgres schema.
*
* This function ensures that the database is running the latest version
* of the schema.
*
* @param pool Postgres connection pool manager.
*/
void
initSchema(std::shared_ptr<PgPool> const& pool);
#endif // RIPPLE_CORE_PG_H_INCLUDED

View File

@@ -31,17 +31,11 @@
#include <thread>
#include <vector>
namespace beast = boost::beast; // from <boost/beast.hpp>
namespace http = beast::http; // from <boost/beast/http.hpp>
namespace websocket = beast::websocket; // from <boost/beast/websocket.hpp>
namespace net = boost::asio; // from <boost/asio.hpp>
using tcp = boost::asio::ip::tcp; // from <boost/asio/ip/tcp.hpp>
//------------------------------------------------------------------------------
// Report a failure
void
fail(beast::error_code ec, char const* what)
fail(boost::beast::error_code ec, char const* what)
{
std::cerr << what << ": " << ec.message() << "\n";
}
@@ -49,12 +43,13 @@ fail(beast::error_code ec, char const* what)
// Echoes back all received WebSocket messages
class session : public std::enable_shared_from_this<session>
{
websocket::stream<beast::tcp_stream> ws_;
beast::flat_buffer buffer_;
boost::beast::websocket::stream<boost::beast::tcp_stream> ws_;
boost::beast::flat_buffer buffer_;
public:
// Take ownership of the socket
explicit session(tcp::socket&& socket) : ws_(std::move(socket))
explicit session(boost::asio::ip::tcp::socket&& socket)
: ws_(std::move(socket))
{
}
@@ -66,9 +61,10 @@ public:
// on the I/O objects in this session. Although not strictly necessary
// for single-threaded contexts, this example code is written to be
// thread-safe by default.
net::dispatch(
boost::asio::dispatch(
ws_.get_executor(),
beast::bind_front_handler(&session::on_run, shared_from_this()));
boost::beast::bind_front_handler(
&session::on_run, shared_from_this()));
}
// Start the asynchronous operation
@@ -76,24 +72,24 @@ public:
on_run()
{
// Set suggested timeout settings for the websocket
ws_.set_option(websocket::stream_base::timeout::suggested(
beast::role_type::server));
ws_.set_option(boost::beast::websocket::stream_base::timeout::suggested(
boost::beast::role_type::server));
// Set a decorator to change the Server of the handshake
ws_.set_option(websocket::stream_base::decorator(
[](websocket::response_type& res) {
ws_.set_option(boost::beast::websocket::stream_base::decorator(
[](boost::beast::websocket::response_type& res) {
res.set(
http::field::server,
boost::beast::http::field::server,
std::string(BOOST_BEAST_VERSION_STRING) +
" websocket-server-async");
}));
// Accept the websocket handshake
ws_.async_accept(
beast::bind_front_handler(&session::on_accept, shared_from_this()));
ws_.async_accept(boost::beast::bind_front_handler(
&session::on_accept, shared_from_this()));
}
void
on_accept(beast::error_code ec)
on_accept(boost::beast::error_code ec)
{
if (ec)
return fail(ec, "accept");
@@ -108,16 +104,17 @@ public:
// Read a message into our buffer
ws_.async_read(
buffer_,
beast::bind_front_handler(&session::on_read, shared_from_this()));
boost::beast::bind_front_handler(
&session::on_read, shared_from_this()));
}
void
on_read(beast::error_code ec, std::size_t bytes_transferred)
on_read(boost::beast::error_code ec, std::size_t bytes_transferred)
{
boost::ignore_unused(bytes_transferred);
// This indicates that the session was closed
if (ec == websocket::error::closed)
if (ec == boost::beast::websocket::error::closed)
return;
if (ec)
@@ -127,11 +124,12 @@ public:
ws_.text(ws_.got_text());
ws_.async_write(
buffer_.data(),
beast::bind_front_handler(&session::on_write, shared_from_this()));
boost::beast::bind_front_handler(
&session::on_write, shared_from_this()));
}
void
on_write(beast::error_code ec, std::size_t bytes_transferred)
on_write(boost::beast::error_code ec, std::size_t bytes_transferred)
{
boost::ignore_unused(bytes_transferred);
@@ -151,14 +149,16 @@ public:
// Accepts incoming connections and launches the sessions
class listener : public std::enable_shared_from_this<listener>
{
net::io_context& ioc_;
tcp::acceptor acceptor_;
boost::asio::io_context& ioc_;
boost::asio::ip::tcp::acceptor acceptor_;
public:
listener(net::io_context& ioc, tcp::endpoint endpoint)
listener(
boost::asio::io_context& ioc,
boost::asio::ip::tcp::endpoint endpoint)
: ioc_(ioc), acceptor_(ioc)
{
beast::error_code ec;
boost::beast::error_code ec;
// Open the acceptor
acceptor_.open(endpoint.protocol(), ec);
@@ -169,7 +169,7 @@ public:
}
// Allow address reuse
acceptor_.set_option(net::socket_base::reuse_address(true), ec);
acceptor_.set_option(boost::asio::socket_base::reuse_address(true), ec);
if (ec)
{
fail(ec, "set_option");
@@ -185,7 +185,7 @@ public:
}
// Start listening for connections
acceptor_.listen(net::socket_base::max_listen_connections, ec);
acceptor_.listen(boost::asio::socket_base::max_listen_connections, ec);
if (ec)
{
fail(ec, "listen");
@@ -206,13 +206,13 @@ private:
{
// The new connection gets its own strand
acceptor_.async_accept(
net::make_strand(ioc_),
beast::bind_front_handler(
boost::asio::make_strand(ioc_),
boost::beast::bind_front_handler(
&listener::on_accept, shared_from_this()));
}
void
on_accept(beast::error_code ec, tcp::socket socket)
on_accept(boost::beast::error_code ec, boost::asio::ip::tcp::socket socket)
{
if (ec)
{
@@ -265,7 +265,7 @@ main(int argc, char* argv[])
<< " websocket-server-async 0.0.0.0 8080 1\n";
return EXIT_FAILURE;
}
auto const address = net::ip::make_address(argv[1]);
auto const address = boost::asio::ip::make_address(argv[1]);
auto const port = static_cast<unsigned short>(std::atoi(argv[2]));
auto const threads = std::max<int>(1, std::atoi(argv[3]));
auto const config = parse_config(argv[4]);
@@ -286,15 +286,19 @@ main(int argc, char* argv[])
std::cerr << "no etl sources listed in config. exiting..." << std::endl;
return EXIT_FAILURE;
}
ETLSource source{sources[0].as_object(), backend};
NetworkValidatedLedgers nwvl;
ETLSource source{sources[0].as_object(), backend, nwvl};
source.start();
// source.loadInitialLedger(60000000);
// The io_context is required for all I/O
net::io_context ioc{threads};
boost::asio::io_context ioc{threads};
// Create and launch a listening port
std::make_shared<listener>(ioc, tcp::endpoint{address, port})->run();
std::make_shared<listener>(
ioc, boost::asio::ip::tcp::endpoint{address, port})
->run();
// Run the I/O service on the requested number of threads
std::vector<std::thread> v;