feat: Snapshot import feature (#1970)

Implement snapshot import cmd
`clio_snapshot --server --grpc_server 0.0.0.0:12345 --path
<snapshot_path>`

Implement snapshot range cmd
`./clio_snapshot --range --path <snapshot_path>`

Add
LedgerHouses: It is responsible for reading/writing snapshot data
Server: Start grpc server and ws server
This commit is contained in:
cyan317
2025-03-26 09:11:15 +00:00
committed by GitHub
parent 66b3f40268
commit f454076fb6
16 changed files with 869 additions and 148 deletions

View File

@@ -4,20 +4,20 @@ import (
"context"
"fmt"
"log"
"os"
"path/filepath"
"sync"
pb "xrplf/clio/clio_snapshot/org/xrpl/rpc/v1"
"xrplf/clio/clio_snapshot/internal/ledgers"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/protobuf/proto"
)
const (
deltaDataFolderDiv = 10000
grpcUser = "clio-snapshot"
markerNum = 16
grpcUser = "clio-snapshot"
markerNum = 16
maxConcurrency = 256
firstAvailableLedger = 32570
)
type gRPCClient struct {
@@ -44,7 +44,25 @@ func createGRPCClient(serverAddr string) (*gRPCClient, error) {
}, nil
}
func getLedgerDeltaData(client pb.XRPLedgerAPIServiceClient, seq uint32, path string) {
func getLedgerDeltaDataInParallel(client pb.XRPLedgerAPIServiceClient, startSeq uint32, endSeq uint32, ledgersHouse *ledgers.LedgersHouse) {
sem := make(chan struct{}, maxConcurrency)
var wg sync.WaitGroup
for i := startSeq; i <= endSeq; i++ {
wg.Add(1)
sem <- struct{}{}
go func(seq uint32) {
defer wg.Done()
log.Printf("Process delta sequence: %d\n", seq)
getLedgerDeltaData(client, seq, ledgersHouse)
<-sem
}(i)
}
wg.Wait()
}
func getLedgerDeltaData(client pb.XRPLedgerAPIServiceClient, seq uint32, ledgersHouse *ledgers.LedgersHouse) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
@@ -56,49 +74,27 @@ func getLedgerDeltaData(client pb.XRPLedgerAPIServiceClient, seq uint32, path st
}
request.Ledger = ledger
request.User = grpcUser
request.GetObjectNeighbors = true
request.Transactions = true
request.Expand = true
request.GetObjects = true
// The first available ledger doesn't have diff data
request.GetObjects = firstAvailableLedger != seq
request.GetObjectNeighbors = firstAvailableLedger != seq
response, err := client.GetLedger(ctx, &request)
if err != nil {
log.Fatalf("Error getting ledger data: %v", err)
log.Fatalf("Error getting ledger delta data: %v - seq: %d", err, seq)
}
saveLedgerDeltaData(seq, response, path)
err = ledgersHouse.WriteLedgerDeltaData(seq, response)
if err != nil {
log.Fatalf("Error writing ledger delta data: %v", err)
}
log.Printf("Processing delta sequence: %d\n", seq)
}
func roundDown(n uint32, roundTo uint32) uint32 {
if roundTo == 0 {
return n
}
return n - (n % roundTo)
}
func saveLedgerDeltaData(seq uint32, response *pb.GetLedgerResponse, path string) {
subPath := filepath.Join(path, fmt.Sprintf("ledger_diff_%d", roundDown(seq, deltaDataFolderDiv)))
err := os.MkdirAll(subPath, os.ModePerm)
if err != nil {
log.Fatalf("Error creating directory: %v", err)
}
protoData, err := proto.Marshal(response)
if err != nil {
log.Fatalf("Error marshalling data: %v", err)
}
filePath := filepath.Join(subPath, fmt.Sprintf("%d.dat", seq))
err = os.WriteFile(filePath, protoData, 0644)
if err != nil {
log.Fatalf("failed to write file: %v", err)
}
}
func generateMarkers(markerNum uint32) [][32]byte {
var byteArray [32]byte
@@ -114,19 +110,7 @@ func generateMarkers(markerNum uint32) [][32]byte {
return byteArrayList
}
func saveLedgerData(path string, data *pb.GetLedgerDataResponse) {
protoData, err := proto.Marshal(data)
if err != nil {
log.Fatalf("Error marshalling data: %v", err)
}
err = os.WriteFile(path, protoData, 0644)
if err != nil {
log.Fatalf("failed to write file: %v", err)
}
}
func getLedgerData(client pb.XRPLedgerAPIServiceClient, seq uint32, marker []byte, end []byte, path string) {
func getLedgerData(client pb.XRPLedgerAPIServiceClient, seq uint32, marker []byte, end []byte, ledgerHouse *ledgers.LedgersHouse) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
@@ -143,25 +127,22 @@ func getLedgerData(client pb.XRPLedgerAPIServiceClient, seq uint32, marker []byt
}
request.User = grpcUser
subPath := filepath.Join(path, fmt.Sprintf("ledger_data_%d", seq), fmt.Sprintf("marker_%x", marker))
err := os.MkdirAll(subPath, os.ModePerm)
if err != nil {
log.Fatalf("Error creating directory: %v", err)
}
for request.Marker != nil {
res, err := client.GetLedgerData(ctx, &request)
if err != nil {
log.Fatalf("Error getting ledger data: %v", err)
}
filePath := filepath.Join(subPath, fmt.Sprintf("%x.dat", request.Marker))
saveLedgerData(filePath, res)
err = ledgerHouse.WriteLedgerData(seq, request.Marker, res)
if err != nil {
log.Fatalf("Error writing ledger data: %v", err)
}
log.Printf("Saving ledger data %x", request.Marker)
request.Marker = res.Marker
}
}
func getLedgerFullData(client pb.XRPLedgerAPIServiceClient, seq uint32, path string) {
func getLedgerFullData(client pb.XRPLedgerAPIServiceClient, seq uint32, ledgerHouse *ledgers.LedgersHouse) {
log.Printf("Processing full sequence: %d\n", seq)
markers := generateMarkers(markerNum)
@@ -176,11 +157,9 @@ func getLedgerFullData(client pb.XRPLedgerAPIServiceClient, seq uint32, path str
end = markers[i+1][:]
}
fmt.Printf("Got ledger data marker: %x-%x\n", marker, end)
go func() {
defer wg.Done()
getLedgerData(client, seq, marker[:], end, path)
getLedgerData(client, seq, marker[:], end, ledgerHouse)
}()
}
@@ -188,19 +167,7 @@ func getLedgerFullData(client pb.XRPLedgerAPIServiceClient, seq uint32, path str
wg.Wait()
}
func checkPath(path string) {
if _, err := os.Stat(path); os.IsNotExist(err) {
// Create the directory if it doesn't exist
err := os.MkdirAll(path, os.ModePerm)
if err != nil {
log.Fatalf("Error creating directory: %v", err)
}
}
}
func ExportFromFullLedger(grpcServer string, startSeq uint32, endSeq uint32, path string) {
checkPath(path)
client, err := createGRPCClient(grpcServer)
if err != nil {
log.Fatalf("Error creating gRPC client: %v", err)
@@ -212,20 +179,21 @@ func ExportFromFullLedger(grpcServer string, startSeq uint32, endSeq uint32, pat
}
func exportFromFullLedgerImpl(client pb.XRPLedgerAPIServiceClient, startSeq uint32, endSeq uint32, path string) {
ledgersHouse := ledgers.NewLedgersHouse(path)
getLedgerFullData(client, startSeq, path)
getLedgerFullData(client, startSeq, ledgersHouse)
//We need to fetch the ledger header and txs for startSeq as well
for i := startSeq; i <= endSeq; i++ {
getLedgerDeltaData(client, i, path)
getLedgerDeltaDataInParallel(client, startSeq, endSeq, ledgersHouse)
err := ledgersHouse.SetRange(startSeq, endSeq)
if err != nil {
log.Fatalf("Error writing range: %v", err)
}
log.Printf("Exporting from full ledger: %d to %d at path %s\n", startSeq, endSeq, path)
}
func ExportFromDeltaLedger(grpcServer string, startSeq uint32, endSeq uint32, path string) {
checkPath(path)
client, err := createGRPCClient(grpcServer)
if err != nil {
log.Fatalf("Error creating gRPC client: %v", err)
@@ -237,8 +205,27 @@ func ExportFromDeltaLedger(grpcServer string, startSeq uint32, endSeq uint32, pa
}
func exportFromDeltaLedgerImpl(client pb.XRPLedgerAPIServiceClient, startSeq uint32, endSeq uint32, path string) {
for i := startSeq; i <= endSeq; i++ {
getLedgerDeltaData(client, i, path)
ledgersHouse := ledgers.NewLedgersHouse(path)
_, oldEnd, err := ledgersHouse.GetRange()
if err != nil {
log.Fatalf("Can't find existing snapshot to extend: %v", err)
}
if oldEnd < startSeq-1 {
log.Fatalf("Missing delta ledger from %d to %d", oldEnd, startSeq)
}
if oldEnd >= endSeq {
log.Fatalf("The snapshot already contains the requested delta ledger")
}
getLedgerDeltaDataInParallel(client, startSeq, endSeq, ledgersHouse)
err = ledgersHouse.AppendDeltaLedger(startSeq, endSeq)
if err != nil {
log.Fatalf("Error writing new range: %v", err)
}
log.Printf("Exporting from ledger: %d to %d at path %s\n", startSeq, endSeq, path)