Use RandomAccessFile in KeyvaDB

This commit is contained in:
Vinnie Falco
2013-07-17 07:02:58 -07:00
parent 74510a95be
commit b81fc4e0c8

View File

@@ -31,12 +31,15 @@ private:
// Accessed by multiple threads // Accessed by multiple threads
struct State struct State
{ {
ScopedPointer <FileInputStream> keyIn; State ()
ScopedPointer <FileOutputStream> keyOut; : keyFile (16384) // buffer size
KeyIndex newKeyIndex; , valFile (16384) // buffer size
{
}
ScopedPointer <FileInputStream> valIn; RandomAccessFile keyFile;
ScopedPointer <FileOutputStream> valOut; RandomAccessFile valFile;
KeyIndex newKeyIndex;
FileOffset valFileSize; FileOffset valFileSize;
bool hasKeys () const noexcept bool hasKeys () const noexcept
@@ -83,26 +86,24 @@ public:
{ {
SharedState::WriteAccess state (m_state); SharedState::WriteAccess state (m_state);
// Output must be opened first, in case it has openFile (&state->keyFile, keyPath);
// to created, or else opening for input will fail.
state->keyOut = openForWrite (keyPath);
state->keyIn = openForRead (keyPath);
int64 const fileSize = state->keyIn->getFile ().getSize (); int64 const fileSize = state->keyFile.getFile ().getSize ();
if (fileSize == 0) if (fileSize == 0)
{ {
// initialize the key file // initialize the key file
state->keyOut->setPosition (keyFileHeaderBytes - 1); RandomAccessFileOutputStream stream (state->keyFile);
state->keyOut->writeByte (0); stream.setPosition (keyFileHeaderBytes - 1);
state->keyOut->flush (); stream.writeByte (0);
stream.flush ();
} }
state->newKeyIndex = 1 + (state->keyIn->getFile ().getSize () - keyFileHeaderBytes) / m_keyRecordBytes; state->newKeyIndex = 1 + (state->keyFile.getFile ().getSize () - keyFileHeaderBytes) / m_keyRecordBytes;
state->valOut = openForWrite (valPath); openFile (&state->valFile, valPath);
state->valIn = openForRead (valPath);
state->valFileSize = state->valIn->getFile ().getSize (); state->valFileSize = state->valFile.getFile ().getSize ();
} }
~KeyvaDBImp () ~KeyvaDBImp ()
@@ -111,22 +112,19 @@ public:
flushInternal (state); flushInternal (state);
state->keyOut = nullptr;
state->valOut = nullptr;
// Delete the database files if requested. // Delete the database files if requested.
// //
if (m_filesAreTemporary) if (m_filesAreTemporary)
{ {
{ {
File const path = state->keyIn->getFile (); File const path = state->keyFile.getFile ();
state->keyIn = nullptr; state->keyFile.close ();
path.deleteFile (); path.deleteFile ();
} }
{ {
File const path = state->valIn->getFile (); File const path = state->valFile.getFile ();
state->valIn = nullptr; state->valFile.close ();
path.deleteFile (); path.deleteFile ();
} }
} }
@@ -161,29 +159,33 @@ public:
} }
// Read a key record into memory. // Read a key record into memory.
// VFALCO TODO Return a Result and do validity checking on all inputs
//
void readKeyRecord (KeyRecord* const keyRecord, void readKeyRecord (KeyRecord* const keyRecord,
KeyIndex const keyIndex, KeyIndex const keyIndex,
SharedState::WriteAccess& state) SharedState::WriteAccess& state)
{ {
FileOffset const byteOffset = calcKeyRecordOffset (keyIndex); FileOffset const byteOffset = calcKeyRecordOffset (keyIndex);
bool const success = state->keyIn->setPosition (byteOffset); RandomAccessFileInputStream stream (state->keyFile);
bool const success = stream.setPosition (byteOffset);
if (success) if (success)
{ {
// This defines the file format! // This defines the file format!
keyRecord->valFileOffset = state->keyIn->readInt64BigEndian (); keyRecord->valFileOffset = stream.readInt64BigEndian ();
keyRecord->valSize = state->keyIn->readIntBigEndian (); keyRecord->valSize = stream.readIntBigEndian ();
keyRecord->leftIndex = state->keyIn->readIntBigEndian (); keyRecord->leftIndex = stream.readIntBigEndian ();
keyRecord->rightIndex = state->keyIn->readIntBigEndian (); keyRecord->rightIndex = stream.readIntBigEndian ();
// Grab the key // Grab the key
state->keyIn->read (keyRecord->key, m_keyBytes); stream.read (keyRecord->key, m_keyBytes);
} }
else else
{ {
String s; String s;
s << "KeyvaDB: Seek failed in " << state->valOut->getFile ().getFileName (); s << "KeyvaDB: Seek failed in " << state->keyFile.getFile ().getFileName ();
Throw (std::runtime_error (s.toStdString ())); Throw (std::runtime_error (s.toStdString ()));
} }
} }
@@ -196,65 +198,70 @@ public:
{ {
FileOffset const byteOffset = calcKeyRecordOffset (keyIndex); FileOffset const byteOffset = calcKeyRecordOffset (keyIndex);
bool const success = state->keyOut->setPosition (byteOffset); RandomAccessFileOutputStream stream (state->keyFile);
bool const success = stream.setPosition (byteOffset);
if (success) if (success)
{ {
// This defines the file format! // This defines the file format!
// VFALCO TODO Make OutputStream return the bool errors here // VFALCO TODO Make OutputStream return the bool errors here
// //
state->keyOut->writeInt64BigEndian (keyRecord.valFileOffset); stream.writeInt64BigEndian (keyRecord.valFileOffset);
state->keyOut->writeIntBigEndian (keyRecord.valSize); stream.writeIntBigEndian (keyRecord.valSize);
state->keyOut->writeIntBigEndian (keyRecord.leftIndex); stream.writeIntBigEndian (keyRecord.leftIndex);
state->keyOut->writeIntBigEndian (keyRecord.rightIndex); stream.writeIntBigEndian (keyRecord.rightIndex);
// Write the key // Write the key
if (includingKey) if (includingKey)
{ {
bool const success = state->keyOut->write (keyRecord.key, m_keyBytes); bool const success = stream.write (keyRecord.key, m_keyBytes);
if (! success) if (! success)
{ {
String s; String s;
s << "KeyvaDB: Write failed in " << state->valOut->getFile ().getFileName (); s << "KeyvaDB: Write failed in " << state->keyFile.getFile ().getFileName ();
Throw (std::runtime_error (s.toStdString ())); Throw (std::runtime_error (s.toStdString ()));
} }
} }
state->keyOut->flush (); //stream.flush ();
} }
else else
{ {
String s; String s;
s << "KeyvaDB: Seek failed in " << state->valOut->getFile ().getFileName (); s << "KeyvaDB: Seek failed in " << state->keyFile.getFile ().getFileName ();
Throw (std::runtime_error (s.toStdString ())); Throw (std::runtime_error (s.toStdString ()));
} }
} }
// Append a value to the value file. // Append a value to the value file.
// VFALCO TODO return a Result
void writeValue (void const* const value, ByteSize valueBytes, SharedState::WriteAccess& state) void writeValue (void const* const value, ByteSize valueBytes, SharedState::WriteAccess& state)
{ {
bool const success = state->valOut->setPosition (state->valFileSize); RandomAccessFileOutputStream stream (state->valFile);
bool const success = stream.setPosition (state->valFileSize);
if (success) if (success)
{ {
bool const success = state->valOut->write (value, static_cast <size_t> (valueBytes)); bool const success = stream.write (value, static_cast <size_t> (valueBytes));
if (! success) if (! success)
{ {
String s; String s;
s << "KeyvaDB: Write failed in " << state->valOut->getFile ().getFileName (); s << "KeyvaDB: Write failed in " << state->valFile.getFile ().getFileName ();
Throw (std::runtime_error (s.toStdString ())); Throw (std::runtime_error (s.toStdString ()));
} }
state->valFileSize += valueBytes; state->valFileSize += valueBytes;
state->valOut->flush (); //stream.flush ();
} }
else else
{ {
String s; String s;
s << "KeyvaDB: Seek failed in " << state->valOut->getFile ().getFileName (); s << "KeyvaDB: Seek failed in " << state->valFile.getFile ().getFileName ();
Throw (std::runtime_error (s.toStdString ())); Throw (std::runtime_error (s.toStdString ()));
} }
} }
@@ -329,9 +336,9 @@ public:
bool get (void const* key, GetCallback* callback) bool get (void const* key, GetCallback* callback)
{ {
FindResult findResult (m_keyStorage.getData ()); // VFALCO TODD Swap these two lines
SharedState::WriteAccess state (m_state); SharedState::WriteAccess state (m_state);
FindResult findResult (m_keyStorage.getData ());
bool found = false; bool found = false;
@@ -343,21 +350,23 @@ public:
{ {
void* const destStorage = callback->createStorageForValue (findResult.keyRecord.valSize); void* const destStorage = callback->createStorageForValue (findResult.keyRecord.valSize);
bool const success = state->valIn->setPosition (findResult.keyRecord.valFileOffset); RandomAccessFileInputStream stream (state->valFile);
bool const success = stream.setPosition (findResult.keyRecord.valFileOffset);
if (! success) if (! success)
{ {
String s; String s;
s << "KeyvaDB: Seek failed in " << state->valOut->getFile ().getFileName (); s << "KeyvaDB: Seek failed in " << state->valFile.getFile ().getFileName ();
Throw (std::runtime_error (s.toStdString ())); Throw (std::runtime_error (s.toStdString ()));
} }
int const bytesRead = state->valIn->read (destStorage, findResult.keyRecord.valSize); int const bytesRead = stream.read (destStorage, findResult.keyRecord.valSize);
if (bytesRead != findResult.keyRecord.valSize) if (bytesRead != findResult.keyRecord.valSize)
{ {
String s; String s;
s << "KeyvaDB: Couldn't read a value from " << state->valIn->getFile ().getFileName (); s << "KeyvaDB: Couldn't read a value from " << state->valFile.getFile ().getFileName ();
Throw (std::runtime_error (s.toStdString ())); Throw (std::runtime_error (s.toStdString ()));
} }
} }
@@ -421,9 +430,12 @@ public:
} }
else else
{ {
// Do nothing
/*
String s; String s;
s << "KeyvaDB: Attempt to write a duplicate key!"; s << "KeyvaDB: Attempt to write a duplicate key!";
Throw (std::runtime_error (s.toStdString ())); Throw (std::runtime_error (s.toStdString ()));
*/
} }
} }
else else
@@ -467,41 +479,25 @@ public:
void flushInternal (SharedState::WriteAccess& state) void flushInternal (SharedState::WriteAccess& state)
{ {
state->keyOut->flush (); state->keyFile.flush ();
state->valOut->flush (); state->valFile.flush ();
} }
//-------------------------------------------------------------------------- //--------------------------------------------------------------------------
private: private:
// Open a file for reading. // Open a file for reading and writing.
static FileInputStream* openForRead (File path) // Creates the file if it doesn't exist.
static void openFile (RandomAccessFile* file, File path)
{ {
FileInputStream* stream = path.createInputStream (); Result const result = file->open (path, RandomAccessFile::readWrite);
if (stream == nullptr) if (! result)
{
String s;
s << "KeyvaDB: Couldn't open " << path.getFileName () << " for reading.";
Throw (std::runtime_error (s.toStdString ()));
}
return stream;
}
// Open a file for writing.
static FileOutputStream* openForWrite (File path)
{
FileOutputStream* stream = path.createOutputStream ();
if (stream == nullptr)
{ {
String s; String s;
s << "KeyvaDB: Couldn't open " << path.getFileName () << " for writing."; s << "KeyvaDB: Couldn't open " << path.getFileName () << " for writing.";
Throw (std::runtime_error (s.toStdString ())); Throw (std::runtime_error (s.toStdString ()));
} }
return stream;
} }
private: private:
@@ -521,11 +517,70 @@ KeyvaDB* KeyvaDB::New (int keyBytes, File keyPath, File valPath, bool filesAreTe
class KeyvaDBTests : public UnitTest class KeyvaDBTests : public UnitTest
{ {
public:
enum
{
maxPayloadBytes = 8 * 1024
};
// The payload is used as the value to store
struct Payload
{
Payload (int maxBytes)
: bufferSize (maxBytes)
, data (maxBytes)
{
}
// Create a pseudo-random payload
void generate (int64 seedValue) noexcept
{
Random r (seedValue);
bytes = 1 + r.nextInt (bufferSize);
bassert (bytes >= 1 && bytes <= bufferSize);
for (int i = 0; i < bytes; ++i)
data [i] = static_cast <unsigned char> (r.nextInt ());
}
bool operator== (Payload const& other) const noexcept
{
if (bytes == other.bytes)
{
return memcmp (data.getData (), other.data.getData (), bytes) == 0;
}
else
{
return false;
}
}
int const bufferSize;
int bytes;
HeapBlock <char> data;
};
public: public:
KeyvaDBTests () : UnitTest ("KevyaDB") KeyvaDBTests () : UnitTest ("KevyaDB")
{ {
} }
// Make sure the Payload object works first!
void testPayload ()
{
beginTest ("Payload");
Payload p1 (maxPayloadBytes);
Payload p2 (maxPayloadBytes);
for (int i = 0; i < 256; ++i)
{
p1.generate (i);
p2.generate (i);
expect (p1 == p2, "Should be equal");
}
}
template <class T> template <class T>
void repeatableShuffle (int const numberOfItems, HeapBlock <T>& items) void repeatableShuffle (int const numberOfItems, HeapBlock <T>& items)
{ {
@@ -539,6 +594,25 @@ public:
} }
} }
// Retrieval callback stores the value in a Payload object for comparison
struct PayloadGetCallback : KeyvaDB::GetCallback
{
Payload payload;
PayloadGetCallback () : payload (maxPayloadBytes)
{
}
void* createStorageForValue (int valueBytes)
{
bassert (valueBytes <= maxPayloadBytes);
payload.bytes = valueBytes;
return payload.data.getData ();
}
};
template <unsigned int KeyBytes> template <unsigned int KeyBytes>
void testSize (unsigned int const maxItems) void testSize (unsigned int const maxItems)
{ {
@@ -553,6 +627,8 @@ public:
File const valPath = File::createTempFile ("").withFileExtension (".val"); File const valPath = File::createTempFile ("").withFileExtension (".val");
ScopedPointer <KeyvaDB> db (KeyvaDB::New (KeyBytes, keyPath, valPath, true)); ScopedPointer <KeyvaDB> db (KeyvaDB::New (KeyBytes, keyPath, valPath, true));
Payload payload (maxPayloadBytes);
{ {
// Create an array of ascending integers. // Create an array of ascending integers.
HeapBlock <unsigned int> items (maxItems); HeapBlock <unsigned int> items (maxItems);
@@ -565,51 +641,44 @@ public:
// Write all the keys of integers. // Write all the keys of integers.
for (unsigned int i = 0; i < maxItems; ++i) for (unsigned int i = 0; i < maxItems; ++i)
{ {
unsigned int const num = items [i]; unsigned int num = items [i];
KeyType const v = KeyType::createFromInteger (num);
// The value is the same as the key, for ease of comparison. KeyType const key = KeyType::createFromInteger (num);
db->put (v.cbegin (), v.cbegin (), KeyBytes);
payload.generate (num);
db->put (key.cbegin (), payload.data.getData (), payload.bytes);
} }
} }
{ {
// This callback creates storage for the value.
struct MyGetCallback : KeyvaDB::GetCallback
{
KeyType v;
void* createStorageForValue (int valueBytes)
{
bassert (valueBytes == KeyBytes);
return v.begin ();
}
};
// Go through all of our keys and try to retrieve them. // Go through all of our keys and try to retrieve them.
// since this is done in ascending order, we should get // since this is done in ascending order, we should get
// random seeks at this point. // random seeks at this point.
// //
PayloadGetCallback cb;
for (unsigned int i = 0; i < maxItems; ++i) for (unsigned int i = 0; i < maxItems; ++i)
{ {
KeyType const v = KeyType::createFromInteger (i); KeyType const v = KeyType::createFromInteger (i);
MyGetCallback cb;
bool const found = db->get (v.cbegin (), &cb); bool const found = db->get (v.cbegin (), &cb);
expect (found, "Should be found"); expect (found, "Should be found");
expect (v == cb.v, "Should be equal"); payload.generate (i);
expect (payload == cb.payload, "Should be equal");
} }
} }
} }
void runTest () void runTest ()
{ {
testSize <4> (512); testPayload ();
testSize <32> (4096);
//testSize <4> (512);
//testSize <32> (4096);
testSize <4> (4);
} }
}; };