Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add pubsub source and schema validation #1679

Merged
merged 6 commits into from
Jun 17, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions sc-poc/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ require (
github.com/segmentio/ksuid v1.0.4
github.com/spf13/cobra v1.6.1
github.com/spf13/viper v1.13.0
github.com/xeipuuv/gojsonschema v1.2.0
go.uber.org/zap v1.24.0
k8s.io/apimachinery v0.26.1
k8s.io/client-go v0.26.1
Expand Down
3 changes: 3 additions & 0 deletions sc-poc/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -759,10 +759,13 @@ github.com/urfave/cli v1.22.1/go.mod h1:Gos4lmkARVdJ6EkW0WaNv/tZAAMe9V7XWyB60NtX
github.com/urfave/cli v1.22.2/go.mod h1:Gos4lmkARVdJ6EkW0WaNv/tZAAMe9V7XWyB60NtXRu0=
github.com/urfave/cli v1.22.12 h1:igJgVw1JdKH+trcLWLeLwZjU9fEfPesQ+9/e4MQ44S8=
github.com/urfave/cli v1.22.12/go.mod h1:sSBEIC79qR6OvcmsD4U3KABeOTxDqQtdDnaFuUN30b8=
github.com/xeipuuv/gojsonpointer v0.0.0-20180127040702-4e3ac2762d5f/go.mod h1:N2zxlSyiKSe5eX1tZViRH5QA0qijqEDrYZiPEAiq3wU=
github.com/xeipuuv/gojsonpointer v0.0.0-20190905194746-02993c407bfb h1:zGWFAtiMcyryUHoUjUJX0/lt1H2+i2Ka2n+D3DImSNo=
github.com/xeipuuv/gojsonpointer v0.0.0-20190905194746-02993c407bfb/go.mod h1:N2zxlSyiKSe5eX1tZViRH5QA0qijqEDrYZiPEAiq3wU=
github.com/xeipuuv/gojsonreference v0.0.0-20180127040603-bd5ef7bd5415 h1:EzJWgHovont7NscjpAxXsDA8S8BMYve8Y5+7cuRE7R0=
github.com/xeipuuv/gojsonreference v0.0.0-20180127040603-bd5ef7bd5415/go.mod h1:GwrjFmJcFw6At/Gs6z4yjiIwzuJ1/+UwLxMQDVQXShQ=
github.com/xeipuuv/gojsonschema v1.2.0 h1:LhYJRs+L4fBtjZUfuSZIKGeVu0QRy8e5Xi7D17UxZ74=
github.com/xeipuuv/gojsonschema v1.2.0/go.mod h1:anYRn/JVcOK2ZgGU+IjEV4nwlhoK5sQluxsYJ78Id3Y=
github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2/go.mod h1:UETIi67q53MR2AWcXfiuqkDkRtnGDLqkBTpCHuJHxtU=
github.com/xordataexchange/crypt v0.0.3-0.20170626215501-b2862e3d0a77/go.mod h1:aYKd//L2LvnjZzWKhF00oedf4jCCReLcmhLdhm1A27Q=
github.com/yashtewari/glob-intersection v0.1.0 h1:6gJvMYQlTDOL3dMsPF6J0+26vwX9MB8/1q3uAdhmTrg=
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,7 @@ apiVersion: apiextensions.k8s.io/v1
kind: CustomResourceDefinition
metadata:
annotations:
controller-gen.kubebuilder.io/version: v0.10.0
creationTimestamp: null
controller-gen.kubebuilder.io/version: (devel)
YourTechBud marked this conversation as resolved.
Show resolved Hide resolved
name: compiledgraphqlsources.core.space-cloud.io
spec:
group: core.space-cloud.io
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,7 @@ apiVersion: apiextensions.k8s.io/v1
kind: CustomResourceDefinition
metadata:
annotations:
controller-gen.kubebuilder.io/version: v0.10.0
creationTimestamp: null
controller-gen.kubebuilder.io/version: (devel)
name: graphqlsources.core.space-cloud.io
spec:
group: core.space-cloud.io
Expand Down
3 changes: 1 addition & 2 deletions sc-poc/manifests/crds/core.space-cloud.io_jwthsasecrets.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,7 @@ apiVersion: apiextensions.k8s.io/v1
kind: CustomResourceDefinition
metadata:
annotations:
controller-gen.kubebuilder.io/version: v0.10.0
creationTimestamp: null
controller-gen.kubebuilder.io/version: (devel)
name: jwthsasecrets.core.space-cloud.io
spec:
group: core.space-cloud.io
Expand Down
3 changes: 1 addition & 2 deletions sc-poc/manifests/crds/core.space-cloud.io_opapolicies.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,7 @@ apiVersion: apiextensions.k8s.io/v1
kind: CustomResourceDefinition
metadata:
annotations:
controller-gen.kubebuilder.io/version: v0.10.0
creationTimestamp: null
controller-gen.kubebuilder.io/version: (devel)
name: opapolicies.core.space-cloud.io
spec:
group: core.space-cloud.io
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,7 @@ apiVersion: apiextensions.k8s.io/v1
kind: CustomResourceDefinition
metadata:
annotations:
controller-gen.kubebuilder.io/version: v0.10.0
creationTimestamp: null
controller-gen.kubebuilder.io/version: (devel)
name: openapisources.core.space-cloud.io
spec:
group: core.space-cloud.io
Expand Down
73 changes: 73 additions & 0 deletions sc-poc/manifests/crds/core.space-cloud.io_pubsubchannels.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
---
apiVersion: apiextensions.k8s.io/v1
kind: CustomResourceDefinition
metadata:
annotations:
controller-gen.kubebuilder.io/version: (devel)
name: pubsubchannels.core.space-cloud.io
spec:
group: core.space-cloud.io
names:
kind: PubsubChannel
listKind: PubsubChannelList
plural: pubsubchannels
singular: pubsubchannel
scope: Namespaced
versions:
- name: v1alpha1
schema:
openAPIV3Schema:
description: PubsubChannel is the schema for the pubsub channel
properties:
apiVersion:
description: 'APIVersion defines the versioned schema of this representation
of an object. Servers should convert recognized schemas to the latest
internal value, and may reject unrecognized values. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#resources'
type: string
kind:
description: 'Kind is a string value representing the REST resource this
object represents. Servers may infer this from the endpoint the client
submits requests to. Cannot be updated. In CamelCase. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#types-kinds'
type: string
metadata:
type: object
spec:
description: PubsubChannel describes the specification of the pubsub channel
properties:
channel:
description: Channel describes the name of the pubsub channel
type: string
payload:
additionalProperties:
description: ChannelSchema defines the schema of the payload that
the channel accepts
properties:
items:
description: Items list the items of the array
properties:
additionalProperties: {}
description: Properties list additional properties of the object
type: object
required:
description: Required specifies the required properties of the
object
items:
type: string
type: array
type:
description: Type defines the type of the data
type: string
type: object
description: Payload describes the payload schema of the channel
type: object
required:
- channel
type: object
status:
description: PubsubChannel defines the observed state of the pubsub channel
type: object
type: object
served: true
storage: true
subresources:
status: {}
31 changes: 28 additions & 3 deletions sc-poc/modules/pubsub/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@ import (
"go.uber.org/zap"

"github.com/spacecloud-io/space-cloud/managers/apis"
"github.com/spacecloud-io/space-cloud/managers/source"
"github.com/spacecloud-io/space-cloud/modules/pubsub/connectors"
"github.com/spacecloud-io/space-cloud/sources/pubsub_channel"
)

var connectorPool = caddy.NewUsagePool()
Expand All @@ -27,6 +29,7 @@ type App struct {
// For internal usage
logger *zap.Logger
asyncapiDoc *AsyncAPI
channels []Channel
}

// CaddyModule returns the Caddy module information.
Expand All @@ -52,10 +55,32 @@ func (a *App) Provision(ctx caddy.Context) error {
connector := val.(*connectors.Connector)
a.pubSub = connector.GetPubsubClient()

channels := a.Channels()
for path, channel := range channels.Channels {
// Get all space-cloud defined pubsub channel sources
a.createInternalChannels()

// Get all user defined pubsub channel sources
sourceManT, err := ctx.App("source")
if err != nil {
a.logger.Error("Unable to load the source manager", zap.Error(err))
}
sourceMan := sourceManT.(*source.App)
sources := sourceMan.GetSources("pubsub")
for _, src := range sources {
channelSrc := src.(*pubsub_channel.PubsubChannelSource)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make an interface in this package itself

topic := Channel{
Name: channelSrc.Spec.Channel,
Payload: ChannelPayload{
Schema: channelSrc.Spec.Payload,
},
}

a.channels = append(a.channels, topic)
}

// Generate publish and subscribe API for each channel
for path, channel := range a.Channels().Channels {
// Get the publish and subscribe API of the channel
publisherAPI := a.getPublisherAPI(path, channel.Name)
publisherAPI := a.getPublisherAPI(path, channel)
subscriberAPI := a.getSubscriberAPI(path, channel.Name)

a.apis = append(a.apis, publisherAPI, subscriberAPI)
Expand Down
58 changes: 49 additions & 9 deletions sc-poc/modules/pubsub/operations.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,10 @@ package pubsub
import (
"context"
"encoding/json"
"strings"

"github.com/ThreeDotsLabs/watermill/message"
"github.com/spacecloud-io/space-cloud/pkg/apis/core/v1alpha1"
)

// Publish publishes message to a topic
Expand All @@ -28,19 +30,44 @@ func (a *App) Subscribe(ctx context.Context, clientID, topic string, options Sub
return messages, nil
}

// Channels return channels with their schema
func (a *App) Channels() ChannelsWithSchema {
return ChannelsWithSchema{
Channels: map[string]Channel{
"/sc/api": {
Name: "api-provision",
Payload: ChannelPayload{
Schema: map[string]interface{}{
"$ref": "#/components/schemas/APIManMsg",
func (a *App) createInternalChannels() {
openapiProvisionChannel := Channel{
Name: "openapi-provision",
Payload: ChannelPayload{
Schema: map[string]*v1alpha1.ChannelSchema{
"doc": {
Type: "string",
},
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is this schema for? Why not keep the entire schema as type = object & AdditionalProperties true

},
},
}

asyncapiProvisionChannel := Channel{
Name: "asyncapi-provision",
Payload: ChannelPayload{
Schema: map[string]*v1alpha1.ChannelSchema{
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here

"doc": {
Type: "object",
Properties: map[string]*v1alpha1.ChannelSchema{
"name": {
Type: "string",
},
"age": {
Type: "integer",
},
},
Required: []string{"name"},
},
},
},
}
a.channels = append(a.channels, openapiProvisionChannel, asyncapiProvisionChannel)
}

// Channels return channels with their schema
func (a *App) Channels() ChannelsWithSchema {
channels := ChannelsWithSchema{
Channels: make(map[string]Channel),
Components: &Components{
Schemas: map[string]interface{}{
"APIManMsg": map[string]interface{}{
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove this if its no longer needed

Expand All @@ -50,4 +77,17 @@ func (a *App) Channels() ChannelsWithSchema {
},
},
}

for _, topic := range a.channels {
channelPath := getChannelPath(topic.Name)
channels.Channels[channelPath] = topic
}
return channels
}

func getChannelPath(name string) string {
if name[0] != '/' {
name = "/" + name
}
return strings.ReplaceAll(name, "-", "/")
}
12 changes: 8 additions & 4 deletions sc-poc/modules/pubsub/types.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
package pubsub

import (
v1alpha1 "github.com/spacecloud-io/space-cloud/pkg/apis/core/v1alpha1"
)

type (
EventType string
)
Expand All @@ -19,7 +23,7 @@ type Message struct {
// PublishMessage defines the type for publishing a message
type PublishMessage struct {
ID string `json:"id"`
MetaData map[string]string `json:"metadata"`
MetaData map[string]string `json:"metadata,omitempty"`
Payload interface{} `json:"payload"`
}

Expand Down Expand Up @@ -50,9 +54,9 @@ type Channel struct {

// ChannelPayload define channel's payload
type ChannelPayload struct {
Schema map[string]interface{} `json:"schema,omitempty"`
Example interface{} `json:"example,omitempty"`
Examples []interface{} `json:"examples,omitempty"`
Schema map[string]*v1alpha1.ChannelSchema `json:"schema,omitempty"`
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
Schema map[string]*v1alpha1.ChannelSchema `json:"schema,omitempty"`
Schema *v1alpha1.ChannelSchema `json:"schema,omitempty"`

Example interface{} `json:"example,omitempty"`
Examples []interface{} `json:"examples,omitempty"`
}

// Components stores the components for the schema refs
Expand Down
Loading