mirror of
https://github.com/XRPLF/clio.git
synced 2025-11-22 20:55:52 +00:00
create postgres database if it doesn't exist
This commit is contained in:
@@ -1527,11 +1527,6 @@ CassandraBackend::open(bool readOnly)
|
|||||||
if (!executeSimpleStatement(query.str()))
|
if (!executeSimpleStatement(query.str()))
|
||||||
continue;
|
continue;
|
||||||
|
|
||||||
query.str("");
|
|
||||||
query << "CREATE INDEX ON " << tablePrefix << "objects(sequence)";
|
|
||||||
if (!executeSimpleStatement(query.str()))
|
|
||||||
continue;
|
|
||||||
|
|
||||||
query.str("");
|
query.str("");
|
||||||
query << "SELECT * FROM " << tablePrefix << "objects WHERE sequence=1"
|
query << "SELECT * FROM " << tablePrefix << "objects WHERE sequence=1"
|
||||||
<< " LIMIT 1";
|
<< " LIMIT 1";
|
||||||
|
|||||||
@@ -40,6 +40,7 @@
|
|||||||
#include <functional>
|
#include <functional>
|
||||||
#include <iterator>
|
#include <iterator>
|
||||||
#include <reporting/Pg.h>
|
#include <reporting/Pg.h>
|
||||||
|
#include <signal.h>
|
||||||
#include <sstream>
|
#include <sstream>
|
||||||
#include <stdexcept>
|
#include <stdexcept>
|
||||||
#include <string>
|
#include <string>
|
||||||
@@ -47,7 +48,6 @@
|
|||||||
#include <thread>
|
#include <thread>
|
||||||
#include <utility>
|
#include <utility>
|
||||||
#include <vector>
|
#include <vector>
|
||||||
#include <signal.h>
|
|
||||||
|
|
||||||
static void
|
static void
|
||||||
noticeReceiver(void* arg, PGresult const* res)
|
noticeReceiver(void* arg, PGresult const* res)
|
||||||
@@ -350,11 +350,27 @@ PgPool::PgPool(boost::json::object const& config)
|
|||||||
*/
|
*/
|
||||||
constexpr std::size_t maxFieldSize = 1024;
|
constexpr std::size_t maxFieldSize = 1024;
|
||||||
constexpr std::size_t maxFields = 1000;
|
constexpr std::size_t maxFields = 1000;
|
||||||
|
std::string conninfo = "postgres://";
|
||||||
|
auto getFieldAsString = [&config](auto field) {
|
||||||
|
if (!config.contains(field))
|
||||||
|
throw std::runtime_error(
|
||||||
|
field + std::string{" missing from postgres config"});
|
||||||
|
if (!config.at(field).is_string())
|
||||||
|
throw std::runtime_error(
|
||||||
|
field + std::string{" in postgres config is not a string"});
|
||||||
|
return std::string{config.at(field).as_string().c_str()};
|
||||||
|
};
|
||||||
|
conninfo += getFieldAsString("username");
|
||||||
|
conninfo += ":";
|
||||||
|
conninfo += getFieldAsString("password");
|
||||||
|
conninfo += "@";
|
||||||
|
conninfo += getFieldAsString("contact_point");
|
||||||
|
conninfo += "/";
|
||||||
|
conninfo += getFieldAsString("database");
|
||||||
|
|
||||||
// The connection object must be freed using the libpq API PQfinish() call.
|
// The connection object must be freed using the libpq API PQfinish() call.
|
||||||
pg_connection_type conn(
|
pg_connection_type conn(
|
||||||
PQconnectdb(config.at("conninfo").as_string().c_str()),
|
PQconnectdb(conninfo.c_str()), [](PGconn* conn) { PQfinish(conn); });
|
||||||
[](PGconn* conn) { PQfinish(conn); });
|
|
||||||
if (!conn)
|
if (!conn)
|
||||||
throw std::runtime_error("Can't create DB connection.");
|
throw std::runtime_error("Can't create DB connection.");
|
||||||
if (PQstatus(conn.get()) != CONNECTION_OK)
|
if (PQstatus(conn.get()) != CONNECTION_OK)
|
||||||
@@ -605,9 +621,26 @@ PgPool::checkin(std::unique_ptr<Pg>& pg)
|
|||||||
std::shared_ptr<PgPool>
|
std::shared_ptr<PgPool>
|
||||||
make_PgPool(boost::json::object const& config)
|
make_PgPool(boost::json::object const& config)
|
||||||
{
|
{
|
||||||
auto ret = std::make_shared<PgPool>(config);
|
try
|
||||||
ret->setup();
|
{
|
||||||
return ret;
|
auto ret = std::make_shared<PgPool>(config);
|
||||||
|
ret->setup();
|
||||||
|
return ret;
|
||||||
|
}
|
||||||
|
catch (std::runtime_error& e)
|
||||||
|
{
|
||||||
|
boost::json::object configCopy = config;
|
||||||
|
configCopy["database"] = "postgres";
|
||||||
|
auto ret = std::make_shared<PgPool>(configCopy);
|
||||||
|
ret->setup();
|
||||||
|
PgQuery pgQuery{ret};
|
||||||
|
std::string query = "CREATE DATABASE " +
|
||||||
|
std::string{config.at("database").as_string().c_str()};
|
||||||
|
pgQuery(query.c_str());
|
||||||
|
ret = std::make_shared<PgPool>(config);
|
||||||
|
ret->setup();
|
||||||
|
return ret;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
//-----------------------------------------------------------------------------
|
//-----------------------------------------------------------------------------
|
||||||
|
|||||||
@@ -816,14 +816,16 @@ PostgresBackend::writeKeys(
|
|||||||
keysBuffer << std::to_string(index.keyIndex) << '\t' << "\\\\x"
|
keysBuffer << std::to_string(index.keyIndex) << '\t' << "\\\\x"
|
||||||
<< ripple::strHex(key) << '\n';
|
<< ripple::strHex(key) << '\n';
|
||||||
numRows++;
|
numRows++;
|
||||||
// If the buffer gets too large, the insert fails. Not sure why. So we
|
// If the buffer gets too large, the insert fails. Not sure why.
|
||||||
// insert after 1 million records
|
// When writing in the background, we insert after every 10000 rows
|
||||||
if (numRows == 100000)
|
if ((isAsync && numRows == 10000) || numRows == 100000)
|
||||||
{
|
{
|
||||||
pgQuery.bulkInsert("keys", keysBuffer.str());
|
pgQuery.bulkInsert("keys", keysBuffer.str());
|
||||||
std::stringstream temp;
|
std::stringstream temp;
|
||||||
keysBuffer.swap(temp);
|
keysBuffer.swap(temp);
|
||||||
numRows = 0;
|
numRows = 0;
|
||||||
|
if (isAsync)
|
||||||
|
std::this_thread::sleep_for(std::chrono::seconds(1));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (numRows > 0)
|
if (numRows > 0)
|
||||||
@@ -855,14 +857,16 @@ PostgresBackend::writeBooks(
|
|||||||
<< ripple::strHex(book.first) << '\t' << "\\\\x"
|
<< ripple::strHex(book.first) << '\t' << "\\\\x"
|
||||||
<< ripple::strHex(offer) << '\n';
|
<< ripple::strHex(offer) << '\n';
|
||||||
numRows++;
|
numRows++;
|
||||||
// If the buffer gets too large, the insert fails. Not sure why. So
|
// If the buffer gets too large, the insert fails. Not sure why.
|
||||||
// we insert after 1 million records
|
// When writing in the background, we insert after every 10 rows
|
||||||
if (numRows == 1000000)
|
if ((isAsync && numRows == 1000) || numRows == 100000)
|
||||||
{
|
{
|
||||||
pgQuery.bulkInsert("books", booksBuffer.str());
|
pgQuery.bulkInsert("books", booksBuffer.str());
|
||||||
std::stringstream temp;
|
std::stringstream temp;
|
||||||
booksBuffer.swap(temp);
|
booksBuffer.swap(temp);
|
||||||
numRows = 0;
|
numRows = 0;
|
||||||
|
if (isAsync)
|
||||||
|
std::this_thread::sleep_for(std::chrono::seconds(1));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user