Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Multi-metrics throttler: post v21 deprecations and changes #16915

Open
wants to merge 16 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 0 additions & 4 deletions go/cmd/vtctldclient/command/throttler.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,10 +169,6 @@ func init() {
UpdateThrottlerConfig.Flags().StringVar(&updateThrottlerConfigOptions.MetricName, "metric-name", "", "name of the metric for which we apply a new threshold (requires --threshold). If empty, the default (either 'lag' or 'custom') metric is used.")
UpdateThrottlerConfig.Flags().Float64Var(&updateThrottlerConfigOptions.Threshold, "threshold", 0, "threshold for the either default check (replication lag seconds) or custom check")
UpdateThrottlerConfig.Flags().StringVar(&updateThrottlerConfigOptions.CustomQuery, "custom-query", "", "custom throttler check query")
UpdateThrottlerConfig.Flags().BoolVar(&updateThrottlerConfigOptions.CheckAsCheckSelf, "check-as-check-self", false, "/throttler/check requests behave as is /throttler/check-self was called")
UpdateThrottlerConfig.Flags().BoolVar(&updateThrottlerConfigOptions.CheckAsCheckShard, "check-as-check-shard", false, "use standard behavior for /throttler/check requests")
UpdateThrottlerConfig.Flags().MarkDeprecated("check-as-check-self", "specify metric with scope in --app-metrics to apply to all checks, or use --scope in CheckThrottler for a specific check")
UpdateThrottlerConfig.Flags().MarkDeprecated("check-as-check-shard", "specify metric with scope in --app-metrics to apply to all checks, or use --scope in CheckThrottler for a specific check")

UpdateThrottlerConfig.Flags().StringVar(&unthrottledAppRule.Name, "unthrottle-app", "", "an app name to unthrottle")
UpdateThrottlerConfig.Flags().StringVar(&throttledAppRule.Name, "throttle-app", "", "an app name to throttle")
Expand Down
4 changes: 2 additions & 2 deletions go/test/endtoend/onlineddl/revert/onlineddl_revert_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
"flag"
"fmt"
"math/rand/v2"
"net/http"
"os"
"path"
"strings"
Expand All @@ -33,6 +32,7 @@ import (
"vitess.io/vitess/go/mysql"
"vitess.io/vitess/go/mysql/capabilities"
"vitess.io/vitess/go/vt/log"
tabletmanagerdatapb "vitess.io/vitess/go/vt/proto/tabletmanagerdata"
"vitess.io/vitess/go/vt/schema"
"vitess.io/vitess/go/vt/vttablet/tabletserver/throttle/throttlerapp"

Expand Down Expand Up @@ -209,7 +209,7 @@ func TestRevertSchemaChanges(t *testing.T) {
require.Equal(t, 1, len(shards))

throttler.EnableLagThrottlerAndWaitForStatus(t, clusterInstance)
throttler.WaitForCheckThrottlerResult(t, clusterInstance, primaryTablet, throttlerapp.TestingName, nil, http.StatusOK, time.Minute)
throttler.WaitForCheckThrottlerResult(t, clusterInstance, primaryTablet, throttlerapp.TestingName, nil, tabletmanagerdatapb.CheckThrottlerResponseCode_OK, time.Minute)

t.Run("revertible", testRevertible)
t.Run("revert", testRevert)
Expand Down
38 changes: 4 additions & 34 deletions go/test/endtoend/tabletmanager/throttler_topo/throttler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,16 +179,14 @@ func throttleCheck(tablet *cluster.Vttablet, skipRequestHeartbeats bool) (*vtctl
flags := &throttle.CheckFlags{
Scope: base.ShardScope,
SkipRequestHeartbeats: skipRequestHeartbeats,
MultiMetricsEnabled: true,
}
resp, err := throttler.CheckThrottler(clusterInstance, tablet, testAppName, flags)
return resp, err
}

func throttleCheckSelf(tablet *cluster.Vttablet) (*vtctldatapb.CheckThrottlerResponse, error) {
flags := &throttle.CheckFlags{
Scope: base.SelfScope,
MultiMetricsEnabled: true,
Scope: base.SelfScope,
}
resp, err := throttler.CheckThrottler(clusterInstance, tablet, testAppName, flags)
return resp, err
Expand All @@ -211,7 +209,7 @@ func warmUpHeartbeat(t *testing.T) tabletmanagerdatapb.CheckThrottlerResponseCod
require.NoError(t, err)

time.Sleep(time.Second)
return throttle.ResponseCodeFromStatus(resp.Check.ResponseCode, int(resp.Check.StatusCode))
return resp.Check.ResponseCode
}

// waitForThrottleCheckStatus waits for the tablet to return the provided HTTP code in a throttle check
Expand All @@ -233,7 +231,7 @@ func waitForThrottleCheckStatus(t *testing.T, tablet *cluster.Vttablet, wantCode
}
select {
case <-ctx.Done():
return assert.EqualValues(t, wantCode, resp.Check.StatusCode, "response: %+v", resp)
return assert.EqualValues(t, wantCode, resp.Check.ResponseCode, "response: %+v", resp)
case <-ticker.C:
}
}
Expand Down Expand Up @@ -344,13 +342,6 @@ func TestInitialThrottler(t *testing.T) {
assert.Equal(t, base.ShardScope.String(), metrics.Scope)
}

if !assert.EqualValues(t, http.StatusOK, resp.Check.StatusCode, "Unexpected response from throttler: %+v", resp) {
rs, err := replicaTablet.VttabletProcess.QueryTablet("show replica status", keyspaceName, false)
assert.NoError(t, err)
t.Logf("Seconds_Behind_Source: %s", rs.Named().Row()["Seconds_Behind_Source"].ToString())
t.Logf("throttler primary status: %+v", throttleStatus(t, primaryTablet))
t.Logf("throttler replica status: %+v", throttleStatus(t, replicaTablet))
}
if !assert.EqualValues(t, tabletmanagerdatapb.CheckThrottlerResponseCode_OK, resp.Check.ResponseCode, "Unexpected response from throttler: %+v", resp) {
rs, err := replicaTablet.VttabletProcess.QueryTablet("show replica status", keyspaceName, false)
assert.NoError(t, err)
Expand All @@ -369,13 +360,6 @@ func TestInitialThrottler(t *testing.T) {
for _, metrics := range resp.Check.Metrics {
assert.Equal(t, base.ShardScope.String(), metrics.Scope)
}
if !assert.EqualValues(t, http.StatusOK, resp.Check.StatusCode, "Unexpected response from throttler: %+v", resp) {
rs, err := replicaTablet.VttabletProcess.QueryTablet("show replica status", keyspaceName, false)
assert.NoError(t, err)
t.Logf("Seconds_Behind_Source: %s", rs.Named().Row()["Seconds_Behind_Source"].ToString())
t.Logf("throttler primary status: %+v", throttleStatus(t, primaryTablet))
t.Logf("throttler replica status: %+v", throttleStatus(t, replicaTablet))
}
if !assert.EqualValues(t, tabletmanagerdatapb.CheckThrottlerResponseCode_OK, resp.Check.ResponseCode, "Unexpected response from throttler: %+v", resp) {
rs, err := replicaTablet.VttabletProcess.QueryTablet("show replica status", keyspaceName, false)
assert.NoError(t, err)
Expand Down Expand Up @@ -429,7 +413,7 @@ func TestThrottleViaApplySchema(t *testing.T) {
require.NotNil(t, keyspace.Keyspace.ThrottlerConfig.ThrottledApps)
// ThrottledApps will actually be empty at this point, but more specifically we want to see that "online-ddl" is not there.
appRule, ok := keyspace.Keyspace.ThrottlerConfig.ThrottledApps[throttlerapp.OnlineDDLName.String()]
assert.True(t, ok, "app rule: %v", appRule)
assert.False(t, ok, "app rule: %v", appRule)
})
}

Expand All @@ -451,13 +435,11 @@ func TestThrottlerAfterMetricsCollected(t *testing.T) {
t.Run("validating primary check self", func(t *testing.T) {
resp, err := throttleCheckSelf(primaryTablet)
require.NoError(t, err)
assert.EqualValues(t, http.StatusOK, resp.Check.StatusCode, "Unexpected response from throttler: %+v", resp)
assert.EqualValues(t, tabletmanagerdatapb.CheckThrottlerResponseCode_OK, resp.Check.ResponseCode, "Unexpected response from throttler: %+v", resp)
})
t.Run("validating replica check self", func(t *testing.T) {
resp, err := throttleCheckSelf(replicaTablet)
require.NoError(t, err)
assert.EqualValues(t, http.StatusOK, resp.Check.StatusCode, "Unexpected response from throttler: %+v", resp)
assert.EqualValues(t, tabletmanagerdatapb.CheckThrottlerResponseCode_OK, resp.Check.ResponseCode, "Unexpected response from throttler: %+v", resp)
})
}
Expand Down Expand Up @@ -485,7 +467,6 @@ func TestLag(t *testing.T) {
t.Run("expecting throttler push back", func(t *testing.T) {
resp, err := throttleCheck(primaryTablet, false)
require.NoError(t, err)
assert.EqualValues(t, http.StatusTooManyRequests, resp.Check.StatusCode, "Unexpected response from throttler: %+v", resp)
assert.EqualValues(t, tabletmanagerdatapb.CheckThrottlerResponseCode_THRESHOLD_EXCEEDED, resp.Check.ResponseCode, "Unexpected response from throttler: %+v", resp)
})
t.Run("primary self-check should still be fine", func(t *testing.T) {
Expand All @@ -496,10 +477,6 @@ func TestLag(t *testing.T) {
assert.Equal(t, base.SelfScope.String(), metrics.Scope)
}
// self (on primary) is unaffected by replication lag
if !assert.EqualValues(t, http.StatusOK, resp.Check.StatusCode, "Unexpected response from throttler: %+v", resp) {
t.Logf("throttler primary status: %+v", throttleStatus(t, primaryTablet))
t.Logf("throttler replica status: %+v", throttleStatus(t, replicaTablet))
}
if !assert.EqualValues(t, tabletmanagerdatapb.CheckThrottlerResponseCode_OK, resp.Check.ResponseCode, "Unexpected response from throttler: %+v", resp) {
t.Logf("throttler primary status: %+v", throttleStatus(t, primaryTablet))
t.Logf("throttler replica status: %+v", throttleStatus(t, replicaTablet))
Expand All @@ -512,7 +489,6 @@ func TestLag(t *testing.T) {
for _, metrics := range resp.Check.Metrics {
assert.Equal(t, base.SelfScope.String(), metrics.Scope)
}
assert.EqualValues(t, http.StatusTooManyRequests, resp.Check.StatusCode, "Unexpected response from throttler: %+v", resp)
assert.EqualValues(t, tabletmanagerdatapb.CheckThrottlerResponseCode_THRESHOLD_EXCEEDED, resp.Check.ResponseCode, "Unexpected response from throttler: %+v", resp)
})
t.Run("exempting test app", func(t *testing.T) {
Expand Down Expand Up @@ -590,13 +566,11 @@ func TestLag(t *testing.T) {
resp, err := throttleCheckSelf(primaryTablet)
require.NoError(t, err)
// self (on primary) is unaffected by replication lag
assert.EqualValues(t, http.StatusOK, resp.Check.StatusCode, "Unexpected response from throttler: %+v", resp)
assert.EqualValues(t, tabletmanagerdatapb.CheckThrottlerResponseCode_OK, resp.Check.ResponseCode, "Unexpected response from throttler: %+v", resp)
})
t.Run("replica self-check should be fine", func(t *testing.T) {
resp, err := throttleCheckSelf(replicaTablet)
require.NoError(t, err)
assert.EqualValues(t, http.StatusOK, resp.Check.StatusCode, "Unexpected response from throttler: %+v", resp)
assert.EqualValues(t, tabletmanagerdatapb.CheckThrottlerResponseCode_OK, resp.Check.ResponseCode, "Unexpected response from throttler: %+v", resp)
})
}
Expand Down Expand Up @@ -641,7 +615,6 @@ func TestCustomQuery(t *testing.T) {
throttler.WaitForValidData(t, primaryTablet, throttlerEnabledTimeout)
resp, err := throttleCheck(primaryTablet, false)
require.NoError(t, err)
assert.EqualValues(t, http.StatusOK, resp.Check.StatusCode, "Unexpected response from throttler: %+v", resp)
assert.EqualValues(t, tabletmanagerdatapb.CheckThrottlerResponseCode_OK, resp.Check.ResponseCode, "Unexpected response from throttler: %+v", resp)
})
t.Run("test threads running", func(t *testing.T) {
Expand All @@ -668,7 +641,6 @@ func TestCustomQuery(t *testing.T) {
{
resp, err := throttleCheckSelf(primaryTablet)
require.NoError(t, err)
assert.EqualValues(t, http.StatusTooManyRequests, resp.Check.StatusCode, "Unexpected response from throttler: %+v", resp)
assert.EqualValues(t, tabletmanagerdatapb.CheckThrottlerResponseCode_THRESHOLD_EXCEEDED, resp.Check.ResponseCode, "Unexpected response from throttler: %+v", resp)
}
})
Expand All @@ -680,7 +652,6 @@ func TestCustomQuery(t *testing.T) {
{
resp, err := throttleCheckSelf(primaryTablet)
require.NoError(t, err)
assert.EqualValues(t, http.StatusOK, resp.Check.StatusCode, "Unexpected response from throttler: %+v", resp)
assert.EqualValues(t, tabletmanagerdatapb.CheckThrottlerResponseCode_OK, resp.Check.ResponseCode, "Unexpected response from throttler: %+v", resp)
}
})
Expand All @@ -704,7 +675,6 @@ func TestRestoreDefaultQuery(t *testing.T) {
t.Run("validating OK response from throttler with default threshold, heartbeats running", func(t *testing.T) {
resp, err := throttleCheck(primaryTablet, false)
require.NoError(t, err)
assert.EqualValues(t, http.StatusOK, resp.Check.StatusCode, "Unexpected response from throttler: %+v", resp)
assert.EqualValues(t, tabletmanagerdatapb.CheckThrottlerResponseCode_OK, resp.Check.ResponseCode, "Unexpected response from throttler: %+v", resp)
})
t.Run("validating pushback response from throttler on default threshold once heartbeats go stale", func(t *testing.T) {
Expand Down
16 changes: 5 additions & 11 deletions go/test/endtoend/throttler/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,7 @@ func CheckThrottlerRaw(vtctldProcess *cluster.VtctldClientProcess, tablet *clust
args = append(args, "CheckThrottler")
if flags == nil {
flags = &throttle.CheckFlags{
Scope: base.SelfScope,
MultiMetricsEnabled: true,
Scope: base.SelfScope,
}
}
if appName != "" {
Expand Down Expand Up @@ -124,11 +123,6 @@ func UpdateThrottlerTopoConfigRaw(
args = append(args, "--threshold", fmt.Sprintf("%f", opts.Threshold))
}
args = append(args, "--custom-query", opts.CustomQuery)
if opts.CustomQuery != "" {
args = append(args, "--check-as-check-self")
} else {
args = append(args, "--check-as-check-shard")
}
if appRule != nil {
args = append(args, "--throttle-app", appRule.Name)
args = append(args, "--throttle-app-duration", time.Until(protoutil.TimeFromProto(appRule.ExpiresAt).UTC()).String())
Expand Down Expand Up @@ -485,15 +479,15 @@ func EnableLagThrottlerAndWaitForStatus(t *testing.T, clusterInstance *cluster.L
}
}

func WaitForCheckThrottlerResult(t *testing.T, clusterInstance *cluster.LocalProcessCluster, tablet *cluster.Vttablet, appName throttlerapp.Name, flags *throttle.CheckFlags, expect int32, timeout time.Duration) (*vtctldatapb.CheckThrottlerResponse, error) {
func WaitForCheckThrottlerResult(t *testing.T, clusterInstance *cluster.LocalProcessCluster, tablet *cluster.Vttablet, appName throttlerapp.Name, flags *throttle.CheckFlags, expect tabletmanagerdatapb.CheckThrottlerResponseCode, timeout time.Duration) (*vtctldatapb.CheckThrottlerResponse, error) {
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
ticker := time.NewTicker(time.Second)
defer ticker.Stop()
for {
resp, err := CheckThrottler(clusterInstance, tablet, appName, flags)
require.NoError(t, err)
if resp.Check.StatusCode == expect {
if resp.Check.ResponseCode == expect {
return resp, nil
}
select {
Expand Down Expand Up @@ -534,11 +528,11 @@ func WaitForValidData(t *testing.T, tablet *cluster.Vttablet, timeout time.Durat

for {
checkResp, checkErr := http.Get(checkURL)
if checkErr != nil {
if checkErr == nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't want to have a require.NoError(t, checkErr) here before the defer, getting rid of the conditional check? Same below.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not in this case, because this is a wait-for function, and I'm OK ignoring the error as long as the function eventually succeeds within the given timeout. So I don't care that there could be multiple errors, and I don't want to report the test as failed.

defer checkResp.Body.Close()
}
selfCheckResp, selfCheckErr := http.Get(selfCheckURL)
if selfCheckErr != nil {
if selfCheckErr == nil {
defer selfCheckResp.Body.Close()
}
if checkErr == nil && selfCheckErr == nil &&
Expand Down
25 changes: 21 additions & 4 deletions go/test/endtoend/vreplication/helper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ import (
"vitess.io/vitess/go/vt/vttablet/tabletserver/throttle/throttlerapp"

binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata"
tabletmanagerdatapb "vitess.io/vitess/go/vt/proto/tabletmanagerdata"
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
)

Expand Down Expand Up @@ -178,16 +179,32 @@ func waitForQueryResult(t *testing.T, conn *mysql.Conn, database string, query s

// waitForTabletThrottlingStatus waits for the tablet to return the provided HTTP code for
// the provided app name in its self check.
func waitForTabletThrottlingStatus(t *testing.T, tablet *cluster.VttabletProcess, throttlerApp throttlerapp.Name, wantCode int64) {
var gotCode int64
func waitForTabletThrottlingStatus(t *testing.T, tablet *cluster.VttabletProcess, throttlerApp throttlerapp.Name, wantCode int) {
timer := time.NewTimer(defaultTimeout)
defer timer.Stop()
for {
output, err := throttlerCheckSelf(tablet, throttlerApp)
require.NoError(t, err)

gotCode, err = jsonparser.GetInt([]byte(output), "StatusCode")
require.NoError(t, err)
responseCode, err := jsonparser.GetInt([]byte(output), "ResponseCode")
require.NoError(t, err, "waitForTabletThrottlingStatus output: %v", output)

var gotCode int
switch tabletmanagerdatapb.CheckThrottlerResponseCode(responseCode) {
case tabletmanagerdatapb.CheckThrottlerResponseCode_OK:
gotCode = http.StatusOK
case tabletmanagerdatapb.CheckThrottlerResponseCode_APP_DENIED:
gotCode = http.StatusExpectationFailed
case tabletmanagerdatapb.CheckThrottlerResponseCode_THRESHOLD_EXCEEDED:
gotCode = http.StatusTooManyRequests
case tabletmanagerdatapb.CheckThrottlerResponseCode_UNKNOWN_METRIC:
gotCode = http.StatusNotFound
case tabletmanagerdatapb.CheckThrottlerResponseCode_INTERNAL_ERROR:
gotCode = http.StatusInternalServerError
default:
gotCode = http.StatusInternalServerError
}

if wantCode == gotCode {
// Wait for any cached check values to be cleared and the new
// status value to be in effect everywhere before returning.
Expand Down
Loading
Loading