From fee9af03daf23ff8ca40234d4d72200a6349a310 Mon Sep 17 00:00:00 2001 From: Shubham Nazare Date: Wed, 10 May 2023 11:35:22 +0530 Subject: [PATCH 1/6] feat!: add pubsub source and schema validation Signed-off-by: Shubham Nazare --- sc-poc/go.mod | 1 + sc-poc/go.sum | 3 + ...space-cloud.io_compiledgraphqlsources.yaml | 3 +- .../core.space-cloud.io_graphqlsources.yaml | 3 +- .../core.space-cloud.io_jwthsasecrets.yaml | 3 +- .../crds/core.space-cloud.io_opapolicies.yaml | 3 +- .../core.space-cloud.io_openapisources.yaml | 3 +- .../core.space-cloud.io_pubsubchannels.yaml | 73 +++++++ sc-poc/modules/pubsub/app.go | 31 ++- sc-poc/modules/pubsub/operations.go | 58 +++++- sc-poc/modules/pubsub/types.go | 12 +- sc-poc/modules/pubsub/websocket.go | 54 ++++- .../pkg/apis/core/v1alpha1/pubsub_channel.go | 55 +++++ .../core/v1alpha1/zz_generated.deepcopy.go | 144 +++++++++++++ .../typed/core/v1alpha1/core_client.go | 5 + .../core/v1alpha1/fake/fake_core_client.go | 4 + .../core/v1alpha1/fake/fake_pubsubchannel.go | 142 +++++++++++++ .../core/v1alpha1/generated_expansion.go | 2 + .../typed/core/v1alpha1/pubsubchannel.go | 195 ++++++++++++++++++ .../core/v1alpha1/interface.go | 7 + .../core/v1alpha1/pubsubchannel.go | 90 ++++++++ .../informers/externalversions/generic.go | 2 + .../core/v1alpha1/expansion_generated.go | 8 + .../listers/core/v1alpha1/pubsubchannel.go | 99 +++++++++ sc-poc/sources/pubsub_channel/source.go | 42 ++++ sc-poc/sources/sources.go | 1 + 26 files changed, 1009 insertions(+), 34 deletions(-) create mode 100644 sc-poc/manifests/crds/core.space-cloud.io_pubsubchannels.yaml create mode 100644 sc-poc/pkg/apis/core/v1alpha1/pubsub_channel.go create mode 100644 sc-poc/pkg/client/clientset/versioned/typed/core/v1alpha1/fake/fake_pubsubchannel.go create mode 100644 sc-poc/pkg/client/clientset/versioned/typed/core/v1alpha1/pubsubchannel.go create mode 100644 sc-poc/pkg/client/informers/externalversions/core/v1alpha1/pubsubchannel.go create mode 100644 sc-poc/pkg/client/listers/core/v1alpha1/pubsubchannel.go create mode 100644 sc-poc/sources/pubsub_channel/source.go diff --git a/sc-poc/go.mod b/sc-poc/go.mod index 8bde916f8..145fac63f 100644 --- a/sc-poc/go.mod +++ b/sc-poc/go.mod @@ -20,6 +20,7 @@ require ( github.com/segmentio/ksuid v1.0.4 github.com/spf13/cobra v1.6.1 github.com/spf13/viper v1.13.0 + github.com/xeipuuv/gojsonschema v1.2.0 go.uber.org/zap v1.24.0 k8s.io/apimachinery v0.26.1 k8s.io/client-go v0.26.1 diff --git a/sc-poc/go.sum b/sc-poc/go.sum index 83461c433..999fbbd47 100644 --- a/sc-poc/go.sum +++ b/sc-poc/go.sum @@ -759,10 +759,13 @@ github.com/urfave/cli v1.22.1/go.mod h1:Gos4lmkARVdJ6EkW0WaNv/tZAAMe9V7XWyB60NtX github.com/urfave/cli v1.22.2/go.mod h1:Gos4lmkARVdJ6EkW0WaNv/tZAAMe9V7XWyB60NtXRu0= github.com/urfave/cli v1.22.12 h1:igJgVw1JdKH+trcLWLeLwZjU9fEfPesQ+9/e4MQ44S8= github.com/urfave/cli v1.22.12/go.mod h1:sSBEIC79qR6OvcmsD4U3KABeOTxDqQtdDnaFuUN30b8= +github.com/xeipuuv/gojsonpointer v0.0.0-20180127040702-4e3ac2762d5f/go.mod h1:N2zxlSyiKSe5eX1tZViRH5QA0qijqEDrYZiPEAiq3wU= github.com/xeipuuv/gojsonpointer v0.0.0-20190905194746-02993c407bfb h1:zGWFAtiMcyryUHoUjUJX0/lt1H2+i2Ka2n+D3DImSNo= github.com/xeipuuv/gojsonpointer v0.0.0-20190905194746-02993c407bfb/go.mod h1:N2zxlSyiKSe5eX1tZViRH5QA0qijqEDrYZiPEAiq3wU= github.com/xeipuuv/gojsonreference v0.0.0-20180127040603-bd5ef7bd5415 h1:EzJWgHovont7NscjpAxXsDA8S8BMYve8Y5+7cuRE7R0= github.com/xeipuuv/gojsonreference v0.0.0-20180127040603-bd5ef7bd5415/go.mod h1:GwrjFmJcFw6At/Gs6z4yjiIwzuJ1/+UwLxMQDVQXShQ= +github.com/xeipuuv/gojsonschema v1.2.0 h1:LhYJRs+L4fBtjZUfuSZIKGeVu0QRy8e5Xi7D17UxZ74= +github.com/xeipuuv/gojsonschema v1.2.0/go.mod h1:anYRn/JVcOK2ZgGU+IjEV4nwlhoK5sQluxsYJ78Id3Y= github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2/go.mod h1:UETIi67q53MR2AWcXfiuqkDkRtnGDLqkBTpCHuJHxtU= github.com/xordataexchange/crypt v0.0.3-0.20170626215501-b2862e3d0a77/go.mod h1:aYKd//L2LvnjZzWKhF00oedf4jCCReLcmhLdhm1A27Q= github.com/yashtewari/glob-intersection v0.1.0 h1:6gJvMYQlTDOL3dMsPF6J0+26vwX9MB8/1q3uAdhmTrg= diff --git a/sc-poc/manifests/crds/core.space-cloud.io_compiledgraphqlsources.yaml b/sc-poc/manifests/crds/core.space-cloud.io_compiledgraphqlsources.yaml index 8f44ddce1..2405cc65d 100644 --- a/sc-poc/manifests/crds/core.space-cloud.io_compiledgraphqlsources.yaml +++ b/sc-poc/manifests/crds/core.space-cloud.io_compiledgraphqlsources.yaml @@ -3,8 +3,7 @@ apiVersion: apiextensions.k8s.io/v1 kind: CustomResourceDefinition metadata: annotations: - controller-gen.kubebuilder.io/version: v0.10.0 - creationTimestamp: null + controller-gen.kubebuilder.io/version: (devel) name: compiledgraphqlsources.core.space-cloud.io spec: group: core.space-cloud.io diff --git a/sc-poc/manifests/crds/core.space-cloud.io_graphqlsources.yaml b/sc-poc/manifests/crds/core.space-cloud.io_graphqlsources.yaml index c4727ec77..601b01222 100644 --- a/sc-poc/manifests/crds/core.space-cloud.io_graphqlsources.yaml +++ b/sc-poc/manifests/crds/core.space-cloud.io_graphqlsources.yaml @@ -3,8 +3,7 @@ apiVersion: apiextensions.k8s.io/v1 kind: CustomResourceDefinition metadata: annotations: - controller-gen.kubebuilder.io/version: v0.10.0 - creationTimestamp: null + controller-gen.kubebuilder.io/version: (devel) name: graphqlsources.core.space-cloud.io spec: group: core.space-cloud.io diff --git a/sc-poc/manifests/crds/core.space-cloud.io_jwthsasecrets.yaml b/sc-poc/manifests/crds/core.space-cloud.io_jwthsasecrets.yaml index 98ba78819..94a3a9e80 100644 --- a/sc-poc/manifests/crds/core.space-cloud.io_jwthsasecrets.yaml +++ b/sc-poc/manifests/crds/core.space-cloud.io_jwthsasecrets.yaml @@ -3,8 +3,7 @@ apiVersion: apiextensions.k8s.io/v1 kind: CustomResourceDefinition metadata: annotations: - controller-gen.kubebuilder.io/version: v0.10.0 - creationTimestamp: null + controller-gen.kubebuilder.io/version: (devel) name: jwthsasecrets.core.space-cloud.io spec: group: core.space-cloud.io diff --git a/sc-poc/manifests/crds/core.space-cloud.io_opapolicies.yaml b/sc-poc/manifests/crds/core.space-cloud.io_opapolicies.yaml index b9f3db0b2..f0bae5f85 100644 --- a/sc-poc/manifests/crds/core.space-cloud.io_opapolicies.yaml +++ b/sc-poc/manifests/crds/core.space-cloud.io_opapolicies.yaml @@ -3,8 +3,7 @@ apiVersion: apiextensions.k8s.io/v1 kind: CustomResourceDefinition metadata: annotations: - controller-gen.kubebuilder.io/version: v0.10.0 - creationTimestamp: null + controller-gen.kubebuilder.io/version: (devel) name: opapolicies.core.space-cloud.io spec: group: core.space-cloud.io diff --git a/sc-poc/manifests/crds/core.space-cloud.io_openapisources.yaml b/sc-poc/manifests/crds/core.space-cloud.io_openapisources.yaml index 45d802e59..0d919b65a 100644 --- a/sc-poc/manifests/crds/core.space-cloud.io_openapisources.yaml +++ b/sc-poc/manifests/crds/core.space-cloud.io_openapisources.yaml @@ -3,8 +3,7 @@ apiVersion: apiextensions.k8s.io/v1 kind: CustomResourceDefinition metadata: annotations: - controller-gen.kubebuilder.io/version: v0.10.0 - creationTimestamp: null + controller-gen.kubebuilder.io/version: (devel) name: openapisources.core.space-cloud.io spec: group: core.space-cloud.io diff --git a/sc-poc/manifests/crds/core.space-cloud.io_pubsubchannels.yaml b/sc-poc/manifests/crds/core.space-cloud.io_pubsubchannels.yaml new file mode 100644 index 000000000..d6ebac5ce --- /dev/null +++ b/sc-poc/manifests/crds/core.space-cloud.io_pubsubchannels.yaml @@ -0,0 +1,73 @@ +--- +apiVersion: apiextensions.k8s.io/v1 +kind: CustomResourceDefinition +metadata: + annotations: + controller-gen.kubebuilder.io/version: (devel) + name: pubsubchannels.core.space-cloud.io +spec: + group: core.space-cloud.io + names: + kind: PubsubChannel + listKind: PubsubChannelList + plural: pubsubchannels + singular: pubsubchannel + scope: Namespaced + versions: + - name: v1alpha1 + schema: + openAPIV3Schema: + description: PubsubChannel is the schema for the pubsub channel + properties: + apiVersion: + description: 'APIVersion defines the versioned schema of this representation + of an object. Servers should convert recognized schemas to the latest + internal value, and may reject unrecognized values. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#resources' + type: string + kind: + description: 'Kind is a string value representing the REST resource this + object represents. Servers may infer this from the endpoint the client + submits requests to. Cannot be updated. In CamelCase. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#types-kinds' + type: string + metadata: + type: object + spec: + description: PubsubChannel describes the specification of the pubsub channel + properties: + channel: + description: Channel describes the name of the pubsub channel + type: string + payload: + additionalProperties: + description: ChannelSchema defines the schema of the payload that + the channel accepts + properties: + items: + description: Items list the items of the array + properties: + additionalProperties: {} + description: Properties list additional properties of the object + type: object + required: + description: Required specifies the required properties of the + object + items: + type: string + type: array + type: + description: Type defines the type of the data + type: string + type: object + description: Payload describes the payload schema of the channel + type: object + required: + - channel + type: object + status: + description: PubsubChannel defines the observed state of the pubsub channel + type: object + type: object + served: true + storage: true + subresources: + status: {} diff --git a/sc-poc/modules/pubsub/app.go b/sc-poc/modules/pubsub/app.go index 6e0893d89..ac21d4773 100644 --- a/sc-poc/modules/pubsub/app.go +++ b/sc-poc/modules/pubsub/app.go @@ -6,7 +6,9 @@ import ( "go.uber.org/zap" "github.com/spacecloud-io/space-cloud/managers/apis" + "github.com/spacecloud-io/space-cloud/managers/source" "github.com/spacecloud-io/space-cloud/modules/pubsub/connectors" + "github.com/spacecloud-io/space-cloud/sources/pubsub_channel" ) var connectorPool = caddy.NewUsagePool() @@ -27,6 +29,7 @@ type App struct { // For internal usage logger *zap.Logger asyncapiDoc *AsyncAPI + channels []Channel } // CaddyModule returns the Caddy module information. @@ -52,10 +55,32 @@ func (a *App) Provision(ctx caddy.Context) error { connector := val.(*connectors.Connector) a.pubSub = connector.GetPubsubClient() - channels := a.Channels() - for path, channel := range channels.Channels { + // Get all space-cloud defined pubsub channel sources + a.createInternalChannels() + + // Get all user defined pubsub channel sources + sourceManT, err := ctx.App("source") + if err != nil { + a.logger.Error("Unable to load the source manager", zap.Error(err)) + } + sourceMan := sourceManT.(*source.App) + sources := sourceMan.GetSources("pubsub") + for _, src := range sources { + channelSrc := src.(*pubsub_channel.PubsubChannelSource) + topic := Channel{ + Name: channelSrc.Spec.Channel, + Payload: ChannelPayload{ + Schema: channelSrc.Spec.Payload, + }, + } + + a.channels = append(a.channels, topic) + } + + // Generate publish and subscribe API for each channel + for path, channel := range a.Channels().Channels { // Get the publish and subscribe API of the channel - publisherAPI := a.getPublisherAPI(path, channel.Name) + publisherAPI := a.getPublisherAPI(path, channel) subscriberAPI := a.getSubscriberAPI(path, channel.Name) a.apis = append(a.apis, publisherAPI, subscriberAPI) diff --git a/sc-poc/modules/pubsub/operations.go b/sc-poc/modules/pubsub/operations.go index d9ac598c3..f254739f2 100644 --- a/sc-poc/modules/pubsub/operations.go +++ b/sc-poc/modules/pubsub/operations.go @@ -3,8 +3,10 @@ package pubsub import ( "context" "encoding/json" + "strings" "github.com/ThreeDotsLabs/watermill/message" + "github.com/spacecloud-io/space-cloud/pkg/apis/core/v1alpha1" ) // Publish publishes message to a topic @@ -28,19 +30,44 @@ func (a *App) Subscribe(ctx context.Context, clientID, topic string, options Sub return messages, nil } -// Channels return channels with their schema -func (a *App) Channels() ChannelsWithSchema { - return ChannelsWithSchema{ - Channels: map[string]Channel{ - "/sc/api": { - Name: "api-provision", - Payload: ChannelPayload{ - Schema: map[string]interface{}{ - "$ref": "#/components/schemas/APIManMsg", +func (a *App) createInternalChannels() { + openapiProvisionChannel := Channel{ + Name: "openapi-provision", + Payload: ChannelPayload{ + Schema: map[string]*v1alpha1.ChannelSchema{ + "doc": { + Type: "string", + }, + }, + }, + } + + asyncapiProvisionChannel := Channel{ + Name: "asyncapi-provision", + Payload: ChannelPayload{ + Schema: map[string]*v1alpha1.ChannelSchema{ + "doc": { + Type: "object", + Properties: map[string]*v1alpha1.ChannelSchema{ + "name": { + Type: "string", + }, + "age": { + Type: "integer", + }, }, + Required: []string{"name"}, }, }, }, + } + a.channels = append(a.channels, openapiProvisionChannel, asyncapiProvisionChannel) +} + +// Channels return channels with their schema +func (a *App) Channels() ChannelsWithSchema { + channels := ChannelsWithSchema{ + Channels: make(map[string]Channel), Components: &Components{ Schemas: map[string]interface{}{ "APIManMsg": map[string]interface{}{ @@ -50,4 +77,17 @@ func (a *App) Channels() ChannelsWithSchema { }, }, } + + for _, topic := range a.channels { + channelPath := getChannelPath(topic.Name) + channels.Channels[channelPath] = topic + } + return channels +} + +func getChannelPath(name string) string { + if name[0] != '/' { + name = "/" + name + } + return strings.ReplaceAll(name, "-", "/") } diff --git a/sc-poc/modules/pubsub/types.go b/sc-poc/modules/pubsub/types.go index 5db21a354..d6ddd5ad8 100644 --- a/sc-poc/modules/pubsub/types.go +++ b/sc-poc/modules/pubsub/types.go @@ -1,5 +1,9 @@ package pubsub +import ( + v1alpha1 "github.com/spacecloud-io/space-cloud/pkg/apis/core/v1alpha1" +) + type ( EventType string ) @@ -19,7 +23,7 @@ type Message struct { // PublishMessage defines the type for publishing a message type PublishMessage struct { ID string `json:"id"` - MetaData map[string]string `json:"metadata"` + MetaData map[string]string `json:"metadata,omitempty"` Payload interface{} `json:"payload"` } @@ -50,9 +54,9 @@ type Channel struct { // ChannelPayload define channel's payload type ChannelPayload struct { - Schema map[string]interface{} `json:"schema,omitempty"` - Example interface{} `json:"example,omitempty"` - Examples []interface{} `json:"examples,omitempty"` + Schema map[string]*v1alpha1.ChannelSchema `json:"schema,omitempty"` + Example interface{} `json:"example,omitempty"` + Examples []interface{} `json:"examples,omitempty"` } // Components stores the components for the schema refs diff --git a/sc-poc/modules/pubsub/websocket.go b/sc-poc/modules/pubsub/websocket.go index 8b0855ca9..01657fdfe 100644 --- a/sc-poc/modules/pubsub/websocket.go +++ b/sc-poc/modules/pubsub/websocket.go @@ -11,9 +11,11 @@ import ( "github.com/google/uuid" "github.com/mitchellh/mapstructure" "github.com/segmentio/ksuid" + "github.com/xeipuuv/gojsonschema" "go.uber.org/zap" "github.com/spacecloud-io/space-cloud/managers/apis" + "github.com/spacecloud-io/space-cloud/pkg/apis/core/v1alpha1" ) // TODO: channels may or may not have prefix slash @@ -24,15 +26,27 @@ func (a *App) GetAPIRoutes() apis.APIs { } // getPublishAPI creates a websocket API for sending messages in the channel -func (a *App) getPublisherAPI(channelPath, channelName string) *apis.API { +func (a *App) getPublisherAPI(channelPath string, channel Channel) *apis.API { + // Create a schema validator for incoming messages + channelSchema := v1alpha1.ChannelSchema{ + Type: "object", + Properties: make(map[string]*v1alpha1.ChannelSchema), + } + channelSchema.Properties = channel.Payload.Schema + schemaLoader := gojsonschema.NewGoLoader(channelSchema) + schemaValidator, err := gojsonschema.NewSchema(schemaLoader) + if err != nil { + a.logger.Error("could not create schema validator for channel", zap.String("channel", channel.Name), zap.Error(err)) + } + return &apis.API{ - Name: fmt.Sprintf("%s-publisher", channelName), + Name: fmt.Sprintf("%s-publisher", channel.Name), Path: fmt.Sprintf("/v1/pubsub/default%s/producer", channelPath), Handler: http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { // Create the websocket connection conn, _, _, err := ws.UpgradeHTTP(r, w) if err != nil { - a.logger.Error("could not establish websocket connection", zap.String("channel", channelName), zap.Error(err)) + a.logger.Error("could not establish websocket connection", zap.String("channel", channel.Name), zap.Error(err)) return } defer conn.Close() @@ -41,32 +55,56 @@ func (a *App) getPublisherAPI(channelPath, channelName string) *apis.API { // Get the message from the websocket connection data, _, err := wsutil.ReadClientData(conn) if err != nil { - a.logger.Error("could not read client data or the connection is closed", zap.String("channel", channelName), zap.Error(err)) + a.logger.Error("could not read client data or the connection is closed", zap.String("channel", channel.Name), zap.Error(err)) return } + // Unmarshal message var message Message err = json.Unmarshal(data, &message) if err != nil { - a.logger.Error("could not unmarshal data", zap.String("channel", channelName), zap.Error(err)) + a.logger.Error("could not unmarshal data", zap.String("channel", channel.Name), zap.Error(err)) continue } + // Handle events of type message if message.Event == MessageEvent { var pubMsg PublishMessage err = mapstructure.Decode(message.Data, &pubMsg) if err != nil { - a.logger.Error("could not decode data", zap.String("channel", channelName), zap.Error(err)) + a.logger.Error("could not decode data", zap.String("channel", channel.Name), zap.Error(err)) continue } + // Create a ID if not ID is not already present if pubMsg.ID == "" { pubMsg.ID = ksuid.New().String() } - err = a.Publish(channelName, pubMsg, PublishOptions{}) + // Validate schema of the message + documentLoader := gojsonschema.NewGoLoader(pubMsg.Payload) + result, err := schemaValidator.Validate(documentLoader) + if err != nil { + a.logger.Error("could not validate schema for channel", zap.String("channel", channel.Name), zap.Error(err)) + } + + if !result.Valid() { + var errMsgs []string + for _, desc := range result.Errors() { + errMsgs = append(errMsgs, fmt.Sprint(desc)) + } + + b, _ := json.Marshal(errMsgs) + err = wsutil.WriteServerText(conn, b) + if err != nil { + a.logger.Error("could not send message to the websocket", zap.Error(err)) + } + continue + } + + err = a.Publish(channel.Name, pubMsg, PublishOptions{}) if err != nil { - a.logger.Error("could not publish client message", zap.String("channel", channelName), zap.Error(err)) + a.logger.Error("could not publish client message", zap.String("channel", channel.Name), zap.Error(err)) } } } diff --git a/sc-poc/pkg/apis/core/v1alpha1/pubsub_channel.go b/sc-poc/pkg/apis/core/v1alpha1/pubsub_channel.go new file mode 100644 index 000000000..a9e422ca0 --- /dev/null +++ b/sc-poc/pkg/apis/core/v1alpha1/pubsub_channel.go @@ -0,0 +1,55 @@ +package v1alpha1 + +import ( + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +// +genclient +// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object +// +kubebuilder:object:root=true +// +kubebuilder:subresource:status + +// PubsubChannel is the schema for the pubsub channel +type PubsubChannel struct { + metav1.TypeMeta `json:",inline"` + metav1.ObjectMeta `json:"metadata,omitempty"` + + Spec PubsubChannelSpec `json:"spec,omitempty"` + Status PubsubChannelStatus `json:"status,omitempty"` +} + +// PubsubChannel describes the specification of the pubsub channel +type PubsubChannelSpec struct { + // Channel describes the name of the pubsub channel + Channel string `json:"channel"` + // Payload describes the payload schema of the channel + Payload map[string]*ChannelSchema `json:"payload,omitempty"` +} + +// ChannelSchema defines the schema of the payload that the channel accepts +type ChannelSchema struct { + // Type defines the type of the data + Type string `json:"type,omitempty"` + + // Items list the items of the array + Items *ChannelSchema `json:"items,omitempty"` + + // Properties list additional properties of the object + Properties map[string]*ChannelSchema `json:"properties,omitempty"` + // Required specifies the required properties of the object + Required []string `json:"required,omitempty"` +} + +// PubsubChannel defines the observed state of the pubsub channel +type PubsubChannelStatus struct { + // TODO: Add state to show if sync succeeded or if there was an error +} + +// +kubebuilder:object:root=true + +// OPAPolicyList contains a list of PubsubChannel +type PubsubChannelList struct { + metav1.TypeMeta `json:",inline"` + metav1.ListMeta `json:"metadata,omitempty"` + Items []PubsubChannel `json:"items"` +} diff --git a/sc-poc/pkg/apis/core/v1alpha1/zz_generated.deepcopy.go b/sc-poc/pkg/apis/core/v1alpha1/zz_generated.deepcopy.go index 0738fba02..8c82e59df 100644 --- a/sc-poc/pkg/apis/core/v1alpha1/zz_generated.deepcopy.go +++ b/sc-poc/pkg/apis/core/v1alpha1/zz_generated.deepcopy.go @@ -34,6 +34,46 @@ func (in *AuthSecret) DeepCopy() *AuthSecret { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *ChannelSchema) DeepCopyInto(out *ChannelSchema) { + *out = *in + if in.Items != nil { + in, out := &in.Items, &out.Items + *out = new(ChannelSchema) + (*in).DeepCopyInto(*out) + } + if in.Properties != nil { + in, out := &in.Properties, &out.Properties + *out = make(map[string]*ChannelSchema, len(*in)) + for key, val := range *in { + var outVal *ChannelSchema + if val == nil { + (*out)[key] = nil + } else { + in, out := &val, &outVal + *out = new(ChannelSchema) + (*in).DeepCopyInto(*out) + } + (*out)[key] = outVal + } + } + if in.Required != nil { + in, out := &in.Required, &out.Required + *out = make([]string, len(*in)) + copy(*out, *in) + } +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ChannelSchema. +func (in *ChannelSchema) DeepCopy() *ChannelSchema { + if in == nil { + return nil + } + out := new(ChannelSchema) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *CompiledGraphqlSource) DeepCopyInto(out *CompiledGraphqlSource) { *out = *in @@ -585,6 +625,110 @@ func (in *OpenAPISourceStatus) DeepCopy() *OpenAPISourceStatus { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *PubsubChannel) DeepCopyInto(out *PubsubChannel) { + *out = *in + out.TypeMeta = in.TypeMeta + in.ObjectMeta.DeepCopyInto(&out.ObjectMeta) + in.Spec.DeepCopyInto(&out.Spec) + out.Status = in.Status +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new PubsubChannel. +func (in *PubsubChannel) DeepCopy() *PubsubChannel { + if in == nil { + return nil + } + out := new(PubsubChannel) + in.DeepCopyInto(out) + return out +} + +// DeepCopyObject is an autogenerated deepcopy function, copying the receiver, creating a new runtime.Object. +func (in *PubsubChannel) DeepCopyObject() runtime.Object { + if c := in.DeepCopy(); c != nil { + return c + } + return nil +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *PubsubChannelList) DeepCopyInto(out *PubsubChannelList) { + *out = *in + out.TypeMeta = in.TypeMeta + in.ListMeta.DeepCopyInto(&out.ListMeta) + if in.Items != nil { + in, out := &in.Items, &out.Items + *out = make([]PubsubChannel, len(*in)) + for i := range *in { + (*in)[i].DeepCopyInto(&(*out)[i]) + } + } +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new PubsubChannelList. +func (in *PubsubChannelList) DeepCopy() *PubsubChannelList { + if in == nil { + return nil + } + out := new(PubsubChannelList) + in.DeepCopyInto(out) + return out +} + +// DeepCopyObject is an autogenerated deepcopy function, copying the receiver, creating a new runtime.Object. +func (in *PubsubChannelList) DeepCopyObject() runtime.Object { + if c := in.DeepCopy(); c != nil { + return c + } + return nil +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *PubsubChannelSpec) DeepCopyInto(out *PubsubChannelSpec) { + *out = *in + if in.Payload != nil { + in, out := &in.Payload, &out.Payload + *out = make(map[string]*ChannelSchema, len(*in)) + for key, val := range *in { + var outVal *ChannelSchema + if val == nil { + (*out)[key] = nil + } else { + in, out := &val, &outVal + *out = new(ChannelSchema) + (*in).DeepCopyInto(*out) + } + (*out)[key] = outVal + } + } +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new PubsubChannelSpec. +func (in *PubsubChannelSpec) DeepCopy() *PubsubChannelSpec { + if in == nil { + return nil + } + out := new(PubsubChannelSpec) + in.DeepCopyInto(out) + return out +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *PubsubChannelStatus) DeepCopyInto(out *PubsubChannelStatus) { + *out = *in +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new PubsubChannelStatus. +func (in *PubsubChannelStatus) DeepCopy() *PubsubChannelStatus { + if in == nil { + return nil + } + out := new(PubsubChannelStatus) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *ResourceRef) DeepCopyInto(out *ResourceRef) { *out = *in diff --git a/sc-poc/pkg/client/clientset/versioned/typed/core/v1alpha1/core_client.go b/sc-poc/pkg/client/clientset/versioned/typed/core/v1alpha1/core_client.go index 441603e5c..7d7ea723e 100644 --- a/sc-poc/pkg/client/clientset/versioned/typed/core/v1alpha1/core_client.go +++ b/sc-poc/pkg/client/clientset/versioned/typed/core/v1alpha1/core_client.go @@ -33,6 +33,7 @@ type CoreV1alpha1Interface interface { JwtHSASecretsGetter OPAPoliciesGetter OpenAPISourcesGetter + PubsubChannelsGetter } // CoreV1alpha1Client is used to interact with features provided by the core.space-cloud.io group. @@ -60,6 +61,10 @@ func (c *CoreV1alpha1Client) OpenAPISources(namespace string) OpenAPISourceInter return newOpenAPISources(c, namespace) } +func (c *CoreV1alpha1Client) PubsubChannels(namespace string) PubsubChannelInterface { + return newPubsubChannels(c, namespace) +} + // NewForConfig creates a new CoreV1alpha1Client for the given config. // NewForConfig is equivalent to NewForConfigAndClient(c, httpClient), // where httpClient was generated with rest.HTTPClientFor(c). diff --git a/sc-poc/pkg/client/clientset/versioned/typed/core/v1alpha1/fake/fake_core_client.go b/sc-poc/pkg/client/clientset/versioned/typed/core/v1alpha1/fake/fake_core_client.go index 4f9dcf7e5..19c0fa6d2 100644 --- a/sc-poc/pkg/client/clientset/versioned/typed/core/v1alpha1/fake/fake_core_client.go +++ b/sc-poc/pkg/client/clientset/versioned/typed/core/v1alpha1/fake/fake_core_client.go @@ -48,6 +48,10 @@ func (c *FakeCoreV1alpha1) OpenAPISources(namespace string) v1alpha1.OpenAPISour return &FakeOpenAPISources{c, namespace} } +func (c *FakeCoreV1alpha1) PubsubChannels(namespace string) v1alpha1.PubsubChannelInterface { + return &FakePubsubChannels{c, namespace} +} + // RESTClient returns a RESTClient that is used to communicate // with API server by this client implementation. func (c *FakeCoreV1alpha1) RESTClient() rest.Interface { diff --git a/sc-poc/pkg/client/clientset/versioned/typed/core/v1alpha1/fake/fake_pubsubchannel.go b/sc-poc/pkg/client/clientset/versioned/typed/core/v1alpha1/fake/fake_pubsubchannel.go new file mode 100644 index 000000000..58fb9c60a --- /dev/null +++ b/sc-poc/pkg/client/clientset/versioned/typed/core/v1alpha1/fake/fake_pubsubchannel.go @@ -0,0 +1,142 @@ +/* +Copyright The Space Cloud Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Code generated by client-gen. DO NOT EDIT. + +package fake + +import ( + "context" + + v1alpha1 "github.com/spacecloud-io/space-cloud/pkg/apis/core/v1alpha1" + v1 "k8s.io/apimachinery/pkg/apis/meta/v1" + labels "k8s.io/apimachinery/pkg/labels" + schema "k8s.io/apimachinery/pkg/runtime/schema" + types "k8s.io/apimachinery/pkg/types" + watch "k8s.io/apimachinery/pkg/watch" + testing "k8s.io/client-go/testing" +) + +// FakePubsubChannels implements PubsubChannelInterface +type FakePubsubChannels struct { + Fake *FakeCoreV1alpha1 + ns string +} + +var pubsubchannelsResource = schema.GroupVersionResource{Group: "core.space-cloud.io", Version: "v1alpha1", Resource: "pubsubchannels"} + +var pubsubchannelsKind = schema.GroupVersionKind{Group: "core.space-cloud.io", Version: "v1alpha1", Kind: "PubsubChannel"} + +// Get takes name of the pubsubChannel, and returns the corresponding pubsubChannel object, and an error if there is any. +func (c *FakePubsubChannels) Get(ctx context.Context, name string, options v1.GetOptions) (result *v1alpha1.PubsubChannel, err error) { + obj, err := c.Fake. + Invokes(testing.NewGetAction(pubsubchannelsResource, c.ns, name), &v1alpha1.PubsubChannel{}) + + if obj == nil { + return nil, err + } + return obj.(*v1alpha1.PubsubChannel), err +} + +// List takes label and field selectors, and returns the list of PubsubChannels that match those selectors. +func (c *FakePubsubChannels) List(ctx context.Context, opts v1.ListOptions) (result *v1alpha1.PubsubChannelList, err error) { + obj, err := c.Fake. + Invokes(testing.NewListAction(pubsubchannelsResource, pubsubchannelsKind, c.ns, opts), &v1alpha1.PubsubChannelList{}) + + if obj == nil { + return nil, err + } + + label, _, _ := testing.ExtractFromListOptions(opts) + if label == nil { + label = labels.Everything() + } + list := &v1alpha1.PubsubChannelList{ListMeta: obj.(*v1alpha1.PubsubChannelList).ListMeta} + for _, item := range obj.(*v1alpha1.PubsubChannelList).Items { + if label.Matches(labels.Set(item.Labels)) { + list.Items = append(list.Items, item) + } + } + return list, err +} + +// Watch returns a watch.Interface that watches the requested pubsubChannels. +func (c *FakePubsubChannels) Watch(ctx context.Context, opts v1.ListOptions) (watch.Interface, error) { + return c.Fake. + InvokesWatch(testing.NewWatchAction(pubsubchannelsResource, c.ns, opts)) + +} + +// Create takes the representation of a pubsubChannel and creates it. Returns the server's representation of the pubsubChannel, and an error, if there is any. +func (c *FakePubsubChannels) Create(ctx context.Context, pubsubChannel *v1alpha1.PubsubChannel, opts v1.CreateOptions) (result *v1alpha1.PubsubChannel, err error) { + obj, err := c.Fake. + Invokes(testing.NewCreateAction(pubsubchannelsResource, c.ns, pubsubChannel), &v1alpha1.PubsubChannel{}) + + if obj == nil { + return nil, err + } + return obj.(*v1alpha1.PubsubChannel), err +} + +// Update takes the representation of a pubsubChannel and updates it. Returns the server's representation of the pubsubChannel, and an error, if there is any. +func (c *FakePubsubChannels) Update(ctx context.Context, pubsubChannel *v1alpha1.PubsubChannel, opts v1.UpdateOptions) (result *v1alpha1.PubsubChannel, err error) { + obj, err := c.Fake. + Invokes(testing.NewUpdateAction(pubsubchannelsResource, c.ns, pubsubChannel), &v1alpha1.PubsubChannel{}) + + if obj == nil { + return nil, err + } + return obj.(*v1alpha1.PubsubChannel), err +} + +// UpdateStatus was generated because the type contains a Status member. +// Add a +genclient:noStatus comment above the type to avoid generating UpdateStatus(). +func (c *FakePubsubChannels) UpdateStatus(ctx context.Context, pubsubChannel *v1alpha1.PubsubChannel, opts v1.UpdateOptions) (*v1alpha1.PubsubChannel, error) { + obj, err := c.Fake. + Invokes(testing.NewUpdateSubresourceAction(pubsubchannelsResource, "status", c.ns, pubsubChannel), &v1alpha1.PubsubChannel{}) + + if obj == nil { + return nil, err + } + return obj.(*v1alpha1.PubsubChannel), err +} + +// Delete takes name of the pubsubChannel and deletes it. Returns an error if one occurs. +func (c *FakePubsubChannels) Delete(ctx context.Context, name string, opts v1.DeleteOptions) error { + _, err := c.Fake. + Invokes(testing.NewDeleteActionWithOptions(pubsubchannelsResource, c.ns, name, opts), &v1alpha1.PubsubChannel{}) + + return err +} + +// DeleteCollection deletes a collection of objects. +func (c *FakePubsubChannels) DeleteCollection(ctx context.Context, opts v1.DeleteOptions, listOpts v1.ListOptions) error { + action := testing.NewDeleteCollectionAction(pubsubchannelsResource, c.ns, listOpts) + + _, err := c.Fake.Invokes(action, &v1alpha1.PubsubChannelList{}) + return err +} + +// Patch applies the patch and returns the patched pubsubChannel. +func (c *FakePubsubChannels) Patch(ctx context.Context, name string, pt types.PatchType, data []byte, opts v1.PatchOptions, subresources ...string) (result *v1alpha1.PubsubChannel, err error) { + obj, err := c.Fake. + Invokes(testing.NewPatchSubresourceAction(pubsubchannelsResource, c.ns, name, pt, data, subresources...), &v1alpha1.PubsubChannel{}) + + if obj == nil { + return nil, err + } + return obj.(*v1alpha1.PubsubChannel), err +} diff --git a/sc-poc/pkg/client/clientset/versioned/typed/core/v1alpha1/generated_expansion.go b/sc-poc/pkg/client/clientset/versioned/typed/core/v1alpha1/generated_expansion.go index 43d530e51..20e3a6b27 100644 --- a/sc-poc/pkg/client/clientset/versioned/typed/core/v1alpha1/generated_expansion.go +++ b/sc-poc/pkg/client/clientset/versioned/typed/core/v1alpha1/generated_expansion.go @@ -27,3 +27,5 @@ type JwtHSASecretExpansion interface{} type OPAPolicyExpansion interface{} type OpenAPISourceExpansion interface{} + +type PubsubChannelExpansion interface{} diff --git a/sc-poc/pkg/client/clientset/versioned/typed/core/v1alpha1/pubsubchannel.go b/sc-poc/pkg/client/clientset/versioned/typed/core/v1alpha1/pubsubchannel.go new file mode 100644 index 000000000..70f6ec6bd --- /dev/null +++ b/sc-poc/pkg/client/clientset/versioned/typed/core/v1alpha1/pubsubchannel.go @@ -0,0 +1,195 @@ +/* +Copyright The Space Cloud Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Code generated by client-gen. DO NOT EDIT. + +package v1alpha1 + +import ( + "context" + "time" + + v1alpha1 "github.com/spacecloud-io/space-cloud/pkg/apis/core/v1alpha1" + scheme "github.com/spacecloud-io/space-cloud/pkg/client/clientset/versioned/scheme" + v1 "k8s.io/apimachinery/pkg/apis/meta/v1" + types "k8s.io/apimachinery/pkg/types" + watch "k8s.io/apimachinery/pkg/watch" + rest "k8s.io/client-go/rest" +) + +// PubsubChannelsGetter has a method to return a PubsubChannelInterface. +// A group's client should implement this interface. +type PubsubChannelsGetter interface { + PubsubChannels(namespace string) PubsubChannelInterface +} + +// PubsubChannelInterface has methods to work with PubsubChannel resources. +type PubsubChannelInterface interface { + Create(ctx context.Context, pubsubChannel *v1alpha1.PubsubChannel, opts v1.CreateOptions) (*v1alpha1.PubsubChannel, error) + Update(ctx context.Context, pubsubChannel *v1alpha1.PubsubChannel, opts v1.UpdateOptions) (*v1alpha1.PubsubChannel, error) + UpdateStatus(ctx context.Context, pubsubChannel *v1alpha1.PubsubChannel, opts v1.UpdateOptions) (*v1alpha1.PubsubChannel, error) + Delete(ctx context.Context, name string, opts v1.DeleteOptions) error + DeleteCollection(ctx context.Context, opts v1.DeleteOptions, listOpts v1.ListOptions) error + Get(ctx context.Context, name string, opts v1.GetOptions) (*v1alpha1.PubsubChannel, error) + List(ctx context.Context, opts v1.ListOptions) (*v1alpha1.PubsubChannelList, error) + Watch(ctx context.Context, opts v1.ListOptions) (watch.Interface, error) + Patch(ctx context.Context, name string, pt types.PatchType, data []byte, opts v1.PatchOptions, subresources ...string) (result *v1alpha1.PubsubChannel, err error) + PubsubChannelExpansion +} + +// pubsubChannels implements PubsubChannelInterface +type pubsubChannels struct { + client rest.Interface + ns string +} + +// newPubsubChannels returns a PubsubChannels +func newPubsubChannels(c *CoreV1alpha1Client, namespace string) *pubsubChannels { + return &pubsubChannels{ + client: c.RESTClient(), + ns: namespace, + } +} + +// Get takes name of the pubsubChannel, and returns the corresponding pubsubChannel object, and an error if there is any. +func (c *pubsubChannels) Get(ctx context.Context, name string, options v1.GetOptions) (result *v1alpha1.PubsubChannel, err error) { + result = &v1alpha1.PubsubChannel{} + err = c.client.Get(). + Namespace(c.ns). + Resource("pubsubchannels"). + Name(name). + VersionedParams(&options, scheme.ParameterCodec). + Do(ctx). + Into(result) + return +} + +// List takes label and field selectors, and returns the list of PubsubChannels that match those selectors. +func (c *pubsubChannels) List(ctx context.Context, opts v1.ListOptions) (result *v1alpha1.PubsubChannelList, err error) { + var timeout time.Duration + if opts.TimeoutSeconds != nil { + timeout = time.Duration(*opts.TimeoutSeconds) * time.Second + } + result = &v1alpha1.PubsubChannelList{} + err = c.client.Get(). + Namespace(c.ns). + Resource("pubsubchannels"). + VersionedParams(&opts, scheme.ParameterCodec). + Timeout(timeout). + Do(ctx). + Into(result) + return +} + +// Watch returns a watch.Interface that watches the requested pubsubChannels. +func (c *pubsubChannels) Watch(ctx context.Context, opts v1.ListOptions) (watch.Interface, error) { + var timeout time.Duration + if opts.TimeoutSeconds != nil { + timeout = time.Duration(*opts.TimeoutSeconds) * time.Second + } + opts.Watch = true + return c.client.Get(). + Namespace(c.ns). + Resource("pubsubchannels"). + VersionedParams(&opts, scheme.ParameterCodec). + Timeout(timeout). + Watch(ctx) +} + +// Create takes the representation of a pubsubChannel and creates it. Returns the server's representation of the pubsubChannel, and an error, if there is any. +func (c *pubsubChannels) Create(ctx context.Context, pubsubChannel *v1alpha1.PubsubChannel, opts v1.CreateOptions) (result *v1alpha1.PubsubChannel, err error) { + result = &v1alpha1.PubsubChannel{} + err = c.client.Post(). + Namespace(c.ns). + Resource("pubsubchannels"). + VersionedParams(&opts, scheme.ParameterCodec). + Body(pubsubChannel). + Do(ctx). + Into(result) + return +} + +// Update takes the representation of a pubsubChannel and updates it. Returns the server's representation of the pubsubChannel, and an error, if there is any. +func (c *pubsubChannels) Update(ctx context.Context, pubsubChannel *v1alpha1.PubsubChannel, opts v1.UpdateOptions) (result *v1alpha1.PubsubChannel, err error) { + result = &v1alpha1.PubsubChannel{} + err = c.client.Put(). + Namespace(c.ns). + Resource("pubsubchannels"). + Name(pubsubChannel.Name). + VersionedParams(&opts, scheme.ParameterCodec). + Body(pubsubChannel). + Do(ctx). + Into(result) + return +} + +// UpdateStatus was generated because the type contains a Status member. +// Add a +genclient:noStatus comment above the type to avoid generating UpdateStatus(). +func (c *pubsubChannels) UpdateStatus(ctx context.Context, pubsubChannel *v1alpha1.PubsubChannel, opts v1.UpdateOptions) (result *v1alpha1.PubsubChannel, err error) { + result = &v1alpha1.PubsubChannel{} + err = c.client.Put(). + Namespace(c.ns). + Resource("pubsubchannels"). + Name(pubsubChannel.Name). + SubResource("status"). + VersionedParams(&opts, scheme.ParameterCodec). + Body(pubsubChannel). + Do(ctx). + Into(result) + return +} + +// Delete takes name of the pubsubChannel and deletes it. Returns an error if one occurs. +func (c *pubsubChannels) Delete(ctx context.Context, name string, opts v1.DeleteOptions) error { + return c.client.Delete(). + Namespace(c.ns). + Resource("pubsubchannels"). + Name(name). + Body(&opts). + Do(ctx). + Error() +} + +// DeleteCollection deletes a collection of objects. +func (c *pubsubChannels) DeleteCollection(ctx context.Context, opts v1.DeleteOptions, listOpts v1.ListOptions) error { + var timeout time.Duration + if listOpts.TimeoutSeconds != nil { + timeout = time.Duration(*listOpts.TimeoutSeconds) * time.Second + } + return c.client.Delete(). + Namespace(c.ns). + Resource("pubsubchannels"). + VersionedParams(&listOpts, scheme.ParameterCodec). + Timeout(timeout). + Body(&opts). + Do(ctx). + Error() +} + +// Patch applies the patch and returns the patched pubsubChannel. +func (c *pubsubChannels) Patch(ctx context.Context, name string, pt types.PatchType, data []byte, opts v1.PatchOptions, subresources ...string) (result *v1alpha1.PubsubChannel, err error) { + result = &v1alpha1.PubsubChannel{} + err = c.client.Patch(pt). + Namespace(c.ns). + Resource("pubsubchannels"). + Name(name). + SubResource(subresources...). + VersionedParams(&opts, scheme.ParameterCodec). + Body(data). + Do(ctx). + Into(result) + return +} diff --git a/sc-poc/pkg/client/informers/externalversions/core/v1alpha1/interface.go b/sc-poc/pkg/client/informers/externalversions/core/v1alpha1/interface.go index ccc8c0e9c..04d5bb6b3 100644 --- a/sc-poc/pkg/client/informers/externalversions/core/v1alpha1/interface.go +++ b/sc-poc/pkg/client/informers/externalversions/core/v1alpha1/interface.go @@ -34,6 +34,8 @@ type Interface interface { OPAPolicies() OPAPolicyInformer // OpenAPISources returns a OpenAPISourceInformer. OpenAPISources() OpenAPISourceInformer + // PubsubChannels returns a PubsubChannelInformer. + PubsubChannels() PubsubChannelInformer } type version struct { @@ -71,3 +73,8 @@ func (v *version) OPAPolicies() OPAPolicyInformer { func (v *version) OpenAPISources() OpenAPISourceInformer { return &openAPISourceInformer{factory: v.factory, namespace: v.namespace, tweakListOptions: v.tweakListOptions} } + +// PubsubChannels returns a PubsubChannelInformer. +func (v *version) PubsubChannels() PubsubChannelInformer { + return &pubsubChannelInformer{factory: v.factory, namespace: v.namespace, tweakListOptions: v.tweakListOptions} +} diff --git a/sc-poc/pkg/client/informers/externalversions/core/v1alpha1/pubsubchannel.go b/sc-poc/pkg/client/informers/externalversions/core/v1alpha1/pubsubchannel.go new file mode 100644 index 000000000..c15ad4cef --- /dev/null +++ b/sc-poc/pkg/client/informers/externalversions/core/v1alpha1/pubsubchannel.go @@ -0,0 +1,90 @@ +/* +Copyright The Space Cloud Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Code generated by informer-gen. DO NOT EDIT. + +package v1alpha1 + +import ( + "context" + time "time" + + corev1alpha1 "github.com/spacecloud-io/space-cloud/pkg/apis/core/v1alpha1" + versioned "github.com/spacecloud-io/space-cloud/pkg/client/clientset/versioned" + internalinterfaces "github.com/spacecloud-io/space-cloud/pkg/client/informers/externalversions/internalinterfaces" + v1alpha1 "github.com/spacecloud-io/space-cloud/pkg/client/listers/core/v1alpha1" + v1 "k8s.io/apimachinery/pkg/apis/meta/v1" + runtime "k8s.io/apimachinery/pkg/runtime" + watch "k8s.io/apimachinery/pkg/watch" + cache "k8s.io/client-go/tools/cache" +) + +// PubsubChannelInformer provides access to a shared informer and lister for +// PubsubChannels. +type PubsubChannelInformer interface { + Informer() cache.SharedIndexInformer + Lister() v1alpha1.PubsubChannelLister +} + +type pubsubChannelInformer struct { + factory internalinterfaces.SharedInformerFactory + tweakListOptions internalinterfaces.TweakListOptionsFunc + namespace string +} + +// NewPubsubChannelInformer constructs a new informer for PubsubChannel type. +// Always prefer using an informer factory to get a shared informer instead of getting an independent +// one. This reduces memory footprint and number of connections to the server. +func NewPubsubChannelInformer(client versioned.Interface, namespace string, resyncPeriod time.Duration, indexers cache.Indexers) cache.SharedIndexInformer { + return NewFilteredPubsubChannelInformer(client, namespace, resyncPeriod, indexers, nil) +} + +// NewFilteredPubsubChannelInformer constructs a new informer for PubsubChannel type. +// Always prefer using an informer factory to get a shared informer instead of getting an independent +// one. This reduces memory footprint and number of connections to the server. +func NewFilteredPubsubChannelInformer(client versioned.Interface, namespace string, resyncPeriod time.Duration, indexers cache.Indexers, tweakListOptions internalinterfaces.TweakListOptionsFunc) cache.SharedIndexInformer { + return cache.NewSharedIndexInformer( + &cache.ListWatch{ + ListFunc: func(options v1.ListOptions) (runtime.Object, error) { + if tweakListOptions != nil { + tweakListOptions(&options) + } + return client.CoreV1alpha1().PubsubChannels(namespace).List(context.TODO(), options) + }, + WatchFunc: func(options v1.ListOptions) (watch.Interface, error) { + if tweakListOptions != nil { + tweakListOptions(&options) + } + return client.CoreV1alpha1().PubsubChannels(namespace).Watch(context.TODO(), options) + }, + }, + &corev1alpha1.PubsubChannel{}, + resyncPeriod, + indexers, + ) +} + +func (f *pubsubChannelInformer) defaultInformer(client versioned.Interface, resyncPeriod time.Duration) cache.SharedIndexInformer { + return NewFilteredPubsubChannelInformer(client, f.namespace, resyncPeriod, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}, f.tweakListOptions) +} + +func (f *pubsubChannelInformer) Informer() cache.SharedIndexInformer { + return f.factory.InformerFor(&corev1alpha1.PubsubChannel{}, f.defaultInformer) +} + +func (f *pubsubChannelInformer) Lister() v1alpha1.PubsubChannelLister { + return v1alpha1.NewPubsubChannelLister(f.Informer().GetIndexer()) +} diff --git a/sc-poc/pkg/client/informers/externalversions/generic.go b/sc-poc/pkg/client/informers/externalversions/generic.go index 906e1ff2b..c843bcd95 100644 --- a/sc-poc/pkg/client/informers/externalversions/generic.go +++ b/sc-poc/pkg/client/informers/externalversions/generic.go @@ -63,6 +63,8 @@ func (f *sharedInformerFactory) ForResource(resource schema.GroupVersionResource return &genericInformer{resource: resource.GroupResource(), informer: f.Core().V1alpha1().OPAPolicies().Informer()}, nil case v1alpha1.SchemeGroupVersion.WithResource("openapisources"): return &genericInformer{resource: resource.GroupResource(), informer: f.Core().V1alpha1().OpenAPISources().Informer()}, nil + case v1alpha1.SchemeGroupVersion.WithResource("pubsubchannels"): + return &genericInformer{resource: resource.GroupResource(), informer: f.Core().V1alpha1().PubsubChannels().Informer()}, nil } diff --git a/sc-poc/pkg/client/listers/core/v1alpha1/expansion_generated.go b/sc-poc/pkg/client/listers/core/v1alpha1/expansion_generated.go index a8f9b219a..70fc46357 100644 --- a/sc-poc/pkg/client/listers/core/v1alpha1/expansion_generated.go +++ b/sc-poc/pkg/client/listers/core/v1alpha1/expansion_generated.go @@ -57,3 +57,11 @@ type OpenAPISourceListerExpansion interface{} // OpenAPISourceNamespaceListerExpansion allows custom methods to be added to // OpenAPISourceNamespaceLister. type OpenAPISourceNamespaceListerExpansion interface{} + +// PubsubChannelListerExpansion allows custom methods to be added to +// PubsubChannelLister. +type PubsubChannelListerExpansion interface{} + +// PubsubChannelNamespaceListerExpansion allows custom methods to be added to +// PubsubChannelNamespaceLister. +type PubsubChannelNamespaceListerExpansion interface{} diff --git a/sc-poc/pkg/client/listers/core/v1alpha1/pubsubchannel.go b/sc-poc/pkg/client/listers/core/v1alpha1/pubsubchannel.go new file mode 100644 index 000000000..0abf6879d --- /dev/null +++ b/sc-poc/pkg/client/listers/core/v1alpha1/pubsubchannel.go @@ -0,0 +1,99 @@ +/* +Copyright The Space Cloud Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Code generated by lister-gen. DO NOT EDIT. + +package v1alpha1 + +import ( + v1alpha1 "github.com/spacecloud-io/space-cloud/pkg/apis/core/v1alpha1" + "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/client-go/tools/cache" +) + +// PubsubChannelLister helps list PubsubChannels. +// All objects returned here must be treated as read-only. +type PubsubChannelLister interface { + // List lists all PubsubChannels in the indexer. + // Objects returned here must be treated as read-only. + List(selector labels.Selector) (ret []*v1alpha1.PubsubChannel, err error) + // PubsubChannels returns an object that can list and get PubsubChannels. + PubsubChannels(namespace string) PubsubChannelNamespaceLister + PubsubChannelListerExpansion +} + +// pubsubChannelLister implements the PubsubChannelLister interface. +type pubsubChannelLister struct { + indexer cache.Indexer +} + +// NewPubsubChannelLister returns a new PubsubChannelLister. +func NewPubsubChannelLister(indexer cache.Indexer) PubsubChannelLister { + return &pubsubChannelLister{indexer: indexer} +} + +// List lists all PubsubChannels in the indexer. +func (s *pubsubChannelLister) List(selector labels.Selector) (ret []*v1alpha1.PubsubChannel, err error) { + err = cache.ListAll(s.indexer, selector, func(m interface{}) { + ret = append(ret, m.(*v1alpha1.PubsubChannel)) + }) + return ret, err +} + +// PubsubChannels returns an object that can list and get PubsubChannels. +func (s *pubsubChannelLister) PubsubChannels(namespace string) PubsubChannelNamespaceLister { + return pubsubChannelNamespaceLister{indexer: s.indexer, namespace: namespace} +} + +// PubsubChannelNamespaceLister helps list and get PubsubChannels. +// All objects returned here must be treated as read-only. +type PubsubChannelNamespaceLister interface { + // List lists all PubsubChannels in the indexer for a given namespace. + // Objects returned here must be treated as read-only. + List(selector labels.Selector) (ret []*v1alpha1.PubsubChannel, err error) + // Get retrieves the PubsubChannel from the indexer for a given namespace and name. + // Objects returned here must be treated as read-only. + Get(name string) (*v1alpha1.PubsubChannel, error) + PubsubChannelNamespaceListerExpansion +} + +// pubsubChannelNamespaceLister implements the PubsubChannelNamespaceLister +// interface. +type pubsubChannelNamespaceLister struct { + indexer cache.Indexer + namespace string +} + +// List lists all PubsubChannels in the indexer for a given namespace. +func (s pubsubChannelNamespaceLister) List(selector labels.Selector) (ret []*v1alpha1.PubsubChannel, err error) { + err = cache.ListAllByNamespace(s.indexer, s.namespace, selector, func(m interface{}) { + ret = append(ret, m.(*v1alpha1.PubsubChannel)) + }) + return ret, err +} + +// Get retrieves the PubsubChannel from the indexer for a given namespace and name. +func (s pubsubChannelNamespaceLister) Get(name string) (*v1alpha1.PubsubChannel, error) { + obj, exists, err := s.indexer.GetByKey(s.namespace + "/" + name) + if err != nil { + return nil, err + } + if !exists { + return nil, errors.NewNotFound(v1alpha1.Resource("pubsubchannel"), name) + } + return obj.(*v1alpha1.PubsubChannel), nil +} diff --git a/sc-poc/sources/pubsub_channel/source.go b/sc-poc/sources/pubsub_channel/source.go new file mode 100644 index 000000000..72f3716f3 --- /dev/null +++ b/sc-poc/sources/pubsub_channel/source.go @@ -0,0 +1,42 @@ +package pubsub_channel + +import ( + "github.com/caddyserver/caddy/v2" + "github.com/spacecloud-io/space-cloud/managers/source" + "github.com/spacecloud-io/space-cloud/pkg/apis/core/v1alpha1" + "k8s.io/apimachinery/pkg/runtime/schema" +) + +var pubsubchannelsResource = schema.GroupVersionResource{Group: "core.space-cloud.io", Version: "v1alpha1", Resource: "pubsubchannels"} + +func init() { + source.RegisterSource(PubsubChannelSource{}, pubsubchannelsResource) +} + +// PubsubChannelSource describes a PubsubChannel source +type PubsubChannelSource struct { + v1alpha1.PubsubChannel +} + +// CaddyModule returns the Caddy module information. +func (PubsubChannelSource) CaddyModule() caddy.ModuleInfo { + return caddy.ModuleInfo{ + ID: caddy.ModuleID(source.GetModuleName(pubsubchannelsResource)), + New: func() caddy.Module { return new(PubsubChannelSource) }, + } +} + +// GetPriority returns the priority of the source. +func (s *PubsubChannelSource) GetPriority() int { + return 0 +} + +// GetProviders returns the providers this source is applicable for +func (s *PubsubChannelSource) GetProviders() []string { + return []string{"pubsub"} +} + +// Interface guard +var ( + _ source.Source = (*PubsubChannelSource)(nil) +) diff --git a/sc-poc/sources/sources.go b/sc-poc/sources/sources.go index 4430d37dd..a911b8cbe 100644 --- a/sc-poc/sources/sources.go +++ b/sc-poc/sources/sources.go @@ -5,4 +5,5 @@ import ( _ "github.com/spacecloud-io/space-cloud/sources/auth/opapolicy" _ "github.com/spacecloud-io/space-cloud/sources/compiledgraphql" _ "github.com/spacecloud-io/space-cloud/sources/graphql" + _ "github.com/spacecloud-io/space-cloud/sources/pubsub_channel" ) From 5d65c6f1ad6853bb43a9bf6e0f871f654381db02 Mon Sep 17 00:00:00 2001 From: Shubham Nazare Date: Sun, 14 May 2023 21:11:06 +0530 Subject: [PATCH 2/6] add interface for pubsub channel and revamp internal channels, schema validation and pubsub channel source type Signed-off-by: Shubham Nazare --- .../core.space-cloud.io_pubsubchannels.yaml | 37 ++++++------ sc-poc/modules/pubsub/app.go | 10 +--- sc-poc/modules/pubsub/asyncapi.go | 56 +++++++++++++------ sc-poc/modules/pubsub/operations.go | 32 +++-------- sc-poc/modules/pubsub/types.go | 16 +++++- sc-poc/modules/pubsub/websocket.go | 15 ++--- .../pkg/apis/core/v1alpha1/pubsub_channel.go | 6 +- .../core/v1alpha1/zz_generated.deepcopy.go | 20 +++---- sc-poc/sources/pubsub_channel/source.go | 11 ++++ 9 files changed, 109 insertions(+), 94 deletions(-) diff --git a/sc-poc/manifests/crds/core.space-cloud.io_pubsubchannels.yaml b/sc-poc/manifests/crds/core.space-cloud.io_pubsubchannels.yaml index d6ebac5ce..22aa80b84 100644 --- a/sc-poc/manifests/crds/core.space-cloud.io_pubsubchannels.yaml +++ b/sc-poc/manifests/crds/core.space-cloud.io_pubsubchannels.yaml @@ -38,27 +38,28 @@ spec: description: Channel describes the name of the pubsub channel type: string payload: - additionalProperties: - description: ChannelSchema defines the schema of the payload that - the channel accepts + description: Payload describes the payload schema of the channel + properties: + additionalProperties: + description: AdditionalProperties defines if the schema accepts + properties other than the ones mentioned + format: byte + type: string + items: + description: Items list the items of the array properties: + additionalProperties: {} + description: Properties list additional properties of the object + type: object + required: + description: Required specifies the required properties of the + object items: - description: Items list the items of the array - properties: - additionalProperties: {} - description: Properties list additional properties of the object - type: object - required: - description: Required specifies the required properties of the - object - items: - type: string - type: array - type: - description: Type defines the type of the data type: string - type: object - description: Payload describes the payload schema of the channel + type: array + type: + description: Type defines the type of the data + type: string type: object required: - channel diff --git a/sc-poc/modules/pubsub/app.go b/sc-poc/modules/pubsub/app.go index ac21d4773..26fc77137 100644 --- a/sc-poc/modules/pubsub/app.go +++ b/sc-poc/modules/pubsub/app.go @@ -8,7 +8,6 @@ import ( "github.com/spacecloud-io/space-cloud/managers/apis" "github.com/spacecloud-io/space-cloud/managers/source" "github.com/spacecloud-io/space-cloud/modules/pubsub/connectors" - "github.com/spacecloud-io/space-cloud/sources/pubsub_channel" ) var connectorPool = caddy.NewUsagePool() @@ -66,13 +65,8 @@ func (a *App) Provision(ctx caddy.Context) error { sourceMan := sourceManT.(*source.App) sources := sourceMan.GetSources("pubsub") for _, src := range sources { - channelSrc := src.(*pubsub_channel.PubsubChannelSource) - topic := Channel{ - Name: channelSrc.Spec.Channel, - Payload: ChannelPayload{ - Schema: channelSrc.Spec.Payload, - }, - } + channelSrc := src.(Source) + topic := channelSrc.GetChannel() a.channels = append(a.channels, topic) } diff --git a/sc-poc/modules/pubsub/asyncapi.go b/sc-poc/modules/pubsub/asyncapi.go index d6c676db1..db8cc61d6 100644 --- a/sc-poc/modules/pubsub/asyncapi.go +++ b/sc-poc/modules/pubsub/asyncapi.go @@ -84,29 +84,51 @@ func (a *App) generateASyncAPIDoc() *AsyncAPI { Subscribe: &Operation{ ID: "producerSubscribe" + getID(channelObj.Name), Message: MessageOneOrMany{ - MessageEntity: MessageEntity{ - Name: "Acknowledgement", - ContentType: "application/json", - Payload: map[string]interface{}{ - "type": "object", - "properties": map[string]interface{}{ - "event": map[string]interface{}{ - "type": "string", + OneOf: []MessageEntity{ + { + Name: "Acknowledgement", + ContentType: "application/json", + Payload: map[string]interface{}{ + "type": "object", + "properties": map[string]interface{}{ + "event": map[string]interface{}{ + "type": "string", + }, + "data": map[string]interface{}{ + "type": "object", + "properties": map[string]interface{}{ + "id": map[string]interface{}{ + "type": "string", + }, + "ack": map[string]interface{}{ + "type": "boolean", + }, + }, + "required": []string{"id", "ack"}, + }, }, - "data": map[string]interface{}{ - "type": "object", - "properties": map[string]interface{}{ - "id": map[string]interface{}{ + "required": []string{"event", "data"}, + }, + }, + { + Name: "Error", + ContentType: "application/json", + Payload: map[string]interface{}{ + "type": "object", + "properties": map[string]interface{}{ + "message": map[string]interface{}{ + "type": "string", + }, + "errors": map[string]interface{}{ + "type": "array", + "items": map[string]interface{}{ "type": "string", }, - "ack": map[string]interface{}{ - "type": "boolean", - }, + "required": []string{"message"}, }, - "required": []string{"id", "ack"}, }, + "required": []string{"event", "data"}, }, - "required": []string{"event", "data"}, }, }, }, diff --git a/sc-poc/modules/pubsub/operations.go b/sc-poc/modules/pubsub/operations.go index f254739f2..4acd81a83 100644 --- a/sc-poc/modules/pubsub/operations.go +++ b/sc-poc/modules/pubsub/operations.go @@ -3,6 +3,7 @@ package pubsub import ( "context" "encoding/json" + "fmt" "strings" "github.com/ThreeDotsLabs/watermill/message" @@ -34,10 +35,9 @@ func (a *App) createInternalChannels() { openapiProvisionChannel := Channel{ Name: "openapi-provision", Payload: ChannelPayload{ - Schema: map[string]*v1alpha1.ChannelSchema{ - "doc": { - Type: "string", - }, + Schema: &v1alpha1.ChannelSchema{ + Type: "object", + AdditionalProperties: json.RawMessage(fmt.Sprintf(`%t`, true)), }, }, } @@ -45,19 +45,9 @@ func (a *App) createInternalChannels() { asyncapiProvisionChannel := Channel{ Name: "asyncapi-provision", Payload: ChannelPayload{ - Schema: map[string]*v1alpha1.ChannelSchema{ - "doc": { - Type: "object", - Properties: map[string]*v1alpha1.ChannelSchema{ - "name": { - Type: "string", - }, - "age": { - Type: "integer", - }, - }, - Required: []string{"name"}, - }, + Schema: &v1alpha1.ChannelSchema{ + Type: "object", + AdditionalProperties: json.RawMessage(fmt.Sprintf(`%t`, true)), }, }, } @@ -68,14 +58,6 @@ func (a *App) createInternalChannels() { func (a *App) Channels() ChannelsWithSchema { channels := ChannelsWithSchema{ Channels: make(map[string]Channel), - Components: &Components{ - Schemas: map[string]interface{}{ - "APIManMsg": map[string]interface{}{ - "type": "object", - "additionalProperties": true, - }, - }, - }, } for _, topic := range a.channels { diff --git a/sc-poc/modules/pubsub/types.go b/sc-poc/modules/pubsub/types.go index d6ddd5ad8..07ec8dc84 100644 --- a/sc-poc/modules/pubsub/types.go +++ b/sc-poc/modules/pubsub/types.go @@ -54,9 +54,9 @@ type Channel struct { // ChannelPayload define channel's payload type ChannelPayload struct { - Schema map[string]*v1alpha1.ChannelSchema `json:"schema,omitempty"` - Example interface{} `json:"example,omitempty"` - Examples []interface{} `json:"examples,omitempty"` + Schema *v1alpha1.ChannelSchema `json:"schema,omitempty"` + Example interface{} `json:"example,omitempty"` + Examples []interface{} `json:"examples,omitempty"` } // Components stores the components for the schema refs @@ -122,3 +122,13 @@ type ServerItem struct { type AsyncAPIComponents struct { Schemas map[string]interface{} `json:"schemas,omitempty"` } + +// WebsocketErrorMessage defines the error message in websocket connection +type WebsocketErrorMessage struct { + Message string `json:"message"` + Errors []string `json:"errors,omitempty"` +} + +type Source interface { + GetChannel() Channel +} diff --git a/sc-poc/modules/pubsub/websocket.go b/sc-poc/modules/pubsub/websocket.go index 01657fdfe..39ef8654f 100644 --- a/sc-poc/modules/pubsub/websocket.go +++ b/sc-poc/modules/pubsub/websocket.go @@ -15,7 +15,6 @@ import ( "go.uber.org/zap" "github.com/spacecloud-io/space-cloud/managers/apis" - "github.com/spacecloud-io/space-cloud/pkg/apis/core/v1alpha1" ) // TODO: channels may or may not have prefix slash @@ -28,11 +27,7 @@ func (a *App) GetAPIRoutes() apis.APIs { // getPublishAPI creates a websocket API for sending messages in the channel func (a *App) getPublisherAPI(channelPath string, channel Channel) *apis.API { // Create a schema validator for incoming messages - channelSchema := v1alpha1.ChannelSchema{ - Type: "object", - Properties: make(map[string]*v1alpha1.ChannelSchema), - } - channelSchema.Properties = channel.Payload.Schema + channelSchema := channel.Payload.Schema schemaLoader := gojsonschema.NewGoLoader(channelSchema) schemaValidator, err := gojsonschema.NewSchema(schemaLoader) if err != nil { @@ -89,9 +84,10 @@ func (a *App) getPublisherAPI(channelPath string, channel Channel) *apis.API { } if !result.Valid() { - var errMsgs []string + var errMsgs WebsocketErrorMessage + errMsgs.Message = "Payload of invalid format provided" for _, desc := range result.Errors() { - errMsgs = append(errMsgs, fmt.Sprint(desc)) + errMsgs.Errors = append(errMsgs.Errors, fmt.Sprint(desc)) } b, _ := json.Marshal(errMsgs) @@ -102,8 +98,7 @@ func (a *App) getPublisherAPI(channelPath string, channel Channel) *apis.API { continue } - err = a.Publish(channel.Name, pubMsg, PublishOptions{}) - if err != nil { + if err := a.Publish(channel.Name, pubMsg, PublishOptions{}); err != nil { a.logger.Error("could not publish client message", zap.String("channel", channel.Name), zap.Error(err)) } } diff --git a/sc-poc/pkg/apis/core/v1alpha1/pubsub_channel.go b/sc-poc/pkg/apis/core/v1alpha1/pubsub_channel.go index a9e422ca0..a3f50d74f 100644 --- a/sc-poc/pkg/apis/core/v1alpha1/pubsub_channel.go +++ b/sc-poc/pkg/apis/core/v1alpha1/pubsub_channel.go @@ -1,6 +1,8 @@ package v1alpha1 import ( + "encoding/json" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) @@ -23,7 +25,7 @@ type PubsubChannelSpec struct { // Channel describes the name of the pubsub channel Channel string `json:"channel"` // Payload describes the payload schema of the channel - Payload map[string]*ChannelSchema `json:"payload,omitempty"` + Payload *ChannelSchema `json:"payload,omitempty"` } // ChannelSchema defines the schema of the payload that the channel accepts @@ -38,6 +40,8 @@ type ChannelSchema struct { Properties map[string]*ChannelSchema `json:"properties,omitempty"` // Required specifies the required properties of the object Required []string `json:"required,omitempty"` + // AdditionalProperties defines if the schema accepts properties other than the ones mentioned + AdditionalProperties json.RawMessage `json:"additionalProperties,omitempty"` } // PubsubChannel defines the observed state of the pubsub channel diff --git a/sc-poc/pkg/apis/core/v1alpha1/zz_generated.deepcopy.go b/sc-poc/pkg/apis/core/v1alpha1/zz_generated.deepcopy.go index 8c82e59df..35822a67b 100644 --- a/sc-poc/pkg/apis/core/v1alpha1/zz_generated.deepcopy.go +++ b/sc-poc/pkg/apis/core/v1alpha1/zz_generated.deepcopy.go @@ -6,6 +6,7 @@ package v1alpha1 import ( + "encoding/json" "k8s.io/apimachinery/pkg/runtime" ) @@ -62,6 +63,11 @@ func (in *ChannelSchema) DeepCopyInto(out *ChannelSchema) { *out = make([]string, len(*in)) copy(*out, *in) } + if in.AdditionalProperties != nil { + in, out := &in.AdditionalProperties, &out.AdditionalProperties + *out = make(json.RawMessage, len(*in)) + copy(*out, *in) + } } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ChannelSchema. @@ -689,18 +695,8 @@ func (in *PubsubChannelSpec) DeepCopyInto(out *PubsubChannelSpec) { *out = *in if in.Payload != nil { in, out := &in.Payload, &out.Payload - *out = make(map[string]*ChannelSchema, len(*in)) - for key, val := range *in { - var outVal *ChannelSchema - if val == nil { - (*out)[key] = nil - } else { - in, out := &val, &outVal - *out = new(ChannelSchema) - (*in).DeepCopyInto(*out) - } - (*out)[key] = outVal - } + *out = new(ChannelSchema) + (*in).DeepCopyInto(*out) } } diff --git a/sc-poc/sources/pubsub_channel/source.go b/sc-poc/sources/pubsub_channel/source.go index 72f3716f3..58e5a8e41 100644 --- a/sc-poc/sources/pubsub_channel/source.go +++ b/sc-poc/sources/pubsub_channel/source.go @@ -3,6 +3,7 @@ package pubsub_channel import ( "github.com/caddyserver/caddy/v2" "github.com/spacecloud-io/space-cloud/managers/source" + "github.com/spacecloud-io/space-cloud/modules/pubsub" "github.com/spacecloud-io/space-cloud/pkg/apis/core/v1alpha1" "k8s.io/apimachinery/pkg/runtime/schema" ) @@ -36,7 +37,17 @@ func (s *PubsubChannelSource) GetProviders() []string { return []string{"pubsub"} } +func (s *PubsubChannelSource) GetChannel() pubsub.Channel { + return pubsub.Channel{ + Name: s.Spec.Channel, + Payload: pubsub.ChannelPayload{ + Schema: s.Spec.Payload, + }, + } +} + // Interface guard var ( _ source.Source = (*PubsubChannelSource)(nil) + _ pubsub.Source = (*PubsubChannelSource)(nil) ) From b492befd765c014d35e52ee8e5a490c677f6dbd8 Mon Sep 17 00:00:00 2001 From: Shubham Nazare Date: Wed, 24 May 2023 18:57:37 +0530 Subject: [PATCH 3/6] apply changes Signed-off-by: Shubham Nazare --- sc-poc/modules/pubsub/app.go | 9 +++++---- sc-poc/modules/pubsub/asyncapi.go | 5 ++++- sc-poc/modules/pubsub/types.go | 1 + 3 files changed, 10 insertions(+), 5 deletions(-) diff --git a/sc-poc/modules/pubsub/app.go b/sc-poc/modules/pubsub/app.go index 26fc77137..0e8b7886c 100644 --- a/sc-poc/modules/pubsub/app.go +++ b/sc-poc/modules/pubsub/app.go @@ -65,10 +65,11 @@ func (a *App) Provision(ctx caddy.Context) error { sourceMan := sourceManT.(*source.App) sources := sourceMan.GetSources("pubsub") for _, src := range sources { - channelSrc := src.(Source) - topic := channelSrc.GetChannel() - - a.channels = append(a.channels, topic) + channelSrc, ok := src.(Source) + if ok { + topic := channelSrc.GetChannel() + a.channels = append(a.channels, topic) + } } // Generate publish and subscribe API for each channel diff --git a/sc-poc/modules/pubsub/asyncapi.go b/sc-poc/modules/pubsub/asyncapi.go index db8cc61d6..d1e526d1b 100644 --- a/sc-poc/modules/pubsub/asyncapi.go +++ b/sc-poc/modules/pubsub/asyncapi.go @@ -111,11 +111,14 @@ func (a *App) generateASyncAPIDoc() *AsyncAPI { }, }, { - Name: "Error", + Name: "PublishError", ContentType: "application/json", Payload: map[string]interface{}{ "type": "object", "properties": map[string]interface{}{ + "id": map[string]interface{}{ + "type": "string", + }, "message": map[string]interface{}{ "type": "string", }, diff --git a/sc-poc/modules/pubsub/types.go b/sc-poc/modules/pubsub/types.go index 07ec8dc84..bf57224fa 100644 --- a/sc-poc/modules/pubsub/types.go +++ b/sc-poc/modules/pubsub/types.go @@ -125,6 +125,7 @@ type AsyncAPIComponents struct { // WebsocketErrorMessage defines the error message in websocket connection type WebsocketErrorMessage struct { + ID string `json:"id"` Message string `json:"message"` Errors []string `json:"errors,omitempty"` } From 04f857d91d2a11fd149bc4341f23a41bc477d81a Mon Sep 17 00:00:00 2001 From: Shubham Nazare Date: Thu, 25 May 2023 18:58:09 +0530 Subject: [PATCH 4/6] pubsub source implements interface Signed-off-by: Shubham Nazare --- sc-poc/sources/pubsub_channel/source.go | 1 + 1 file changed, 1 insertion(+) diff --git a/sc-poc/sources/pubsub_channel/source.go b/sc-poc/sources/pubsub_channel/source.go index 58e5a8e41..9fdaf120c 100644 --- a/sc-poc/sources/pubsub_channel/source.go +++ b/sc-poc/sources/pubsub_channel/source.go @@ -37,6 +37,7 @@ func (s *PubsubChannelSource) GetProviders() []string { return []string{"pubsub"} } +// GetChannel returns the channel of this source. func (s *PubsubChannelSource) GetChannel() pubsub.Channel { return pubsub.Channel{ Name: s.Spec.Channel, From bc9e7e87a491084284a0bc4d2443b5c0f3a621cd Mon Sep 17 00:00:00 2001 From: Shubham Nazare Date: Mon, 29 May 2023 19:28:18 +0530 Subject: [PATCH 5/6] merge publisherror with acknowledgement Signed-off-by: Shubham Nazare --- sc-poc/modules/pubsub/asyncapi.go | 33 ++++++++++--------------------- 1 file changed, 10 insertions(+), 23 deletions(-) diff --git a/sc-poc/modules/pubsub/asyncapi.go b/sc-poc/modules/pubsub/asyncapi.go index d1e526d1b..42df39bc9 100644 --- a/sc-poc/modules/pubsub/asyncapi.go +++ b/sc-poc/modules/pubsub/asyncapi.go @@ -103,6 +103,16 @@ func (a *App) generateASyncAPIDoc() *AsyncAPI { "ack": map[string]interface{}{ "type": "boolean", }, + "message": map[string]interface{}{ + "type": "string", + }, + "errors": map[string]interface{}{ + "type": "array", + "items": map[string]interface{}{ + "type": "string", + }, + "required": []string{"message"}, + }, }, "required": []string{"id", "ack"}, }, @@ -110,29 +120,6 @@ func (a *App) generateASyncAPIDoc() *AsyncAPI { "required": []string{"event", "data"}, }, }, - { - Name: "PublishError", - ContentType: "application/json", - Payload: map[string]interface{}{ - "type": "object", - "properties": map[string]interface{}{ - "id": map[string]interface{}{ - "type": "string", - }, - "message": map[string]interface{}{ - "type": "string", - }, - "errors": map[string]interface{}{ - "type": "array", - "items": map[string]interface{}{ - "type": "string", - }, - "required": []string{"message"}, - }, - }, - "required": []string{"event", "data"}, - }, - }, }, }, }, From 6f5a29119ac8b4fbd77ba79a1729058e885c93e8 Mon Sep 17 00:00:00 2001 From: Shubham Nazare Date: Wed, 31 May 2023 16:18:34 +0530 Subject: [PATCH 6/6] remove oneOf from producerSubscribe Signed-off-by: Shubham Nazare --- sc-poc/modules/pubsub/asyncapi.go | 56 +++++++++++++++---------------- 1 file changed, 27 insertions(+), 29 deletions(-) diff --git a/sc-poc/modules/pubsub/asyncapi.go b/sc-poc/modules/pubsub/asyncapi.go index 42df39bc9..9b6da7c6d 100644 --- a/sc-poc/modules/pubsub/asyncapi.go +++ b/sc-poc/modules/pubsub/asyncapi.go @@ -84,41 +84,39 @@ func (a *App) generateASyncAPIDoc() *AsyncAPI { Subscribe: &Operation{ ID: "producerSubscribe" + getID(channelObj.Name), Message: MessageOneOrMany{ - OneOf: []MessageEntity{ - { - Name: "Acknowledgement", - ContentType: "application/json", - Payload: map[string]interface{}{ - "type": "object", - "properties": map[string]interface{}{ - "event": map[string]interface{}{ - "type": "string", - }, - "data": map[string]interface{}{ - "type": "object", - "properties": map[string]interface{}{ - "id": map[string]interface{}{ - "type": "string", - }, - "ack": map[string]interface{}{ - "type": "boolean", - }, - "message": map[string]interface{}{ + MessageEntity: MessageEntity{ + Name: "Acknowledgement", + ContentType: "application/json", + Payload: map[string]interface{}{ + "type": "object", + "properties": map[string]interface{}{ + "event": map[string]interface{}{ + "type": "string", + }, + "data": map[string]interface{}{ + "type": "object", + "properties": map[string]interface{}{ + "id": map[string]interface{}{ + "type": "string", + }, + "ack": map[string]interface{}{ + "type": "boolean", + }, + "message": map[string]interface{}{ + "type": "string", + }, + "errors": map[string]interface{}{ + "type": "array", + "items": map[string]interface{}{ "type": "string", }, - "errors": map[string]interface{}{ - "type": "array", - "items": map[string]interface{}{ - "type": "string", - }, - "required": []string{"message"}, - }, + "required": []string{"message"}, }, - "required": []string{"id", "ack"}, }, + "required": []string{"id", "ack"}, }, - "required": []string{"event", "data"}, }, + "required": []string{"event", "data"}, }, }, },