From 92cf5933a258142f3818e43d729da571830f6b90 Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Fri, 15 Nov 2024 12:23:12 -0500 Subject: [PATCH] For larger payloads, stream the internal contents For larger payloads (> ZstdInMemoryDecompressorMaxSize) we were already streaming the internal events as we decompressed the payload, but in the vstreamer we were still reading the entire contents into memory before sending them to the consumer (vplayer). With this we stream the internal contents all the way from the binlog consumer to the vstream consumer so that we do not need to hold the entire contents, which can be 10s of GiBs, in memory all at once. Signed-off-by: Matt Lord --- go/mysql/binlog_event_compression.go | 7 ++++ .../tabletserver/vstreamer/vstreamer.go | 33 ++++++++++++++++--- 2 files changed, 36 insertions(+), 4 deletions(-) diff --git a/go/mysql/binlog_event_compression.go b/go/mysql/binlog_event_compression.go index 1cb38d5cb16..0eb96da843b 100644 --- a/go/mysql/binlog_event_compression.go +++ b/go/mysql/binlog_event_compression.go @@ -98,6 +98,11 @@ type TransactionPayload struct { payload []byte reader io.Reader iterator func() (BinlogEvent, error) + // StreamingContents tells the consumer that we are streaming the + // decompressed payload and they should also stream the events. + // This ensures that neither the producer nor the consumer are + // holding the entire payload's contents in memory. + StreamingContents bool } // IsTransactionPayload returns true if a compressed transaction @@ -292,6 +297,8 @@ func (tp *TransactionPayload) decompress() error { } compressedTrxPayloadsUsingStream.Add(1) tp.reader = streamDecoder + // Signal the consumer to also stream the contents. + tp.StreamingContents = true return nil } diff --git a/go/vt/vttablet/tabletserver/vstreamer/vstreamer.go b/go/vt/vttablet/tabletserver/vstreamer/vstreamer.go index ea7f75cdc38..0d70675d2ea 100644 --- a/go/vt/vttablet/tabletserver/vstreamer/vstreamer.go +++ b/go/vt/vttablet/tabletserver/vstreamer/vstreamer.go @@ -375,7 +375,7 @@ func (vs *vstreamer) parseEvents(ctx context.Context, events <-chan mysql.Binlog } return fmt.Errorf("unexpected server EOF") } - vevents, err := vs.parseEvent(ev) + vevents, err := vs.parseEvent(ev, bufferAndTransmit) if err != nil { vs.vse.errorCounts.Add("ParseEvent", 1) return err @@ -416,7 +416,11 @@ func (vs *vstreamer) parseEvents(ctx context.Context, events <-chan mysql.Binlog } // parseEvent parses an event from the binlog and converts it to a list of VEvents. -func (vs *vstreamer) parseEvent(ev mysql.BinlogEvent) ([]*binlogdatapb.VEvent, error) { +// The bufferAndTransmit function must be passed if the event is a TransactionPayloadEvent +// as for larger payloads (> ZstdInMemoryDecompressorMaxSize) the internal events need +// to be streamed directly here in order to avoid holding the entire payload's contents, +// which can be 10s of GiBs, all in memory. +func (vs *vstreamer) parseEvent(ev mysql.BinlogEvent, bufferAndTransmit func(vevent *binlogdatapb.VEvent) error) ([]*binlogdatapb.VEvent, error) { if !ev.IsValid() { return nil, fmt.Errorf("can't parse binlog event: invalid data: %#v", ev) } @@ -672,12 +676,33 @@ func (vs *vstreamer) parseEvent(ev mysql.BinlogEvent) ([]*binlogdatapb.VEvent, e } return nil, err } - tpvevents, err := vs.parseEvent(tpevent) + tpvevents, err := vs.parseEvent(tpevent, nil) // Parse the internal event if err != nil { return nil, vterrors.Wrap(err, "failed to parse transaction payload's internal event") } - vevents = append(vevents, tpvevents...) + if tp.StreamingContents { + //log.Errorf("DEBUG: processing transaction payload's internal events as a stream") + // Transmit each internal event individually to avoid buffering + // the large transaction's entire payload of events in memory, as + // the uncompressed size can be 10s or even 100s of GiBs in size. + for _, tpvevent := range tpvevents { + tpvevent.Timestamp = int64(ev.Timestamp()) + tpvevent.CurrentTime = time.Now().UnixNano() + //log.Errorf("DEBUG: streaming transaction payload's internal event: %v", tpevent) + if err := bufferAndTransmit(tpvevent); err != nil { + if err == io.EOF { + return nil, nil + } + vs.vse.errorCounts.Add("TransactionPayloadBufferAndTransmit", 1) + return nil, fmt.Errorf("error sending compressed transaction payload's internal event: %v", err) + } + } + } else { // Process the payload's internal events all at once + //log.Errorf("DEBUG: processing transaction payload's internal events all at once: %v", tpvevents) + vevents = append(vevents, tpvevents...) + } } + vs.vse.vstreamerCompressedTransactionsDecoded.Add(1) } for _, vevent := range vevents {