[JNI] Add java api and java tests for WriteBatch and WriteOptions, add put() and remove() to RocksDB.

Summary:
* Add java api for rocksdb::WriteBatch and rocksdb::WriteOptions, which are necessary components
  for running benchmark.
* Add java test for org.rocksdb.WriteBatch and org.rocksdb.WriteOptions.
* Add remove() to org.rocksdb.RocksDB, and add put() and remove() to RocksDB which take
  org.rocksdb.WriteOptions.

Test Plan: make jtest

Reviewers: haobo, sdong, dhruba

Reviewed By: sdong

CC: leveldb

Differential Revision: https://reviews.facebook.net/D17373
This commit is contained in:
Yueh-Hsuan Chiang
2014-04-02 13:14:55 -07:00
parent 8c4a3bfa5b
commit da0887a3dc
11 changed files with 906 additions and 24 deletions

View File

@@ -1,4 +1,4 @@
// Copyright (c) 2013, Facebook, Inc. All rights reserved.
// Copyright (c) 2014, Facebook, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.
@@ -64,6 +64,17 @@ public class RocksDB {
put(key, key.length, value, value.length);
}
/**
* Set the database entry for "key" to "value".
*
* @param key the specified key to be inserted.
* @param value the value associated with the specified key.
*/
public void put(WriteOptions writeOpts, byte[] key, byte[] value)
throws RocksDBException {
put(writeOpts.nativeHandle_, key, key.length, value, value.length);
}
/**
* Get the value associated with the specified key.
*
@@ -95,6 +106,25 @@ public class RocksDB {
return get(key, key.length);
}
/**
* Remove the database entry (if any) for "key". Returns OK on
* success, and a non-OK status on error. It is not an error if "key"
* did not exist in the database.
*/
public void remove(byte[] key) throws RocksDBException {
remove(key, key.length);
}
/**
* Remove the database entry (if any) for "key". Returns OK on
* success, and a non-OK status on error. It is not an error if "key"
* did not exist in the database.
*/
public void remove(WriteOptions writeOpt, byte[] key)
throws RocksDBException {
remove(writeOpt.nativeHandle_, key, key.length);
}
@Override protected void finalize() {
close();
}
@@ -108,15 +138,23 @@ public class RocksDB {
// native methods
private native void open0(String path) throws RocksDBException;
private native void open(long optionsHandle, String path) throws RocksDBException;
private native void open(
long optionsHandle, String path) throws RocksDBException;
private native void put(
byte[] key, int keyLen,
byte[] value, int valueLen) throws RocksDBException;
private native void put(
long writeOptHandle, byte[] key, int keyLen,
byte[] value, int valueLen) throws RocksDBException;
private native int get(
byte[] key, int keyLen,
byte[] value, int valueLen) throws RocksDBException;
private native byte[] get(
byte[] key, int keyLen) throws RocksDBException;
private native void remove(
byte[] key, int keyLen) throws RocksDBException;
private native void remove(
long writeOptHandle, byte[] key, int keyLen) throws RocksDBException;
private native void close0();
private long nativeHandle_;

View File

@@ -0,0 +1,121 @@
// Copyright (c) 2014, Facebook, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.
package org.rocksdb;
import java.lang.*;
import java.util.*;
/**
* WriteBatch holds a collection of updates to apply atomically to a DB.
*
* The updates are applied in the order in which they are added
* to the WriteBatch. For example, the value of "key" will be "v3"
* after the following batch is written:
*
* batch.put("key", "v1");
* batch.remove("key");
* batch.put("key", "v2");
* batch.put("key", "v3");
*
* Multiple threads can invoke const methods on a WriteBatch without
* external synchronization, but if any of the threads may call a
* non-const method, all threads accessing the same WriteBatch must use
* external synchronization.
*/
public class WriteBatch {
public WriteBatch() {
nativeHandle_ = 0;
newWriteBatch(0);
}
public WriteBatch(int reserved_bytes) {
nativeHandle_ = 0;
newWriteBatch(reserved_bytes);
}
/**
* Returns the number of updates in the batch.
*/
public native int count();
/**
* Store the mapping "key->value" in the database.
*/
public void put(byte[] key, byte[] value) {
put(key, key.length, value, value.length);
}
/**
* Merge "value" with the existing value of "key" in the database.
* "key->merge(existing, value)"
*/
public void merge(byte[] key, byte[] value) {
merge(key, key.length, value, value.length);
}
/**
* If the database contains a mapping for "key", erase it. Else do nothing.
*/
public void remove(byte[] key) {
remove(key, key.length);
}
/**
* Append a blob of arbitrary size to the records in this batch. The blob will
* be stored in the transaction log but not in any other file. In particular,
* it will not be persisted to the SST files. When iterating over this
* WriteBatch, WriteBatch::Handler::LogData will be called with the contents
* of the blob as it is encountered. Blobs, puts, deletes, and merges will be
* encountered in the same order in thich they were inserted. The blob will
* NOT consume sequence number(s) and will NOT increase the count of the batch
*
* Example application: add timestamps to the transaction log for use in
* replication.
*/
public void putLogData(byte[] blob) {
putLogData(blob, blob.length);
}
/**
* Clear all updates buffered in this batch
*/
public native void clear();
/**
* Delete the c++ side pointer.
*/
public synchronized void dispose() {
if (nativeHandle_ != 0) {
dispose0();
}
}
@Override protected void finalize() {
dispose();
}
private native void newWriteBatch(int reserved_bytes);
private native void put(byte[] key, int keyLen,
byte[] value, int valueLen);
private native void merge(byte[] key, int keyLen,
byte[] value, int valueLen);
private native void remove(byte[] key, int keyLen);
private native void putLogData(byte[] blob, int blobLen);
private native void dispose0();
private long nativeHandle_;
}
/**
* Package-private class which provides java api to access
* c++ WriteBatchInternal.
*/
class WriteBatchInternal {
static native void setSequence(WriteBatch batch, long sn);
static native long sequence(WriteBatch batch);
static native void append(WriteBatch b1, WriteBatch b2);
}

View File

@@ -0,0 +1,125 @@
// Copyright (c) 2014, Facebook, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.
//
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.
package org.rocksdb;
import java.util.*;
import java.lang.*;
import java.io.UnsupportedEncodingException;
/**
* This class mimics the db/write_batch_test.cc in the c++ rocksdb library.
*/
public class WriteBatchTest {
static {
System.loadLibrary("rocksdbjni");
}
public static void main(String args[]) {
System.out.println("Testing WriteBatchTest.Empty ===");
Empty();
System.out.println("Testing WriteBatchTest.Multiple ===");
Multiple();
System.out.println("Testing WriteBatchTest.Append ===");
Append();
System.out.println("Testing WriteBatchTest.Blob ===");
Blob();
// The following tests have not yet ported.
// Continue();
// PutGatherSlices();
System.out.println("Passed all WriteBatchTest!");
}
static void Empty() {
WriteBatch batch = new WriteBatch();
assert(batch.count() == 0);
}
static void Multiple() {
try {
WriteBatch batch = new WriteBatch();
batch.put("foo".getBytes("US-ASCII"), "bar".getBytes("US-ASCII"));
batch.remove("box".getBytes("US-ASCII"));
batch.put("baz".getBytes("US-ASCII"), "boo".getBytes("US-ASCII"));
WriteBatchInternal.setSequence(batch, 100);
assert(100 == WriteBatchInternal.sequence(batch));
assert(3 == batch.count());
assert(new String("Put(baz, boo)@102" +
"Delete(box)@101" +
"Put(foo, bar)@100")
.equals(new String(getContents(batch), "US-ASCII")));
} catch (UnsupportedEncodingException e) {
System.err.println(e);
assert(false);
}
}
static void Append() {
WriteBatch b1 = new WriteBatch();
WriteBatch b2 = new WriteBatch();
WriteBatchInternal.setSequence(b1, 200);
WriteBatchInternal.setSequence(b2, 300);
WriteBatchInternal.append(b1, b2);
assert(getContents(b1).length == 0);
assert(b1.count() == 0);
try {
b2.put("a".getBytes("US-ASCII"), "va".getBytes("US-ASCII"));
WriteBatchInternal.append(b1, b2);
assert("Put(a, va)@200".equals(new String(getContents(b1), "US-ASCII")));
assert(1 == b1.count());
b2.clear();
b2.put("b".getBytes("US-ASCII"), "vb".getBytes("US-ASCII"));
WriteBatchInternal.append(b1, b2);
assert(new String("Put(a, va)@200" +
"Put(b, vb)@201")
.equals(new String(getContents(b1), "US-ASCII")));
assert(2 == b1.count());
b2.remove("foo".getBytes("US-ASCII"));
WriteBatchInternal.append(b1, b2);
assert(new String("Put(a, va)@200" +
"Put(b, vb)@202" +
"Put(b, vb)@201" +
"Delete(foo)@203")
.equals(new String(getContents(b1), "US-ASCII")));
assert(4 == b1.count());
} catch (UnsupportedEncodingException e) {
System.err.println(e);
assert(false);
}
}
static void Blob() {
WriteBatch batch = new WriteBatch();
try {
batch.put("k1".getBytes("US-ASCII"), "v1".getBytes("US-ASCII"));
batch.put("k2".getBytes("US-ASCII"), "v2".getBytes("US-ASCII"));
batch.put("k3".getBytes("US-ASCII"), "v3".getBytes("US-ASCII"));
batch.putLogData("blob1".getBytes("US-ASCII"));
batch.remove("k2".getBytes("US-ASCII"));
batch.putLogData("blob2".getBytes("US-ASCII"));
batch.merge("foo".getBytes("US-ASCII"), "bar".getBytes("US-ASCII"));
assert(5 == batch.count());
assert(new String("Merge(foo, bar)@4" +
"Put(k1, v1)@0" +
"Delete(k2)@3" +
"Put(k2, v2)@1" +
"Put(k3, v3)@2")
.equals(new String(getContents(batch), "US-ASCII")));
} catch (UnsupportedEncodingException e) {
System.err.println(e);
assert(false);
}
}
static native byte[] getContents(WriteBatch batch);
}

View File

@@ -0,0 +1,96 @@
// Copyright (c) 2014, Facebook, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.
package org.rocksdb;
/**
* Options that control write operations.
*
* Note that developers should call WriteOptions.dispose() to release the
* c++ side memory before a WriteOptions instance runs out of scope.
*/
public class WriteOptions {
public WriteOptions() {
nativeHandle_ = 0;
newWriteOptions();
}
public synchronized void dispose() {
if (nativeHandle_ != 0) {
dispose0(nativeHandle_);
}
}
/**
* If true, the write will be flushed from the operating system
* buffer cache (by calling WritableFile::Sync()) before the write
* is considered complete. If this flag is true, writes will be
* slower.
*
* If this flag is false, and the machine crashes, some recent
* writes may be lost. Note that if it is just the process that
* crashes (i.e., the machine does not reboot), no writes will be
* lost even if sync==false.
*
* In other words, a DB write with sync==false has similar
* crash semantics as the "write()" system call. A DB write
* with sync==true has similar crash semantics to a "write()"
* system call followed by "fdatasync()".
*
* Default: false
*/
public void setSync(boolean flag) {
setSync(nativeHandle_, flag);
}
/**
* If true, the write will be flushed from the operating system
* buffer cache (by calling WritableFile::Sync()) before the write
* is considered complete. If this flag is true, writes will be
* slower.
*
* If this flag is false, and the machine crashes, some recent
* writes may be lost. Note that if it is just the process that
* crashes (i.e., the machine does not reboot), no writes will be
* lost even if sync==false.
*
* In other words, a DB write with sync==false has similar
* crash semantics as the "write()" system call. A DB write
* with sync==true has similar crash semantics to a "write()"
* system call followed by "fdatasync()".
*/
public boolean sync() {
return sync(nativeHandle_);
}
/**
* If true, writes will not first go to the write ahead log,
* and the write may got lost after a crash.
*/
public void setDisableWAL(boolean flag) {
setDisableWAL(nativeHandle_, flag);
}
/**
* If true, writes will not first go to the write ahead log,
* and the write may got lost after a crash.
*/
public boolean disableWAL() {
return disableWAL(nativeHandle_);
}
@Override protected void finalize() {
dispose();
}
private native void newWriteOptions();
private native void setSync(long handle, boolean flag);
private native boolean sync(long handle);
private native void setDisableWAL(long handle, boolean flag);
private native boolean disableWAL(long handle);
private native void dispose0(long handle);
protected long nativeHandle_;
}