This commit is contained in:
Richard Holland
2024-08-18 16:46:46 +10:00
parent fbfd8c1e0a
commit 224e78ac81
2 changed files with 720 additions and 674 deletions

View File

@@ -24,13 +24,13 @@
#include <ripple/nodestore/impl/EncodedBlob.h> #include <ripple/nodestore/impl/EncodedBlob.h>
#include <ripple/nodestore/impl/codec.h> #include <ripple/nodestore/impl/codec.h>
#include <boost/filesystem.hpp> #include <boost/filesystem.hpp>
#include "snug.hpp"
#include <cassert> #include <cassert>
#include <chrono> #include <chrono>
#include <cstdint> #include <cstdint>
#include <cstdio> #include <cstdio>
#include <exception> #include <exception>
#include <memory> #include <memory>
#include "snug.hpp"
namespace ripple { namespace ripple {
namespace NodeStore { namespace NodeStore {
@@ -38,7 +38,8 @@ namespace NodeStore {
class SnugDBBackend : public Backend class SnugDBBackend : public Backend
{ {
private: private:
static constexpr uint64_t BUFFER_SIZE = 256ULL*1024ULL*1024ULL; // 256 Mib read buffer per thread static constexpr uint64_t BUFFER_SIZE =
256ULL * 1024ULL * 1024ULL; // 256 Mib read buffer per thread
public: public:
beast::Journal const j_; beast::Journal const j_;
std::string const name_; std::string const name_;
@@ -49,14 +50,11 @@ public:
Section const& keyValues, Section const& keyValues,
Scheduler& scheduler, Scheduler& scheduler,
beast::Journal journal) beast::Journal journal)
: j_(journal) : j_(journal), name_(get(keyValues, "path")), scheduler_(scheduler)
, name_(get(keyValues, "path"))
, scheduler_(scheduler)
{ {
if (name_.empty()) if (name_.empty())
throw std::runtime_error( throw std::runtime_error(
"nodestore: Missing path in SnugDB backend"); "nodestore: Missing path in SnugDB backend");
} }
~SnugDBBackend() override ~SnugDBBackend() override
@@ -68,8 +66,7 @@ public:
} }
catch (std::exception const& e) catch (std::exception const& e)
{ {
JLOG(j_.warn()) JLOG(j_.warn()) << "SnugDB threw on destruction: " << e.what();
<< "SnugDB threw on destruction: " << e.what();
// Don't allow exceptions to propagate out of destructors. // Don't allow exceptions to propagate out of destructors.
} }
} }
@@ -91,10 +88,8 @@ public:
return; return;
} }
std::string path = name_ + "/" + std::string path = name_ + "/" + std::to_string(uid) + "-" +
std::to_string(uid) + "-" + std::to_string(appType) + "-" + std::to_string(salt);
std::to_string(appType) + "-" +
std::to_string(salt);
boost::filesystem::create_directories(path); boost::filesystem::create_directories(path);
db_ = std::make_unique<snug::SnugDB>(path); db_ = std::make_unique<snug::SnugDB>(path);
@@ -126,32 +121,34 @@ public:
pno->reset(); pno->reset();
static thread_local std::unique_ptr<uint8_t[]> static thread_local std::unique_ptr<uint8_t[]> thread_buffer =
thread_buffer = std::make_unique<uint8_t[]>(BUFFER_SIZE); std::make_unique<uint8_t[]>(BUFFER_SIZE);
uint8_t* ptr = &(thread_buffer[0]); uint8_t* ptr = &(thread_buffer[0]);
uint64_t len = BUFFER_SIZE; uint64_t len = BUFFER_SIZE;
int result = db_->read_entry( int result = db_->read_entry(
static_cast<uint8_t*>(const_cast<void*>(key)), static_cast<uint8_t*>(const_cast<void*>(key)), ptr, &len);
ptr,
&len);
if (0) if (0)
{ {
std::stringstream ss; std::stringstream ss;
const unsigned char* bytes = static_cast<const unsigned char*>(key); const unsigned char* bytes = static_cast<const unsigned char*>(key);
for (int i = 0; i < 32; ++i) { for (int i = 0; i < 32; ++i)
ss << std::setfill('0') << std::setw(2) << std::hex << static_cast<int>(bytes[i]); {
ss << std::setfill('0') << std::setw(2) << std::hex
<< static_cast<int>(bytes[i]);
} }
std::string key_hex = ss.str(); std::string key_hex = ss.str();
// Print the result using printf // Print the result using printf
printf("snug fetch: len=%zu result=%zu key=%s\n", len, result, key_hex.c_str()); printf(
"snug fetch: len=%zu result=%zu key=%s\n",
len,
result,
key_hex.c_str());
} }
if (result == 1) if (result == 1)
return notFound; return notFound;
@@ -194,20 +191,24 @@ public:
if (0) if (0)
{ {
std::stringstream ss; std::stringstream ss;
const unsigned char* bytes = static_cast<const unsigned char*>(const_cast<void*>(e.getKey())); const unsigned char* bytes = static_cast<const unsigned char*>(
const_cast<void*>(e.getKey()));
for (int i = 0; i < 32; ++i) for (int i = 0; i < 32; ++i)
ss << std::setfill('0') << std::setw(2) << std::hex << static_cast<int>(bytes[i]); ss << std::setfill('0') << std::setw(2) << std::hex
<< static_cast<int>(bytes[i]);
std::string key_hex = ss.str(); std::string key_hex = ss.str();
std::cout << "snugdb write: len=" << e.getSize()
std::cout << "snugdb write: len=" << e.getSize() << ", key=" << key_hex << "\n"; << ", key=" << key_hex << "\n";
} }
int out = db_->write_entry( int out = db_->write_entry(
static_cast<uint8_t*>(const_cast<void*>(e.getKey())), static_cast<uint8_t*>(const_cast<void*>(e.getKey())),
static_cast<uint8_t*>(const_cast<void*>(e.getData())), static_cast<uint8_t*>(const_cast<void*>(e.getData())),
e.getSize()); e.getSize());
if (out != 0) if (out != 0)
throw std::runtime_error("SnugDB could not write entry. Disk full? error" + std::to_string(out)); throw std::runtime_error(
"SnugDB could not write entry. Disk full? error" +
std::to_string(out));
} }
void void
@@ -243,19 +244,22 @@ public:
void void
for_each(std::function<void(std::shared_ptr<NodeObject>)> f) override for_each(std::function<void(std::shared_ptr<NodeObject>)> f) override
{ {
db_->visit_all([](uint8_t* key, uint8_t* data, uint64_t len, void* fp) -> void db_->visit_all(
{ [](uint8_t* key, uint8_t* data, uint64_t len, void* fp) -> void {
DecodedBlob decoded(key, data, len); DecodedBlob decoded(key, data, len);
if (!decoded.wasOk()) if (!decoded.wasOk())
{ {
throw std::runtime_error("Missing or corrupted data in snugdb"); throw std::runtime_error(
"Missing or corrupted data in snugdb");
return; return;
} }
std::function<void(std::shared_ptr<NodeObject>)> f = std::function<void(std::shared_ptr<NodeObject>)> f =
*(reinterpret_cast<std::function<void(std::shared_ptr<NodeObject>)>*>(fp)); *(reinterpret_cast<
std::function<void(std::shared_ptr<NodeObject>)>*>(fp));
f(decoded.createObject()); f(decoded.createObject());
}, reinterpret_cast<void*>(&f)); },
reinterpret_cast<void*>(&f));
} }
int int

View File

@@ -1,64 +1,71 @@
#include <stdint.h> #include <algorithm>
#include <stdio.h>
#include <unistd.h>
#include <fcntl.h>
#include <string.h>
#include <stdlib.h>
#include <sys/mman.h>
#include <sys/stat.h>
#include <array> #include <array>
#include <fcntl.h>
#include <memory>
#include <mutex> #include <mutex>
#include <shared_mutex> #include <shared_mutex>
#include <stdint.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/mman.h>
#include <sys/stat.h>
#include <unistd.h>
#include <vector> #include <vector>
#include <memory> namespace snug {
#include <algorithm>
namespace snug
{
int compare_entries_reverse(const void* a, const void* b) int
{ compare_entries_reverse(const void* a, const void* b)
{
const uint64_t* a_key = static_cast<const uint64_t*>(a); const uint64_t* a_key = static_cast<const uint64_t*>(a);
const uint64_t* b_key = static_cast<const uint64_t*>(b); const uint64_t* b_key = static_cast<const uint64_t*>(b);
// Unrolled comparison of 4 uint64_t values (4 * 8 = 32 bytes) // Unrolled comparison of 4 uint64_t values (4 * 8 = 32 bytes)
if (b_key[0] > a_key[0]) return 1; if (b_key[0] > a_key[0])
if (b_key[0] < a_key[0]) return -1; return 1;
if (b_key[0] < a_key[0])
return -1;
if (b_key[1] > a_key[1]) return 1; if (b_key[1] > a_key[1])
if (b_key[1] < a_key[1]) return -1; return 1;
if (b_key[1] < a_key[1])
return -1;
if (b_key[2] > a_key[2]) return 1; if (b_key[2] > a_key[2])
if (b_key[2] < a_key[2]) return -1; return 1;
if (b_key[2] < a_key[2])
return -1;
if (b_key[3] > a_key[3]) return 1; if (b_key[3] > a_key[3])
if (b_key[3] < a_key[3]) return -1; return 1;
if (b_key[3] < a_key[3])
return -1;
return 0; // Keys are equal return 0; // Keys are equal
} }
class SnugDB
{
private:
static constexpr uint64_t SNUGSIZE = 256ull*1024ull*1024ull*1024ull; // 256 GiB
static constexpr uint64_t BIGSIZE = 10ull*1024ull*1024ull*1024ull*1024ull; // 10 TiB
class SnugDB
{
private:
static constexpr uint64_t SNUGSIZE =
256ull * 1024ull * 1024ull * 1024ull; // 256 GiB
static constexpr uint64_t BIGSIZE =
10ull * 1024ull * 1024ull * 1024ull * 1024ull; // 10 TiB
static constexpr size_t BUCKET_COUNT = 1048576; static constexpr size_t BUCKET_COUNT = 1048576;
std::unique_ptr<std::shared_mutex[]> mutexes = std::unique_ptr<std::shared_mutex[]> mutexes =
std::make_unique<std::shared_mutex[]>(BUCKET_COUNT); std::make_unique<std::shared_mutex[]>(BUCKET_COUNT);
// each file snug.0 snug.1 ... is mmaped and the pointer // each file snug.0 snug.1 ... is mmaped and the pointer
uint8_t* mapped_files[1024]; uint8_t* mapped_files[1024];
uint64_t mapped_files_count { 0 }; uint64_t mapped_files_count{0};
uint8_t* big_file; // this file has 64kib blocks in it which are used uint8_t* big_file; // this file has 64kib blocks in it which are used
// as an overflow for large blobs // as an overflow for large blobs
std::mutex big_file_mutex; // locked when incrementing the "next new block" pointer std::mutex big_file_mutex; // locked when incrementing the "next new block"
// pointer
// only used when adding a new file // only used when adding a new file
std::mutex mapped_files_count_mutex; std::mutex mapped_files_count_mutex;
@@ -69,7 +76,8 @@ namespace snug
// 1 = could not open // 1 = could not open
// 2 = could not seek // 2 = could not seek
// 3 = could not write at end of file // 3 = could not write at end of file
int alloc_file(char const* fn, uint64_t size) int
alloc_file(char const* fn, uint64_t size)
{ {
int fd = open(fn, O_WRONLY | O_CREAT | O_TRUNC, 0644); int fd = open(fn, O_WRONLY | O_CREAT | O_TRUNC, 0644);
if (fd < 0) if (fd < 0)
@@ -96,7 +104,8 @@ namespace snug
} }
// 0 = file exists and is right size // 0 = file exists and is right size
int check_file(char const* fn, uint64_t size) int
check_file(char const* fn, uint64_t size)
{ {
struct stat st; struct stat st;
int file_exists = (stat(fn, &st) == 0); int file_exists = (stat(fn, &st) == 0);
@@ -110,39 +119,37 @@ namespace snug
return 0; return 0;
} }
#define OFFSET(byte0, byte1, byte2)\ #define OFFSET(byte0, byte1, byte2) \
(((((uint64_t)(byte0 & 0xFFU)) << 12) +\ (((((uint64_t)(byte0 & 0xFFU)) << 12) + \
(((uint64_t)(byte1 & 0xFFU)) << 4) +\ (((uint64_t)(byte1 & 0xFFU)) << 4) + ((uint64_t)(byte2 & 0xFU))) \
((uint64_t)(byte2 & 0xFU))) << 18) << 18)
// check if 32 bytes are 0, which they will be for a zero entry // check if 32 bytes are 0, which they will be for a zero entry
#define IS_ZERO_ENTRY(x)\ #define IS_ZERO_ENTRY(x) \
(*((uint64_t*)((x)+ 0)) == 0 && \ (*((uint64_t*)((x) + 0)) == 0 && *((uint64_t*)((x) + 8)) == 0 && \
*((uint64_t*)((x)+ 8)) == 0 && \ *((uint64_t*)((x) + 16)) == 0 && *((uint64_t*)((x) + 24)) == 0)
*((uint64_t*)((x)+16)) == 0 && \
*((uint64_t*)((x)+24)) == 0)
#define IS_ENTRY(x,y)\ #define IS_ENTRY(x, y) \
(*((uint64_t*)((x)+ 0)) == *((uint64_t*)((y)+ 0)) && \ (*((uint64_t*)((x) + 0)) == *((uint64_t*)((y) + 0)) && \
*((uint64_t*)((x)+ 8)) == *((uint64_t*)((y)+ 8)) && \ *((uint64_t*)((x) + 8)) == *((uint64_t*)((y) + 8)) && \
*((uint64_t*)((x)+16)) == *((uint64_t*)((y)+16)) && \ *((uint64_t*)((x) + 16)) == *((uint64_t*)((y) + 16)) && \
*((uint64_t*)((x)+24)) == *((uint64_t*)((y)+24))) *((uint64_t*)((x) + 24)) == *((uint64_t*)((y) + 24)))
#define WRITE_KEY(x /* dst */, y /* src */, flags)\ #define WRITE_KEY(x /* dst */, y /* src */, flags) \
{\ { \
*((uint64_t*)((x)+ 0)) = *((uint64_t*)((y)+ 0)); \ *((uint64_t*)((x) + 0)) = *((uint64_t*)((y) + 0)); \
*((uint64_t*)((x)+ 8)) = *((uint64_t*)((y)+ 8)); \ *((uint64_t*)((x) + 8)) = *((uint64_t*)((y) + 8)); \
*((uint64_t*)((x)+16)) = *((uint64_t*)((y)+16)); \ *((uint64_t*)((x) + 16)) = *((uint64_t*)((y) + 16)); \
*((uint64_t*)((x)+24)) = *((uint64_t*)((y)+24)); \ *((uint64_t*)((x) + 24)) = *((uint64_t*)((y) + 24)); \
*((uint64_t*)((x)+32)) = flags;\ *((uint64_t*)((x) + 32)) = flags; \
} }
// if an entry exceeds 984 bytes then the overflow is written // if an entry exceeds 984 bytes then the overflow is written
// into the snug.big file in a linked list of 32kib blocks // into the snug.big file in a linked list of 32kib blocks
// the first of those blocks is a control block // the first of those blocks is a control block
uint64_t get_big_block() uint64_t
get_big_block()
{ {
std::unique_lock<std::mutex> lock(big_file_mutex); std::unique_lock<std::mutex> lock(big_file_mutex);
@@ -160,8 +167,7 @@ namespace snug
} }
// grab the nth one // grab the nth one
uint8_t* offset = big_file + 16 uint8_t* offset = big_file + 16 + 8 * (free_blocks - 1);
+ 8 * (free_blocks - 1);
// decrement free block counter // decrement free block counter
*(uint64_t*)(big_file + 8) -= 1; *(uint64_t*)(big_file + 8) -= 1;
@@ -169,7 +175,8 @@ namespace snug
return *((uint64_t*)offset); return *((uint64_t*)offset);
} }
void unalloc_blocks(uint64_t next_block) void
unalloc_blocks(uint64_t next_block)
{ {
if (next_block != 0) if (next_block != 0)
{ {
@@ -182,11 +189,9 @@ namespace snug
if (free_blocks >= 4095) if (free_blocks >= 4095)
break; break;
uint8_t* offset = big_file + 16 + 8 * free_blocks;
uint8_t* offset = big_file + 16 *((uint64_t*)offset) = next_block;
+ 8 * free_blocks;
*((uint64_t*) offset) = next_block;
*((uint64_t*)(big_file + 8)) += 1; *((uint64_t*)(big_file + 8)) += 1;
@@ -196,8 +201,7 @@ namespace snug
// clear the pointer on the old block // clear the pointer on the old block
*((uint64_t*)(big_file + previous)) = 0; *((uint64_t*)(big_file + previous)) = 0;
} } while (next_block != 0);
while (next_block != 0);
} }
} }
@@ -215,9 +219,9 @@ namespace snug
// return 0 = failure // return 0 = failure
// > 0 = first block in the chain // > 0 = first block in the chain
uint64_t write_big_entry_internal(uint8_t* data, ssize_t len, uint64_t next_block) uint64_t
write_big_entry_internal(uint8_t* data, ssize_t len, uint64_t next_block)
{ {
uint64_t first_block = 0; uint64_t first_block = 0;
uint64_t* last_block_ptr = 0; uint64_t* last_block_ptr = 0;
@@ -249,8 +253,7 @@ namespace snug
next_block = *((uint64_t*)big_ptr); next_block = *((uint64_t*)big_ptr);
last_block_ptr = (uint64_t*)big_ptr; last_block_ptr = (uint64_t*)big_ptr;
} } while (len > 0);
while (len > 0);
// if there's a dangling chain we'll unallocate it // if there's a dangling chain we'll unallocate it
if (next_block != 0) if (next_block != 0)
@@ -268,34 +271,38 @@ namespace snug
// 0 = success // 0 = success
// 1 = bucket full // 1 = bucket full
// 2 = big blocks full // 2 = big blocks full
int write_entry_internal(uint8_t* data, uint8_t* key, uint8_t* val, uint32_t len) int
write_entry_internal(
uint8_t* data,
uint8_t* key,
uint8_t* val,
uint32_t len)
{ {
// find the entry // find the entry
uint64_t offset = OFFSET(key[0], key[1], (key[2]>>4)); uint64_t offset = OFFSET(key[0], key[1], (key[2] >> 4));
// lock the bucket for writing // lock the bucket for writing
std::unique_lock<std::shared_mutex> lock(mutexes[offset >> 18]); std::unique_lock<std::shared_mutex> lock(mutexes[offset >> 18]);
uint8_t* start = data + offset; uint8_t* start = data + offset;
for (int i = 0; i < 256*1024; i+=1024) for (int i = 0; i < 256 * 1024; i += 1024)
{ {
if (!IS_ENTRY(start + i, key) && !IS_ZERO_ENTRY(start + i)) if (!IS_ENTRY(start + i, key) && !IS_ZERO_ENTRY(start + i))
continue; continue;
// read flags // read flags
uint64_t flags = *((uint64_t*)(start + i + 32)); uint64_t flags = *((uint64_t*)(start + i + 32));
// big entries are tricky // big entries are tricky
bool const old_big = (flags >> 32) != 0; bool const old_big = (flags >> 32) != 0;
bool const new_big = len > 984; bool const new_big = len > 984;
if (new_big) if (new_big)
{ {
//write_big_entry_internal(uint8_t* data, ssize_t len, uint64_t next_block) // write_big_entry_internal(uint8_t* data, ssize_t len, uint64_t
uint64_t first_block = // next_block)
write_big_entry_internal(val + 984, len - 984, (old_big ? (flags >> 32) : 0)); uint64_t first_block = write_big_entry_internal(
val + 984, len - 984, (old_big ? (flags >> 32) : 0));
if (first_block == 0) // error state if (first_block == 0) // error state
{ {
@@ -330,20 +337,25 @@ namespace snug
return 1; return 1;
} }
// out_len carries the length of the output buffer when calling and is replaced // out_len carries the length of the output buffer when calling and is
// with the length of the data found when returning // replaced with the length of the data found when returning
int read_entry_internal(uint8_t* data, uint8_t* key, uint8_t* val_out, uint64_t* out_len) int
read_entry_internal(
uint8_t* data,
uint8_t* key,
uint8_t* val_out,
uint64_t* out_len)
{ {
uint64_t buf_len = *out_len; uint64_t buf_len = *out_len;
// find the entry // find the entry
uint64_t offset = OFFSET(key[0], key[1], (key[2]>>4)); uint64_t offset = OFFSET(key[0], key[1], (key[2] >> 4));
uint8_t* start = data + offset; uint8_t* start = data + offset;
// lock the bucket for reading // lock the bucket for reading
std::shared_lock<std::shared_mutex> lock(mutexes[offset >> 18]); std::shared_lock<std::shared_mutex> lock(mutexes[offset >> 18]);
for (int i = 0; i < 256*1024; i+=1024) for (int i = 0; i < 256 * 1024; i += 1024)
{ {
if (IS_ZERO_ENTRY(start + i)) if (IS_ZERO_ENTRY(start + i))
return 1; return 1;
@@ -363,7 +375,7 @@ namespace snug
*out_len = size; *out_len = size;
size_t to_read = size > 984 ? 984: size; size_t to_read = size > 984 ? 984 : size;
memcpy(val_out, start + i + 40, to_read); memcpy(val_out, start + i + 40, to_read);
val_out += to_read; val_out += to_read;
@@ -395,18 +407,22 @@ namespace snug
return 1; return 1;
} }
void setup() void
setup()
{ {
struct stat path_stat; struct stat path_stat;
if (stat(path.c_str(), &path_stat) != 0) if (stat(path.c_str(), &path_stat) != 0)
throw std::runtime_error("Error checking path: " + path + " - " + std::string(strerror(errno))); throw std::runtime_error(
"Error checking path: " + path + " - " +
std::string(strerror(errno)));
if (!S_ISDIR(path_stat.st_mode)) if (!S_ISDIR(path_stat.st_mode))
throw std::runtime_error("Path is not a directory: " + path); throw std::runtime_error("Path is not a directory: " + path);
if (access(path.c_str(), R_OK | W_OK | X_OK) != 0) if (access(path.c_str(), R_OK | W_OK | X_OK) != 0)
throw std::runtime_error("Insufficient permissions for path: " + path); throw std::runtime_error(
"Insufficient permissions for path: " + path);
// Search for existing snug files sequentially // Search for existing snug files sequentially
std::vector<std::string> snug_files; std::vector<std::string> snug_files;
@@ -427,7 +443,8 @@ namespace snug
std::string new_file = path + "/snug.0"; std::string new_file = path + "/snug.0";
int result = alloc_file(new_file.c_str(), SNUGSIZE); int result = alloc_file(new_file.c_str(), SNUGSIZE);
if (result != 0) if (result != 0)
throw std::runtime_error("Failed to create initial file: " + new_file); throw std::runtime_error(
"Failed to create initial file: " + new_file);
snug_files.push_back("snug.0"); snug_files.push_back("snug.0");
} }
@@ -446,10 +463,17 @@ namespace snug
if (fstat(fd, &file_stat) == -1) if (fstat(fd, &file_stat) == -1)
{ {
close(fd); close(fd);
throw std::runtime_error("Unable to get file stats: " + full_path); throw std::runtime_error(
"Unable to get file stats: " + full_path);
} }
void* mapped = mmap(nullptr, file_stat.st_size, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0); void* mapped = mmap(
nullptr,
file_stat.st_size,
PROT_READ | PROT_WRITE,
MAP_SHARED,
fd,
0);
close(fd); // Can close fd after mmap close(fd); // Can close fd after mmap
if (mapped == MAP_FAILED) if (mapped == MAP_FAILED)
@@ -465,7 +489,8 @@ namespace snug
{ {
int result = alloc_file(new_file.c_str(), BIGSIZE); int result = alloc_file(new_file.c_str(), BIGSIZE);
if (result != 0) if (result != 0)
throw std::runtime_error("Failed to create initial file: " + new_file); throw std::runtime_error(
"Failed to create initial file: " + new_file);
} }
int fd = open(new_file.c_str(), O_RDWR); int fd = open(new_file.c_str(), O_RDWR);
@@ -476,10 +501,17 @@ namespace snug
if (fstat(fd, &file_stat) == -1) if (fstat(fd, &file_stat) == -1)
{ {
close(fd); close(fd);
throw std::runtime_error("Unable to get file stats: " + new_file); throw std::runtime_error(
"Unable to get file stats: " + new_file);
} }
void* mapped = mmap(nullptr, file_stat.st_size, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0); void* mapped = mmap(
nullptr,
file_stat.st_size,
PROT_READ | PROT_WRITE,
MAP_SHARED,
fd,
0);
close(fd); // Can close fd after mmap close(fd); // Can close fd after mmap
if (mapped == MAP_FAILED) if (mapped == MAP_FAILED)
@@ -488,8 +520,8 @@ namespace snug
big_file = static_cast<uint8_t*>(mapped); big_file = static_cast<uint8_t*>(mapped);
} }
} }
public:
public:
SnugDB(std::string path_) : path(path_) SnugDB(std::string path_) : path(path_)
{ {
setup(); setup();
@@ -506,7 +538,8 @@ namespace snug
munmap(big_file, BIGSIZE); munmap(big_file, BIGSIZE);
} }
int write_entry(uint8_t* key, uint8_t* val, ssize_t len) int
write_entry(uint8_t* key, uint8_t* val, ssize_t len)
{ {
for (size_t i = 0; i < mapped_files_count; ++i) for (size_t i = 0; i < mapped_files_count; ++i)
{ {
@@ -523,10 +556,12 @@ namespace snug
// acquire the mutex // acquire the mutex
const std::lock_guard<std::mutex> lock(mapped_files_count_mutex); const std::lock_guard<std::mutex> lock(mapped_files_count_mutex);
std::string new_file = path + "/snug." + std::to_string(mapped_files_count); std::string new_file =
path + "/snug." + std::to_string(mapped_files_count);
int alloc_result = alloc_file(new_file.c_str(), SNUGSIZE); int alloc_result = alloc_file(new_file.c_str(), SNUGSIZE);
if (alloc_result != 0) if (alloc_result != 0)
return alloc_result + 10; // Return error code from alloc_file if it fails (+10) return alloc_result +
10; // Return error code from alloc_file if it fails (+10)
int fd = open(new_file.c_str(), O_RDWR); int fd = open(new_file.c_str(), O_RDWR);
if (fd == -1) if (fd == -1)
@@ -539,7 +574,13 @@ namespace snug
return 2; // Return 2 for fstat failure return 2; // Return 2 for fstat failure
} }
void* mapped = mmap(nullptr, file_stat.st_size, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0); void* mapped = mmap(
nullptr,
file_stat.st_size,
PROT_READ | PROT_WRITE,
MAP_SHARED,
fd,
0);
close(fd); // Can close fd after mmap close(fd); // Can close fd after mmap
if (mapped == MAP_FAILED) if (mapped == MAP_FAILED)
@@ -557,9 +598,9 @@ namespace snug
return write_entry(key, val, len); return write_entry(key, val, len);
} }
int read_entry(uint8_t* key, uint8_t* val_out, uint64_t* out_len_orig) int
read_entry(uint8_t* key, uint8_t* val_out, uint64_t* out_len_orig)
{ {
for (size_t i = 0; i < mapped_files_count; ++i) for (size_t i = 0; i < mapped_files_count; ++i)
{ {
uint64_t out_len = *out_len_orig; uint64_t out_len = *out_len_orig;
@@ -581,16 +622,17 @@ namespace snug
return 1; return 1;
} }
void
void visit_all(void (*f)(uint8_t*, uint8_t*, uint64_t, void* /*opaque caller val*/), visit_all(
void (*f)(uint8_t*, uint8_t*, uint64_t, void* /*opaque caller val*/),
void* opaque) void* opaque)
{ {
// to visit all we only need to check snug.0 to begin with // to visit all we only need to check snug.0 to begin with
// we go to the first bucket // we go to the first bucket
// if we find no entries there we go to the next bucket // if we find no entries there we go to the next bucket
// if we find entries there then we need to count them, // if we find entries there then we need to count them,
// if we find 256 entries there then we go to snug.1 and so on until we run out // if we find 256 entries there then we go to snug.1 and so on until we
// we merge sort the entries into a list for the visit // run out we merge sort the entries into a list for the visit
for (uint64_t bucket = 0; bucket < BUCKET_COUNT; ++bucket) for (uint64_t bucket = 0; bucket < BUCKET_COUNT; ++bucket)
{ {
@@ -603,7 +645,7 @@ namespace snug
if (*((uint64_t*)(ptr + 32)) == 0) if (*((uint64_t*)(ptr + 32)) == 0)
continue; continue;
//if (IS_ZERO_ENTRY(ptr)) // if (IS_ZERO_ENTRY(ptr))
// continue; // continue;
// live bucket, collect entries // live bucket, collect entries
@@ -611,7 +653,8 @@ namespace snug
{ {
// need to acquire the mutex to prevent a race condition // need to acquire the mutex to prevent a race condition
// where a new file is being added while we're searching // where a new file is being added while we're searching
const std::lock_guard<std::mutex> lock(mapped_files_count_mutex); const std::lock_guard<std::mutex> lock(
mapped_files_count_mutex);
// preallocate worst case scenario, RIP memory // preallocate worst case scenario, RIP memory
entries.reserve(mapped_files_count * 256); entries.reserve(mapped_files_count * 256);
@@ -630,9 +673,10 @@ namespace snug
continue; continue;
// sort the entries // sort the entries
std::sort(entries.begin(), entries.end(), std::sort(
[](const uint8_t* a, const uint8_t* b) entries.begin(),
{ entries.end(),
[](const uint8_t* a, const uint8_t* b) {
return memcmp(a, b, 32) < 0; return memcmp(a, b, 32) < 0;
}); });
@@ -651,7 +695,8 @@ namespace snug
} }
// copy big entry to a buffer // copy big entry to a buffer
std::unique_ptr<uint8_t[]> copybuf = std::make_unique<uint8_t[]>(size); std::unique_ptr<uint8_t[]> copybuf =
std::make_unique<uint8_t[]>(size);
uint8_t* data = &(copybuf[0]); uint8_t* data = &(copybuf[0]);
memcpy(data, entry + 40, 984); memcpy(data, entry + 40, 984);
@@ -659,7 +704,6 @@ namespace snug
data += 984; data += 984;
size -= 984; size -= 984;
// big block read logic // big block read logic
while (size > 0) while (size > 0)
{ {
@@ -683,9 +727,7 @@ namespace snug
f(entry, data, (flags & 0xFFFFFFFFULL), opaque); f(entry, data, (flags & 0xFFFFFFFFULL), opaque);
} }
} }
} }
};
}; } // namespace snug
}