fix: Support Delete NFT (#1695)

Fixes #1677
This commit is contained in:
Peter Chen
2024-10-25 11:27:02 -04:00
committed by Alex Kremer
parent ffc9deb0f8
commit 081adf1cae
3 changed files with 198 additions and 104 deletions

View File

@@ -60,6 +60,10 @@ var (
skipLedgersTable = app.Flag("skip-ledgers", "Whether to skip deletion from ledgers table").Default("false").Bool()
skipWriteLatestLedger = app.Flag("skip-write-latest-ledger", "Whether to skip writing the latest ledger index").Default("false").Bool()
skipAccTransactionsTable = app.Flag("skip-account-transactions", "Whether to skip deletion from account_transactions table").Default("false").Bool()
skipNFTokenTable = app.Flag("skip-nf-tokens", "Whether to skip deletion from nf_tokens table").Default("false").Bool()
skipIssuerNFTokenTable = app.Flag("skip-issuer-nf-tokens-v2", "Whether to skip deletion from issuer_nf_tokens_v2 table").Default("false").Bool()
skipNFTokenURITable = app.Flag("skip-nf-tokens-uri", "Whether to skip deletion from nf_token_uris table").Default("false").Bool()
skipNFTokenTransactionsTable = app.Flag("skip-nf-token-transactions", "Whether to skip deletion from nf_token_transactions table").Default("false").Bool()
workerCount = 1 // the calculated number of parallel goroutines the client should run
ranges []*util.TokenRange // the calculated ranges to be executed in parallel
@@ -90,6 +94,11 @@ func main() {
SkipLedgersTable: *skipLedgersTable,
SkipWriteLatestLedger: *skipWriteLatestLedger,
SkipAccTransactionsTable: *skipAccTransactionsTable,
SkipNFTokenTable: *skipNFTokenTable,
SkipIssuerNFTokenTable: *skipIssuerNFTokenTable,
SkipNFTokenURITable: *skipNFTokenURITable,
SkipNFTokenTransactionsTable: *skipNFTokenTransactionsTable,
WorkerCount: workerCount,
Ranges: ranges,
RangesRead: ledgerOrTokenRange,
@@ -171,6 +180,10 @@ Skip deletion of:
- ledger_transactions table : %t
- ledgers table : %t
- account_tx table : %t
- nf_tokens table : %t
- issuer_nf_tokens_v2 table : %t
- nf_token_uris table : %t
- nf_token_transactions table : %t
Will update ledger_range : %t
@@ -194,6 +207,10 @@ Will update ledger_range : %t
*skipLedgerTransactionsTable,
*skipLedgersTable,
*skipAccTransactionsTable,
*skipNFTokenTable,
*skipIssuerNFTokenTable,
*skipNFTokenURITable,
*skipNFTokenTransactionsTable,
!*skipWriteLatestLedger,
)
@@ -264,6 +281,15 @@ func prepareResume(cmd *string) {
// should be already deleted so we skip for deletion
tableFound := false
switch scanner.Text() {
case "nf_token_transactions":
*skipNFTokenURITable = true
fallthrough
case "nf_token_uris":
*skipNFTokenTable = true
fallthrough
case "nf_tokens":
*skipAccTransactionsTable = true
fallthrough
case "account_tx":
*skipLedgersTable = true
fallthrough

View File

@@ -47,6 +47,10 @@ type Settings struct {
SkipLedgersTable bool
SkipWriteLatestLedger bool
SkipAccTransactionsTable bool
SkipNFTokenTable bool
SkipIssuerNFTokenTable bool
SkipNFTokenURITable bool
SkipNFTokenTransactionsTable bool
WorkerCount int
Ranges []*util.TokenRange
@@ -54,44 +58,6 @@ type Settings struct {
Command string
}
type Marker struct {
cmd string
file *os.File
}
func NewMarker(cmd string) *Marker {
return &Marker{cmd: cmd}
}
func CloseMarker(m *Marker) {
if m.file != nil {
m.file.Close()
}
os.Remove("continue.txt")
}
func (m *Marker) EnterTable(table string) error {
// Create the file
file, err := os.OpenFile("continue.txt", os.O_WRONLY|os.O_TRUNC|os.O_CREATE, 0644)
m.file = file
if err != nil {
return fmt.Errorf("failed to create file: %w", err)
}
fmt.Fprintf(m.file, "%s\n", m.cmd)
m.file.WriteString(fmt.Sprintf("%s\n", table))
return nil
}
func (m *Marker) MarkProgress(x int64, y int64) {
fmt.Fprintf(m.file, "%d, %d \n", x, y)
}
func (m *Marker) ExitTable() {
m.file.Close()
m.file = nil
os.Remove("continue.txt")
}
type Cass interface {
GetLedgerRange() (uint64, uint64, error)
DeleteBefore(ledgerIdx uint64)
@@ -227,7 +193,7 @@ func (c *ClioCass) pruneData(
// successor queries
if !c.settings.SkipSuccessorTable {
marker := NewMarker(c.settings.Command)
marker := util.NewMarker(c.settings.Command)
if err := marker.EnterTable("successor"); err != nil {
return err
}
@@ -235,7 +201,7 @@ func (c *ClioCass) pruneData(
log.Println("Generating delete queries for successor table")
rowsCount, deleteCount, errCount = c.prepareAndExecuteDeleteQueries(marker, fromLedgerIdx, toLedgerIdx,
"SELECT key, seq FROM successor WHERE token(key) >= ? AND token(key) <= ?",
"DELETE FROM successor WHERE key = ? AND seq = ?", deleteMethod{deleteGeneral: maybe.Set(true)}, columnSettings{UseBlob: true, UseSeq: false})
"DELETE FROM successor WHERE key = ? AND seq = ?", deleteMethod{deleteGeneral: maybe.Set(true)}, columnSettings{})
log.Printf("Total delete queries: %d\n", deleteCount)
log.Printf("Total traversed rows: %d\n\n", rowsCount)
totalRows += rowsCount
@@ -247,7 +213,7 @@ func (c *ClioCass) pruneData(
// objects queries
if !c.settings.SkipObjectsTable {
marker := NewMarker(c.settings.Command)
marker := util.NewMarker(c.settings.Command)
if err := marker.EnterTable("objects"); err != nil {
return err
}
@@ -255,7 +221,7 @@ func (c *ClioCass) pruneData(
log.Println("Generating delete queries for objects table")
rowsCount, deleteCount, errCount = c.prepareAndExecuteDeleteQueries(marker, fromLedgerIdx, toLedgerIdx,
"SELECT key, sequence FROM objects WHERE token(key) >= ? AND token(key) <= ?",
"DELETE FROM objects WHERE key = ? AND sequence = ?", deleteMethod{deleteObject: maybe.Set(true)}, columnSettings{UseBlob: true, UseSeq: true})
"DELETE FROM objects WHERE key = ? AND sequence = ?", deleteMethod{deleteObject: maybe.Set(true)}, columnSettings{})
log.Printf("Total delete queries: %d\n", deleteCount)
log.Printf("Total traversed rows: %d\n\n", rowsCount)
totalErrors += errCount
@@ -267,7 +233,7 @@ func (c *ClioCass) pruneData(
// ledger_hashes queries
if !c.settings.SkipLedgerHashesTable {
marker := NewMarker(c.settings.Command)
marker := util.NewMarker(c.settings.Command)
if err := marker.EnterTable("ledger_hashes"); err != nil {
return err
}
@@ -287,7 +253,7 @@ func (c *ClioCass) pruneData(
// transactions queries
if !c.settings.SkipTransactionsTable {
marker := NewMarker(c.settings.Command)
marker := util.NewMarker(c.settings.Command)
if err := marker.EnterTable("transactions"); err != nil {
return err
}
@@ -306,8 +272,9 @@ func (c *ClioCass) pruneData(
}
// diff queries
if !c.settings.SkipDiffTable {
marker := NewMarker(c.settings.Command)
marker := util.NewMarker(c.settings.Command)
if err := marker.EnterTable("diff"); err != nil {
return err
}
@@ -324,7 +291,7 @@ func (c *ClioCass) pruneData(
// ledger_transactions queries
if !c.settings.SkipLedgerTransactionsTable {
marker := NewMarker(c.settings.Command)
marker := util.NewMarker(c.settings.Command)
if err := marker.EnterTable("ledger_transactions"); err != nil {
return err
}
@@ -341,7 +308,7 @@ func (c *ClioCass) pruneData(
// ledgers queries
if !c.settings.SkipLedgersTable {
marker := NewMarker(c.settings.Command)
marker := util.NewMarker(c.settings.Command)
if err := marker.EnterTable("ledgers"); err != nil {
return err
}
@@ -358,7 +325,7 @@ func (c *ClioCass) pruneData(
// account_tx queries
if !c.settings.SkipAccTransactionsTable {
marker := NewMarker(c.settings.Command)
marker := util.NewMarker(c.settings.Command)
if err := marker.EnterTable("account_tx"); err != nil {
return err
}
@@ -366,7 +333,7 @@ func (c *ClioCass) pruneData(
log.Println("Generating delete queries for account transactions table")
rowsCount, deleteCount, errCount = c.prepareAndExecuteDeleteQueries(marker, fromLedgerIdx, toLedgerIdx,
"SELECT account, seq_idx FROM account_tx WHERE token(account) >= ? AND token(account) <= ?",
"DELETE FROM account_tx WHERE account = ? AND seq_idx = (?, ?)", deleteMethod{deleteTransaction: maybe.Set(true)}, columnSettings{UseBlob: true, UseSeq: false})
"DELETE FROM account_tx WHERE account = ? AND seq_idx = (?, ?)", deleteMethod{deleteTransaction: maybe.Set(true)}, columnSettings{})
log.Printf("Total delete queries: %d\n", deleteCount)
log.Printf("Total traversed rows: %d\n\n", rowsCount)
totalRows += rowsCount
@@ -376,7 +343,67 @@ func (c *ClioCass) pruneData(
marker.ExitTable()
}
// TODO: take care of nft tables and other stuff like that
// nf_token queries
if !c.settings.SkipNFTokenTable {
marker := util.NewMarker(c.settings.Command)
if err := marker.EnterTable("nf_tokens"); err != nil {
return err
}
log.Println("Generating delete queries for nft tokens table")
rowsCount, deleteCount, errCount = c.prepareAndExecuteDeleteQueries(marker, fromLedgerIdx, toLedgerIdx,
"SELECT token_id, sequence FROM nf_tokens WHERE token(token_id) >= ? AND token(token_id) <= ?",
"DELETE FROM nf_tokens WHERE token_id = ? AND sequence = ?", deleteMethod{deleteObject: maybe.Set(true)}, columnSettings{})
log.Printf("Total delete queries: %d\n", deleteCount)
log.Printf("Total traversed rows: %d\n\n", rowsCount)
totalRows += rowsCount
totalErrors += errCount
totalDeletes += deleteCount
marker.ExitTable()
}
// issuer_nf_tokens_v2: skipped because table is small and not trivial to delete
// nf_token_URI queries
if !c.settings.SkipNFTokenURITable {
marker := util.NewMarker(c.settings.Command)
if err := marker.EnterTable("nf_token_uris"); err != nil {
return err
}
log.Println("Generating delete queries for nft token URI table")
rowsCount, deleteCount, errCount = c.prepareAndExecuteDeleteQueries(marker, fromLedgerIdx, toLedgerIdx,
"SELECT token_id, sequence FROM nf_token_uris WHERE token(token_id) >= ? AND token(token_id) <= ?",
"DELETE FROM nf_token_uris WHERE token_id = ? AND sequence = ?", deleteMethod{deleteObject: maybe.Set(true)}, columnSettings{})
log.Printf("Total delete queries: %d\n", deleteCount)
log.Printf("Total traversed rows: %d\n\n", rowsCount)
totalRows += rowsCount
totalErrors += errCount
totalDeletes += deleteCount
marker.ExitTable()
}
// nf_token_transactions
if !c.settings.SkipNFTokenTransactionsTable {
marker := util.NewMarker(c.settings.Command)
if err := marker.EnterTable("nf_token_transactions"); err != nil {
return err
}
log.Println("Generating delete queries for nft token transactions table")
rowsCount, deleteCount, errCount = c.prepareAndExecuteDeleteQueries(marker, fromLedgerIdx, toLedgerIdx,
"SELECT token_id, seq_idx FROM nf_token_transactions WHERE token(token_id) >= ? AND token(token_id) <= ?",
"DELETE FROM nf_token_transactions WHERE token_id = ? AND seq_idx = (?, ?)", deleteMethod{deleteTransaction: maybe.Set(true)}, columnSettings{})
log.Printf("Total delete queries: %d\n", deleteCount)
log.Printf("Total traversed rows: %d\n\n", rowsCount)
totalRows += rowsCount
totalErrors += errCount
totalDeletes += deleteCount
marker.ExitTable()
}
if !c.settings.SkipWriteLatestLedger {
var (
@@ -424,7 +451,7 @@ func (c *ClioCass) prepareAndExecuteSimpleDeleteQueries(
info.Data = append(info.Data, deleteParams{Seq: i})
// for every 1000 queries in data, delete
if len(info.Data) == 1000 {
_, err := c.performDeleteQueries(&info, session, colSettings)
err := c.performDeleteQueries(&info, session, colSettings)
atomic.AddUint64(&totalDeletes, uint64(len(info.Data)))
atomic.AddUint64(&totalErrors, err)
info = deleteInfo{Query: deleteQueryTemplate}
@@ -432,7 +459,7 @@ func (c *ClioCass) prepareAndExecuteSimpleDeleteQueries(
}
// delete the rest of queries if exists
if len(info.Data) > 0 {
_, err := c.performDeleteQueries(&info, session, colSettings)
err := c.performDeleteQueries(&info, session, colSettings)
atomic.AddUint64(&totalDeletes, uint64(len(info.Data)))
atomic.AddUint64(&totalErrors, err)
}
@@ -543,7 +570,7 @@ func (c *ClioCass) prepareAccTxnDelete(
}
func (c *ClioCass) prepareAndExecuteDeleteQueries(
marker *Marker,
marker *util.Marker,
fromLedgerIdx maybe.Maybe[uint64],
toLedgerIdx maybe.Maybe[uint64],
queryTemplate string,
@@ -612,6 +639,8 @@ func (c *ClioCass) prepareAndExecuteDeleteQueries(
prepareDeleteResult = c.prepareAccTxnDelete(scanner, &info, fromLedgerIdx, toLedgerIdx, &rowsRetrieved)
} else if method.deleteGeneral.HasValue() && method.deleteGeneral.Value() {
prepareDeleteResult = c.prepareDefaultDelete(scanner, &info, fromLedgerIdx, toLedgerIdx, &rowsRetrieved)
} else {
log.Fatal("Deletion method not supported")
}
if !prepareDeleteResult {
@@ -620,10 +649,9 @@ func (c *ClioCass) prepareAndExecuteDeleteQueries(
atomic.AddUint64(&totalErrors, 1)
}
if len(nextPageState) == 0 {
// Checks for delete queries after iterating all pages
// Checks for delete queries when there are queries available to delete
if len(info.Data) > 0 {
_, numErr := c.performDeleteQueries(&info, session, colSettings)
numErr := c.performDeleteQueries(&info, session, colSettings)
atomic.AddUint64(&totalErrors, numErr)
atomic.AddUint64(&totalDeletes, uint64(len(info.Data)))
if totalDeletes >= counter {
@@ -633,6 +661,8 @@ func (c *ClioCass) prepareAndExecuteDeleteQueries(
// reset back to the deleted query template after finishing executing delete
info = deleteInfo{Query: deleteQueryTemplate}
}
if len(nextPageState) == 0 {
break
}
pageState = nextPageState
@@ -681,10 +711,9 @@ func (c *ClioCass) splitDeleteWork(info *deleteInfo) [][]deleteParams {
return chunks
}
func (c *ClioCass) performDeleteQueries(info *deleteInfo, session *gocql.Session, colSettings columnSettings) (uint64, uint64) {
func (c *ClioCass) performDeleteQueries(info *deleteInfo, session *gocql.Session, colSettings columnSettings) uint64 {
var wg sync.WaitGroup
var sessionCreationWaitGroup sync.WaitGroup
var totalDeletes uint64
var totalErrors uint64
chunks := c.splitDeleteWork(info)
@@ -727,11 +756,6 @@ func (c *ClioCass) performDeleteQueries(info *deleteInfo, session *gocql.Session
log.Printf("DELETE ERROR: %s\n", err)
fmt.Fprintf(os.Stderr, "FAILED QUERY: %s\n", fmt.Sprintf("%s [blob=0x%x][seq=%d]", info.Query, r.Blob, r.Seq))
atomic.AddUint64(&totalErrors, 1)
} else {
atomic.AddUint64(&totalDeletes, 1)
if atomic.LoadUint64(&totalDeletes)%10000 == 0 {
log.Printf("... %d deletes ...\n", totalDeletes)
}
}
}
}
@@ -739,7 +763,7 @@ func (c *ClioCass) performDeleteQueries(info *deleteInfo, session *gocql.Session
}
wg.Wait()
return totalDeletes, totalErrors
return totalErrors
}
func (c *ClioCass) updateLedgerRange(newStartLedger maybe.Maybe[uint64], newEndLedger maybe.Maybe[uint64]) error {

View File

@@ -0,0 +1,44 @@
package util
import (
"fmt"
"os"
)
type Marker struct {
Cmd string
File *os.File
}
func NewMarker(cmd string) *Marker {
return &Marker{Cmd: cmd}
}
func CloseMarker(m *Marker) {
if m.File != nil {
m.File.Close()
}
os.Remove("continue.txt")
}
func (m *Marker) EnterTable(table string) error {
// Create the file
file, err := os.OpenFile("continue.txt", os.O_WRONLY|os.O_TRUNC|os.O_CREATE, 0644)
m.File = file
if err != nil {
return fmt.Errorf("failed to create file: %w", err)
}
fmt.Fprintf(m.File, "%s\n", m.Cmd)
m.File.WriteString(fmt.Sprintf("%s\n", table))
return nil
}
func (m *Marker) MarkProgress(x int64, y int64) {
fmt.Fprintf(m.File, "%d, %d \n", x, y)
}
func (m *Marker) ExitTable() {
m.File.Close()
m.File = nil
os.Remove("continue.txt")
}