Skip to content

Commit

Permalink
save
Browse files Browse the repository at this point in the history
  • Loading branch information
AskAlexSharov committed Oct 20, 2023
2 parents 0695309 + 17a03d5 commit a31ed7f
Show file tree
Hide file tree
Showing 5 changed files with 308 additions and 28 deletions.
121 changes: 115 additions & 6 deletions erigon-lib/etl/buffers.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ const (
// SortableOldestAppearedBuffer - buffer that keeps only the oldest entries.
// if first v1 was added under key K, then v2; only v1 will stay
SortableOldestAppearedBuffer
SortableMergeBuffer

//BufIOSize - 128 pages | default is 1 page | increasing over `64 * 4096` doesn't show speedup on SSD/NVMe, but show speedup in cloud drives
BufIOSize = 128 * 4096
Expand Down Expand Up @@ -211,8 +212,8 @@ func (b *appendSortableBuffer) Put(k, v []byte) {
b.size += len(k)
}
b.size += len(v)
stored = append(stored, v...)
b.entries[string(k)] = stored
fmt.Printf("put: %d, %x, %x . %x\n", b.size, k, stored, v)
b.entries[string(k)] = append(stored, v...)
}

func (b *appendSortableBuffer) Size() int { return b.size }
Expand All @@ -222,8 +223,8 @@ func (b *appendSortableBuffer) Len() int {
return len(b.entries)
}
func (b *appendSortableBuffer) Sort() {
for i := range b.entries {
b.sortedBuf = append(b.sortedBuf, sortableBufferEntry{key: []byte(i), value: b.entries[i]})
for key, val := range b.entries {
b.sortedBuf = append(b.sortedBuf, sortableBufferEntry{key: []byte(key), value: val})
}
sort.Stable(b)
}
Expand Down Expand Up @@ -255,6 +256,7 @@ func (b *appendSortableBuffer) Write(w io.Writer) error {
var numBuf [binary.MaxVarintLen64]byte
entries := b.sortedBuf
for _, entry := range entries {
fmt.Printf("write: %x, %x\n", entry.key, entry.value)
lk := int64(len(entry.key))
if entry.key == nil {
lk = -1
Expand All @@ -266,7 +268,7 @@ func (b *appendSortableBuffer) Write(w io.Writer) error {
if _, err := w.Write(entry.key); err != nil {
return err
}
lv := int64(len(entry.key))
lv := int64(len(entry.value))
if entry.value == nil {
lv = -1
}
Expand Down Expand Up @@ -381,14 +383,16 @@ func (b *oldestEntrySortableBuffer) CheckFlushSize() bool {
return b.size >= b.optimalSize
}

func getBufferByType(tp int, size datasize.ByteSize) Buffer {
func getBufferByType(tp int, size datasize.ByteSize, prevBuf Buffer) Buffer {
switch tp {
case SortableSliceBuffer:
return NewSortableBuffer(size)
case SortableAppendBuffer:
return NewAppendBuffer(size)
case SortableOldestAppearedBuffer:
return NewOldestEntryBuffer(size)
case SortableMergeBuffer:
return NewLatestMergedEntryMergedBuffer(size, prevBuf.(*oldestMergedEntrySortableBuffer).merge)
default:
panic("unknown buffer type " + strconv.Itoa(tp))
}
Expand All @@ -402,7 +406,112 @@ func getTypeByBuffer(b Buffer) int {
return SortableAppendBuffer
case *oldestEntrySortableBuffer:
return SortableOldestAppearedBuffer
case *oldestMergedEntrySortableBuffer:
return SortableMergeBuffer
default:
panic(fmt.Sprintf("unknown buffer type: %T ", b))
}
}

func NewLatestMergedEntryMergedBuffer(bufferOptimalSize datasize.ByteSize, merger func([]byte, []byte) []byte) *oldestMergedEntrySortableBuffer {
if merger == nil {
panic("nil merge func")
}
return &oldestMergedEntrySortableBuffer{
entries: make(map[string][]byte),
size: 0,
merge: merger,
optimalSize: int(bufferOptimalSize.Bytes()),
}
}

type oldestMergedEntrySortableBuffer struct {
entries map[string][]byte
merge func([]byte, []byte) []byte
sortedBuf []sortableBufferEntry
size int
optimalSize int
}

func (b *oldestMergedEntrySortableBuffer) Put(k, v []byte) {
prev, ok := b.entries[string(k)]
if ok {
b.size -= len(v)
// if we already had this entry, we are going to keep it and ignore new value
v = b.merge(prev, v)
b.size += len(v)
} else {
b.size += len(k) + len(v)
}
b.entries[string(k)] = common.Copy(v)
}

func (b *oldestMergedEntrySortableBuffer) Size() int { return b.size }
func (b *oldestMergedEntrySortableBuffer) SizeLimit() int { return b.optimalSize }

func (b *oldestMergedEntrySortableBuffer) Len() int {
return len(b.entries)
}

func (b *oldestMergedEntrySortableBuffer) Sort() {
for k, v := range b.entries {
b.sortedBuf = append(b.sortedBuf, sortableBufferEntry{key: []byte(k), value: v})
}
sort.Stable(b)
}

func (b *oldestMergedEntrySortableBuffer) Less(i, j int) bool {
return bytes.Compare(b.sortedBuf[i].key, b.sortedBuf[j].key) < 0
}

func (b *oldestMergedEntrySortableBuffer) Swap(i, j int) {
b.sortedBuf[i], b.sortedBuf[j] = b.sortedBuf[j], b.sortedBuf[i]
}

func (b *oldestMergedEntrySortableBuffer) Get(i int, keyBuf, valBuf []byte) ([]byte, []byte) {
keyBuf = append(keyBuf, b.sortedBuf[i].key...)
valBuf = append(valBuf, b.sortedBuf[i].value...)
return keyBuf, valBuf
}
func (b *oldestMergedEntrySortableBuffer) Reset() {
b.sortedBuf = nil
b.entries = make(map[string][]byte)
b.size = 0
}
func (b *oldestMergedEntrySortableBuffer) Prealloc(predictKeysAmount, predictDataSize int) {
b.entries = make(map[string][]byte, predictKeysAmount)
b.sortedBuf = make([]sortableBufferEntry, 0, predictKeysAmount*2)
}

func (b *oldestMergedEntrySortableBuffer) Write(w io.Writer) error {
var numBuf [binary.MaxVarintLen64]byte
entries := b.sortedBuf
for _, entry := range entries {
lk := int64(len(entry.key))
if entry.key == nil {
lk = -1
}
n := binary.PutVarint(numBuf[:], lk)
if _, err := w.Write(numBuf[:n]); err != nil {
return err
}
if _, err := w.Write(entry.key); err != nil {
return err
}
lv := int64(len(entry.value))
if entry.value == nil {
lv = -1
}
n = binary.PutVarint(numBuf[:], lv)
if _, err := w.Write(numBuf[:n]); err != nil {
return err
}
if _, err := w.Write(entry.value); err != nil {
return err
}
}
return nil
}
func (b *oldestMergedEntrySortableBuffer) CheckFlushSize() bool {
return b.size >= b.optimalSize
}
77 changes: 58 additions & 19 deletions erigon-lib/etl/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ func (c *Collector) flushBuffer(canStoreInRam bool) error {
} else {
fullBuf := c.buf
prevLen, prevSize := fullBuf.Len(), fullBuf.SizeLimit()
c.buf = getBufferByType(c.bufType, datasize.ByteSize(c.buf.SizeLimit()))
c.buf = getBufferByType(c.bufType, datasize.ByteSize(c.buf.SizeLimit()), c.buf)

doFsync := !c.autoClean /* is critical collector */
var err error
Expand Down Expand Up @@ -149,6 +149,7 @@ func (c *Collector) Load(db kv.RwTx, toBucket string, loadFunc LoadFunc, args Tr
if c.autoClean {
defer c.Close()
}
args.BufferType = c.bufType

if !c.allFlushed {
if e := c.flushBuffer(true); e != nil {
Expand Down Expand Up @@ -181,26 +182,13 @@ func (c *Collector) Load(db kv.RwTx, toBucket string, loadFunc LoadFunc, args Tr
defer logEvery.Stop()

i := 0
var prevK []byte
loadNextFunc := func(_, k, v []byte) error {
if i == 0 {
isEndOfBucket := lastKey == nil || bytes.Compare(lastKey, k) == -1
canUseAppend = haveSortingGuaranties && isEndOfBucket
}
i++

// SortableOldestAppearedBuffer must guarantee that only 1 oldest value of key will appear
// but because size of buffer is limited - each flushed file does guarantee "oldest appeared"
// property, but files may overlap. files are sorted, just skip repeated keys here
if c.bufType == SortableOldestAppearedBuffer {
if bytes.Equal(prevK, k) {
return nil
} else {
// Need to copy k because the underlying space will be re-used for the next key
prevK = common.Copy(k)
}
}

select {
default:
case <-logEvery.C:
Expand Down Expand Up @@ -249,7 +237,7 @@ func (c *Collector) Load(db kv.RwTx, toBucket string, loadFunc LoadFunc, args Tr
simpleLoad := func(k, v []byte) error {
return loadFunc(k, v, currentTable, loadNextFunc)
}
if err := mergeSortFiles(c.logPrefix, c.dataProviders, simpleLoad, args); err != nil {
if err := mergeSortFiles(c.logPrefix, c.dataProviders, simpleLoad, args, c.buf); err != nil {
return fmt.Errorf("loadIntoTable %s: %w", toBucket, err)
}
//logger.Trace(fmt.Sprintf("[%s] ETL Load done", c.logPrefix), "bucket", bucket, "records", i)
Expand Down Expand Up @@ -278,7 +266,7 @@ func (c *Collector) Close() {
// for the next item, which is then added back to the heap.
// The subsequent iterations pop the heap again and load up the provider associated with it to get the next element after processing LoadFunc.
// this continues until all providers have reached their EOF.
func mergeSortFiles(logPrefix string, providers []dataProvider, loadFunc simpleLoadFunc, args TransformArgs) error {
func mergeSortFiles(logPrefix string, providers []dataProvider, loadFunc simpleLoadFunc, args TransformArgs, buf Buffer) (err error) {
for _, provider := range providers {
if err := provider.Wait(); err != nil {
return err
Expand All @@ -297,6 +285,8 @@ func mergeSortFiles(logPrefix string, providers []dataProvider, loadFunc simpleL
}
}

var prevK, prevV []byte

// Main loading loop
for h.Len() > 0 {
if err := common.Stopped(args.Quit); err != nil {
Expand All @@ -305,16 +295,65 @@ func mergeSortFiles(logPrefix string, providers []dataProvider, loadFunc simpleL

element := heapPop(h)
provider := providers[element.TimeIdx]
err := loadFunc(element.Key, element.Value)
if err != nil {
return err

// SortableOldestAppearedBuffer must guarantee that only 1 oldest value of key will appear
// but because size of buffer is limited - each flushed file does guarantee "oldest appeared"
// property, but files may overlap. files are sorted, just skip repeated keys here
if args.BufferType == SortableOldestAppearedBuffer {
if !bytes.Equal(prevK, element.Key) {
if err = loadFunc(element.Key, element.Value); err != nil {
return err
}
// Need to copy k because the underlying space will be re-used for the next key
prevK = common.Copy(element.Key)
}
} else if args.BufferType == SortableAppendBuffer {
if !bytes.Equal(prevK, element.Key) {
if prevK != nil {
if err = loadFunc(prevK, prevV); err != nil {
return err
}
}
// Need to copy k because the underlying space will be re-used for the next key
prevK = common.Copy(element.Key)
prevV = common.Copy(element.Value)
} else {
prevV = append(prevV, element.Value...)
}
} else if args.BufferType == SortableMergeBuffer {
if !bytes.Equal(prevK, element.Key) {
if prevK != nil {
if err = loadFunc(prevK, prevV); err != nil {
return err
}
}
// Need to copy k because the underlying space will be re-used for the next key
prevK = common.Copy(element.Key)
prevV = common.Copy(element.Value)
} else {
prevV = buf.(*oldestMergedEntrySortableBuffer).merge(prevV, element.Value)
}
} else {
if err = loadFunc(element.Key, element.Value); err != nil {
return err
}
}

if element.Key, element.Value, err = provider.Next(element.Key[:0], element.Value[:0]); err == nil {
heapPush(h, element)
} else if !errors.Is(err, io.EOF) {
return fmt.Errorf("%s: error while reading next element from disk: %w", logPrefix, err)
}
}

if args.BufferType == SortableAppendBuffer {
if prevK != nil {
if err = loadFunc(prevK, prevV); err != nil {
return err
}
}
}

return nil
}

Expand Down
4 changes: 2 additions & 2 deletions erigon-lib/etl/dataprovider.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,11 +72,11 @@ func FlushToDisk(logPrefix string, b Buffer, tmpdir string, doFsync bool, lvl lo
w := bufio.NewWriterSize(bufferFile, BufIOSize)
defer w.Flush() //nolint:errcheck

_, fName := filepath.Split(bufferFile.Name())
if err = b.Write(w); err != nil {
return fmt.Errorf("error writing entries to disk: %w", err)
}
_, fName := filepath.Split(bufferFile.Name())
log.Log(lvl, fmt.Sprintf("[%s] Flushed buffer", logPrefix), "file", fName)
log.Log(lvl, fmt.Sprintf("[%s] Flushed buffer file", logPrefix), "name", fName)
return nil
})

Expand Down
2 changes: 1 addition & 1 deletion erigon-lib/etl/etl.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ func Transform(
if args.BufferSize > 0 {
bufferSize = datasize.ByteSize(args.BufferSize)
}
buffer := getBufferByType(args.BufferType, bufferSize)
buffer := getBufferByType(args.BufferType, bufferSize, nil)
collector := NewCollector(logPrefix, tmpdir, buffer, logger)
defer collector.Close()

Expand Down
Loading

0 comments on commit a31ed7f

Please sign in to comment.