Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

flesh out vstream client example #17185

Closed

Conversation

derekperkins
Copy link
Member

@derekperkins derekperkins commented Nov 8, 2024

Description

We have decent documentation about how to start a vstream, but not much about how to best consume it. This extends the local example to show how to parse vstream events, check point vgtids, etc.

If this complicates the example too much, I'm happy to put this elsewhere. This is also my first time using vstream, so I looked at some places it is used in the codebase, but it's very possible I'm missing important cases to show. I made comment threads for the specific questions I had.

I was procrastinating something else I have to finish, so I just kept typing and typing on this PR :) At this point, I'd like for this to end up as close to production ready as possible, or at least have stubs + comments for some more complex integrations like DDL handling.

It seems like a little framework could be built around the vstream primitives, where all the looping, heartbeat monitoring, etc., happened inside, and all users would need to provide is some event handlers. I'll probably write something internally first to see how it works out, and then maybe upstream that somewhere if it makes sense.

cc @mattlord

This is based on top of another PR adding time conversion funcs to sqltypes, so once that merges, I'll rebase and this will just be the single example commit

I ran the example from Matt's CDC post, and this was the output from the local example
https://vitess.io/blog/2024-07-29-cdc-vstream/#a-look-under-the-hood-at-vstream

$ go run vstream_client.go
upserting 24 customers
upserting customer 0: &{0 [email protected] false map[] 0001-01-01 00:00:00 +0000 UTC}
upserting customer 1: &{0 [email protected] false map[] 0001-01-01 00:00:00 +0000 UTC}
upserting customer 2: &{0 [email protected] false map[] 0001-01-01 00:00:00 +0000 UTC}
upserting customer 3: &{0 [email protected] false map[] 0001-01-01 00:00:00 +0000 UTC}
...

Checklist

  • "Backport to:" labels have been added if this change should be back-ported to release branches
  • If this change is to be back-ported to previous releases, a justification is included in the PR description
  • Tests were added or are not required
  • Did the new or modified tests pass consistently locally and on CI?
  • Documentation was added or is not required

there are already conversions for other native types, but datetime conversions were only available in the vitessdriver. This moves that implementation into sqltypes, so it can be more easily accessed elsewhere.

Signed-off-by: Derek Perkins <[email protected]>
Copy link
Contributor

vitess-bot bot commented Nov 8, 2024

Review Checklist

Hello reviewers! 👋 Please follow this checklist when reviewing this Pull Request.

General

  • Ensure that the Pull Request has a descriptive title.
  • Ensure there is a link to an issue (except for internal cleanup and flaky test fixes), new features should have an RFC that documents use cases and test cases.

Tests

  • Bug fixes should have at least one unit or end-to-end test, enhancement and new features should have a sufficient number of tests.

Documentation

  • Apply the release notes (needs details) label if users need to know about this change.
  • New features should be documented.
  • There should be some code comments as to why things are implemented the way they are.
  • There should be a comment at the top of each new or modified test to explain what the test does.

New flags

  • Is this flag really necessary?
  • Flag names must be clear and intuitive, use dashes (-), and have a clear help text.

If a workflow is added or modified:

  • Each item in Jobs should be named in order to mark it as required.
  • If the workflow needs to be marked as required, the maintainer team must be notified.

Backward compatibility

  • Protobuf changes should be wire-compatible.
  • Changes to _vt tables and RPCs need to be backward compatible.
  • RPC changes should be compatible with vitess-operator
  • If a flag is removed, then it should also be removed from vitess-operator and arewefastyet, if used there.
  • vtctl command output order should be stable and awk-able.

@vitess-bot vitess-bot bot added NeedsBackportReason If backport labels have been applied to a PR, a justification is required NeedsDescriptionUpdate The description is not clear or comprehensive enough, and needs work NeedsIssue A linked issue is missing for this Pull Request NeedsWebsiteDocsUpdate What it says labels Nov 8, 2024
@github-actions github-actions bot added this to the v22.0.0 milestone Nov 8, 2024
Copy link

codecov bot commented Nov 8, 2024

Codecov Report

Attention: Patch coverage is 86.04651% with 6 lines in your changes missing coverage. Please review.

Project coverage is 67.33%. Comparing base (f6ca94f) to head (7dd91c9).
Report is 8 commits behind head on main.

Files with missing lines Patch % Lines
go/sqltypes/value.go 85.36% 6 Missing ⚠️
Additional details and impacted files
@@           Coverage Diff            @@
##             main   #17185    +/-   ##
========================================
  Coverage   67.33%   67.33%            
========================================
  Files        1569     1569            
  Lines      252244   252559   +315     
========================================
+ Hits       169858   170072   +214     
- Misses      82386    82487   +101     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

@derekperkins derekperkins changed the title Vstream client example flesh out vstream client example Nov 8, 2024

// alter the destination schema based on the DDL event

case binlogdatapb.VEventType_COPY_COMPLETED:
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should there be some sort of defer happening, that resets the destination tables and resets the last vgtid to empty?

Comment on lines +120 to +131
// storeLastVgtid stores the last vgtid processed by the client, so that it can resume from that position on restart.
// Storing a json blob in a database is just one way to do this, you could put it anywhere.
func storeLastVgtid(ctx context.Context, vgtid *binlogdatapb.VGtid) error {
_, err := json.Marshal(vgtid)
if err != nil {
return err
}

return nil
}
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is storing a full json blob the right thing to do? Do we really only want a subset if we want vstream to handle reshards?

continue
}

row := sqltypes.MakeRowTrusted(fields, rc.After)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

MakeRowTrusted has a warning that it shouldn't be used except in special circumstances, which require comments about why it is used. Is there a better way to handle this conversion?

Comment on lines +375 to +385
// TODO: I'm not exactly sure how to handle multiple rows in a single event, so I'm just going to take the last one
for _, rc := range rowEvent.RowChanges {
// ignore deletes
if rc.After == nil {
continue
}
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd love any feedback on a good way to handle this

return nil
}

// ********************************************************************************************************
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried to add comments to each case, but it's limited by my understanding, so please feel free to correct anything that isn't right. I think it would make sense to transfer a lot of the comments over to the proto, which has some very useful comments, but most of the type enums have none.


// keep track of the last event time for heartbeat monitoring. We're purposefully not using the event
// timestamp, since that would cause cancellation if the stream was copying, delayed, or lagging.
lastEventReceivedAtUnix.Store(time.Now().Unix())
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this the right value to use to check the heartbeat?

Comment on lines +188 to +190
// if we haven't received an event in twice the heartbeat duration, we'll cancel the context, since
// we're likely disconnected, and exit the goroutine
if tm.Sub(time.Unix(lastEventReceivedAtUnix.Load(), 0)) > heartbeatDur*2 {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I made this heartbeat * 2 to give some buffer, but I don't have any idea if that's too much, not enough, etc

Comment on lines +160 to +161
// the first events will be field events, which contains schema information for any tables that are being streamed
var customerFields, corderFields []*querypb.Field
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As I'm thinking, these are per shard, and this only works if schemas are identical across all shards. That may be fine here, but you'd have to account for it in a production setting. Maybe keep a mutex guarded map[keyspace/shard.table]*Fields?

Comment on lines 314 to 327
func initStateTable(ctx context.Context, session *vtgateconn.VTGateSession, stateKeyspace, stateTable string) error {
query := fmt.Sprintf(`create table if not exists %s.%s (
keyspace varbinary(512) not null,
table varbinary(512) not null,
vgtid json,
PRIMARY KEY (keyspace, table),
)`, stateKeyspace, stateTable)
_, err := session.Execute(ctx, query, nil)
if err != nil {
return fmt.Errorf("vstreamer: failed to create state table: %w", err)
}

return nil
}
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This feels like it might fit in the regular vreplication table somehow? Maybe that's shoehorning it where it doesn't go. Some of this decision hinges on whether this is a core vitess thing or a contrib framework.

Comment on lines 162 to 165
// v.shardsByKeyspace, err = getShardsByKeyspace(ctx, v.session)
// if err != nil {
// return nil, err
// }
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Connecting through this vtgate session bypasses some routing that enables SHOW VITESS_SHARDS, returning this error, so I have it commented out for now

VT05003: unknown database '_vt' in vschema

Comment on lines 167 to 170
err = v.initTables(tables)
if err != nil {
return nil, err
}
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think there's a consolidation issue to deal with here. Someone could easily add a new table in code and redeploy, expecting it to catch up, but it wouldn't by default. To support that, I think we'd have to store a list of tables in state, and if a new table is added, catch it up to the same vgtid, then do a cutover to be in the same stream. That'd be a much better user experience, but a decent amount of added complexity.

}

// copyRowToStruct builds a customer from a row event
func copyRowToStruct(shard shardConfig, row []sqltypes.Value, vPtr reflect.Value) error {
Copy link
Member Author

@derekperkins derekperkins Nov 12, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This needs to be fleshed out more, as it just does top level field matching for now, doesn't handle pointers, bytes, etc. I think it also deserves an optional scanner interface, for people to do custom unmarshaling.

Comment on lines 386 to 388
// create a new struct for the row
v := reflect.New(table.underlyingType)
table.currentBatch = reflect.Append(table.currentBatch, v)
Copy link
Member Author

@derekperkins derekperkins Nov 12, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While I really like the idea of returning a slice of their custom data type, it makes things like marking something as deleted tough to do. I'm considering returning a []Event something like this, so there it still does all the work of assembling a row, but still lets you dive into the raw data if needed.

type Event struct {
    Data any // this is their custom unmarshaled struct
    Deleted bool

    RawEvent *binlogdatapb.RowEvent
    RowChangeIndex // references the exact row change this was scanned from
}

Also very open to suggestions about how to make the stream useful, I admittedly haven't looked much at prior art with Debezium or other systems

Comment on lines 257 to 259
if table.Keyspace == "" {
return fmt.Errorf("vstreamer: table %v has no keyspace", table)
}
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a reason to support multiple keyspaces in the same stream? At a surface level, it doesn't seem useful, but I also didn't see anything that made me believe it was unsupported at the vstream level

Comment on lines 336 to 343
// if there are no rows, or the value is null, return nil, which will start the stream from the beginning
if len(result.Rows) == 0 {
return nil, nil
}

if result.Rows[0][0].IsNull() {
return nil, nil
}
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not 100% sure what the minimal requirement is to bootstrap a vgtid. It seems like I'd probably have to loop through all the keyspaces + shards to make an unique, covering []*binlogdatapb.ShardGtid. Is that right?

Signed-off-by: Derek Perkins <[email protected]>
@derekperkins
Copy link
Member Author

closing in favor of #17221

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Component: Examples Component: VReplication NeedsBackportReason If backport labels have been applied to a PR, a justification is required NeedsDescriptionUpdate The description is not clear or comprehensive enough, and needs work NeedsIssue A linked issue is missing for this Pull Request NeedsWebsiteDocsUpdate What it says Type: Enhancement Logical improvement (somewhere between a bug and feature)
Projects
None yet
Development

Successfully merging this pull request may close these issues.

1 participant