Compare commits

..

3 Commits

Author SHA1 Message Date
Jingchen
d5a3923228 Merge branch 'develop' into dangell/relay 2025-07-23 14:16:25 +01:00
Denis Angell
d4bfb4feec [temp] Optional Open Ledger 2025-07-19 21:58:59 +02:00
Denis Angell
7cffafb8ce relay transactions at earliest moment
- update TxQ::apply to minimize preflight/preclaim calls
- add read only open view for NetworkOPs::apply preflight & preclaim
- NetworkOPs::apply call preflight and preclaim, then relay, then apply
2025-07-19 10:12:25 +02:00
36 changed files with 4159 additions and 764 deletions

View File

@@ -10,6 +10,7 @@ runs:
shell: bash
run: |
conan export --version 1.1.10 external/snappy
conan export --version 9.7.3 external/rocksdb
conan export --version 4.0.3 external/soci
- name: add Ripple Conan remote
if: env.CONAN_URL != ''
@@ -21,6 +22,7 @@ runs:
fi
conan remote add --index 0 ripple "${CONAN_URL}"
echo "Added conan remote ripple at ${CONAN_URL}"
- name: try to authenticate to Ripple Conan remote
if: env.CONAN_LOGIN_USERNAME_RIPPLE != '' && env.CONAN_PASSWORD_RIPPLE != ''
id: remote
@@ -29,6 +31,7 @@ runs:
echo "Authenticating to ripple remote..."
conan remote auth ripple --force
conan remote list-users
- name: list missing binaries
id: binaries
shell: bash

View File

@@ -8,7 +8,7 @@ on:
jobs:
check:
if: ${{ github.event_name == 'push' || github.event.pull_request.draft != true || contains(github.event.pull_request.labels.*.name, 'DraftRunCI') }}
runs-on: [self-hosted, Linux, X64, devbox]
runs-on: ubuntu-24.04
container: ghcr.io/xrplf/ci/tools-rippled-clang-format
steps:
# For jobs running in containers, $GITHUB_WORKSPACE and ${{ github.workspace }} might not be the

View File

@@ -24,6 +24,8 @@ env:
CONAN_GLOBAL_CONF: |
core.download:parallel={{os.cpu_count()}}
core.upload:parallel={{os.cpu_count()}}
core:default_build_profile=libxrpl
core:default_profile=libxrpl
tools.build:jobs={{ (os.cpu_count() * 4/5) | int }}
tools.build:verbosity=verbose
tools.compilation:verbosity=verbose
@@ -40,7 +42,7 @@ jobs:
- Ninja
configuration:
- Release
runs-on: [self-hosted, macOS, devbox]
runs-on: [self-hosted, macOS, mac-runner-m1]
env:
# The `build` action requires these variables.
build_dir: .build
@@ -94,6 +96,7 @@ jobs:
shell: bash
run: |
conan export --version 1.1.10 external/snappy
conan export --version 9.7.3 external/rocksdb
conan export --version 4.0.3 external/soci
- name: add Ripple Conan remote
if: env.CONAN_URL != ''

View File

@@ -25,6 +25,8 @@ env:
CONAN_GLOBAL_CONF: |
core.download:parallel={{ os.cpu_count() }}
core.upload:parallel={{ os.cpu_count() }}
core:default_build_profile=libxrpl
core:default_profile=libxrpl
tools.build:jobs={{ (os.cpu_count() * 4/5) | int }}
tools.build:verbosity=verbose
tools.compilation:verbosity=verbose
@@ -74,7 +76,7 @@ jobs:
compiler_version: 16
distro: debian
codename: bookworm
runs-on: [self-hosted, Linux, X64, devbox]
runs-on: [self-hosted, heavy]
container: ghcr.io/xrplf/ci/${{ matrix.distro }}-${{ matrix.codename }}:${{ matrix.compiler }}-${{ matrix.compiler_version }}
env:
build_dir: .build
@@ -99,6 +101,7 @@ jobs:
run: tar -czf conan.tar.gz -C ${CONAN_HOME} .
- name: build dependencies
uses: ./.github/actions/dependencies
with:
configuration: ${{ matrix.configuration }}
- name: upload archive
@@ -133,7 +136,7 @@ jobs:
-
- "-Dunity=ON"
needs: dependencies
runs-on: [self-hosted, Linux, X64, devbox]
runs-on: [self-hosted, heavy]
container: ghcr.io/xrplf/ci/${{ matrix.distro }}-${{ matrix.codename }}:${{ matrix.compiler }}-${{ matrix.compiler_version }}
env:
build_dir: .build
@@ -194,7 +197,7 @@ jobs:
- "-DUNIT_TEST_REFERENCE_FEE=200"
- "-DUNIT_TEST_REFERENCE_FEE=1000"
needs: dependencies
runs-on: [self-hosted, Linux, X64, devbox]
runs-on: [self-hosted, heavy]
container: ghcr.io/xrplf/ci/ubuntu-jammy:gcc-12
env:
build_dir: .build
@@ -242,7 +245,7 @@ jobs:
configuration:
- Debug
needs: dependencies
runs-on: [self-hosted, Linux, X64, devbox]
runs-on: [self-hosted, heavy]
container: ghcr.io/xrplf/ci/ubuntu-jammy:gcc-12
env:
build_dir: .build
@@ -309,7 +312,7 @@ jobs:
conan:
needs: dependencies
runs-on: [self-hosted, Linux, X64, devbox]
runs-on: [self-hosted, heavy]
container:
image: ghcr.io/xrplf/ci/ubuntu-jammy:gcc-12
env:
@@ -356,44 +359,39 @@ jobs:
cmake --build .
./example | grep '^[[:digit:]]\+\.[[:digit:]]\+\.[[:digit:]]\+'
# NOTE we are not using dependencies built above because it lags with
# compiler versions. Instrumentation requires clang version 16 or
# later
instrumentation-build:
needs: dependencies
runs-on: [self-hosted, Linux, X64, devbox]
container: ghcr.io/xrplf/ci/debian-bookworm:clang-16
if: ${{ github.event_name == 'push' || github.event.pull_request.draft != true || contains(github.event.pull_request.labels.*.name, 'DraftRunCI') }}
env:
build_dir: .build
CLANG_RELEASE: 16
runs-on: [self-hosted, heavy]
container: ghcr.io/xrplf/ci/debian-bookworm:clang-16
steps:
- name: download cache
uses: actions/download-artifact@d3f86a106a0bac45b974a628896c90dbdf5c8093
with:
name: linux-clang-Debug
- name: extract cache
run: |
mkdir -p ${CONAN_HOME}
tar -xzf conan.tar.gz -C ${CONAN_HOME}
- name: check environment
run: |
echo ${PATH} | tr ':' '\n'
conan --version
cmake --version
env | sort
ls ${CONAN_HOME}
- name: checkout
uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683
- name: dependencies
uses: ./.github/actions/dependencies
with:
configuration: Debug
- name: prepare environment
run: |
mkdir -p ${build_dir}
echo "SOURCE_DIR=$(pwd)" >> $GITHUB_ENV
echo "BUILD_DIR=$(pwd)/${build_dir}" >> $GITHUB_ENV
mkdir ${GITHUB_WORKSPACE}/.build
echo "SOURCE_DIR=$GITHUB_WORKSPACE" >> $GITHUB_ENV
echo "BUILD_DIR=$GITHUB_WORKSPACE/.build" >> $GITHUB_ENV
- name: configure Conan
run: |
echo "${CONAN_GLOBAL_CONF}" >> $(conan config home)/global.conf
conan config install conan/profiles/ -tf $(conan config home)/profiles/
conan profile show
- name: build dependencies
run: |
cd ${BUILD_DIR}
conan install ${SOURCE_DIR} \
--output-folder ${BUILD_DIR} \
--build missing \
--settings:all build_type=Debug
- name: build with instrumentation
run: |

View File

@@ -27,6 +27,8 @@ env:
CONAN_GLOBAL_CONF: |
core.download:parallel={{os.cpu_count()}}
core.upload:parallel={{os.cpu_count()}}
core:default_build_profile=libxrpl
core:default_profile=libxrpl
tools.build:jobs=24
tools.build:verbosity=verbose
tools.compilation:verbosity=verbose
@@ -40,7 +42,7 @@ jobs:
matrix:
version:
- generator: Visual Studio 17 2022
runs-on: [self-hosted, Windows, devbox]
runs-on: windows-2022
configuration:
- type: Release
tests: true
@@ -89,6 +91,7 @@ jobs:
shell: bash
run: |
conan export --version 1.1.10 external/snappy
conan export --version 9.7.3 external/rocksdb
conan export --version 4.0.3 external/soci
- name: add Ripple Conan remote
if: env.CONAN_URL != ''

View File

@@ -171,6 +171,14 @@ which allows you to statically link it with GCC, if you want.
conan export --version 1.1.10 external/snappy
```
Export our [Conan recipe for RocksDB](./external/rocksdb).
It does not override paths to dependencies when building with Visual Studio.
```
# Conan 2.x
conan export --version 9.7.3 external/rocksdb
```
Export our [Conan recipe for SOCI](./external/soci).
It patches their CMake to correctly import its dependencies.

View File

@@ -9,7 +9,6 @@
[settings]
os={{ os }}
arch={{ arch }}
build_type=Debug
compiler={{compiler}}
compiler.version={{ compiler_version }}
compiler.cppstd=20
@@ -18,20 +17,3 @@ compiler.runtime=static
{% else %}
compiler.libcxx={{detect_api.detect_libcxx(compiler, version, compiler_exe)}}
{% endif %}
[conf]
{% if compiler == "clang" and compiler_version >= 19 %}
tools.build:cxxflags=['-Wno-missing-template-arg-list-after-template-kw']
{% endif %}
{% if compiler == "apple-clang" and compiler_version >= 17 %}
tools.build:cxxflags=['-Wno-missing-template-arg-list-after-template-kw']
{% endif %}
{% if compiler == "clang" and compiler_version == 16 %}
tools.build:cxxflags=['-DBOOST_ASIO_DISABLE_CONCEPTS']
{% endif %}
{% if compiler == "gcc" and compiler_version < 13 %}
tools.build:cxxflags=['-Wno-restrict']
{% endif %}
[tool_requires]
!cmake/*: cmake/[>=3 <4]

View File

@@ -112,7 +112,7 @@ class Xrpl(ConanFile):
if self.options.jemalloc:
self.requires('jemalloc/5.3.0')
if self.options.rocksdb:
self.requires('rocksdb/10.0.1')
self.requires('rocksdb/9.7.3')
self.requires('xxhash/0.8.3', **transitive_headers_opt)
exports_sources = (
@@ -143,6 +143,8 @@ class Xrpl(ConanFile):
tc.variables['static'] = self.options.static
tc.variables['unity'] = self.options.unity
tc.variables['xrpld'] = self.options.xrpld
if self.settings.compiler == 'clang' and self.settings.compiler.version == 16:
tc.extra_cxxflags = ["-DBOOST_ASIO_DISABLE_CONCEPTS"]
tc.generate()
def build(self):

12
external/rocksdb/conandata.yml vendored Normal file
View File

@@ -0,0 +1,12 @@
sources:
"9.7.3":
url: "https://github.com/facebook/rocksdb/archive/refs/tags/v9.7.3.tar.gz"
sha256: "acfabb989cbfb5b5c4d23214819b059638193ec33dad2d88373c46448d16d38b"
patches:
"9.7.3":
- patch_file: "patches/9.x.x-0001-exclude-thirdparty.patch"
patch_description: "Do not include thirdparty.inc"
patch_type: "portability"
- patch_file: "patches/9.7.3-0001-memory-leak.patch"
patch_description: "Fix a leak of obsolete blob files left open until DB::Close()"
patch_type: "portability"

235
external/rocksdb/conanfile.py vendored Normal file
View File

@@ -0,0 +1,235 @@
import os
import glob
import shutil
from conan import ConanFile
from conan.errors import ConanInvalidConfiguration
from conan.tools.build import check_min_cppstd
from conan.tools.cmake import CMake, CMakeDeps, CMakeToolchain, cmake_layout
from conan.tools.files import apply_conandata_patches, collect_libs, copy, export_conandata_patches, get, rm, rmdir
from conan.tools.microsoft import check_min_vs, is_msvc, is_msvc_static_runtime
from conan.tools.scm import Version
required_conan_version = ">=1.53.0"
class RocksDBConan(ConanFile):
name = "rocksdb"
description = "A library that provides an embeddable, persistent key-value store for fast storage"
license = ("GPL-2.0-only", "Apache-2.0")
url = "https://github.com/conan-io/conan-center-index"
homepage = "https://github.com/facebook/rocksdb"
topics = ("database", "leveldb", "facebook", "key-value")
package_type = "library"
settings = "os", "arch", "compiler", "build_type"
options = {
"shared": [True, False],
"fPIC": [True, False],
"lite": [True, False],
"with_gflags": [True, False],
"with_snappy": [True, False],
"with_lz4": [True, False],
"with_zlib": [True, False],
"with_zstd": [True, False],
"with_tbb": [True, False],
"with_jemalloc": [True, False],
"enable_sse": [False, "sse42", "avx2"],
"use_rtti": [True, False],
}
default_options = {
"shared": False,
"fPIC": True,
"lite": False,
"with_snappy": False,
"with_lz4": False,
"with_zlib": False,
"with_zstd": False,
"with_gflags": False,
"with_tbb": False,
"with_jemalloc": False,
"enable_sse": False,
"use_rtti": False,
}
@property
def _min_cppstd(self):
return "11" if Version(self.version) < "8.8.1" else "17"
@property
def _compilers_minimum_version(self):
return {} if self._min_cppstd == "11" else {
"apple-clang": "10",
"clang": "7",
"gcc": "7",
"msvc": "191",
"Visual Studio": "15",
}
def export_sources(self):
export_conandata_patches(self)
def config_options(self):
if self.settings.os == "Windows":
del self.options.fPIC
if self.settings.arch != "x86_64":
del self.options.with_tbb
if self.settings.build_type == "Debug":
self.options.use_rtti = True # Rtti are used in asserts for debug mode...
def configure(self):
if self.options.shared:
self.options.rm_safe("fPIC")
def layout(self):
cmake_layout(self, src_folder="src")
def requirements(self):
if self.options.with_gflags:
self.requires("gflags/2.2.2")
if self.options.with_snappy:
self.requires("snappy/1.1.10")
if self.options.with_lz4:
self.requires("lz4/1.10.0")
if self.options.with_zlib:
self.requires("zlib/[>=1.2.11 <2]")
if self.options.with_zstd:
self.requires("zstd/1.5.6")
if self.options.get_safe("with_tbb"):
self.requires("onetbb/2021.12.0")
if self.options.with_jemalloc:
self.requires("jemalloc/5.3.0")
def validate(self):
if self.settings.compiler.get_safe("cppstd"):
check_min_cppstd(self, self._min_cppstd)
minimum_version = self._compilers_minimum_version.get(str(self.settings.compiler), False)
if minimum_version and Version(self.settings.compiler.version) < minimum_version:
raise ConanInvalidConfiguration(
f"{self.ref} requires C++{self._min_cppstd}, which your compiler does not support."
)
if self.settings.arch not in ["x86_64", "ppc64le", "ppc64", "mips64", "armv8"]:
raise ConanInvalidConfiguration("Rocksdb requires 64 bits")
check_min_vs(self, "191")
if self.version == "6.20.3" and \
self.settings.os == "Linux" and \
self.settings.compiler == "gcc" and \
Version(self.settings.compiler.version) < "5":
raise ConanInvalidConfiguration("Rocksdb 6.20.3 is not compilable with gcc <5.") # See https://github.com/facebook/rocksdb/issues/3522
def source(self):
get(self, **self.conan_data["sources"][self.version], strip_root=True)
def generate(self):
tc = CMakeToolchain(self)
tc.variables["FAIL_ON_WARNINGS"] = False
tc.variables["WITH_TESTS"] = False
tc.variables["WITH_TOOLS"] = False
tc.variables["WITH_CORE_TOOLS"] = False
tc.variables["WITH_BENCHMARK_TOOLS"] = False
tc.variables["WITH_FOLLY_DISTRIBUTED_MUTEX"] = False
if is_msvc(self):
tc.variables["WITH_MD_LIBRARY"] = not is_msvc_static_runtime(self)
tc.variables["ROCKSDB_INSTALL_ON_WINDOWS"] = self.settings.os == "Windows"
tc.variables["ROCKSDB_LITE"] = self.options.lite
tc.variables["WITH_GFLAGS"] = self.options.with_gflags
tc.variables["WITH_SNAPPY"] = self.options.with_snappy
tc.variables["WITH_LZ4"] = self.options.with_lz4
tc.variables["WITH_ZLIB"] = self.options.with_zlib
tc.variables["WITH_ZSTD"] = self.options.with_zstd
tc.variables["WITH_TBB"] = self.options.get_safe("with_tbb", False)
tc.variables["WITH_JEMALLOC"] = self.options.with_jemalloc
tc.variables["ROCKSDB_BUILD_SHARED"] = self.options.shared
tc.variables["ROCKSDB_LIBRARY_EXPORTS"] = self.settings.os == "Windows" and self.options.shared
tc.variables["ROCKSDB_DLL" ] = self.settings.os == "Windows" and self.options.shared
tc.variables["USE_RTTI"] = self.options.use_rtti
if not bool(self.options.enable_sse):
tc.variables["PORTABLE"] = True
tc.variables["FORCE_SSE42"] = False
elif self.options.enable_sse == "sse42":
tc.variables["PORTABLE"] = True
tc.variables["FORCE_SSE42"] = True
elif self.options.enable_sse == "avx2":
tc.variables["PORTABLE"] = False
tc.variables["FORCE_SSE42"] = False
# not available yet in CCI
tc.variables["WITH_NUMA"] = False
tc.generate()
deps = CMakeDeps(self)
if self.options.with_jemalloc:
deps.set_property("jemalloc", "cmake_file_name", "JeMalloc")
deps.set_property("jemalloc", "cmake_target_name", "JeMalloc::JeMalloc")
if self.options.with_zstd:
deps.set_property("zstd", "cmake_target_name", "zstd::zstd")
deps.generate()
def build(self):
apply_conandata_patches(self)
cmake = CMake(self)
cmake.configure()
cmake.build()
def _remove_static_libraries(self):
rm(self, "rocksdb.lib", os.path.join(self.package_folder, "lib"))
for lib in glob.glob(os.path.join(self.package_folder, "lib", "*.a")):
if not lib.endswith(".dll.a"):
os.remove(lib)
def _remove_cpp_headers(self):
for path in glob.glob(os.path.join(self.package_folder, "include", "rocksdb", "*")):
if path != os.path.join(self.package_folder, "include", "rocksdb", "c.h"):
if os.path.isfile(path):
os.remove(path)
else:
shutil.rmtree(path)
def package(self):
copy(self, "COPYING", src=self.source_folder, dst=os.path.join(self.package_folder, "licenses"))
copy(self, "LICENSE*", src=self.source_folder, dst=os.path.join(self.package_folder, "licenses"))
cmake = CMake(self)
cmake.install()
if self.options.shared:
self._remove_static_libraries()
self._remove_cpp_headers() # Force stable ABI for shared libraries
rmdir(self, os.path.join(self.package_folder, "lib", "cmake"))
rmdir(self, os.path.join(self.package_folder, "lib", "pkgconfig"))
def package_info(self):
cmake_target = "rocksdb-shared" if self.options.shared else "rocksdb"
self.cpp_info.set_property("cmake_file_name", "RocksDB")
self.cpp_info.set_property("cmake_target_name", f"RocksDB::{cmake_target}")
# TODO: back to global scope in conan v2 once cmake_find_package* generators removed
self.cpp_info.components["librocksdb"].libs = collect_libs(self)
if self.settings.os == "Windows":
self.cpp_info.components["librocksdb"].system_libs = ["shlwapi", "rpcrt4"]
if self.options.shared:
self.cpp_info.components["librocksdb"].defines = ["ROCKSDB_DLL"]
elif self.settings.os in ["Linux", "FreeBSD"]:
self.cpp_info.components["librocksdb"].system_libs = ["pthread", "m"]
if self.options.lite:
self.cpp_info.components["librocksdb"].defines.append("ROCKSDB_LITE")
# TODO: to remove in conan v2 once cmake_find_package* generators removed
self.cpp_info.names["cmake_find_package"] = "RocksDB"
self.cpp_info.names["cmake_find_package_multi"] = "RocksDB"
self.cpp_info.components["librocksdb"].names["cmake_find_package"] = cmake_target
self.cpp_info.components["librocksdb"].names["cmake_find_package_multi"] = cmake_target
self.cpp_info.components["librocksdb"].set_property("cmake_target_name", f"RocksDB::{cmake_target}")
if self.options.with_gflags:
self.cpp_info.components["librocksdb"].requires.append("gflags::gflags")
if self.options.with_snappy:
self.cpp_info.components["librocksdb"].requires.append("snappy::snappy")
if self.options.with_lz4:
self.cpp_info.components["librocksdb"].requires.append("lz4::lz4")
if self.options.with_zlib:
self.cpp_info.components["librocksdb"].requires.append("zlib::zlib")
if self.options.with_zstd:
self.cpp_info.components["librocksdb"].requires.append("zstd::zstd")
if self.options.get_safe("with_tbb"):
self.cpp_info.components["librocksdb"].requires.append("onetbb::onetbb")
if self.options.with_jemalloc:
self.cpp_info.components["librocksdb"].requires.append("jemalloc::jemalloc")

View File

@@ -0,0 +1,319 @@
diff --git a/HISTORY.md b/HISTORY.md
index 36d472229..05ad1a202 100644
--- a/HISTORY.md
+++ b/HISTORY.md
@@ -1,6 +1,10 @@
# Rocksdb Change Log
> NOTE: Entries for next release do not go here. Follow instructions in `unreleased_history/README.txt`
+## 9.7.4 (10/31/2024)
+### Bug Fixes
+* Fix a leak of obsolete blob files left open until DB::Close(). This bug was introduced in version 9.4.0.
+
## 9.7.3 (10/16/2024)
### Behavior Changes
* OPTIONS file to be loaded by remote worker is now preserved so that it does not get purged by the primary host. A similar technique as how we are preserving new SST files from getting purged is used for this. min_options_file_numbers_ is tracked like pending_outputs_ is tracked.
diff --git a/db/blob/blob_file_cache.cc b/db/blob/blob_file_cache.cc
index 5f340aadf..1b9faa238 100644
--- a/db/blob/blob_file_cache.cc
+++ b/db/blob/blob_file_cache.cc
@@ -42,6 +42,7 @@ Status BlobFileCache::GetBlobFileReader(
assert(blob_file_reader);
assert(blob_file_reader->IsEmpty());
+ // NOTE: sharing same Cache with table_cache
const Slice key = GetSliceForKey(&blob_file_number);
assert(cache_);
@@ -98,4 +99,13 @@ Status BlobFileCache::GetBlobFileReader(
return Status::OK();
}
+void BlobFileCache::Evict(uint64_t blob_file_number) {
+ // NOTE: sharing same Cache with table_cache
+ const Slice key = GetSliceForKey(&blob_file_number);
+
+ assert(cache_);
+
+ cache_.get()->Erase(key);
+}
+
} // namespace ROCKSDB_NAMESPACE
diff --git a/db/blob/blob_file_cache.h b/db/blob/blob_file_cache.h
index 740e67ada..6858d012b 100644
--- a/db/blob/blob_file_cache.h
+++ b/db/blob/blob_file_cache.h
@@ -36,6 +36,15 @@ class BlobFileCache {
uint64_t blob_file_number,
CacheHandleGuard<BlobFileReader>* blob_file_reader);
+ // Called when a blob file is obsolete to ensure it is removed from the cache
+ // to avoid effectively leaking the open file and assicated memory
+ void Evict(uint64_t blob_file_number);
+
+ // Used to identify cache entries for blob files (not normally useful)
+ static const Cache::CacheItemHelper* GetHelper() {
+ return CacheInterface::GetBasicHelper();
+ }
+
private:
using CacheInterface =
BasicTypedCacheInterface<BlobFileReader, CacheEntryRole::kMisc>;
diff --git a/db/column_family.h b/db/column_family.h
index e4b7adde8..86637736a 100644
--- a/db/column_family.h
+++ b/db/column_family.h
@@ -401,6 +401,7 @@ class ColumnFamilyData {
SequenceNumber earliest_seq);
TableCache* table_cache() const { return table_cache_.get(); }
+ BlobFileCache* blob_file_cache() const { return blob_file_cache_.get(); }
BlobSource* blob_source() const { return blob_source_.get(); }
// See documentation in compaction_picker.h
diff --git a/db/db_impl/db_impl.cc b/db/db_impl/db_impl.cc
index 261593423..06573ac2e 100644
--- a/db/db_impl/db_impl.cc
+++ b/db/db_impl/db_impl.cc
@@ -659,8 +659,9 @@ Status DBImpl::CloseHelper() {
// We need to release them before the block cache is destroyed. The block
// cache may be destroyed inside versions_.reset(), when column family data
// list is destroyed, so leaving handles in table cache after
- // versions_.reset() may cause issues.
- // Here we clean all unreferenced handles in table cache.
+ // versions_.reset() may cause issues. Here we clean all unreferenced handles
+ // in table cache, and (for certain builds/conditions) assert that no obsolete
+ // files are hanging around unreferenced (leak) in the table/blob file cache.
// Now we assume all user queries have finished, so only version set itself
// can possibly hold the blocks from block cache. After releasing unreferenced
// handles here, only handles held by version set left and inside
@@ -668,6 +669,9 @@ Status DBImpl::CloseHelper() {
// time a handle is released, we erase it from the cache too. By doing that,
// we can guarantee that after versions_.reset(), table cache is empty
// so the cache can be safely destroyed.
+#ifndef NDEBUG
+ TEST_VerifyNoObsoleteFilesCached(/*db_mutex_already_held=*/true);
+#endif // !NDEBUG
table_cache_->EraseUnRefEntries();
for (auto& txn_entry : recovered_transactions_) {
@@ -3227,6 +3231,8 @@ Status DBImpl::MultiGetImpl(
s = Status::Aborted();
break;
}
+ // This could be a long-running operation
+ ROCKSDB_THREAD_YIELD_HOOK();
}
// Post processing (decrement reference counts and record statistics)
diff --git a/db/db_impl/db_impl.h b/db/db_impl/db_impl.h
index 5e4fa310b..ccc0abfa7 100644
--- a/db/db_impl/db_impl.h
+++ b/db/db_impl/db_impl.h
@@ -1241,9 +1241,14 @@ class DBImpl : public DB {
static Status TEST_ValidateOptions(const DBOptions& db_options) {
return ValidateOptions(db_options);
}
-
#endif // NDEBUG
+ // In certain configurations, verify that the table/blob file cache only
+ // contains entries for live files, to check for effective leaks of open
+ // files. This can only be called when purging of obsolete files has
+ // "settled," such as during parts of DB Close().
+ void TEST_VerifyNoObsoleteFilesCached(bool db_mutex_already_held) const;
+
// persist stats to column family "_persistent_stats"
void PersistStats();
diff --git a/db/db_impl/db_impl_debug.cc b/db/db_impl/db_impl_debug.cc
index 790a50d7a..67f5b4aaf 100644
--- a/db/db_impl/db_impl_debug.cc
+++ b/db/db_impl/db_impl_debug.cc
@@ -9,6 +9,7 @@
#ifndef NDEBUG
+#include "db/blob/blob_file_cache.h"
#include "db/column_family.h"
#include "db/db_impl/db_impl.h"
#include "db/error_handler.h"
@@ -328,5 +329,49 @@ size_t DBImpl::TEST_EstimateInMemoryStatsHistorySize() const {
InstrumentedMutexLock l(&const_cast<DBImpl*>(this)->stats_history_mutex_);
return EstimateInMemoryStatsHistorySize();
}
+
+void DBImpl::TEST_VerifyNoObsoleteFilesCached(
+ bool db_mutex_already_held) const {
+ // This check is somewhat expensive and obscure to make a part of every
+ // unit test in every build variety. Thus, we only enable it for ASAN builds.
+ if (!kMustFreeHeapAllocations) {
+ return;
+ }
+
+ std::optional<InstrumentedMutexLock> l;
+ if (db_mutex_already_held) {
+ mutex_.AssertHeld();
+ } else {
+ l.emplace(&mutex_);
+ }
+
+ std::vector<uint64_t> live_files;
+ for (auto cfd : *versions_->GetColumnFamilySet()) {
+ if (cfd->IsDropped()) {
+ continue;
+ }
+ // Sneakily add both SST and blob files to the same list
+ cfd->current()->AddLiveFiles(&live_files, &live_files);
+ }
+ std::sort(live_files.begin(), live_files.end());
+
+ auto fn = [&live_files](const Slice& key, Cache::ObjectPtr, size_t,
+ const Cache::CacheItemHelper* helper) {
+ if (helper != BlobFileCache::GetHelper()) {
+ // Skip non-blob files for now
+ // FIXME: diagnose and fix the leaks of obsolete SST files revealed in
+ // unit tests.
+ return;
+ }
+ // See TableCache and BlobFileCache
+ assert(key.size() == sizeof(uint64_t));
+ uint64_t file_number;
+ GetUnaligned(reinterpret_cast<const uint64_t*>(key.data()), &file_number);
+ // Assert file is in sorted live_files
+ assert(
+ std::binary_search(live_files.begin(), live_files.end(), file_number));
+ };
+ table_cache_->ApplyToAllEntries(fn, {});
+}
} // namespace ROCKSDB_NAMESPACE
#endif // NDEBUG
diff --git a/db/db_iter.cc b/db/db_iter.cc
index e02586377..bf4749eb9 100644
--- a/db/db_iter.cc
+++ b/db/db_iter.cc
@@ -540,6 +540,8 @@ bool DBIter::FindNextUserEntryInternal(bool skipping_saved_key,
} else {
iter_.Next();
}
+ // This could be a long-running operation due to tombstones, etc.
+ ROCKSDB_THREAD_YIELD_HOOK();
} while (iter_.Valid());
valid_ = false;
diff --git a/db/table_cache.cc b/db/table_cache.cc
index 71fc29c32..8a5be75e8 100644
--- a/db/table_cache.cc
+++ b/db/table_cache.cc
@@ -164,6 +164,7 @@ Status TableCache::GetTableReader(
}
Cache::Handle* TableCache::Lookup(Cache* cache, uint64_t file_number) {
+ // NOTE: sharing same Cache with BlobFileCache
Slice key = GetSliceForFileNumber(&file_number);
return cache->Lookup(key);
}
@@ -179,6 +180,7 @@ Status TableCache::FindTable(
size_t max_file_size_for_l0_meta_pin, Temperature file_temperature) {
PERF_TIMER_GUARD_WITH_CLOCK(find_table_nanos, ioptions_.clock);
uint64_t number = file_meta.fd.GetNumber();
+ // NOTE: sharing same Cache with BlobFileCache
Slice key = GetSliceForFileNumber(&number);
*handle = cache_.Lookup(key);
TEST_SYNC_POINT_CALLBACK("TableCache::FindTable:0",
diff --git a/db/version_builder.cc b/db/version_builder.cc
index ed8ab8214..c98f53f42 100644
--- a/db/version_builder.cc
+++ b/db/version_builder.cc
@@ -24,6 +24,7 @@
#include <vector>
#include "cache/cache_reservation_manager.h"
+#include "db/blob/blob_file_cache.h"
#include "db/blob/blob_file_meta.h"
#include "db/dbformat.h"
#include "db/internal_stats.h"
@@ -744,12 +745,9 @@ class VersionBuilder::Rep {
return Status::Corruption("VersionBuilder", oss.str());
}
- // Note: we use C++11 for now but in C++14, this could be done in a more
- // elegant way using generalized lambda capture.
- VersionSet* const vs = version_set_;
- const ImmutableCFOptions* const ioptions = ioptions_;
-
- auto deleter = [vs, ioptions](SharedBlobFileMetaData* shared_meta) {
+ auto deleter = [vs = version_set_, ioptions = ioptions_,
+ bc = cfd_ ? cfd_->blob_file_cache()
+ : nullptr](SharedBlobFileMetaData* shared_meta) {
if (vs) {
assert(ioptions);
assert(!ioptions->cf_paths.empty());
@@ -758,6 +756,9 @@ class VersionBuilder::Rep {
vs->AddObsoleteBlobFile(shared_meta->GetBlobFileNumber(),
ioptions->cf_paths.front().path);
}
+ if (bc) {
+ bc->Evict(shared_meta->GetBlobFileNumber());
+ }
delete shared_meta;
};
@@ -766,7 +767,7 @@ class VersionBuilder::Rep {
blob_file_number, blob_file_addition.GetTotalBlobCount(),
blob_file_addition.GetTotalBlobBytes(),
blob_file_addition.GetChecksumMethod(),
- blob_file_addition.GetChecksumValue(), deleter);
+ blob_file_addition.GetChecksumValue(), std::move(deleter));
mutable_blob_file_metas_.emplace(
blob_file_number, MutableBlobFileMetaData(std::move(shared_meta)));
diff --git a/db/version_set.h b/db/version_set.h
index 9336782b1..024f869e7 100644
--- a/db/version_set.h
+++ b/db/version_set.h
@@ -1514,7 +1514,6 @@ class VersionSet {
void GetLiveFilesMetaData(std::vector<LiveFileMetaData>* metadata);
void AddObsoleteBlobFile(uint64_t blob_file_number, std::string path) {
- // TODO: Erase file from BlobFileCache?
obsolete_blob_files_.emplace_back(blob_file_number, std::move(path));
}
diff --git a/include/rocksdb/version.h b/include/rocksdb/version.h
index 2a19796b8..0afa2cab1 100644
--- a/include/rocksdb/version.h
+++ b/include/rocksdb/version.h
@@ -13,7 +13,7 @@
// minor or major version number planned for release.
#define ROCKSDB_MAJOR 9
#define ROCKSDB_MINOR 7
-#define ROCKSDB_PATCH 3
+#define ROCKSDB_PATCH 4
// Do not use these. We made the mistake of declaring macros starting with
// double underscore. Now we have to live with our choice. We'll deprecate these
diff --git a/port/port.h b/port/port.h
index 13aa56d47..141716e5b 100644
--- a/port/port.h
+++ b/port/port.h
@@ -19,3 +19,19 @@
#elif defined(OS_WIN)
#include "port/win/port_win.h"
#endif
+
+#ifdef OS_LINUX
+// A temporary hook into long-running RocksDB threads to support modifying their
+// priority etc. This should become a public API hook once the requirements
+// are better understood.
+extern "C" void RocksDbThreadYield() __attribute__((__weak__));
+#define ROCKSDB_THREAD_YIELD_HOOK() \
+ { \
+ if (RocksDbThreadYield) { \
+ RocksDbThreadYield(); \
+ } \
+ }
+#else
+#define ROCKSDB_THREAD_YIELD_HOOK() \
+ {}
+#endif

View File

@@ -0,0 +1,30 @@
diff --git a/CMakeLists.txt b/CMakeLists.txt
index 93b884d..b715cb6 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -106,14 +106,9 @@ endif()
include(CMakeDependentOption)
if(MSVC)
- option(WITH_GFLAGS "build with GFlags" OFF)
option(WITH_XPRESS "build with windows built in compression" OFF)
- option(ROCKSDB_SKIP_THIRDPARTY "skip thirdparty.inc" OFF)
-
- if(NOT ROCKSDB_SKIP_THIRDPARTY)
- include(${CMAKE_CURRENT_SOURCE_DIR}/thirdparty.inc)
- endif()
-else()
+endif()
+if(TRUE)
if(CMAKE_SYSTEM_NAME MATCHES "FreeBSD" AND NOT CMAKE_SYSTEM_NAME MATCHES "kFreeBSD")
# FreeBSD has jemalloc as default malloc
# but it does not have all the jemalloc files in include/...
@@ -126,7 +121,7 @@ else()
endif()
endif()
- if(MINGW)
+ if(MSVC OR MINGW)
option(WITH_GFLAGS "build with GFlags" OFF)
else()
option(WITH_GFLAGS "build with GFlags" ON)

View File

@@ -53,19 +53,19 @@
* then change the macro parameter in features.macro to
* `VoteBehavior::DefaultYes`. The communication process is beyond
* the scope of these instructions.
* 5) If a supported feature (`Supported::yes`) was _ever_ in a released
* version, it can never be changed back to `Supported::no`, because
* it _may_ still become enabled at any time. This would cause newer
* versions of `rippled` to become amendment blocked.
* Instead, to prevent newer versions from voting on the feature, use
* `VoteBehavior::Obsolete`. Obsolete features can not be voted for
* by any versions of `rippled` built with that setting, but will still
* work correctly if they get enabled. If a feature remains obsolete
* for long enough that _all_ clients that could vote for it are
* amendment blocked, the feature can be removed from the code
* as if it was unsupported.
*
* 5) A feature marked as Obsolete can mean either:
* 1) It is in the ledger (marked as Supported::yes) and it is on its way to
* become Retired
* 2) The feature is not in the ledger (has always been marked as
* Supported::no) and the code to support it has been removed
*
* If we want to discontinue a feature that we've never fully supported and
* the feature has never been enabled, we should remove all the related
* code, and mark the feature as "abandoned". To do this:
*
* 1) Open features.macro, move the feature to the abandoned section and
* change the macro to XRPL_ABANDON
*
* When a feature has been enabled for several years, the conditional code
* may be removed, and the feature "retired". To retire a feature:
@@ -99,10 +99,13 @@ namespace detail {
#undef XRPL_FIX
#pragma push_macro("XRPL_RETIRE")
#undef XRPL_RETIRE
#pragma push_macro("XRPL_ABANDON")
#undef XRPL_ABANDON
#define XRPL_FEATURE(name, supported, vote) +1
#define XRPL_FIX(name, supported, vote) +1
#define XRPL_RETIRE(name) +1
#define XRPL_ABANDON(name) +1
// This value SHOULD be equal to the number of amendments registered in
// Feature.cpp. Because it's only used to reserve storage, and determine how
@@ -119,6 +122,8 @@ static constexpr std::size_t numFeatures =
#pragma pop_macro("XRPL_FIX")
#undef XRPL_FEATURE
#pragma pop_macro("XRPL_FEATURE")
#undef XRPL_ABANDON
#pragma pop_macro("XRPL_ABANDON")
/** Amendments that this server supports and the default voting behavior.
Whether they are enabled depends on the Rules defined in the validated
@@ -360,10 +365,13 @@ foreachFeature(FeatureBitset bs, F&& f)
#undef XRPL_FIX
#pragma push_macro("XRPL_RETIRE")
#undef XRPL_RETIRE
#pragma push_macro("XRPL_ABANDON")
#undef XRPL_ABANDON
#define XRPL_FEATURE(name, supported, vote) extern uint256 const feature##name;
#define XRPL_FIX(name, supported, vote) extern uint256 const fix##name;
#define XRPL_RETIRE(name)
#define XRPL_ABANDON(name)
#include <xrpl/protocol/detail/features.macro>
@@ -373,6 +381,8 @@ foreachFeature(FeatureBitset bs, F&& f)
#pragma pop_macro("XRPL_FIX")
#undef XRPL_FEATURE
#pragma pop_macro("XRPL_FEATURE")
#undef XRPL_ABANDON
#pragma pop_macro("XRPL_ABANDON")
} // namespace ripple

View File

@@ -26,6 +26,9 @@
#if !defined(XRPL_RETIRE)
#error "undefined macro: XRPL_RETIRE"
#endif
#if !defined(XRPL_ABANDON)
#error "undefined macro: XRPL_ABANDON"
#endif
// Add new amendments to the top of this list.
// Keep it sorted in reverse chronological order.
@@ -130,6 +133,11 @@ XRPL_FIX (NFTokenDirV1, Supported::yes, VoteBehavior::Obsolete)
XRPL_FEATURE(NonFungibleTokensV1, Supported::yes, VoteBehavior::Obsolete)
XRPL_FEATURE(CryptoConditionsSuite, Supported::yes, VoteBehavior::Obsolete)
// The following amendments were never supported, never enabled, and
// we've abanded them. These features should never be in the ledger,
// and we've removed all the related code.
XRPL_ABANDON(OwnerPaysFee)
// The following amendments have been active for at least two years. Their
// pre-amendment code has been removed and the identifiers are deprecated.
// All known amendments and amendments that may appear in a validated

View File

@@ -482,7 +482,8 @@ LEDGER_ENTRY(ltDELEGATE, 0x0083, Delegate, delegate, ({
}))
/** A ledger object representing a single asset vault.
\sa keylet::vault
\sa keylet::mptoken
*/
LEDGER_ENTRY(ltVAULT, 0x0084, Vault, vault, ({
{sfPreviousTxnID, soeREQUIRED},

View File

@@ -409,7 +409,6 @@ TRANSACTION(ttMPTOKEN_ISSUANCE_CREATE, 54, MPTokenIssuanceCreate, Delegation::de
{sfTransferFee, soeOPTIONAL},
{sfMaximumAmount, soeOPTIONAL},
{sfMPTokenMetadata, soeOPTIONAL},
{sfDomainID, soeOPTIONAL},
}))
/** This transaction type destroys a MPTokensIssuance instance */
@@ -421,7 +420,6 @@ TRANSACTION(ttMPTOKEN_ISSUANCE_DESTROY, 55, MPTokenIssuanceDestroy, Delegation::
TRANSACTION(ttMPTOKEN_ISSUANCE_SET, 56, MPTokenIssuanceSet, Delegation::delegatable, ({
{sfMPTokenIssuanceID, soeREQUIRED},
{sfHolder, soeOPTIONAL},
{sfDomainID, soeOPTIONAL},
}))
/** This transaction type authorizes a MPToken instance */
@@ -480,7 +478,7 @@ TRANSACTION(ttVAULT_CREATE, 65, VaultCreate, Delegation::delegatable, ({
{sfAsset, soeREQUIRED, soeMPTSupported},
{sfAssetsMaximum, soeOPTIONAL},
{sfMPTokenMetadata, soeOPTIONAL},
{sfDomainID, soeOPTIONAL},
{sfDomainID, soeOPTIONAL}, // PermissionedDomainID
{sfWithdrawalPolicy, soeOPTIONAL},
{sfData, soeOPTIONAL},
}))
@@ -489,7 +487,7 @@ TRANSACTION(ttVAULT_CREATE, 65, VaultCreate, Delegation::delegatable, ({
TRANSACTION(ttVAULT_SET, 66, VaultSet, Delegation::delegatable, ({
{sfVaultID, soeREQUIRED},
{sfAssetsMaximum, soeOPTIONAL},
{sfDomainID, soeOPTIONAL},
{sfDomainID, soeOPTIONAL}, // PermissionedDomainID
{sfData, soeOPTIONAL},
}))

View File

@@ -254,7 +254,7 @@ FeatureCollections::registerFeature(
{
check(!readOnly, "Attempting to register a feature after startup.");
check(
support == Supported::yes || vote == VoteBehavior::DefaultNo,
support == Supported::yes || vote != VoteBehavior::DefaultYes,
"Invalid feature parameters. Must be supported to be up-voted.");
Feature const* i = getByName(name);
if (!i)
@@ -268,7 +268,7 @@ FeatureCollections::registerFeature(
features.emplace_back(name, f);
auto const getAmendmentSupport = [=]() {
if (vote == VoteBehavior::Obsolete)
if (vote == VoteBehavior::Obsolete && support == Supported::yes)
return AmendmentSupport::Retired;
return support == Supported::yes ? AmendmentSupport::Supported
: AmendmentSupport::Unsupported;
@@ -398,6 +398,14 @@ retireFeature(std::string const& name)
return registerFeature(name, Supported::yes, VoteBehavior::Obsolete);
}
// Abandoned features are not in the ledger and have no code controlled by the
// feature. They were never supported, and cannot be voted on.
uint256
abandonFeature(std::string const& name)
{
return registerFeature(name, Supported::no, VoteBehavior::Obsolete);
}
/** Tell FeatureCollections when registration is complete. */
bool
registrationIsDone()
@@ -432,6 +440,8 @@ featureToName(uint256 const& f)
#undef XRPL_FIX
#pragma push_macro("XRPL_RETIRE")
#undef XRPL_RETIRE
#pragma push_macro("XRPL_ABANDON")
#undef XRPL_ABANDON
#define XRPL_FEATURE(name, supported, vote) \
uint256 const feature##name = registerFeature(#name, supported, vote);
@@ -443,6 +453,11 @@ featureToName(uint256 const& f)
[[deprecated("The referenced amendment has been retired")]] \
[[maybe_unused]] \
uint256 const retired##name = retireFeature(#name);
#define XRPL_ABANDON(name) \
[[deprecated("The referenced amendment has been abandoned")]] \
[[maybe_unused]] \
uint256 const abandoned##name = abandonFeature(#name);
// clang-format on
#include <xrpl/protocol/detail/features.macro>
@@ -453,6 +468,8 @@ featureToName(uint256 const& f)
#pragma pop_macro("XRPL_FIX")
#undef XRPL_FEATURE
#pragma pop_macro("XRPL_FEATURE")
#undef XRPL_ABANDON
#pragma pop_macro("XRPL_ABANDON")
// All of the features should now be registered, since variables in a cpp file
// are initialized from top to bottom.

View File

@@ -18,16 +18,10 @@
//==============================================================================
#include <test/jtx.h>
#include <test/jtx/credentials.h>
#include <test/jtx/permissioned_domains.h>
#include <test/jtx/trust.h>
#include <test/jtx/xchain_bridge.h>
#include <xrpl/basics/base_uint.h>
#include <xrpl/beast/utility/Zero.h>
#include <xrpl/protocol/Feature.h>
#include <xrpl/protocol/TER.h>
#include <xrpl/protocol/TxFlags.h>
#include <xrpl/protocol/jss.h>
namespace ripple {
@@ -67,48 +61,6 @@ class MPToken_test : public beast::unit_test::suite
.metadata = "test",
.err = temMALFORMED});
if (!features[featureSingleAssetVault])
{
// tries to set DomainID when SAV is disabled
mptAlice.create(
{.maxAmt = 100,
.assetScale = 0,
.metadata = "test",
.flags = tfMPTRequireAuth,
.domainID = uint256(42),
.err = temDISABLED});
}
else if (!features[featurePermissionedDomains])
{
// tries to set DomainID when PD is disabled
mptAlice.create(
{.maxAmt = 100,
.assetScale = 0,
.metadata = "test",
.flags = tfMPTRequireAuth,
.domainID = uint256(42),
.err = temDISABLED});
}
else
{
// tries to set DomainID when RequireAuth is not set
mptAlice.create(
{.maxAmt = 100,
.assetScale = 0,
.metadata = "test",
.domainID = uint256(42),
.err = temMALFORMED});
// tries to set zero DomainID
mptAlice.create(
{.maxAmt = 100,
.assetScale = 0,
.metadata = "test",
.flags = tfMPTRequireAuth,
.domainID = beast::zero,
.err = temMALFORMED});
}
// tries to set a txfee greater than max
mptAlice.create(
{.maxAmt = 100,
@@ -188,48 +140,6 @@ class MPToken_test : public beast::unit_test::suite
BEAST_EXPECT(
result[sfMaximumAmount.getJsonName()] == "9223372036854775807");
}
if (features[featureSingleAssetVault])
{
// Add permissioned domain
Account const credIssuer1{"credIssuer1"};
std::string const credType = "credential";
pdomain::Credentials const credentials1{
{.issuer = credIssuer1, .credType = credType}};
{
Env env{*this, features};
env.fund(XRP(1000), credIssuer1);
env(pdomain::setTx(credIssuer1, credentials1));
auto const domainId1 = [&]() {
auto tx = env.tx()->getJson(JsonOptions::none);
return pdomain::getNewDomain(env.meta());
}();
MPTTester mptAlice(env, alice);
mptAlice.create({
.maxAmt = maxMPTokenAmount, // 9'223'372'036'854'775'807
.assetScale = 1,
.transferFee = 10,
.metadata = "123",
.ownerCount = 1,
.flags = tfMPTCanLock | tfMPTRequireAuth | tfMPTCanEscrow |
tfMPTCanTrade | tfMPTCanTransfer | tfMPTCanClawback,
.domainID = domainId1,
});
// Get the hash for the most recent transaction.
std::string const txHash{
env.tx()->getJson(JsonOptions::none)[jss::hash].asString()};
Json::Value const result = env.rpc("tx", txHash)[jss::result];
BEAST_EXPECT(
result[sfMaximumAmount.getJsonName()] ==
"9223372036854775807");
}
}
}
void
@@ -589,59 +499,6 @@ class MPToken_test : public beast::unit_test::suite
.flags = 0x00000008,
.err = temINVALID_FLAG});
if (!features[featureSingleAssetVault])
{
// test invalid flags - nothing is being changed
mptAlice.set(
{.account = alice,
.flags = 0x00000000,
.err = tecNO_PERMISSION});
mptAlice.set(
{.account = alice,
.holder = bob,
.flags = 0x00000000,
.err = tecNO_PERMISSION});
// cannot set DomainID since SAV is not enabled
mptAlice.set(
{.account = alice,
.domainID = uint256(42),
.err = temDISABLED});
}
else
{
// test invalid flags - nothing is being changed
mptAlice.set(
{.account = alice,
.flags = 0x00000000,
.err = temMALFORMED});
mptAlice.set(
{.account = alice,
.holder = bob,
.flags = 0x00000000,
.err = temMALFORMED});
if (!features[featurePermissionedDomains])
{
// cannot set DomainID since PD is not enabled
mptAlice.set(
{.account = alice,
.domainID = uint256(42),
.err = temDISABLED});
}
else
{
// cannot set DomainID since Holder is set
mptAlice.set(
{.account = alice,
.holder = bob,
.domainID = uint256(42),
.err = temMALFORMED});
}
}
// set both lock and unlock flags at the same time will fail
mptAlice.set(
{.account = alice,
@@ -725,53 +582,6 @@ class MPToken_test : public beast::unit_test::suite
mptAlice.set(
{.holder = cindy, .flags = tfMPTLock, .err = tecNO_DST});
}
if (features[featureSingleAssetVault] &&
features[featurePermissionedDomains])
{
// Add permissioned domain
Account const credIssuer1{"credIssuer1"};
std::string const credType = "credential";
pdomain::Credentials const credentials1{
{.issuer = credIssuer1, .credType = credType}};
{
Env env{*this, features};
MPTTester mptAlice(env, alice);
mptAlice.create({});
// Trying to set DomainID on a public MPTokenIssuance
mptAlice.set(
{.domainID = uint256(42), .err = tecNO_PERMISSION});
mptAlice.set(
{.domainID = beast::zero, .err = tecNO_PERMISSION});
}
{
Env env{*this, features};
MPTTester mptAlice(env, alice);
mptAlice.create({.flags = tfMPTRequireAuth});
// Trying to set non-existing DomainID
mptAlice.set(
{.domainID = uint256(42), .err = tecOBJECT_NOT_FOUND});
// Trying to lock but locking is disabled
mptAlice.set(
{.flags = tfMPTUnlock,
.domainID = uint256(42),
.err = tecNO_PERMISSION});
mptAlice.set(
{.flags = tfMPTUnlock,
.domainID = beast::zero,
.err = tecNO_PERMISSION});
}
}
}
void
@@ -780,136 +590,71 @@ class MPToken_test : public beast::unit_test::suite
testcase("Enabled set transaction");
using namespace test::jtx;
// Test locking and unlocking
Env env{*this, features};
Account const alice("alice"); // issuer
Account const bob("bob"); // holder
MPTTester mptAlice(env, alice, {.holders = {bob}});
// create a mptokenissuance with locking
mptAlice.create(
{.ownerCount = 1, .holderCount = 0, .flags = tfMPTCanLock});
mptAlice.authorize({.account = bob, .holderCount = 1});
// locks bob's mptoken
mptAlice.set({.account = alice, .holder = bob, .flags = tfMPTLock});
// trying to lock bob's mptoken again will still succeed
// but no changes to the objects
mptAlice.set({.account = alice, .holder = bob, .flags = tfMPTLock});
// alice locks the mptissuance
mptAlice.set({.account = alice, .flags = tfMPTLock});
// alice tries to lock up both mptissuance and mptoken again
// it will not change the flags and both will remain locked.
mptAlice.set({.account = alice, .flags = tfMPTLock});
mptAlice.set({.account = alice, .holder = bob, .flags = tfMPTLock});
// alice unlocks bob's mptoken
mptAlice.set({.account = alice, .holder = bob, .flags = tfMPTUnlock});
// locks up bob's mptoken again
mptAlice.set({.account = alice, .holder = bob, .flags = tfMPTLock});
if (!features[featureSingleAssetVault])
{
// Test locking and unlocking
Env env{*this, features};
// Delete bobs' mptoken even though it is locked
mptAlice.authorize({.account = bob, .flags = tfMPTUnauthorize});
MPTTester mptAlice(env, alice, {.holders = {bob}});
// create a mptokenissuance with locking
mptAlice.create(
{.ownerCount = 1, .holderCount = 0, .flags = tfMPTCanLock});
mptAlice.authorize({.account = bob, .holderCount = 1});
// locks bob's mptoken
mptAlice.set({.account = alice, .holder = bob, .flags = tfMPTLock});
// trying to lock bob's mptoken again will still succeed
// but no changes to the objects
mptAlice.set({.account = alice, .holder = bob, .flags = tfMPTLock});
// alice locks the mptissuance
mptAlice.set({.account = alice, .flags = tfMPTLock});
// alice tries to lock up both mptissuance and mptoken again
// it will not change the flags and both will remain locked.
mptAlice.set({.account = alice, .flags = tfMPTLock});
mptAlice.set({.account = alice, .holder = bob, .flags = tfMPTLock});
// alice unlocks bob's mptoken
mptAlice.set(
{.account = alice, .holder = bob, .flags = tfMPTUnlock});
{.account = alice,
.holder = bob,
.flags = tfMPTUnlock,
.err = tecOBJECT_NOT_FOUND});
// locks up bob's mptoken again
mptAlice.set({.account = alice, .holder = bob, .flags = tfMPTLock});
if (!features[featureSingleAssetVault])
{
// Delete bobs' mptoken even though it is locked
mptAlice.authorize({.account = bob, .flags = tfMPTUnauthorize});
mptAlice.set(
{.account = alice,
.holder = bob,
.flags = tfMPTUnlock,
.err = tecOBJECT_NOT_FOUND});
return;
}
// Cannot delete locked MPToken
mptAlice.authorize(
{.account = bob,
.flags = tfMPTUnauthorize,
.err = tecNO_PERMISSION});
// alice unlocks mptissuance
mptAlice.set({.account = alice, .flags = tfMPTUnlock});
// alice unlocks bob's mptoken
mptAlice.set(
{.account = alice, .holder = bob, .flags = tfMPTUnlock});
// alice unlocks mptissuance and bob's mptoken again despite that
// they are already unlocked. Make sure this will not change the
// flags
mptAlice.set(
{.account = alice, .holder = bob, .flags = tfMPTUnlock});
mptAlice.set({.account = alice, .flags = tfMPTUnlock});
return;
}
if (features[featureSingleAssetVault])
{
// Add permissioned domain
std::string const credType = "credential";
// Cannot delete locked MPToken
mptAlice.authorize(
{.account = bob,
.flags = tfMPTUnauthorize,
.err = tecNO_PERMISSION});
// Test setting and resetting domain ID
Env env{*this, features};
// alice unlocks mptissuance
mptAlice.set({.account = alice, .flags = tfMPTUnlock});
auto const domainId1 = [&]() {
Account const credIssuer1{"credIssuer1"};
env.fund(XRP(1000), credIssuer1);
// alice unlocks bob's mptoken
mptAlice.set({.account = alice, .holder = bob, .flags = tfMPTUnlock});
pdomain::Credentials const credentials1{
{.issuer = credIssuer1, .credType = credType}};
env(pdomain::setTx(credIssuer1, credentials1));
return [&]() {
auto tx = env.tx()->getJson(JsonOptions::none);
return pdomain::getNewDomain(env.meta());
}();
}();
auto const domainId2 = [&]() {
Account const credIssuer2{"credIssuer2"};
env.fund(XRP(1000), credIssuer2);
pdomain::Credentials const credentials2{
{.issuer = credIssuer2, .credType = credType}};
env(pdomain::setTx(credIssuer2, credentials2));
return [&]() {
auto tx = env.tx()->getJson(JsonOptions::none);
return pdomain::getNewDomain(env.meta());
}();
}();
MPTTester mptAlice(env, alice, {.holders = {bob}});
// create a mptokenissuance with auth.
mptAlice.create(
{.ownerCount = 1, .holderCount = 0, .flags = tfMPTRequireAuth});
BEAST_EXPECT(mptAlice.checkDomainID(std::nullopt));
// reset "domain not set" to "domain not set", i.e. no change
mptAlice.set({.domainID = beast::zero});
BEAST_EXPECT(mptAlice.checkDomainID(std::nullopt));
// reset "domain not set" to domain1
mptAlice.set({.domainID = domainId1});
BEAST_EXPECT(mptAlice.checkDomainID(domainId1));
// reset domain1 to domain2
mptAlice.set({.domainID = domainId2});
BEAST_EXPECT(mptAlice.checkDomainID(domainId2));
// reset domain to "domain not set"
mptAlice.set({.domainID = beast::zero});
BEAST_EXPECT(mptAlice.checkDomainID(std::nullopt));
}
// alice unlocks mptissuance and bob's mptoken again despite that
// they are already unlocked. Make sure this will not change the
// flags
mptAlice.set({.account = alice, .holder = bob, .flags = tfMPTUnlock});
mptAlice.set({.account = alice, .flags = tfMPTUnlock});
}
void
@@ -1144,200 +889,6 @@ class MPToken_test : public beast::unit_test::suite
mptAlice.pay(bob, alice, 100, tecNO_AUTH);
}
if (features[featureSingleAssetVault] &&
features[featurePermissionedDomains])
{
// If RequireAuth is enabled and domain is a match, payment succeeds
{
Env env{*this, features};
std::string const credType = "credential";
Account const credIssuer1{"credIssuer1"};
env.fund(XRP(1000), credIssuer1, bob);
auto const domainId1 = [&]() {
pdomain::Credentials const credentials1{
{.issuer = credIssuer1, .credType = credType}};
env(pdomain::setTx(credIssuer1, credentials1));
return [&]() {
auto tx = env.tx()->getJson(JsonOptions::none);
return pdomain::getNewDomain(env.meta());
}();
}();
// bob is authorized via domain
env(credentials::create(bob, credIssuer1, credType));
env(credentials::accept(bob, credIssuer1, credType));
env.close();
MPTTester mptAlice(env, alice, {});
env.close();
mptAlice.create({
.ownerCount = 1,
.holderCount = 0,
.flags = tfMPTRequireAuth | tfMPTCanTransfer,
.domainID = domainId1,
});
mptAlice.authorize({.account = bob});
env.close();
// bob is authorized via domain
mptAlice.pay(alice, bob, 100);
mptAlice.set({.domainID = beast::zero});
// bob is no longer authorized
mptAlice.pay(alice, bob, 100, tecNO_AUTH);
}
{
Env env{*this, features};
std::string const credType = "credential";
Account const credIssuer1{"credIssuer1"};
env.fund(XRP(1000), credIssuer1, bob);
auto const domainId1 = [&]() {
pdomain::Credentials const credentials1{
{.issuer = credIssuer1, .credType = credType}};
env(pdomain::setTx(credIssuer1, credentials1));
return [&]() {
auto tx = env.tx()->getJson(JsonOptions::none);
return pdomain::getNewDomain(env.meta());
}();
}();
// bob is authorized via domain
env(credentials::create(bob, credIssuer1, credType));
env(credentials::accept(bob, credIssuer1, credType));
env.close();
MPTTester mptAlice(env, alice, {});
env.close();
mptAlice.create({
.ownerCount = 1,
.holderCount = 0,
.flags = tfMPTRequireAuth | tfMPTCanTransfer,
.domainID = domainId1,
});
// bob creates an empty MPToken
mptAlice.authorize({.account = bob});
// alice authorizes bob to hold funds
mptAlice.authorize({.account = alice, .holder = bob});
// alice sends 100 MPT to bob
mptAlice.pay(alice, bob, 100);
// alice UNAUTHORIZES bob
mptAlice.authorize(
{.account = alice,
.holder = bob,
.flags = tfMPTUnauthorize});
// bob is still authorized, via domain
mptAlice.pay(bob, alice, 10);
mptAlice.set({.domainID = beast::zero});
// bob fails to send back to alice because he is no longer
// authorize to move his funds!
mptAlice.pay(bob, alice, 10, tecNO_AUTH);
}
{
Env env{*this, features};
std::string const credType = "credential";
// credIssuer1 is the owner of domainId1 and a credential issuer
Account const credIssuer1{"credIssuer1"};
// credIssuer2 is the owner of domainId2 and a credential issuer
// Note, domainId2 also lists credentials issued by credIssuer1
Account const credIssuer2{"credIssuer2"};
env.fund(XRP(1000), credIssuer1, credIssuer2, bob, carol);
auto const domainId1 = [&]() {
pdomain::Credentials const credentials{
{.issuer = credIssuer1, .credType = credType}};
env(pdomain::setTx(credIssuer1, credentials));
return [&]() {
auto tx = env.tx()->getJson(JsonOptions::none);
return pdomain::getNewDomain(env.meta());
}();
}();
auto const domainId2 = [&]() {
pdomain::Credentials const credentials{
{.issuer = credIssuer1, .credType = credType},
{.issuer = credIssuer2, .credType = credType}};
env(pdomain::setTx(credIssuer2, credentials));
return [&]() {
auto tx = env.tx()->getJson(JsonOptions::none);
return pdomain::getNewDomain(env.meta());
}();
}();
// bob is authorized via credIssuer1 which is recognized by both
// domainId1 and domainId2
env(credentials::create(bob, credIssuer1, credType));
env(credentials::accept(bob, credIssuer1, credType));
env.close();
// carol is authorized via credIssuer2, only recognized by
// domainId2
env(credentials::create(carol, credIssuer2, credType));
env(credentials::accept(carol, credIssuer2, credType));
env.close();
MPTTester mptAlice(env, alice, {});
env.close();
mptAlice.create({
.ownerCount = 1,
.holderCount = 0,
.flags = tfMPTRequireAuth | tfMPTCanTransfer,
.domainID = domainId1,
});
// bob and carol create an empty MPToken
mptAlice.authorize({.account = bob});
mptAlice.authorize({.account = carol});
env.close();
// alice sends 50 MPT to bob but cannot send to carol
mptAlice.pay(alice, bob, 50);
mptAlice.pay(alice, carol, 50, tecNO_AUTH);
env.close();
// bob cannot send to carol because they are not on the same
// domain (since credIssuer2 is not recognized by domainId1)
mptAlice.pay(bob, carol, 10, tecNO_AUTH);
env.close();
// alice updates domainID to domainId2 which recognizes both
// credIssuer1 and credIssuer2
mptAlice.set({.domainID = domainId2});
// alice can now send to carol
mptAlice.pay(alice, carol, 10);
env.close();
// bob can now send to carol because both are in the same
// domain
mptAlice.pay(bob, carol, 10);
env.close();
// bob loses his authorization and can no longer send MPT
env(credentials::deleteCred(
credIssuer1, bob, credIssuer1, credType));
env.close();
mptAlice.pay(bob, carol, 10, tecNO_AUTH);
mptAlice.pay(bob, alice, 10, tecNO_AUTH);
}
}
// Non-issuer cannot send to each other if MPTCanTransfer isn't set
{
Env env(*this, features);
@@ -1789,8 +1340,10 @@ class MPToken_test : public beast::unit_test::suite
}
void
testDepositPreauth(FeatureBitset features)
testDepositPreauth()
{
testcase("DepositPreauth");
using namespace test::jtx;
Account const alice("alice"); // issuer
Account const bob("bob"); // holder
@@ -1799,11 +1352,8 @@ class MPToken_test : public beast::unit_test::suite
char const credType[] = "abcde";
if (features[featureCredentials])
{
testcase("DepositPreauth");
Env env(*this, features);
Env env(*this);
env.fund(XRP(50000), diana, dpIssuer);
env.close();
@@ -2747,8 +2297,6 @@ public:
// MPTokenIssuanceCreate
testCreateValidation(all - featureSingleAssetVault);
testCreateValidation(
(all | featureSingleAssetVault) - featurePermissionedDomains);
testCreateValidation(all | featureSingleAssetVault);
testCreateEnabled(all - featureSingleAssetVault);
testCreateEnabled(all | featureSingleAssetVault);
@@ -2766,11 +2314,7 @@ public:
testAuthorizeEnabled(all | featureSingleAssetVault);
// MPTokenIssuanceSet
testSetValidation(all - featureSingleAssetVault);
testSetValidation(
(all | featureSingleAssetVault) - featurePermissionedDomains);
testSetValidation(all | featureSingleAssetVault);
testSetValidation(all);
testSetEnabled(all - featureSingleAssetVault);
testSetEnabled(all | featureSingleAssetVault);
@@ -2779,9 +2323,8 @@ public:
testClawback(all);
// Test Direct Payment
testPayment(all | featureSingleAssetVault);
testDepositPreauth(all);
testDepositPreauth(all - featureCredentials);
testPayment(all);
testDepositPreauth();
// Test MPT Amount is invalid in Tx, which don't support MPT
testMPTInvalidInTx(all);

View File

@@ -0,0 +1,70 @@
//------------------------------------------------------------------------------
/*
This file is part of rippled: https://github.com/ripple/rippled
Copyright (c) 2024 Ripple Labs Inc.
Permission to use, copy, modify, and/or distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#include <test/jtx.h>
namespace ripple {
namespace test {
struct Simple_test : public beast::unit_test::suite
{
void
testSimple(FeatureBitset features)
{
testcase("Simple");
using namespace test::jtx;
using namespace std::literals;
Env env{*this, features};
auto const alice = Account("alice");
auto const bob = Account("bob");
// env.fund(XRP(100'000), alice, bob);
env(pay(env.master, alice, XRP(1000)));
env.close();
// create open ledger with 1000 transactions
// for (int i = 0; i < 2500; ++i)
// env(pay(alice, bob, XRP(1)), fee(XRP(1)));
// env.close();
// {
// Json::Value params;
// params[jss::ledger_index] = env.current()->seq() - 1;
// params[jss::transactions] = true;
// params[jss::expand] = true;
// auto const jrr = env.rpc("json", "ledger", to_string(params));
// std::cout << jrr << std::endl;
// }
}
public:
void
run() override
{
using namespace test::jtx;
FeatureBitset const all{testable_amendments()};
testSimple(all);
}
};
BEAST_DEFINE_TESTSUITE(Simple, app, ripple);
} // namespace test
} // namespace ripple

1394
src/test/app/Taker_test.cpp Normal file

File diff suppressed because it is too large Load Diff

View File

@@ -19,7 +19,6 @@
#include <test/jtx.h>
#include <xrpl/protocol/SField.h>
#include <xrpl/protocol/jss.h>
namespace ripple {
@@ -100,8 +99,6 @@ MPTTester::create(MPTCreate const& arg)
jv[sfMPTokenMetadata] = strHex(*arg.metadata);
if (arg.maxAmt)
jv[sfMaximumAmount] = std::to_string(*arg.maxAmt);
if (arg.domainID)
jv[sfDomainID] = to_string(*arg.domainID);
if (submit(arg, jv) != tesSUCCESS)
{
// Verify issuance doesn't exist
@@ -238,8 +235,6 @@ MPTTester::set(MPTSet const& arg)
jv[sfHolder] = arg.holder->human();
if (arg.delegate)
jv[sfDelegate] = arg.delegate->human();
if (arg.domainID)
jv[sfDomainID] = to_string(*arg.domainID);
if (submit(arg, jv) == tesSUCCESS && arg.flags.value_or(0))
{
auto require = [&](std::optional<Account> const& holder,
@@ -277,16 +272,6 @@ MPTTester::forObject(
return false;
}
[[nodiscard]] bool
MPTTester::checkDomainID(std::optional<uint256> expected) const
{
return forObject([&](SLEP const& sle) -> bool {
if (sle->isFieldPresent(sfDomainID))
return expected == sle->getFieldH256(sfDomainID);
return (!expected.has_value());
});
}
[[nodiscard]] bool
MPTTester::checkMPTokenAmount(
Account const& holder_,

View File

@@ -106,7 +106,6 @@ struct MPTCreate
std::optional<std::uint32_t> holderCount = std::nullopt;
bool fund = true;
std::optional<std::uint32_t> flags = {0};
std::optional<uint256> domainID = std::nullopt;
std::optional<TER> err = std::nullopt;
};
@@ -140,7 +139,6 @@ struct MPTSet
std::optional<std::uint32_t> holderCount = std::nullopt;
std::optional<std::uint32_t> flags = std::nullopt;
std::optional<Account> delegate = std::nullopt;
std::optional<uint256> domainID = std::nullopt;
std::optional<TER> err = std::nullopt;
};
@@ -167,9 +165,6 @@ public:
void
set(MPTSet const& set = {});
[[nodiscard]] bool
checkDomainID(std::optional<uint256> expected) const;
[[nodiscard]] bool
checkMPTokenAmount(Account const& holder, std::int64_t expectedAmount)
const;

View File

@@ -131,9 +131,6 @@ public:
BEAST_EXPECT(jv.isMember(jss::id) && jv[jss::id] == 5);
}
BEAST_EXPECT(jv[jss::result][jss::ledger_index] == 2);
BEAST_EXPECT(
jv[jss::result][jss::network_id] ==
env.app().config().NETWORK_ID);
}
{
@@ -142,8 +139,7 @@ public:
// Check stream update
BEAST_EXPECT(wsc->findMsg(5s, [&](auto const& jv) {
return jv[jss::ledger_index] == 3 &&
jv[jss::network_id] == env.app().config().NETWORK_ID;
return jv[jss::ledger_index] == 3;
}));
}
@@ -153,8 +149,7 @@ public:
// Check stream update
BEAST_EXPECT(wsc->findMsg(5s, [&](auto const& jv) {
return jv[jss::ledger_index] == 4 &&
jv[jss::network_id] == env.app().config().NETWORK_ID;
return jv[jss::ledger_index] == 4;
}));
}
@@ -514,11 +509,6 @@ public:
if (!jv.isMember(jss::validated_hash))
return false;
uint32_t netID = env.app().config().NETWORK_ID;
if (!jv.isMember(jss::network_id) ||
jv[jss::network_id] != netID)
return false;
// Certain fields are only added on a flag ledger.
bool const isFlagLedger =
(env.closed()->info().seq + 1) % 256 == 0;
@@ -577,7 +567,6 @@ public:
jv[jss::streams][0u] = "ledger";
jr = env.rpc("json", "subscribe", to_string(jv))[jss::result];
BEAST_EXPECT(jr[jss::status] == "success");
BEAST_EXPECT(jr[jss::network_id] == env.app().config().NETWORK_ID);
jr = env.rpc("json", "unsubscribe", to_string(jv))[jss::result];
BEAST_EXPECT(jr[jss::status] == "success");

View File

@@ -114,6 +114,9 @@ public:
std::shared_ptr<OpenView const>
current() const;
std::shared_ptr<OpenView const>
read() const;
/** Modify the open ledger
Thread safety:
@@ -217,6 +220,9 @@ OpenLedger::apply(
ApplyFlags flags,
beast::Journal j)
{
if (view.isMock())
return;
for (auto iter = txs.begin(); iter != txs.end(); ++iter)
{
try

View File

@@ -54,6 +54,15 @@ OpenLedger::current() const
return current_;
}
std::shared_ptr<OpenView const>
OpenLedger::read() const
{
std::lock_guard lock(current_mutex_);
// Create a copy of the current view for read-only access
// This snapshot won't change even if current_ is updated
return std::make_shared<OpenView const>(*current_);
}
bool
OpenLedger::modify(modify_type const& f)
{
@@ -88,10 +97,60 @@ OpenLedger::accept(
using empty = std::vector<std::shared_ptr<STTx const>>;
apply(app, *next, *ledger, empty{}, retries, flags, j_);
}
// Pre-apply local transactions and broadcast early if beneficial
std::vector<PreApplyResult> localPreApplyResults;
// Track which transactions we've already relayed
std::set<uint256> earlyRelayedTxs;
if (!locals.empty())
{
localPreApplyResults.reserve(locals.size());
// Use the next view as read-only for preApply (it's not being modified
// yet)
for (auto const& item : locals)
{
auto const result =
app.getTxQ().preApply(app, *next, item.second, flags, j_);
localPreApplyResults.push_back(result);
// Skip transactions that are not likely to claim fees
if (!result.pcresult.likelyToClaimFee)
continue;
auto const txId = item.second->getTransactionID();
// Skip batch transactions from relaying
if (!(item.second->isFlag(tfInnerBatchTxn) &&
rules.enabled(featureBatch)))
{
if (auto const toSkip = app.getHashRouter().shouldRelay(txId))
{
JLOG(j_.debug()) << "Early relaying local tx " << txId;
protocol::TMTransaction msg;
Serializer s;
item.second->add(s);
msg.set_rawtransaction(s.data(), s.size());
msg.set_status(protocol::tsCURRENT);
msg.set_receivetimestamp(
app.timeKeeper().now().time_since_epoch().count());
msg.set_deferred(result.pcresult.ter == terQUEUED);
app.overlay().relay(txId, msg, *toSkip);
// Track that we've already relayed this transaction
earlyRelayedTxs.insert(txId);
}
}
}
}
// Block calls to modify, otherwise
// new tx going into the open ledger
// would get lost.
std::lock_guard lock1(modify_mutex_);
// Apply tx from the current open view
if (!current_->txs.empty())
{
@@ -110,19 +169,36 @@ OpenLedger::accept(
flags,
j_);
}
// Call the modifier
if (f)
f(*next, j_);
// Apply local tx
for (auto const& item : locals)
app.getTxQ().apply(app, *next, item.second, flags, j_);
// If we didn't relay this transaction recently, relay it to all peers
// Apply local tx using pre-computed results
auto localIter = locals.begin();
for (size_t i = 0; i < localPreApplyResults.size(); ++i, ++localIter)
{
app.getTxQ().queueApply(
app,
*next,
localIter->second,
flags,
localPreApplyResults[i].pfresult,
j_);
}
// Relay transactions that weren't already broadcast early
// (This handles transactions that weren't likely to claim fees initially
// but succeeded, plus any transactions from current_->txs and retries)
for (auto const& txpair : next->txs)
{
auto const& tx = txpair.first;
auto const txId = tx->getTransactionID();
// Skip if we already relayed this transaction early
if (earlyRelayedTxs.find(txId) != earlyRelayedTxs.end())
continue;
// skip batch txns
// LCOV_EXCL_START
if (tx->isFlag(tfInnerBatchTxn) && rules.enabled(featureBatch))

View File

@@ -1494,15 +1494,75 @@ NetworkOPsImp::apply(std::unique_lock<std::mutex>& batchLock)
{
std::unique_lock masterLock{app_.getMasterMutex(), std::defer_lock};
bool changed = false;
// Structure to hold preclaim results
std::vector<PreApplyResult> preapplyResults;
preapplyResults.reserve(transactions.size());
{
std::unique_lock ledgerLock{
m_ledgerMaster.peekMutex(), std::defer_lock};
std::lock(masterLock, ledgerLock);
app_.openLedger().modify([&](OpenView& view, beast::Journal j) {
for (TransactionStatus& e : transactions)
// Stage 1: Pre-apply and broadcast in single loop
auto newOL = app_.openLedger().read();
for (TransactionStatus& e : transactions)
{
// Use read-only view for preApply
auto const result = app_.getTxQ().preApply(
app_,
*newOL,
e.transaction->getSTransaction(),
e.failType == FailHard::yes ? tapFAIL_HARD : tapNONE,
m_journal);
preapplyResults.push_back(result);
// Immediately broadcast if transaction is likely to claim a fee
bool shouldBroadcast = result.pcresult.likelyToClaimFee;
// Check for hard failure
bool enforceFailHard =
(e.failType == FailHard::yes &&
!isTesSuccess(result.pcresult.ter));
if (shouldBroadcast && !enforceFailHard)
{
// we check before adding to the batch
auto const toSkip = app_.getHashRouter().shouldRelay(
e.transaction->getID());
if (auto const sttx = *(e.transaction->getSTransaction());
toSkip &&
// Skip relaying if it's an inner batch txn and batch
// feature is enabled
!(sttx.isFlag(tfInnerBatchTxn) &&
newOL->rules().enabled(featureBatch)))
{
protocol::TMTransaction tx;
Serializer s;
sttx.add(s);
tx.set_rawtransaction(s.data(), s.size());
tx.set_status(protocol::tsCURRENT);
tx.set_receivetimestamp(
app_.timeKeeper().now().time_since_epoch().count());
tx.set_deferred(result.pcresult.ter == terQUEUED);
app_.overlay().relay(
e.transaction->getID(), tx, *toSkip);
e.transaction->setBroadcast();
}
}
}
// Stage 2: Actually apply the transactions using pre-computed
// results
app_.openLedger().modify([&](OpenView& view, beast::Journal j) {
for (size_t i = 0; i < transactions.size(); ++i)
{
auto& e = transactions[i];
auto const& preResult = preapplyResults[i];
ApplyFlags flags = tapNONE;
if (e.admin)
flags |= tapUNLIMITED;
@@ -1510,8 +1570,15 @@ NetworkOPsImp::apply(std::unique_lock<std::mutex>& batchLock)
if (e.failType == FailHard::yes)
flags |= tapFAIL_HARD;
auto const result = app_.getTxQ().apply(
app_, view, e.transaction->getSTransaction(), flags, j);
// Use the pre-computed results from Stage 1
auto const result = app_.getTxQ().queueApply(
app_,
view,
e.transaction->getSTransaction(),
flags,
preResult.pfresult,
j);
e.result = result.ter;
e.applied = result.applied;
changed = changed || result.applied;
@@ -1519,6 +1586,7 @@ NetworkOPsImp::apply(std::unique_lock<std::mutex>& batchLock)
return changed;
});
}
if (changed)
reportFeeChange();
@@ -1527,6 +1595,8 @@ NetworkOPsImp::apply(std::unique_lock<std::mutex>& batchLock)
validatedLedgerIndex = l->info().seq;
auto newOL = app_.openLedger().current();
// Process results (rest of the method remains the same)
for (TransactionStatus& e : transactions)
{
e.transaction->clearSubmitResult();
@@ -1677,36 +1747,6 @@ NetworkOPsImp::apply(std::unique_lock<std::mutex>& batchLock)
e.transaction->setKept();
}
if ((e.applied ||
((mMode != OperatingMode::FULL) &&
(e.failType != FailHard::yes) && e.local) ||
(e.result == terQUEUED)) &&
!enforceFailHard)
{
auto const toSkip =
app_.getHashRouter().shouldRelay(e.transaction->getID());
if (auto const sttx = *(e.transaction->getSTransaction());
toSkip &&
// Skip relaying if it's an inner batch txn and batch
// feature is enabled
!(sttx.isFlag(tfInnerBatchTxn) &&
newOL->rules().enabled(featureBatch)))
{
protocol::TMTransaction tx;
Serializer s;
sttx.add(s);
tx.set_rawtransaction(s.data(), s.size());
tx.set_status(protocol::tsCURRENT);
tx.set_receivetimestamp(
app_.timeKeeper().now().time_since_epoch().count());
tx.set_deferred(e.result == terQUEUED);
// FIXME: This should be when we received it
app_.overlay().relay(e.transaction->getID(), tx, *toSkip);
e.transaction->setBroadcast();
}
}
if (validatedLedgerIndex)
{
auto [fee, accountSeq, availableSeq] =
@@ -2415,7 +2455,6 @@ NetworkOPsImp::pubValidation(std::shared_ptr<STValidation> const& val)
jvObj[jss::flags] = val->getFlags();
jvObj[jss::signing_time] = *(*val)[~sfSigningTime];
jvObj[jss::data] = strHex(val->getSerializer().slice());
jvObj[jss::network_id] = app_.config().NETWORK_ID;
if (auto version = (*val)[~sfServerVersion])
jvObj[jss::server_version] = std::to_string(*version);
@@ -3120,8 +3159,6 @@ NetworkOPsImp::pubLedger(std::shared_ptr<ReadView const> const& lpAccepted)
jvObj[jss::ledger_time] = Json::Value::UInt(
lpAccepted->info().closeTime.time_since_epoch().count());
jvObj[jss::network_id] = app_.config().NETWORK_ID;
if (!lpAccepted->rules().enabled(featureXRPFees))
jvObj[jss::fee_ref] = Config::FEE_UNITS_DEPRECATED;
jvObj[jss::fee_base] = lpAccepted->fees().base.jsonClipped();
@@ -4180,7 +4217,6 @@ NetworkOPsImp::subLedger(InfoSub::ref isrListener, Json::Value& jvResult)
jvResult[jss::reserve_base] =
lpClosed->fees().accountReserve(0).jsonClipped();
jvResult[jss::reserve_inc] = lpClosed->fees().increment.jsonClipped();
jvResult[jss::network_id] = app_.config().NETWORK_ID;
}
if ((mMode >= OperatingMode::SYNCING) && !isNeedNetworkLedger())

View File

@@ -36,6 +36,12 @@
namespace ripple {
struct PreApplyResult
{
PreflightResult pfresult;
PreclaimResult pcresult;
};
class Application;
class Config;
@@ -261,6 +267,30 @@ public:
/// Destructor
virtual ~TxQ();
/**
Prepares the transaction for application to the open ledger.
This is a preflight step that checks the transaction and
prepares it for application.
@return A `PreApplyResult` with the result of the preflight.
*/
PreApplyResult
preApply(
Application& app,
OpenView const& view,
std::shared_ptr<STTx const> const& tx,
ApplyFlags flags,
beast::Journal j);
ApplyResult
queueApply(
Application& app,
OpenView& view,
std::shared_ptr<STTx const> const& tx,
ApplyFlags flags,
PreflightResult const& pfresult,
beast::Journal j);
/**
Add a new transaction to the open ledger, hold it in the queue,
or reject it.

View File

@@ -726,12 +726,27 @@ TxQ::tryClearAccountQueueUpThruTx(
// b. The entire queue also has a (dynamic) maximum size. Transactions
// beyond that limit are rejected.
//
PreApplyResult
TxQ::preApply(
Application& app,
OpenView const& view,
std::shared_ptr<STTx const> const& tx,
ApplyFlags flags,
beast::Journal j)
{
PreflightResult const pfresult =
preflight(app, view.rules(), *tx, flags, j);
PreclaimResult const& pcresult = preclaim(pfresult, app, view);
return {pfresult, pcresult};
}
ApplyResult
TxQ::apply(
TxQ::queueApply(
Application& app,
OpenView& view,
std::shared_ptr<STTx const> const& tx,
ApplyFlags flags,
PreflightResult const& pfresult,
beast::Journal j)
{
STAmountSO stAmountSO{view.rules().enabled(fixSTAmountCanonicalize)};
@@ -740,9 +755,6 @@ TxQ::apply(
// See if the transaction is valid, properly formed,
// etc. before doing potentially expensive queue
// replace and multi-transaction operations.
auto const pfresult = preflight(app, view.rules(), *tx, flags, j);
if (pfresult.ter != tesSUCCESS)
return {pfresult.ter, false};
// See if the transaction paid a high enough fee that it can go straight
// into the ledger.
@@ -1350,6 +1362,24 @@ TxQ::apply(
return {terQUEUED, false};
}
ApplyResult
TxQ::apply(
Application& app,
OpenView& view,
std::shared_ptr<STTx const> const& tx,
ApplyFlags flags,
beast::Journal j)
{
// See if the transaction is valid, properly formed,
// etc. before doing potentially expensive queue
// replace and multi-transaction operations.
auto const pfresult = preflight(app, view.rules(), *tx, flags, j);
if (pfresult.ter != tesSUCCESS)
return {pfresult.ter, false};
return queueApply(app, view, tx, flags, pfresult, j);
}
/*
1. Update the fee metrics based on the fee levels of the
txs in the validated ledger and whether consensus is

View File

@@ -26,9 +26,9 @@
#include <xrpl/basics/base_uint.h>
#include <xrpl/beast/utility/WrappedSink.h>
#include <xrpl/protocol/Feature.h>
#include <xrpl/protocol/Quality.h>
#include <xrpl/protocol/STAmount.h>
#include <xrpl/protocol/TER.h>
#include <xrpl/protocol/TxFlags.h>
#include <xrpl/protocol/st.h>
namespace ripple {
@@ -311,6 +311,374 @@ CreateOffer::checkAcceptAsset(
return tesSUCCESS;
}
bool
CreateOffer::dry_offer(ApplyView& view, Offer const& offer)
{
if (offer.fully_consumed())
return true;
auto const amount = accountFunds(
view,
offer.owner(),
offer.amount().out,
fhZERO_IF_FROZEN,
ctx_.app.journal("View"));
return (amount <= beast::zero);
}
std::pair<bool, Quality>
CreateOffer::select_path(
bool have_direct,
OfferStream const& direct,
bool have_bridge,
OfferStream const& leg1,
OfferStream const& leg2)
{
// If we don't have any viable path, why are we here?!
XRPL_ASSERT(
have_direct || have_bridge,
"ripple::CreateOffer::select_path : valid inputs");
// If there's no bridged path, the direct is the best by default.
if (!have_bridge)
return std::make_pair(true, direct.tip().quality());
Quality const bridged_quality(
composed_quality(leg1.tip().quality(), leg2.tip().quality()));
if (have_direct)
{
// We compare the quality of the composed quality of the bridged
// offers and compare it against the direct offer to pick the best.
Quality const direct_quality(direct.tip().quality());
if (bridged_quality < direct_quality)
return std::make_pair(true, direct_quality);
}
// Either there was no direct offer, or it didn't have a better quality
// than the bridge.
return std::make_pair(false, bridged_quality);
}
bool
CreateOffer::reachedOfferCrossingLimit(Taker const& taker) const
{
auto const crossings =
taker.get_direct_crossings() + (2 * taker.get_bridge_crossings());
// The crossing limit is part of the Ripple protocol and
// changing it is a transaction-processing change.
return crossings >= 850;
}
std::pair<TER, Amounts>
CreateOffer::bridged_cross(
Taker& taker,
ApplyView& view,
ApplyView& view_cancel,
NetClock::time_point const when)
{
auto const& takerAmount = taker.original_offer();
XRPL_ASSERT(
!isXRP(takerAmount.in) && !isXRP(takerAmount.out),
"ripple::CreateOffer::bridged_cross : neither is XRP");
if (isXRP(takerAmount.in) || isXRP(takerAmount.out))
Throw<std::logic_error>("Bridging with XRP and an endpoint.");
OfferStream offers_direct(
view,
view_cancel,
Book(taker.issue_in(), taker.issue_out(), std::nullopt),
when,
stepCounter_,
j_);
OfferStream offers_leg1(
view,
view_cancel,
Book(taker.issue_in(), xrpIssue(), std::nullopt),
when,
stepCounter_,
j_);
OfferStream offers_leg2(
view,
view_cancel,
Book(xrpIssue(), taker.issue_out(), std::nullopt),
when,
stepCounter_,
j_);
TER cross_result = tesSUCCESS;
// Note the subtle distinction here: self-offers encountered in the
// bridge are taken, but self-offers encountered in the direct book
// are not.
bool have_bridge = offers_leg1.step() && offers_leg2.step();
bool have_direct = step_account(offers_direct, taker);
int count = 0;
auto viewJ = ctx_.app.journal("View");
// Modifying the order or logic of the operations in the loop will cause
// a protocol breaking change.
while (have_direct || have_bridge)
{
bool leg1_consumed = false;
bool leg2_consumed = false;
bool direct_consumed = false;
auto const [use_direct, quality] = select_path(
have_direct, offers_direct, have_bridge, offers_leg1, offers_leg2);
// We are always looking at the best quality; we are done with
// crossing as soon as we cross the quality boundary.
if (taker.reject(quality))
break;
count++;
if (use_direct)
{
if (auto stream = j_.debug())
{
stream << count << " Direct:";
stream << " offer: " << offers_direct.tip();
stream << " in: " << offers_direct.tip().amount().in;
stream << " out: " << offers_direct.tip().amount().out;
stream << " owner: " << offers_direct.tip().owner();
stream << " funds: "
<< accountFunds(
view,
offers_direct.tip().owner(),
offers_direct.tip().amount().out,
fhIGNORE_FREEZE,
viewJ);
}
cross_result = taker.cross(offers_direct.tip());
JLOG(j_.debug()) << "Direct Result: " << transToken(cross_result);
if (dry_offer(view, offers_direct.tip()))
{
direct_consumed = true;
have_direct = step_account(offers_direct, taker);
}
}
else
{
if (auto stream = j_.debug())
{
auto const owner1_funds_before = accountFunds(
view,
offers_leg1.tip().owner(),
offers_leg1.tip().amount().out,
fhIGNORE_FREEZE,
viewJ);
auto const owner2_funds_before = accountFunds(
view,
offers_leg2.tip().owner(),
offers_leg2.tip().amount().out,
fhIGNORE_FREEZE,
viewJ);
stream << count << " Bridge:";
stream << " offer1: " << offers_leg1.tip();
stream << " in: " << offers_leg1.tip().amount().in;
stream << " out: " << offers_leg1.tip().amount().out;
stream << " owner: " << offers_leg1.tip().owner();
stream << " funds: " << owner1_funds_before;
stream << " offer2: " << offers_leg2.tip();
stream << " in: " << offers_leg2.tip().amount().in;
stream << " out: " << offers_leg2.tip().amount().out;
stream << " owner: " << offers_leg2.tip().owner();
stream << " funds: " << owner2_funds_before;
}
cross_result = taker.cross(offers_leg1.tip(), offers_leg2.tip());
JLOG(j_.debug()) << "Bridge Result: " << transToken(cross_result);
if (view.rules().enabled(fixTakerDryOfferRemoval))
{
// have_bridge can be true the next time 'round only if
// neither of the OfferStreams are dry.
leg1_consumed = dry_offer(view, offers_leg1.tip());
if (leg1_consumed)
have_bridge &= offers_leg1.step();
leg2_consumed = dry_offer(view, offers_leg2.tip());
if (leg2_consumed)
have_bridge &= offers_leg2.step();
}
else
{
// This old behavior may leave an empty offer in the book for
// the second leg.
if (dry_offer(view, offers_leg1.tip()))
{
leg1_consumed = true;
have_bridge = (have_bridge && offers_leg1.step());
}
if (dry_offer(view, offers_leg2.tip()))
{
leg2_consumed = true;
have_bridge = (have_bridge && offers_leg2.step());
}
}
}
if (cross_result != tesSUCCESS)
{
cross_result = tecFAILED_PROCESSING;
break;
}
if (taker.done())
{
JLOG(j_.debug()) << "The taker reports he's done during crossing!";
break;
}
if (reachedOfferCrossingLimit(taker))
{
JLOG(j_.debug()) << "The offer crossing limit has been exceeded!";
break;
}
// Postcondition: If we aren't done, then we *must* have consumed at
// least one offer fully.
XRPL_ASSERT(
direct_consumed || leg1_consumed || leg2_consumed,
"ripple::CreateOffer::bridged_cross : consumed an offer");
if (!direct_consumed && !leg1_consumed && !leg2_consumed)
Throw<std::logic_error>(
"bridged crossing: nothing was fully consumed.");
}
return std::make_pair(cross_result, taker.remaining_offer());
}
std::pair<TER, Amounts>
CreateOffer::direct_cross(
Taker& taker,
ApplyView& view,
ApplyView& view_cancel,
NetClock::time_point const when)
{
OfferStream offers(
view,
view_cancel,
Book(taker.issue_in(), taker.issue_out(), std::nullopt),
when,
stepCounter_,
j_);
TER cross_result(tesSUCCESS);
int count = 0;
bool have_offer = step_account(offers, taker);
// Modifying the order or logic of the operations in the loop will cause
// a protocol breaking change.
while (have_offer)
{
bool direct_consumed = false;
auto& offer(offers.tip());
// We are done with crossing as soon as we cross the quality boundary
if (taker.reject(offer.quality()))
break;
count++;
if (auto stream = j_.debug())
{
stream << count << " Direct:";
stream << " offer: " << offer;
stream << " in: " << offer.amount().in;
stream << " out: " << offer.amount().out;
stream << "quality: " << offer.quality();
stream << " owner: " << offer.owner();
stream << " funds: "
<< accountFunds(
view,
offer.owner(),
offer.amount().out,
fhIGNORE_FREEZE,
ctx_.app.journal("View"));
}
cross_result = taker.cross(offer);
JLOG(j_.debug()) << "Direct Result: " << transToken(cross_result);
if (dry_offer(view, offer))
{
direct_consumed = true;
have_offer = step_account(offers, taker);
}
if (cross_result != tesSUCCESS)
{
cross_result = tecFAILED_PROCESSING;
break;
}
if (taker.done())
{
JLOG(j_.debug()) << "The taker reports he's done during crossing!";
break;
}
if (reachedOfferCrossingLimit(taker))
{
JLOG(j_.debug()) << "The offer crossing limit has been exceeded!";
break;
}
// Postcondition: If we aren't done, then we *must* have consumed the
// offer on the books fully!
XRPL_ASSERT(
direct_consumed,
"ripple::CreateOffer::direct_cross : consumed an offer");
if (!direct_consumed)
Throw<std::logic_error>(
"direct crossing: nothing was fully consumed.");
}
return std::make_pair(cross_result, taker.remaining_offer());
}
// Step through the stream for as long as possible, skipping any offers
// that are from the taker or which cross the taker's threshold.
// Return false if the is no offer in the book, true otherwise.
bool
CreateOffer::step_account(OfferStream& stream, Taker const& taker)
{
while (stream.step())
{
auto const& offer = stream.tip();
// This offer at the tip crosses the taker's threshold. We're done.
if (taker.reject(offer.quality()))
return true;
// This offer at the tip is not from the taker. We're done.
if (offer.owner() != taker.account())
return true;
}
// We ran out of offers. Can't advance.
return false;
}
std::pair<TER, Amounts>
CreateOffer::flowCross(
PaymentSandbox& psb,
@@ -515,6 +883,21 @@ CreateOffer::flowCross(
return {tecINTERNAL, takerAmount};
}
std::pair<TER, Amounts>
CreateOffer::cross(
Sandbox& sb,
Sandbox& sbCancel,
Amounts const& takerAmount,
std::optional<uint256> const& domainID)
{
PaymentSandbox psbFlow{&sb};
PaymentSandbox psbCancelFlow{&sbCancel};
auto const ret = flowCross(psbFlow, psbCancelFlow, takerAmount, domainID);
psbFlow.apply(sb);
psbCancelFlow.apply(sbCancel);
return ret;
}
std::string
CreateOffer::format_amount(STAmount const& amount)
{
@@ -524,6 +907,20 @@ CreateOffer::format_amount(STAmount const& amount)
return txt;
}
void
CreateOffer::preCompute()
{
cross_type_ = CrossType::IouToIou;
bool const pays_xrp = ctx_.tx.getFieldAmount(sfTakerPays).native();
bool const gets_xrp = ctx_.tx.getFieldAmount(sfTakerGets).native();
if (pays_xrp && !gets_xrp)
cross_type_ = CrossType::IouToXrp;
else if (gets_xrp && !pays_xrp)
cross_type_ = CrossType::XrpToIou;
return Transactor::preCompute();
}
TER
CreateOffer::applyHybrid(
Sandbox& sb,
@@ -687,6 +1084,11 @@ CreateOffer::applyGuts(Sandbox& sb, Sandbox& sbCancel)
// We reverse pays and gets because during crossing we are taking.
Amounts const takerAmount(saTakerGets, saTakerPays);
// The amount of the offer that is unfilled after crossing has been
// performed. It may be equal to the original amount (didn't cross),
// empty (fully crossed), or something in-between.
Amounts place_offer;
JLOG(j_.debug()) << "Attempting cross: "
<< to_string(takerAmount.in.issue()) << " -> "
<< to_string(takerAmount.out.issue());
@@ -699,17 +1101,8 @@ CreateOffer::applyGuts(Sandbox& sb, Sandbox& sbCancel)
stream << " out: " << format_amount(takerAmount.out);
}
// The amount of the offer that is unfilled after crossing has been
// performed. It may be equal to the original amount (didn't cross),
// empty (fully crossed), or something in-between.
Amounts place_offer;
PaymentSandbox psbFlow{&sb};
PaymentSandbox psbCancelFlow{&sbCancel};
std::tie(result, place_offer) =
flowCross(psbFlow, psbCancelFlow, takerAmount, domainID);
psbFlow.apply(sb);
psbCancelFlow.apply(sbCancel);
cross(sb, sbCancel, takerAmount, domainID);
// We expect the implementation of cross to succeed
// or give a tec.

View File

@@ -20,10 +20,10 @@
#ifndef RIPPLE_TX_CREATEOFFER_H_INCLUDED
#define RIPPLE_TX_CREATEOFFER_H_INCLUDED
#include <xrpld/app/tx/detail/OfferStream.h>
#include <xrpld/app/tx/detail/Taker.h>
#include <xrpld/app/tx/detail/Transactor.h>
#include <xrpl/protocol/Quality.h>
namespace ripple {
class PaymentSandbox;
@@ -36,7 +36,8 @@ public:
static constexpr ConsequencesFactoryType ConsequencesFactory{Custom};
/** Construct a Transactor subclass that creates an offer in the ledger. */
explicit CreateOffer(ApplyContext& ctx) : Transactor(ctx)
explicit CreateOffer(ApplyContext& ctx)
: Transactor(ctx), stepCounter_(1000, j_)
{
}
@@ -51,6 +52,10 @@ public:
static TER
preclaim(PreclaimContext const& ctx);
/** Gather information beyond what the Transactor base class gathers. */
void
preCompute() override;
/** Precondition: fee collection is likely. Attempt to create the offer. */
TER
doApply() override;
@@ -68,6 +73,42 @@ private:
beast::Journal const j,
Issue const& issue);
bool
dry_offer(ApplyView& view, Offer const& offer);
static std::pair<bool, Quality>
select_path(
bool have_direct,
OfferStream const& direct,
bool have_bridge,
OfferStream const& leg1,
OfferStream const& leg2);
std::pair<TER, Amounts>
bridged_cross(
Taker& taker,
ApplyView& view,
ApplyView& view_cancel,
NetClock::time_point const when);
std::pair<TER, Amounts>
direct_cross(
Taker& taker,
ApplyView& view,
ApplyView& view_cancel,
NetClock::time_point const when);
// Step through the stream for as long as possible, skipping any offers
// that are from the taker or which cross the taker's threshold.
// Return false if the is no offer in the book, true otherwise.
static bool
step_account(OfferStream& stream, Taker const& taker);
// True if the number of offers that have been crossed
// exceeds the limit.
bool
reachedOfferCrossingLimit(Taker const& taker) const;
// Use the payment flow code to perform offer crossing.
std::pair<TER, Amounts>
flowCross(
@@ -76,6 +117,17 @@ private:
Amounts const& takerAmount,
std::optional<uint256> const& domainID);
// Temporary
// This is a central location that invokes both versions of cross
// so the results can be compared. Eventually this layer will be
// removed once flowCross is determined to be stable.
std::pair<TER, Amounts>
cross(
Sandbox& sb,
Sandbox& sbCancel,
Amounts const& takerAmount,
std::optional<uint256> const& domainID);
static std::string
format_amount(STAmount const& amount);
@@ -87,6 +139,13 @@ private:
STAmount const& saTakerPays,
STAmount const& saTakerGets,
std::function<void(SLE::ref, std::optional<uint256>)> const& setDir);
private:
// What kind of offer we are placing
CrossType cross_type_;
// The number of steps to take through order books while crossing
OfferStream::StepCounter stepCounter_;
};
using OfferCreate = CreateOffer;

View File

@@ -31,11 +31,6 @@ MPTokenIssuanceCreate::preflight(PreflightContext const& ctx)
if (!ctx.rules.enabled(featureMPTokensV1))
return temDISABLED;
if (ctx.tx.isFieldPresent(sfDomainID) &&
!(ctx.rules.enabled(featurePermissionedDomains) &&
ctx.rules.enabled(featureSingleAssetVault)))
return temDISABLED;
if (auto const ret = preflight1(ctx); !isTesSuccess(ret))
return ret;
@@ -53,16 +48,6 @@ MPTokenIssuanceCreate::preflight(PreflightContext const& ctx)
return temMALFORMED;
}
if (auto const domain = ctx.tx[~sfDomainID])
{
if (*domain == beast::zero)
return temMALFORMED;
// Domain present implies that MPTokenIssuance is not public
if ((ctx.tx.getFlags() & tfMPTRequireAuth) == 0)
return temMALFORMED;
}
if (auto const metadata = ctx.tx[~sfMPTokenMetadata])
{
if (metadata->length() == 0 ||
@@ -157,7 +142,6 @@ MPTokenIssuanceCreate::doApply()
.assetScale = tx[~sfAssetScale],
.transferFee = tx[~sfTransferFee],
.metadata = tx[~sfMPTokenMetadata],
.domainId = tx[~sfDomainID],
});
return result ? tesSUCCESS : result.error();
}

View File

@@ -21,7 +21,6 @@
#include <xrpld/app/tx/detail/MPTokenIssuanceSet.h>
#include <xrpl/protocol/Feature.h>
#include <xrpl/protocol/LedgerFormats.h>
#include <xrpl/protocol/TxFlags.h>
namespace ripple {
@@ -32,14 +31,6 @@ MPTokenIssuanceSet::preflight(PreflightContext const& ctx)
if (!ctx.rules.enabled(featureMPTokensV1))
return temDISABLED;
if (ctx.tx.isFieldPresent(sfDomainID) &&
!(ctx.rules.enabled(featurePermissionedDomains) &&
ctx.rules.enabled(featureSingleAssetVault)))
return temDISABLED;
if (ctx.tx.isFieldPresent(sfDomainID) && ctx.tx.isFieldPresent(sfHolder))
return temMALFORMED;
if (auto const ret = preflight1(ctx); !isTesSuccess(ret))
return ret;
@@ -57,13 +48,6 @@ MPTokenIssuanceSet::preflight(PreflightContext const& ctx)
if (holderID && accountID == holderID)
return temMALFORMED;
if (ctx.rules.enabled(featureSingleAssetVault))
{
// Is this transaction actually changing anything ?
if (txFlags == 0 && !ctx.tx.isFieldPresent(sfDomainID))
return temMALFORMED;
}
return preflight2(ctx);
}
@@ -113,14 +97,9 @@ MPTokenIssuanceSet::preclaim(PreclaimContext const& ctx)
if (!sleMptIssuance)
return tecOBJECT_NOT_FOUND;
if (!sleMptIssuance->isFlag(lsfMPTCanLock))
{
// For readability two separate `if` rather than `||` of two conditions
if (!ctx.view.rules().enabled(featureSingleAssetVault))
return tecNO_PERMISSION;
else if (ctx.tx.isFlag(tfMPTLock) || ctx.tx.isFlag(tfMPTUnlock))
return tecNO_PERMISSION;
}
// if the mpt has disabled locking
if (!((*sleMptIssuance)[sfFlags] & lsfMPTCanLock))
return tecNO_PERMISSION;
// ensure it is issued by the tx submitter
if ((*sleMptIssuance)[sfIssuer] != ctx.tx[sfAccount])
@@ -138,20 +117,6 @@ MPTokenIssuanceSet::preclaim(PreclaimContext const& ctx)
return tecOBJECT_NOT_FOUND;
}
if (auto const domain = ctx.tx[~sfDomainID])
{
if (not sleMptIssuance->isFlag(lsfMPTRequireAuth))
return tecNO_PERMISSION;
if (*domain != beast::zero)
{
auto const sleDomain =
ctx.view.read(keylet::permissionedDomain(*domain));
if (!sleDomain)
return tecOBJECT_NOT_FOUND;
}
}
return tesSUCCESS;
}
@@ -161,7 +126,6 @@ MPTokenIssuanceSet::doApply()
auto const mptIssuanceID = ctx_.tx[sfMPTokenIssuanceID];
auto const txFlags = ctx_.tx.getFlags();
auto const holderID = ctx_.tx[~sfHolder];
auto const domainID = ctx_.tx[~sfDomainID];
std::shared_ptr<SLE> sle;
if (holderID)
@@ -183,24 +147,6 @@ MPTokenIssuanceSet::doApply()
if (flagsIn != flagsOut)
sle->setFieldU32(sfFlags, flagsOut);
if (domainID)
{
// This is enforced in preflight.
XRPL_ASSERT(
sle->getType() == ltMPTOKEN_ISSUANCE,
"MPTokenIssuanceSet::doApply : modifying MPTokenIssuance");
if (*domainID != beast::zero)
{
sle->setFieldH256(sfDomainID, *domainID);
}
else
{
if (sle->isFieldPresent(sfDomainID))
sle->makeFieldAbsent(sfDomainID);
}
}
view().update(sle);
return tesSUCCESS;

View File

@@ -0,0 +1,863 @@
//------------------------------------------------------------------------------
/*
This file is part of rippled: https://github.com/ripple/rippled
Copyright (c) 2014 Ripple Labs Inc.
Permission to use, copy, modify, and/or distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#include <xrpld/app/tx/detail/Taker.h>
#include <xrpl/basics/Log.h>
#include <xrpl/basics/contract.h>
namespace ripple {
static std::string
format_amount(STAmount const& amount)
{
std::string txt = amount.getText();
txt += "/";
txt += to_string(amount.issue().currency);
return txt;
}
BasicTaker::BasicTaker(
CrossType cross_type,
AccountID const& account,
Amounts const& amount,
Quality const& quality,
std::uint32_t flags,
Rate const& rate_in,
Rate const& rate_out,
beast::Journal journal)
: account_(account)
, quality_(quality)
, threshold_(quality_)
, sell_(flags & tfSell)
, original_(amount)
, remaining_(amount)
, issue_in_(remaining_.in.issue())
, issue_out_(remaining_.out.issue())
, m_rate_in(rate_in)
, m_rate_out(rate_out)
, cross_type_(cross_type)
, journal_(journal)
{
XRPL_ASSERT(
remaining_.in > beast::zero,
"ripple::BasicTaker::BasicTaker : positive remaining in");
XRPL_ASSERT(
remaining_.out > beast::zero,
"ripple::BasicTaker::BasicTaker : positive remaining out");
XRPL_ASSERT(
m_rate_in.value, "ripple::BasicTaker::BasicTaker : nonzero rate in");
XRPL_ASSERT(
m_rate_out.value, "ripple::BasicTaker::BasicTaker : nonzero rate out");
// If we are dealing with a particular flavor, make sure that it's the
// flavor we expect:
XRPL_ASSERT(
cross_type != CrossType::XrpToIou ||
(isXRP(issue_in()) && !isXRP(issue_out())),
"ripple::BasicTaker::BasicTaker : valid cross to IOU");
XRPL_ASSERT(
cross_type != CrossType::IouToXrp ||
(!isXRP(issue_in()) && isXRP(issue_out())),
"ripple::BasicTaker::BasicTaker : valid cross to XRP");
// And make sure we're not crossing XRP for XRP
XRPL_ASSERT(
!isXRP(issue_in()) || !isXRP(issue_out()),
"ripple::BasicTaker::BasicTaker : not crossing XRP for XRP");
// If this is a passive order, we adjust the quality so as to prevent offers
// at the same quality level from being consumed.
if (flags & tfPassive)
++threshold_;
}
Rate
BasicTaker::effective_rate(
Rate const& rate,
Issue const& issue,
AccountID const& from,
AccountID const& to)
{
// If there's a transfer rate, the issuer is not involved
// and the sender isn't the same as the recipient, return
// the actual transfer rate.
if (rate != parityRate && from != to && from != issue.account &&
to != issue.account)
{
return rate;
}
return parityRate;
}
bool
BasicTaker::unfunded() const
{
if (get_funds(account(), remaining_.in) > beast::zero)
return false;
JLOG(journal_.debug()) << "Unfunded: taker is out of funds.";
return true;
}
bool
BasicTaker::done() const
{
// We are done if we have consumed all the input currency
if (remaining_.in <= beast::zero)
{
JLOG(journal_.debug())
<< "Done: all the input currency has been consumed.";
return true;
}
// We are done if using buy semantics and we received the
// desired amount of output currency
if (!sell_ && (remaining_.out <= beast::zero))
{
JLOG(journal_.debug()) << "Done: the desired amount has been received.";
return true;
}
// We are done if the taker is out of funds
if (unfunded())
{
JLOG(journal_.debug()) << "Done: taker out of funds.";
return true;
}
return false;
}
Amounts
BasicTaker::remaining_offer() const
{
// If the taker is done, then there's no offer to place.
if (done())
return Amounts(remaining_.in.zeroed(), remaining_.out.zeroed());
// Avoid math altogether if we didn't cross.
if (original_ == remaining_)
return original_;
if (sell_)
{
XRPL_ASSERT(
remaining_.in > beast::zero,
"ripple::BasicTaker::remaining_offer : positive remaining in");
// We scale the output based on the remaining input:
return Amounts(
remaining_.in,
divRound(remaining_.in, quality_.rate(), issue_out_, true));
}
XRPL_ASSERT(
remaining_.out > beast::zero,
"ripple::BasicTaker::remaining_offer : positive remaining out");
// We scale the input based on the remaining output:
return Amounts(
mulRound(remaining_.out, quality_.rate(), issue_in_, true),
remaining_.out);
}
Amounts const&
BasicTaker::original_offer() const
{
return original_;
}
// TODO: the presence of 'output' is an artifact caused by the fact that
// Amounts carry issue information which should be decoupled.
static STAmount
qual_div(STAmount const& amount, Quality const& quality, STAmount const& output)
{
auto result = divide(amount, quality.rate(), output.issue());
return std::min(result, output);
}
static STAmount
qual_mul(STAmount const& amount, Quality const& quality, STAmount const& output)
{
auto result = multiply(amount, quality.rate(), output.issue());
return std::min(result, output);
}
void
BasicTaker::log_flow(char const* description, Flow const& flow)
{
auto stream = journal_.debug();
if (!stream)
return;
stream << description;
if (isXRP(issue_in()))
stream << " order in: " << format_amount(flow.order.in);
else
stream << " order in: " << format_amount(flow.order.in)
<< " (issuer: " << format_amount(flow.issuers.in) << ")";
if (isXRP(issue_out()))
stream << " order out: " << format_amount(flow.order.out);
else
stream << " order out: " << format_amount(flow.order.out)
<< " (issuer: " << format_amount(flow.issuers.out) << ")";
}
BasicTaker::Flow
BasicTaker::flow_xrp_to_iou(
Amounts const& order,
Quality quality,
STAmount const& owner_funds,
STAmount const& taker_funds,
Rate const& rate_out)
{
Flow f;
f.order = order;
f.issuers.out = multiply(f.order.out, rate_out);
log_flow("flow_xrp_to_iou", f);
// Clamp on owner balance
if (owner_funds < f.issuers.out)
{
f.issuers.out = owner_funds;
f.order.out = divide(f.issuers.out, rate_out);
f.order.in = qual_mul(f.order.out, quality, f.order.in);
log_flow("(clamped on owner balance)", f);
}
// Clamp if taker wants to limit the output
if (!sell_ && remaining_.out < f.order.out)
{
f.order.out = remaining_.out;
f.order.in = qual_mul(f.order.out, quality, f.order.in);
f.issuers.out = multiply(f.order.out, rate_out);
log_flow("(clamped on taker output)", f);
}
// Clamp on the taker's funds
if (taker_funds < f.order.in)
{
f.order.in = taker_funds;
f.order.out = qual_div(f.order.in, quality, f.order.out);
f.issuers.out = multiply(f.order.out, rate_out);
log_flow("(clamped on taker funds)", f);
}
// Clamp on remaining offer if we are not handling the second leg
// of an autobridge.
if (cross_type_ == CrossType::XrpToIou && (remaining_.in < f.order.in))
{
f.order.in = remaining_.in;
f.order.out = qual_div(f.order.in, quality, f.order.out);
f.issuers.out = multiply(f.order.out, rate_out);
log_flow("(clamped on taker input)", f);
}
return f;
}
BasicTaker::Flow
BasicTaker::flow_iou_to_xrp(
Amounts const& order,
Quality quality,
STAmount const& owner_funds,
STAmount const& taker_funds,
Rate const& rate_in)
{
Flow f;
f.order = order;
f.issuers.in = multiply(f.order.in, rate_in);
log_flow("flow_iou_to_xrp", f);
// Clamp on owner's funds
if (owner_funds < f.order.out)
{
f.order.out = owner_funds;
f.order.in = qual_mul(f.order.out, quality, f.order.in);
f.issuers.in = multiply(f.order.in, rate_in);
log_flow("(clamped on owner funds)", f);
}
// Clamp if taker wants to limit the output and we are not the
// first leg of an autobridge.
if (!sell_ && cross_type_ == CrossType::IouToXrp)
{
if (remaining_.out < f.order.out)
{
f.order.out = remaining_.out;
f.order.in = qual_mul(f.order.out, quality, f.order.in);
f.issuers.in = multiply(f.order.in, rate_in);
log_flow("(clamped on taker output)", f);
}
}
// Clamp on the taker's input offer
if (remaining_.in < f.order.in)
{
f.order.in = remaining_.in;
f.issuers.in = multiply(f.order.in, rate_in);
f.order.out = qual_div(f.order.in, quality, f.order.out);
log_flow("(clamped on taker input)", f);
}
// Clamp on the taker's input balance
if (taker_funds < f.issuers.in)
{
f.issuers.in = taker_funds;
f.order.in = divide(f.issuers.in, rate_in);
f.order.out = qual_div(f.order.in, quality, f.order.out);
log_flow("(clamped on taker funds)", f);
}
return f;
}
BasicTaker::Flow
BasicTaker::flow_iou_to_iou(
Amounts const& order,
Quality quality,
STAmount const& owner_funds,
STAmount const& taker_funds,
Rate const& rate_in,
Rate const& rate_out)
{
Flow f;
f.order = order;
f.issuers.in = multiply(f.order.in, rate_in);
f.issuers.out = multiply(f.order.out, rate_out);
log_flow("flow_iou_to_iou", f);
// Clamp on owner balance
if (owner_funds < f.issuers.out)
{
f.issuers.out = owner_funds;
f.order.out = divide(f.issuers.out, rate_out);
f.order.in = qual_mul(f.order.out, quality, f.order.in);
f.issuers.in = multiply(f.order.in, rate_in);
log_flow("(clamped on owner funds)", f);
}
// Clamp on taker's offer
if (!sell_ && remaining_.out < f.order.out)
{
f.order.out = remaining_.out;
f.order.in = qual_mul(f.order.out, quality, f.order.in);
f.issuers.out = multiply(f.order.out, rate_out);
f.issuers.in = multiply(f.order.in, rate_in);
log_flow("(clamped on taker output)", f);
}
// Clamp on the taker's input offer
if (remaining_.in < f.order.in)
{
f.order.in = remaining_.in;
f.issuers.in = multiply(f.order.in, rate_in);
f.order.out = qual_div(f.order.in, quality, f.order.out);
f.issuers.out = multiply(f.order.out, rate_out);
log_flow("(clamped on taker input)", f);
}
// Clamp on the taker's input balance
if (taker_funds < f.issuers.in)
{
f.issuers.in = taker_funds;
f.order.in = divide(f.issuers.in, rate_in);
f.order.out = qual_div(f.order.in, quality, f.order.out);
f.issuers.out = multiply(f.order.out, rate_out);
log_flow("(clamped on taker funds)", f);
}
return f;
}
// Calculates the direct flow through the specified offer
BasicTaker::Flow
BasicTaker::do_cross(Amounts offer, Quality quality, AccountID const& owner)
{
auto const owner_funds = get_funds(owner, offer.out);
auto const taker_funds = get_funds(account(), offer.in);
Flow result;
if (cross_type_ == CrossType::XrpToIou)
{
result = flow_xrp_to_iou(
offer,
quality,
owner_funds,
taker_funds,
out_rate(owner, account()));
}
else if (cross_type_ == CrossType::IouToXrp)
{
result = flow_iou_to_xrp(
offer,
quality,
owner_funds,
taker_funds,
in_rate(owner, account()));
}
else
{
result = flow_iou_to_iou(
offer,
quality,
owner_funds,
taker_funds,
in_rate(owner, account()),
out_rate(owner, account()));
}
if (!result.sanity_check())
Throw<std::logic_error>("Computed flow fails sanity check.");
remaining_.out -= result.order.out;
remaining_.in -= result.order.in;
XRPL_ASSERT(
remaining_.in >= beast::zero,
"ripple::BasicTaker::do_cross : minimum remaining in");
return result;
}
// Calculates the bridged flow through the specified offers
std::pair<BasicTaker::Flow, BasicTaker::Flow>
BasicTaker::do_cross(
Amounts offer1,
Quality quality1,
AccountID const& owner1,
Amounts offer2,
Quality quality2,
AccountID const& owner2)
{
XRPL_ASSERT(
!offer1.in.native(),
"ripple::BasicTaker::do_cross : offer1 in is not XRP");
XRPL_ASSERT(
offer1.out.native(),
"ripple::BasicTaker::do_cross : offer1 out is XRP");
XRPL_ASSERT(
offer2.in.native(), "ripple::BasicTaker::do_cross : offer2 in is XRP");
XRPL_ASSERT(
!offer2.out.native(),
"ripple::BasicTaker::do_cross : offer2 out is not XRP");
// If the taker owns the first leg of the offer, then the taker's available
// funds aren't the limiting factor for the input - the offer itself is.
auto leg1_in_funds = get_funds(account(), offer1.in);
if (account() == owner1)
{
JLOG(journal_.trace()) << "The taker owns the first leg of a bridge.";
leg1_in_funds = std::max(leg1_in_funds, offer1.in);
}
// If the taker owns the second leg of the offer, then the taker's available
// funds are not the limiting factor for the output - the offer itself is.
auto leg2_out_funds = get_funds(owner2, offer2.out);
if (account() == owner2)
{
JLOG(journal_.trace()) << "The taker owns the second leg of a bridge.";
leg2_out_funds = std::max(leg2_out_funds, offer2.out);
}
// The amount available to flow via XRP is the amount that the owner of the
// first leg of the bridge has, up to the first leg's output.
//
// But, when both legs of a bridge are owned by the same person, the amount
// of XRP that can flow between the two legs is, essentially, infinite
// since all the owner is doing is taking out XRP of his left pocket
// and putting it in his right pocket. In that case, we set the available
// XRP to the largest of the two offers.
auto xrp_funds = get_funds(owner1, offer1.out);
if (owner1 == owner2)
{
JLOG(journal_.trace())
<< "The bridge endpoints are owned by the same account.";
xrp_funds = std::max(offer1.out, offer2.in);
}
if (auto stream = journal_.debug())
{
stream << "Available bridge funds:";
stream << " leg1 in: " << format_amount(leg1_in_funds);
stream << " leg2 out: " << format_amount(leg2_out_funds);
stream << " xrp: " << format_amount(xrp_funds);
}
auto const leg1_rate = in_rate(owner1, account());
auto const leg2_rate = out_rate(owner2, account());
// Attempt to determine the maximal flow that can be achieved across each
// leg independent of the other.
auto flow1 =
flow_iou_to_xrp(offer1, quality1, xrp_funds, leg1_in_funds, leg1_rate);
if (!flow1.sanity_check())
Throw<std::logic_error>("Computed flow1 fails sanity check.");
auto flow2 =
flow_xrp_to_iou(offer2, quality2, leg2_out_funds, xrp_funds, leg2_rate);
if (!flow2.sanity_check())
Throw<std::logic_error>("Computed flow2 fails sanity check.");
// We now have the maximal flows across each leg individually. We need to
// equalize them, so that the amount of XRP that flows out of the first leg
// is the same as the amount of XRP that flows into the second leg. We take
// the side which is the limiting factor (if any) and adjust the other.
if (flow1.order.out < flow2.order.in)
{
// Adjust the second leg of the offer down:
flow2.order.in = flow1.order.out;
flow2.order.out = qual_div(flow2.order.in, quality2, flow2.order.out);
flow2.issuers.out = multiply(flow2.order.out, leg2_rate);
log_flow("Balancing: adjusted second leg down", flow2);
}
else if (flow1.order.out > flow2.order.in)
{
// Adjust the first leg of the offer down:
flow1.order.out = flow2.order.in;
flow1.order.in = qual_mul(flow1.order.out, quality1, flow1.order.in);
flow1.issuers.in = multiply(flow1.order.in, leg1_rate);
log_flow("Balancing: adjusted first leg down", flow2);
}
if (flow1.order.out != flow2.order.in)
Throw<std::logic_error>("Bridged flow is out of balance.");
remaining_.out -= flow2.order.out;
remaining_.in -= flow1.order.in;
return std::make_pair(flow1, flow2);
}
//==============================================================================
Taker::Taker(
CrossType cross_type,
ApplyView& view,
AccountID const& account,
Amounts const& offer,
std::uint32_t flags,
beast::Journal journal)
: BasicTaker(
cross_type,
account,
offer,
Quality(offer),
flags,
calculateRate(view, offer.in.getIssuer(), account),
calculateRate(view, offer.out.getIssuer(), account),
journal)
, view_(view)
, xrp_flow_(0)
, direct_crossings_(0)
, bridge_crossings_(0)
{
XRPL_ASSERT(
issue_in() == offer.in.issue(),
"ripple::Taker::Taker : issue in is a match");
XRPL_ASSERT(
issue_out() == offer.out.issue(),
"ripple::Taker::Taker : issue out is a match");
if (auto stream = journal_.debug())
{
stream << "Crossing as: " << to_string(account);
if (isXRP(issue_in()))
stream << " Offer in: " << format_amount(offer.in);
else
stream << " Offer in: " << format_amount(offer.in)
<< " (issuer: " << issue_in().account << ")";
if (isXRP(issue_out()))
stream << " Offer out: " << format_amount(offer.out);
else
stream << " Offer out: " << format_amount(offer.out)
<< " (issuer: " << issue_out().account << ")";
stream << " Balance: "
<< format_amount(get_funds(account, offer.in));
}
}
void
Taker::consume_offer(Offer& offer, Amounts const& order)
{
if (order.in < beast::zero)
Throw<std::logic_error>("flow with negative input.");
if (order.out < beast::zero)
Throw<std::logic_error>("flow with negative output.");
JLOG(journal_.debug()) << "Consuming from offer " << offer;
if (auto stream = journal_.trace())
{
auto const& available = offer.amount();
stream << " in:" << format_amount(available.in);
stream << " out:" << format_amount(available.out);
}
offer.consume(view_, order);
}
STAmount
Taker::get_funds(AccountID const& account, STAmount const& amount) const
{
return accountFunds(view_, account, amount, fhZERO_IF_FROZEN, journal_);
}
TER
Taker::transferXRP(
AccountID const& from,
AccountID const& to,
STAmount const& amount)
{
if (!isXRP(amount))
Throw<std::logic_error>("Using transferXRP with IOU");
if (from == to)
return tesSUCCESS;
// Transferring zero is equivalent to not doing a transfer
if (amount == beast::zero)
return tesSUCCESS;
return ripple::transferXRP(view_, from, to, amount, journal_);
}
TER
Taker::redeemIOU(
AccountID const& account,
STAmount const& amount,
Issue const& issue)
{
if (isXRP(amount))
Throw<std::logic_error>("Using redeemIOU with XRP");
if (account == issue.account)
return tesSUCCESS;
// Transferring zero is equivalent to not doing a transfer
if (amount == beast::zero)
return tesSUCCESS;
// If we are trying to redeem some amount, then the account
// must have a credit balance.
if (get_funds(account, amount) <= beast::zero)
Throw<std::logic_error>("redeemIOU has no funds to redeem");
auto ret = ripple::redeemIOU(view_, account, amount, issue, journal_);
if (get_funds(account, amount) < beast::zero)
Throw<std::logic_error>("redeemIOU redeemed more funds than available");
return ret;
}
TER
Taker::issueIOU(
AccountID const& account,
STAmount const& amount,
Issue const& issue)
{
if (isXRP(amount))
Throw<std::logic_error>("Using issueIOU with XRP");
if (account == issue.account)
return tesSUCCESS;
// Transferring zero is equivalent to not doing a transfer
if (amount == beast::zero)
return tesSUCCESS;
return ripple::issueIOU(view_, account, amount, issue, journal_);
}
// Performs funds transfers to fill the given offer and adjusts offer.
TER
Taker::fill(BasicTaker::Flow const& flow, Offer& offer)
{
// adjust offer
consume_offer(offer, flow.order);
TER result = tesSUCCESS;
if (cross_type() != CrossType::XrpToIou)
{
XRPL_ASSERT(
!isXRP(flow.order.in), "ripple::Taker::fill : order in is not XRP");
if (result == tesSUCCESS)
result =
redeemIOU(account(), flow.issuers.in, flow.issuers.in.issue());
if (result == tesSUCCESS)
result =
issueIOU(offer.owner(), flow.order.in, flow.order.in.issue());
}
else
{
XRPL_ASSERT(
isXRP(flow.order.in), "ripple::Taker::fill : order in is XRP");
if (result == tesSUCCESS)
result = transferXRP(account(), offer.owner(), flow.order.in);
}
// Now send funds from the account whose offer we're taking
if (cross_type() != CrossType::IouToXrp)
{
XRPL_ASSERT(
!isXRP(flow.order.out),
"ripple::Taker::fill : order out is not XRP");
if (result == tesSUCCESS)
result = redeemIOU(
offer.owner(), flow.issuers.out, flow.issuers.out.issue());
if (result == tesSUCCESS)
result =
issueIOU(account(), flow.order.out, flow.order.out.issue());
}
else
{
XRPL_ASSERT(
isXRP(flow.order.out), "ripple::Taker::fill : order out is XRP");
if (result == tesSUCCESS)
result = transferXRP(offer.owner(), account(), flow.order.out);
}
if (result == tesSUCCESS)
direct_crossings_++;
return result;
}
// Performs bridged funds transfers to fill the given offers and adjusts offers.
TER
Taker::fill(
BasicTaker::Flow const& flow1,
Offer& leg1,
BasicTaker::Flow const& flow2,
Offer& leg2)
{
// Adjust offers accordingly
consume_offer(leg1, flow1.order);
consume_offer(leg2, flow2.order);
TER result = tesSUCCESS;
// Taker to leg1: IOU
if (leg1.owner() != account())
{
if (result == tesSUCCESS)
result = redeemIOU(
account(), flow1.issuers.in, flow1.issuers.in.issue());
if (result == tesSUCCESS)
result =
issueIOU(leg1.owner(), flow1.order.in, flow1.order.in.issue());
}
// leg1 to leg2: bridging over XRP
if (result == tesSUCCESS)
result = transferXRP(leg1.owner(), leg2.owner(), flow1.order.out);
// leg2 to Taker: IOU
if (leg2.owner() != account())
{
if (result == tesSUCCESS)
result = redeemIOU(
leg2.owner(), flow2.issuers.out, flow2.issuers.out.issue());
if (result == tesSUCCESS)
result =
issueIOU(account(), flow2.order.out, flow2.order.out.issue());
}
if (result == tesSUCCESS)
{
bridge_crossings_++;
xrp_flow_ += flow1.order.out;
}
return result;
}
TER
Taker::cross(Offer& offer)
{
// In direct crossings, at least one leg must not be XRP.
if (isXRP(offer.amount().in) && isXRP(offer.amount().out))
return tefINTERNAL;
auto const amount =
do_cross(offer.amount(), offer.quality(), offer.owner());
return fill(amount, offer);
}
TER
Taker::cross(Offer& leg1, Offer& leg2)
{
// In bridged crossings, XRP must can't be the input to the first leg
// or the output of the second leg.
if (isXRP(leg1.amount().in) || isXRP(leg2.amount().out))
return tefINTERNAL;
auto ret = do_cross(
leg1.amount(),
leg1.quality(),
leg1.owner(),
leg2.amount(),
leg2.quality(),
leg2.owner());
return fill(ret.first, leg1, ret.second, leg2);
}
Rate
Taker::calculateRate(
ApplyView const& view,
AccountID const& issuer,
AccountID const& account)
{
return isXRP(issuer) || (account == issuer) ? parityRate
: transferRate(view, issuer);
}
} // namespace ripple

View File

@@ -0,0 +1,341 @@
//------------------------------------------------------------------------------
/*
This file is part of rippled: https://github.com/ripple/rippled
Copyright (c) 2014 Ripple Labs Inc.
Permission to use, copy, modify, and/or distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#ifndef RIPPLE_APP_BOOK_TAKER_H_INCLUDED
#define RIPPLE_APP_BOOK_TAKER_H_INCLUDED
#include <xrpld/app/tx/detail/Offer.h>
#include <xrpld/ledger/View.h>
#include <xrpl/beast/utility/Journal.h>
#include <xrpl/protocol/Quality.h>
#include <xrpl/protocol/Rate.h>
#include <xrpl/protocol/TER.h>
#include <xrpl/protocol/TxFlags.h>
namespace ripple {
/** The flavor of an offer crossing */
enum class CrossType { XrpToIou, IouToXrp, IouToIou };
/** State for the active party during order book or payment operations. */
class BasicTaker
{
private:
AccountID account_;
Quality quality_;
Quality threshold_;
bool sell_;
// The original in and out quantities.
Amounts const original_;
// The amounts still left over for us to try and take.
Amounts remaining_;
// The issuers for the input and output
Issue const& issue_in_;
Issue const& issue_out_;
// The rates that will be paid when the input and output currencies are
// transfered and the currency issuer isn't involved:
Rate const m_rate_in;
Rate const m_rate_out;
// The type of crossing that we are performing
CrossType cross_type_;
protected:
beast::Journal const journal_;
struct Flow
{
explicit Flow() = default;
Amounts order;
Amounts issuers;
bool
sanity_check() const
{
using beast::zero;
if (isXRP(order.in) && isXRP(order.out))
return false;
return order.in >= zero && order.out >= zero &&
issuers.in >= zero && issuers.out >= zero;
}
};
private:
void
log_flow(char const* description, Flow const& flow);
Flow
flow_xrp_to_iou(
Amounts const& offer,
Quality quality,
STAmount const& owner_funds,
STAmount const& taker_funds,
Rate const& rate_out);
Flow
flow_iou_to_xrp(
Amounts const& offer,
Quality quality,
STAmount const& owner_funds,
STAmount const& taker_funds,
Rate const& rate_in);
Flow
flow_iou_to_iou(
Amounts const& offer,
Quality quality,
STAmount const& owner_funds,
STAmount const& taker_funds,
Rate const& rate_in,
Rate const& rate_out);
// Calculates the transfer rate that we should use when calculating
// flows for a particular issue between two accounts.
static Rate
effective_rate(
Rate const& rate,
Issue const& issue,
AccountID const& from,
AccountID const& to);
// The transfer rate for the input currency between the given accounts
Rate
in_rate(AccountID const& from, AccountID const& to) const
{
return effective_rate(m_rate_in, original_.in.issue(), from, to);
}
// The transfer rate for the output currency between the given accounts
Rate
out_rate(AccountID const& from, AccountID const& to) const
{
return effective_rate(m_rate_out, original_.out.issue(), from, to);
}
public:
BasicTaker() = delete;
BasicTaker(BasicTaker const&) = delete;
BasicTaker(
CrossType cross_type,
AccountID const& account,
Amounts const& amount,
Quality const& quality,
std::uint32_t flags,
Rate const& rate_in,
Rate const& rate_out,
beast::Journal journal = beast::Journal{beast::Journal::getNullSink()});
virtual ~BasicTaker() = default;
/** Returns the amount remaining on the offer.
This is the amount at which the offer should be placed. It may either
be for the full amount when there were no crossing offers, or for zero
when the offer fully crossed, or any amount in between.
It is always at the original offer quality (quality_)
*/
Amounts
remaining_offer() const;
/** Returns the amount that the offer was originally placed at. */
Amounts const&
original_offer() const;
/** Returns the account identifier of the taker. */
AccountID const&
account() const noexcept
{
return account_;
}
/** Returns `true` if the quality does not meet the taker's requirements. */
bool
reject(Quality const& quality) const noexcept
{
return quality < threshold_;
}
/** Returns the type of crossing that is being performed */
CrossType
cross_type() const
{
return cross_type_;
}
/** Returns the Issue associated with the input of the offer */
Issue const&
issue_in() const
{
return issue_in_;
}
/** Returns the Issue associated with the output of the offer */
Issue const&
issue_out() const
{
return issue_out_;
}
/** Returns `true` if the taker has run out of funds. */
bool
unfunded() const;
/** Returns `true` if order crossing should not continue.
Order processing is stopped if the taker's order quantities have
been reached, or if the taker has run out of input funds.
*/
bool
done() const;
/** Perform direct crossing through given offer.
@return an `Amounts` describing the flow achieved during cross
*/
BasicTaker::Flow
do_cross(Amounts offer, Quality quality, AccountID const& owner);
/** Perform bridged crossing through given offers.
@return a pair of `Amounts` describing the flow achieved during cross
*/
std::pair<BasicTaker::Flow, BasicTaker::Flow>
do_cross(
Amounts offer1,
Quality quality1,
AccountID const& owner1,
Amounts offer2,
Quality quality2,
AccountID const& owner2);
virtual STAmount
get_funds(AccountID const& account, STAmount const& funds) const = 0;
};
//------------------------------------------------------------------------------
class Taker : public BasicTaker
{
public:
Taker() = delete;
Taker(Taker const&) = delete;
Taker(
CrossType cross_type,
ApplyView& view,
AccountID const& account,
Amounts const& offer,
std::uint32_t flags,
beast::Journal journal);
~Taker() = default;
void
consume_offer(Offer& offer, Amounts const& order);
STAmount
get_funds(AccountID const& account, STAmount const& funds) const override;
STAmount const&
get_xrp_flow() const
{
return xrp_flow_;
}
std::uint32_t
get_direct_crossings() const
{
return direct_crossings_;
}
std::uint32_t
get_bridge_crossings() const
{
return bridge_crossings_;
}
/** Perform a direct or bridged offer crossing as appropriate.
Funds will be transferred accordingly, and offers will be adjusted.
@return tesSUCCESS if successful, or an error code otherwise.
*/
/** @{ */
TER
cross(Offer& offer);
TER
cross(Offer& leg1, Offer& leg2);
/** @} */
private:
static Rate
calculateRate(
ApplyView const& view,
AccountID const& issuer,
AccountID const& account);
TER
fill(BasicTaker::Flow const& flow, Offer& offer);
TER
fill(
BasicTaker::Flow const& flow1,
Offer& leg1,
BasicTaker::Flow const& flow2,
Offer& leg2);
TER
transferXRP(
AccountID const& from,
AccountID const& to,
STAmount const& amount);
TER
redeemIOU(
AccountID const& account,
STAmount const& amount,
Issue const& issue);
TER
issueIOU(
AccountID const& account,
STAmount const& amount,
Issue const& issue);
private:
// The underlying ledger entry we are dealing with
ApplyView& view_;
// The amount of XRP that flowed if we were autobridging
STAmount xrp_flow_;
// The number direct crossings that we performed
std::uint32_t direct_crossings_;
// The number autobridged crossings that we performed
std::uint32_t bridge_crossings_;
};
} // namespace ripple
#endif

View File

@@ -111,6 +111,7 @@ private:
std::size_t baseTxCount_ = 0;
bool open_ = true;
bool mock_ = true;
public:
OpenView() = delete;
@@ -187,6 +188,12 @@ public:
*/
OpenView(ReadView const* base, std::shared_ptr<void const> hold = nullptr);
bool
isMock() const
{
return mock_;
}
/** Returns true if this reflects an open ledger. */
bool
open() const override

View File

@@ -21,6 +21,8 @@
#include <xrpl/basics/contract.h>
#include <iostream>
namespace ripple {
class OpenView::txs_iter_impl : public txs_type::iter_base
@@ -77,15 +79,32 @@ public:
OpenView::OpenView(OpenView const& rhs)
: ReadView(rhs)
, TxsRawView(rhs)
, monotonic_resource_{std::make_unique<
boost::container::pmr::monotonic_buffer_resource>(initialBufferSize)}
, txs_{rhs.txs_, monotonic_resource_.get()}
, rules_{rhs.rules_}
, info_{rhs.info_}
, base_{rhs.base_}
, items_{rhs.items_}
, hold_{rhs.hold_}
, open_{rhs.open_} {};
, baseTxCount_{rhs.baseTxCount_}
, open_{rhs.open_}
, mock_{rhs.mock_}
{
// Calculate optimal buffer size based on source data
size_t estimatedNeeds =
rhs.txs_.size() * 300; // rough estimate: 300 bytes per tx entry
size_t bufferSize =
std::max(initialBufferSize, estimatedNeeds * 3 / 2); // 50% headroom
// std::cout << "[OpenView Memory] Copy constructor - Source has "
// << rhs.txs_.size() << " txs"
// << ", estimated needs: " << estimatedNeeds / 1024 << "KB"
// << ", allocating: " << bufferSize / 1024 << "KB" << std::endl;
monotonic_resource_ =
std::make_unique<boost::container::pmr::monotonic_buffer_resource>(
bufferSize);
txs_ = txs_map{rhs.txs_, monotonic_resource_.get()};
}
OpenView::OpenView(
open_ledger_t,