Skip to content

Commit

Permalink
feat: auto wire up grafana (#3397)
Browse files Browse the repository at this point in the history
  • Loading branch information
stuartwdouglas authored Nov 15, 2024
1 parent 1389427 commit 00836b3
Show file tree
Hide file tree
Showing 11 changed files with 104 additions and 47 deletions.
19 changes: 0 additions & 19 deletions Justfile
Original file line number Diff line number Diff line change
Expand Up @@ -280,25 +280,6 @@ debug *args:
dlv_pid=$!
wait "$dlv_pid"

# Run `ftl dev` with the given args after setting the necessary envar.
otel-dev *args:
#!/bin/bash
set -euo pipefail

export OTEL_EXPORTER_OTLP_ENDPOINT="http://localhost:${OTEL_GRPC_PORT}"
export OTEL_METRIC_EXPORT_INTERVAL=${OTEL_METRIC_EXPORT_INTERVAL}
# Uncomment this line for much richer debug logs
# export FTL_O11Y_LOG_LEVEL="debug"
ftl dev {{args}}

# runs the otel-lgtm observability stack locallt which includes
# an otel collector, loki (for logs), prometheus metrics db (for metrics), tempo (trace storage) and grafana (for visualization)
observe:
docker compose up otel-lgtm

observe-stop:
docker compose down otel-lgtm

localstack:
docker compose up localstack -d --wait

Expand Down
8 changes: 7 additions & 1 deletion backend/controller/scaling/localscaling/local_scaling.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/TBD54566975/ftl/internal/localdebug"
"github.com/TBD54566975/ftl/internal/log"
"github.com/TBD54566975/ftl/internal/model"
"github.com/TBD54566975/ftl/internal/observability"
)

var _ scaling.RunnerScaling = &localScaling{}
Expand All @@ -42,6 +43,7 @@ type localScaling struct {
prevRunnerSuffix int
ideSupport optional.Option[localdebug.IDEIntegration]
registryConfig artefacts.RegistryConfig
enableOtel bool
}

func (l *localScaling) Start(ctx context.Context, endpoint url.URL, leaser leases.Leaser) error {
Expand Down Expand Up @@ -82,7 +84,7 @@ type runnerInfo struct {
port string
}

func NewLocalScaling(portAllocator *bind.BindAllocator, controllerAddresses []*url.URL, configPath string, enableIDEIntegration bool, registryConfig artefacts.RegistryConfig) (scaling.RunnerScaling, error) {
func NewLocalScaling(portAllocator *bind.BindAllocator, controllerAddresses []*url.URL, configPath string, enableIDEIntegration bool, registryConfig artefacts.RegistryConfig, enableOtel bool) (scaling.RunnerScaling, error) {

cacheDir, err := os.UserCacheDir()
if err != nil {
Expand All @@ -97,6 +99,7 @@ func NewLocalScaling(portAllocator *bind.BindAllocator, controllerAddresses []*u
prevRunnerSuffix: -1,
debugPorts: map[string]*localdebug.DebugInfo{},
registryConfig: registryConfig,
enableOtel: enableOtel,
}
if enableIDEIntegration && configPath != "" {
local.ideSupport = optional.Ptr(localdebug.NewIDEIntegration(configPath))
Expand Down Expand Up @@ -201,6 +204,9 @@ func (l *localScaling) startRunner(ctx context.Context, deploymentKey string, in
Deployment: deploymentKey,
DebugPort: debugPort,
Registry: l.registryConfig,
ObservabilityConfig: observability.Config{
ExportOTEL: observability.ExportOTELFlag(l.enableOtel),
},
}

simpleName := fmt.Sprintf("runner%d", keySuffix)
Expand Down
6 changes: 6 additions & 0 deletions backend/runner/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"google.golang.org/protobuf/types/known/structpb"
"google.golang.org/protobuf/types/known/timestamppb"

"github.com/TBD54566975/ftl"
"github.com/TBD54566975/ftl/backend/controller/artefacts"
ftlv1 "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1"
"github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1/ftlv1connect"
Expand Down Expand Up @@ -57,11 +58,16 @@ type Config struct {
Deployment string `help:"The deployment this runner is for." env:"FTL_DEPLOYMENT"`
DebugPort int `help:"The port to use for debugging." env:"FTL_DEBUG_PORT"`
Registry artefacts.RegistryConfig `embed:"" prefix:"oci-"`
ObservabilityConfig ftlobservability.Config `embed:"" prefix:"o11y-"`
}

func Start(ctx context.Context, config Config) error {
ctx, doneFunc := context.WithCancel(ctx)
defer doneFunc()
err := ftlobservability.Init(ctx, false, "", "ftl-runner", ftl.Version, config.ObservabilityConfig)
if err != nil {
return fmt.Errorf("failed to initialise observability: %w", err)
}
hostname, err := os.Hostname()
if err != nil {
observability.Runner.StartupFailed(ctx)
Expand Down
10 changes: 3 additions & 7 deletions cmd/ftl-runner/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,12 @@ import (
"github.com/TBD54566975/ftl/backend/runner"
_ "github.com/TBD54566975/ftl/internal/automaxprocs" // Set GOMAXPROCS to match Linux container CPU quota.
"github.com/TBD54566975/ftl/internal/log"
"github.com/TBD54566975/ftl/internal/observability"
)

var cli struct {
Version kong.VersionFlag `help:"Show version."`
LogConfig log.Config `prefix:"log-" embed:""`
ObservabilityConfig observability.Config `embed:"" prefix:"o11y-"`
RunnerConfig runner.Config `embed:""`
Version kong.VersionFlag `help:"Show version."`
LogConfig log.Config `prefix:"log-" embed:""`
RunnerConfig runner.Config `embed:""`
}

func main() {
Expand All @@ -44,8 +42,6 @@ and route to user code.
})
logger := log.Configure(os.Stderr, cli.LogConfig)
ctx := log.ContextWithLogger(context.Background(), logger)
err = observability.Init(ctx, false, "", "ftl-runner", ftl.Version, cli.ObservabilityConfig)
kctx.FatalIfErrorf(err, "failed to initialize observability")
err = runner.Start(ctx, cli.RunnerConfig)
kctx.FatalIfErrorf(err)
}
2 changes: 0 additions & 2 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@ services:
retries: 60
start_period: 80s
otel-lgtm:
profiles:
- infra
image: grafana/otel-lgtm
platform: linux/amd64
ports:
Expand Down
2 changes: 1 addition & 1 deletion frontend/cli/cmd_box_run.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ func (b *boxRunCmd) Run(
if err != nil {
return fmt.Errorf("failed to create runner port allocator: %w", err)
}
runnerScaling, err := localscaling.NewLocalScaling(bindAllocator, []*url.URL{b.Bind}, "", false, b.Registry)
runnerScaling, err := localscaling.NewLocalScaling(bindAllocator, []*url.URL{b.Bind}, "", false, b.Registry, false)
if err != nil {
return fmt.Errorf("failed to create runner autoscaler: %w", err)
}
Expand Down
2 changes: 1 addition & 1 deletion frontend/cli/cmd_schema_import.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ func (s *schemaImportCmd) setup(ctx context.Context) error {
return err
}

err = container.Run(ctx, "ollama/ollama", ollamaContainerName, s.OllamaPort, 11434, optional.Some(ollamaVolume))
err = container.Run(ctx, "ollama/ollama", ollamaContainerName, map[int]int{s.OllamaPort: 11434}, optional.Some(ollamaVolume))
if err != nil {
return err
}
Expand Down
13 changes: 12 additions & 1 deletion frontend/cli/cmd_serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ type serveCmd struct {
ObservabilityConfig observability.Config `embed:"" prefix:"o11y-"`
DatabaseImage string `help:"The container image to start for the database" default:"postgres:15.8" env:"FTL_DATABASE_IMAGE" hidden:""`
RegistryImage string `help:"The container image to start for the image registry" default:"registry:2" env:"FTL_REGISTRY_IMAGE" hidden:""`
GrafanaImage string `help:"The container image to start for the automatic Grafana instance" default:"grafana/otel-lgtm" env:"FTL_GRAFANA_IMAGE" hidden:""`
DisableGrafana bool `help:"Disable the automatic Grafana that is started if no telemetry collector is specified." default:"false"`
controller.CommonConfig
provisioner.CommonProvisionerConfig
}
Expand Down Expand Up @@ -116,6 +118,15 @@ func (s *serveCmd) run(
logger.Debugf("Starting FTL with %d controller(s)", s.Controllers)
}

if !s.DisableGrafana && !bool(s.ObservabilityConfig.ExportOTEL) {
err := dev.SetupGrafana(ctx, s.GrafanaImage)
if err != nil {
return fmt.Errorf("failed to setup grafana image: %w", err)
}
os.Setenv("OTEL_EXPORTER_OTLP_ENDPOINT", "http://localhost:4317")
os.Setenv("OTEL_METRIC_EXPORT_INTERVAL", "1000")
s.ObservabilityConfig.ExportOTEL = true
}
err := observability.Init(ctx, false, "", "ftl-serve", ftl.Version, s.ObservabilityConfig)
if err != nil {
return fmt.Errorf("observability init failed: %w", err)
Expand Down Expand Up @@ -174,7 +185,7 @@ func (s *serveCmd) run(
provisionerAddresses = append(provisionerAddresses, bind)
}

runnerScaling, err := localscaling.NewLocalScaling(bindAllocator, controllerAddresses, projConfig.Path, devMode && !projConfig.DisableIDEIntegration, registry)
runnerScaling, err := localscaling.NewLocalScaling(bindAllocator, controllerAddresses, projConfig.Path, devMode && !projConfig.DisableIDEIntegration, registry, bool(s.ObservabilityConfig.ExportOTEL))
if err != nil {
return err
}
Expand Down
23 changes: 13 additions & 10 deletions internal/container/container.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ func Pull(ctx context.Context, imageName string) error {
}

// Run starts a new detached container with the given image, name, port map, and (optional) volume mount.
func Run(ctx context.Context, image, name string, hostPort, containerPort int, volume optional.Option[string]) error {
func Run(ctx context.Context, image, name string, hostToContainerPort map[int]int, volume optional.Option[string], env ...string) error {
cli, err := dockerClient.Get(ctx)
if err != nil {
return err
Expand All @@ -96,21 +96,23 @@ func Run(ctx context.Context, image, name string, hostPort, containerPort int, v
}

config := container.Config{
Image: image,
Image: image,
Env: env,
ExposedPorts: map[nat.Port]struct{}{},
}
bindings := nat.PortMap{}
for k, v := range hostToContainerPort {
containerNatPort := nat.Port(fmt.Sprintf("%d/tcp", v))
bindings[containerNatPort] = []nat.PortBinding{{HostPort: strconv.Itoa(k)}}
config.ExposedPorts[containerNatPort] = struct{}{}
}

containerNatPort := nat.Port(fmt.Sprintf("%d/tcp", containerPort))
hostConfig := container.HostConfig{
PublishAllPorts: true,
RestartPolicy: container.RestartPolicy{
Name: container.RestartPolicyAlways,
},
PortBindings: nat.PortMap{
containerNatPort: []nat.PortBinding{
{
HostPort: strconv.Itoa(hostPort),
},
},
},
PortBindings: bindings,
}
if v, ok := volume.Get(); ok {
hostConfig.Binds = []string{v}
Expand All @@ -132,6 +134,7 @@ func Run(ctx context.Context, image, name string, hostPort, containerPort int, v
// RunDB runs a new detached postgres container with the given name and exposed port.
func RunDB(ctx context.Context, name string, port int, image string) error {
cli, err := dockerClient.Get(ctx)

if err != nil {
return err
}
Expand Down
56 changes: 56 additions & 0 deletions internal/dev/grafana.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package dev

import (
"context"
"fmt"
"net"

"github.com/alecthomas/types/optional"

"github.com/TBD54566975/ftl/internal/container"
"github.com/TBD54566975/ftl/internal/log"
)

const ftlGrafanaName = "ftl-otel-lgtm-1"

func SetupGrafana(ctx context.Context, image string) error {
logger := log.FromContext(ctx)

exists, err := container.DoesExist(ctx, ftlGrafanaName, optional.Some(image))
if err != nil {
return fmt.Errorf("failed to check if container exists: %w", err)
}

if !exists {
logger.Debugf("Creating docker container '%s' for grafana", ftlGrafanaName)
// check if port is already in use
ports := []int{3000, 4317, 4318}
for _, port := range ports {
if l, err := net.Listen("tcp", fmt.Sprintf(":%d", port)); err != nil {
return fmt.Errorf("port %d is already in use", port)
} else if err = l.Close(); err != nil {
return fmt.Errorf("failed to close listener: %w", err)
}
}
err = container.Run(ctx, image, ftlGrafanaName, map[int]int{3000: 3000, 4317: 4317, 4318: 4318}, optional.None[string](), "ENABLE_LOGS_ALL=true", "GF_PATHS_DATA=/data/grafana")
if err != nil {
return fmt.Errorf("failed to run grafana container: %w", err)
}

} else {
// Start the existing container
err = container.Start(ctx, ftlGrafanaName)
if err != nil {
return fmt.Errorf("failed to start existing registry container: %w", err)
}

logger.Debugf("Reusing existing docker container %s for grafana", ftlGrafanaName)
}

err = WaitForPortReady(ctx, 3000)
if err != nil {
return fmt.Errorf("registry container failed to be healthy: %w", err)
}

return nil
}
10 changes: 5 additions & 5 deletions internal/dev/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ func SetupRegistry(ctx context.Context, image string, port int) error {
return fmt.Errorf("failed to close listener: %w", err)
}

err = container.Run(ctx, image, ftlRegistryName, port, 5000, optional.None[string]())
err = container.Run(ctx, image, ftlRegistryName, map[int]int{port: 5000}, optional.None[string]())
if err != nil {
return fmt.Errorf("failed to run registry container: %w", err)
}
Expand All @@ -54,24 +54,24 @@ func SetupRegistry(ctx context.Context, image string, port int) error {
logger.Debugf("Reusing existing docker container %s on port %d for image registry", ftlRegistryName, port)
}

err = WaitForRegistryReady(ctx, port)
err = WaitForPortReady(ctx, port)
if err != nil {
return fmt.Errorf("registry container failed to be healthy: %w", err)
}

return nil
}

func WaitForRegistryReady(ctx context.Context, port int) error {
func WaitForPortReady(ctx context.Context, port int) error {

timeout := time.After(10 * time.Minute)
retry := time.NewTicker(5 * time.Millisecond)
for {
select {
case <-ctx.Done():
return fmt.Errorf("context cancelled waiting for registry")
return fmt.Errorf("context cancelled waiting for container")
case <-timeout:
return fmt.Errorf("timed out waiting for registry to be healthy")
return fmt.Errorf("timed out waiting for container to be healthy")
case <-retry.C:
url := fmt.Sprintf("http://127.0.0.1:%d", port)

Expand Down

0 comments on commit 00836b3

Please sign in to comment.