Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add pubsub source and schema validation #1679

Merged
merged 6 commits into from
Jun 17, 2023
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions sc-poc/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ require (
github.com/segmentio/ksuid v1.0.4
github.com/spf13/cobra v1.6.1
github.com/spf13/viper v1.13.0
github.com/xeipuuv/gojsonschema v1.2.0
go.uber.org/zap v1.24.0
k8s.io/apimachinery v0.26.1
k8s.io/client-go v0.26.1
Expand Down
3 changes: 3 additions & 0 deletions sc-poc/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -759,10 +759,13 @@ github.com/urfave/cli v1.22.1/go.mod h1:Gos4lmkARVdJ6EkW0WaNv/tZAAMe9V7XWyB60NtX
github.com/urfave/cli v1.22.2/go.mod h1:Gos4lmkARVdJ6EkW0WaNv/tZAAMe9V7XWyB60NtXRu0=
github.com/urfave/cli v1.22.12 h1:igJgVw1JdKH+trcLWLeLwZjU9fEfPesQ+9/e4MQ44S8=
github.com/urfave/cli v1.22.12/go.mod h1:sSBEIC79qR6OvcmsD4U3KABeOTxDqQtdDnaFuUN30b8=
github.com/xeipuuv/gojsonpointer v0.0.0-20180127040702-4e3ac2762d5f/go.mod h1:N2zxlSyiKSe5eX1tZViRH5QA0qijqEDrYZiPEAiq3wU=
github.com/xeipuuv/gojsonpointer v0.0.0-20190905194746-02993c407bfb h1:zGWFAtiMcyryUHoUjUJX0/lt1H2+i2Ka2n+D3DImSNo=
github.com/xeipuuv/gojsonpointer v0.0.0-20190905194746-02993c407bfb/go.mod h1:N2zxlSyiKSe5eX1tZViRH5QA0qijqEDrYZiPEAiq3wU=
github.com/xeipuuv/gojsonreference v0.0.0-20180127040603-bd5ef7bd5415 h1:EzJWgHovont7NscjpAxXsDA8S8BMYve8Y5+7cuRE7R0=
github.com/xeipuuv/gojsonreference v0.0.0-20180127040603-bd5ef7bd5415/go.mod h1:GwrjFmJcFw6At/Gs6z4yjiIwzuJ1/+UwLxMQDVQXShQ=
github.com/xeipuuv/gojsonschema v1.2.0 h1:LhYJRs+L4fBtjZUfuSZIKGeVu0QRy8e5Xi7D17UxZ74=
github.com/xeipuuv/gojsonschema v1.2.0/go.mod h1:anYRn/JVcOK2ZgGU+IjEV4nwlhoK5sQluxsYJ78Id3Y=
github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2/go.mod h1:UETIi67q53MR2AWcXfiuqkDkRtnGDLqkBTpCHuJHxtU=
github.com/xordataexchange/crypt v0.0.3-0.20170626215501-b2862e3d0a77/go.mod h1:aYKd//L2LvnjZzWKhF00oedf4jCCReLcmhLdhm1A27Q=
github.com/yashtewari/glob-intersection v0.1.0 h1:6gJvMYQlTDOL3dMsPF6J0+26vwX9MB8/1q3uAdhmTrg=
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,7 @@ apiVersion: apiextensions.k8s.io/v1
kind: CustomResourceDefinition
metadata:
annotations:
controller-gen.kubebuilder.io/version: v0.10.0
creationTimestamp: null
controller-gen.kubebuilder.io/version: (devel)
YourTechBud marked this conversation as resolved.
Show resolved Hide resolved
name: compiledgraphqlsources.core.space-cloud.io
spec:
group: core.space-cloud.io
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,7 @@ apiVersion: apiextensions.k8s.io/v1
kind: CustomResourceDefinition
metadata:
annotations:
controller-gen.kubebuilder.io/version: v0.10.0
creationTimestamp: null
controller-gen.kubebuilder.io/version: (devel)
name: graphqlsources.core.space-cloud.io
spec:
group: core.space-cloud.io
Expand Down
3 changes: 1 addition & 2 deletions sc-poc/manifests/crds/core.space-cloud.io_jwthsasecrets.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,7 @@ apiVersion: apiextensions.k8s.io/v1
kind: CustomResourceDefinition
metadata:
annotations:
controller-gen.kubebuilder.io/version: v0.10.0
creationTimestamp: null
controller-gen.kubebuilder.io/version: (devel)
name: jwthsasecrets.core.space-cloud.io
spec:
group: core.space-cloud.io
Expand Down
3 changes: 1 addition & 2 deletions sc-poc/manifests/crds/core.space-cloud.io_opapolicies.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,7 @@ apiVersion: apiextensions.k8s.io/v1
kind: CustomResourceDefinition
metadata:
annotations:
controller-gen.kubebuilder.io/version: v0.10.0
creationTimestamp: null
controller-gen.kubebuilder.io/version: (devel)
name: opapolicies.core.space-cloud.io
spec:
group: core.space-cloud.io
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,7 @@ apiVersion: apiextensions.k8s.io/v1
kind: CustomResourceDefinition
metadata:
annotations:
controller-gen.kubebuilder.io/version: v0.10.0
creationTimestamp: null
controller-gen.kubebuilder.io/version: (devel)
name: openapisources.core.space-cloud.io
spec:
group: core.space-cloud.io
Expand Down
74 changes: 74 additions & 0 deletions sc-poc/manifests/crds/core.space-cloud.io_pubsubchannels.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
---
apiVersion: apiextensions.k8s.io/v1
kind: CustomResourceDefinition
metadata:
annotations:
controller-gen.kubebuilder.io/version: (devel)
name: pubsubchannels.core.space-cloud.io
spec:
group: core.space-cloud.io
names:
kind: PubsubChannel
listKind: PubsubChannelList
plural: pubsubchannels
singular: pubsubchannel
scope: Namespaced
versions:
- name: v1alpha1
schema:
openAPIV3Schema:
description: PubsubChannel is the schema for the pubsub channel
properties:
apiVersion:
description: 'APIVersion defines the versioned schema of this representation
of an object. Servers should convert recognized schemas to the latest
internal value, and may reject unrecognized values. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#resources'
type: string
kind:
description: 'Kind is a string value representing the REST resource this
object represents. Servers may infer this from the endpoint the client
submits requests to. Cannot be updated. In CamelCase. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#types-kinds'
type: string
metadata:
type: object
spec:
description: PubsubChannel describes the specification of the pubsub channel
properties:
channel:
description: Channel describes the name of the pubsub channel
type: string
payload:
description: Payload describes the payload schema of the channel
properties:
additionalProperties:
description: AdditionalProperties defines if the schema accepts
properties other than the ones mentioned
format: byte
type: string
items:
description: Items list the items of the array
properties:
additionalProperties: {}
description: Properties list additional properties of the object
type: object
required:
description: Required specifies the required properties of the
object
items:
type: string
type: array
type:
description: Type defines the type of the data
type: string
type: object
required:
- channel
type: object
status:
description: PubsubChannel defines the observed state of the pubsub channel
type: object
type: object
served: true
storage: true
subresources:
status: {}
26 changes: 23 additions & 3 deletions sc-poc/modules/pubsub/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"go.uber.org/zap"

"github.com/spacecloud-io/space-cloud/managers/apis"
"github.com/spacecloud-io/space-cloud/managers/source"
"github.com/spacecloud-io/space-cloud/modules/pubsub/connectors"
)

Expand All @@ -27,6 +28,7 @@ type App struct {
// For internal usage
logger *zap.Logger
asyncapiDoc *AsyncAPI
channels []Channel
}

// CaddyModule returns the Caddy module information.
Expand All @@ -52,10 +54,28 @@ func (a *App) Provision(ctx caddy.Context) error {
connector := val.(*connectors.Connector)
a.pubSub = connector.GetPubsubClient()

channels := a.Channels()
for path, channel := range channels.Channels {
// Get all space-cloud defined pubsub channel sources
a.createInternalChannels()

// Get all user defined pubsub channel sources
sourceManT, err := ctx.App("source")
if err != nil {
a.logger.Error("Unable to load the source manager", zap.Error(err))
}
sourceMan := sourceManT.(*source.App)
sources := sourceMan.GetSources("pubsub")
for _, src := range sources {
channelSrc, ok := src.(Source)
if ok {
topic := channelSrc.GetChannel()
a.channels = append(a.channels, topic)
}
}

// Generate publish and subscribe API for each channel
for path, channel := range a.Channels().Channels {
// Get the publish and subscribe API of the channel
publisherAPI := a.getPublisherAPI(path, channel.Name)
publisherAPI := a.getPublisherAPI(path, channel)
subscriberAPI := a.getSubscriberAPI(path, channel.Name)

a.apis = append(a.apis, publisherAPI, subscriberAPI)
Expand Down
59 changes: 42 additions & 17 deletions sc-poc/modules/pubsub/asyncapi.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,29 +84,54 @@ func (a *App) generateASyncAPIDoc() *AsyncAPI {
Subscribe: &Operation{
ID: "producerSubscribe" + getID(channelObj.Name),
Message: MessageOneOrMany{
MessageEntity: MessageEntity{
Name: "Acknowledgement",
ContentType: "application/json",
Payload: map[string]interface{}{
"type": "object",
"properties": map[string]interface{}{
"event": map[string]interface{}{
"type": "string",
OneOf: []MessageEntity{
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No need of a oneOf if you are returning just a single object

{
Name: "Acknowledgement",
ContentType: "application/json",
Payload: map[string]interface{}{
"type": "object",
"properties": map[string]interface{}{
"event": map[string]interface{}{
"type": "string",
},
"data": map[string]interface{}{
"type": "object",
"properties": map[string]interface{}{
"id": map[string]interface{}{
"type": "string",
},
"ack": map[string]interface{}{
"type": "boolean",
},
},
"required": []string{"id", "ack"},
},
},
"data": map[string]interface{}{
"type": "object",
"properties": map[string]interface{}{
"id": map[string]interface{}{
"required": []string{"event", "data"},
},
},
{
Name: "PublishError",
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we merge publish error with the Acknowledge message?

ContentType: "application/json",
Payload: map[string]interface{}{
"type": "object",
"properties": map[string]interface{}{
"id": map[string]interface{}{
"type": "string",
},
"message": map[string]interface{}{
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Accept a message id as well. We need to tell the publisher which message didn't publish. Also you can make json schema directly from a struct without having to make such maps. Look at this library: https://github.com/invopop/jsonschema

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we do this in a separate PR? I will make an issue for it.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"type": "string",
},
"errors": map[string]interface{}{
"type": "array",
"items": map[string]interface{}{
"type": "string",
},
"ack": map[string]interface{}{
"type": "boolean",
},
"required": []string{"message"},
},
"required": []string{"id", "ack"},
},
"required": []string{"event", "data"},
},
"required": []string{"event", "data"},
},
},
},
Expand Down
56 changes: 39 additions & 17 deletions sc-poc/modules/pubsub/operations.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,11 @@ package pubsub
import (
"context"
"encoding/json"
"fmt"
"strings"

"github.com/ThreeDotsLabs/watermill/message"
"github.com/spacecloud-io/space-cloud/pkg/apis/core/v1alpha1"
)

// Publish publishes message to a topic
Expand All @@ -28,26 +31,45 @@ func (a *App) Subscribe(ctx context.Context, clientID, topic string, options Sub
return messages, nil
}

// Channels return channels with their schema
func (a *App) Channels() ChannelsWithSchema {
return ChannelsWithSchema{
Channels: map[string]Channel{
"/sc/api": {
Name: "api-provision",
Payload: ChannelPayload{
Schema: map[string]interface{}{
"$ref": "#/components/schemas/APIManMsg",
},
},
func (a *App) createInternalChannels() {
openapiProvisionChannel := Channel{
Name: "openapi-provision",
Payload: ChannelPayload{
Schema: &v1alpha1.ChannelSchema{
Type: "object",
AdditionalProperties: json.RawMessage(fmt.Sprintf(`%t`, true)),
},
},
Components: &Components{
Schemas: map[string]interface{}{
"APIManMsg": map[string]interface{}{
"type": "object",
"additionalProperties": true,
},
}

asyncapiProvisionChannel := Channel{
Name: "asyncapi-provision",
Payload: ChannelPayload{
Schema: &v1alpha1.ChannelSchema{
Type: "object",
AdditionalProperties: json.RawMessage(fmt.Sprintf(`%t`, true)),
},
},
}
a.channels = append(a.channels, openapiProvisionChannel, asyncapiProvisionChannel)
}

// Channels return channels with their schema
func (a *App) Channels() ChannelsWithSchema {
channels := ChannelsWithSchema{
Channels: make(map[string]Channel),
}

for _, topic := range a.channels {
channelPath := getChannelPath(topic.Name)
channels.Channels[channelPath] = topic
}
return channels
}

func getChannelPath(name string) string {
if name[0] != '/' {
name = "/" + name
}
return strings.ReplaceAll(name, "-", "/")
}
23 changes: 19 additions & 4 deletions sc-poc/modules/pubsub/types.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
package pubsub

import (
v1alpha1 "github.com/spacecloud-io/space-cloud/pkg/apis/core/v1alpha1"
)

type (
EventType string
)
Expand All @@ -19,7 +23,7 @@ type Message struct {
// PublishMessage defines the type for publishing a message
type PublishMessage struct {
ID string `json:"id"`
MetaData map[string]string `json:"metadata"`
MetaData map[string]string `json:"metadata,omitempty"`
Payload interface{} `json:"payload"`
}

Expand Down Expand Up @@ -50,9 +54,9 @@ type Channel struct {

// ChannelPayload define channel's payload
type ChannelPayload struct {
Schema map[string]interface{} `json:"schema,omitempty"`
Example interface{} `json:"example,omitempty"`
Examples []interface{} `json:"examples,omitempty"`
Schema *v1alpha1.ChannelSchema `json:"schema,omitempty"`
Example interface{} `json:"example,omitempty"`
Examples []interface{} `json:"examples,omitempty"`
}

// Components stores the components for the schema refs
Expand Down Expand Up @@ -118,3 +122,14 @@ type ServerItem struct {
type AsyncAPIComponents struct {
Schemas map[string]interface{} `json:"schemas,omitempty"`
}

// WebsocketErrorMessage defines the error message in websocket connection
type WebsocketErrorMessage struct {
ID string `json:"id"`
Message string `json:"message"`
YourTechBud marked this conversation as resolved.
Show resolved Hide resolved
Errors []string `json:"errors,omitempty"`
}

type Source interface {
GetChannel() Channel
}
Loading