Skip to content

Commit

Permalink
For larger payloads, stream the internal contents
Browse files Browse the repository at this point in the history
For larger payloads (> ZstdInMemoryDecompressorMaxSize) we were already
streaming the internal events as we decompressed the payload, but in
the vstreamer we were still reading the entire contents into memory
before sending them to the consumer (vplayer).

With this we stream the internal contents all the way from the binlog
consumer to the vstream consumer so that we do not need to hold the
entire contents, which can be 10s of GiBs, in memory all at once.

Signed-off-by: Matt Lord <[email protected]>
  • Loading branch information
mattlord committed Nov 15, 2024
1 parent 9fdfdb9 commit 92cf593
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 4 deletions.
7 changes: 7 additions & 0 deletions go/mysql/binlog_event_compression.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,11 @@ type TransactionPayload struct {
payload []byte
reader io.Reader
iterator func() (BinlogEvent, error)
// StreamingContents tells the consumer that we are streaming the
// decompressed payload and they should also stream the events.
// This ensures that neither the producer nor the consumer are
// holding the entire payload's contents in memory.
StreamingContents bool
}

// IsTransactionPayload returns true if a compressed transaction
Expand Down Expand Up @@ -292,6 +297,8 @@ func (tp *TransactionPayload) decompress() error {
}
compressedTrxPayloadsUsingStream.Add(1)
tp.reader = streamDecoder
// Signal the consumer to also stream the contents.
tp.StreamingContents = true
return nil
}

Expand Down
33 changes: 29 additions & 4 deletions go/vt/vttablet/tabletserver/vstreamer/vstreamer.go
Original file line number Diff line number Diff line change
Expand Up @@ -375,7 +375,7 @@ func (vs *vstreamer) parseEvents(ctx context.Context, events <-chan mysql.Binlog
}
return fmt.Errorf("unexpected server EOF")
}
vevents, err := vs.parseEvent(ev)
vevents, err := vs.parseEvent(ev, bufferAndTransmit)
if err != nil {
vs.vse.errorCounts.Add("ParseEvent", 1)
return err
Expand Down Expand Up @@ -416,7 +416,11 @@ func (vs *vstreamer) parseEvents(ctx context.Context, events <-chan mysql.Binlog
}

// parseEvent parses an event from the binlog and converts it to a list of VEvents.
func (vs *vstreamer) parseEvent(ev mysql.BinlogEvent) ([]*binlogdatapb.VEvent, error) {
// The bufferAndTransmit function must be passed if the event is a TransactionPayloadEvent
// as for larger payloads (> ZstdInMemoryDecompressorMaxSize) the internal events need
// to be streamed directly here in order to avoid holding the entire payload's contents,
// which can be 10s of GiBs, all in memory.
func (vs *vstreamer) parseEvent(ev mysql.BinlogEvent, bufferAndTransmit func(vevent *binlogdatapb.VEvent) error) ([]*binlogdatapb.VEvent, error) {
if !ev.IsValid() {
return nil, fmt.Errorf("can't parse binlog event: invalid data: %#v", ev)
}
Expand Down Expand Up @@ -672,12 +676,33 @@ func (vs *vstreamer) parseEvent(ev mysql.BinlogEvent) ([]*binlogdatapb.VEvent, e
}
return nil, err
}
tpvevents, err := vs.parseEvent(tpevent)
tpvevents, err := vs.parseEvent(tpevent, nil) // Parse the internal event
if err != nil {
return nil, vterrors.Wrap(err, "failed to parse transaction payload's internal event")
}
vevents = append(vevents, tpvevents...)
if tp.StreamingContents {
//log.Errorf("DEBUG: processing transaction payload's internal events as a stream")
// Transmit each internal event individually to avoid buffering
// the large transaction's entire payload of events in memory, as
// the uncompressed size can be 10s or even 100s of GiBs in size.
for _, tpvevent := range tpvevents {
tpvevent.Timestamp = int64(ev.Timestamp())
tpvevent.CurrentTime = time.Now().UnixNano()
//log.Errorf("DEBUG: streaming transaction payload's internal event: %v", tpevent)
if err := bufferAndTransmit(tpvevent); err != nil {
if err == io.EOF {
return nil, nil
}
vs.vse.errorCounts.Add("TransactionPayloadBufferAndTransmit", 1)
return nil, fmt.Errorf("error sending compressed transaction payload's internal event: %v", err)
}
}
} else { // Process the payload's internal events all at once
//log.Errorf("DEBUG: processing transaction payload's internal events all at once: %v", tpvevents)
vevents = append(vevents, tpvevents...)
}
}

vs.vse.vstreamerCompressedTransactionsDecoded.Add(1)
}
for _, vevent := range vevents {
Expand Down

0 comments on commit 92cf593

Please sign in to comment.