From 6575ac53157ad73b0610da0fb04fa2a7be20366e Mon Sep 17 00:00:00 2001 From: Ning Sun Date: Mon, 12 Jul 2021 18:18:54 +0800 Subject: [PATCH 01/10] (chore) initial commit Signed-off-by: Ning Sun --- formats/avro/README.md | 4 + formats/avro/pom.xml | 128 ++++++++++++++++++ .../io.cloudevents.core.format.EventFormat | 1 + .../src/test/resources/v03/min.proto.json | 6 + .../test/resources/v1/binary_ext.proto.json | 12 ++ .../test/resources/v1/json_data.proto.json | 21 +++ .../v1/json_data_with_ext.proto.json | 30 ++++ .../avro/src/test/resources/v1/min.proto.json | 6 + .../test/resources/v1/text_data.proto.json | 18 +++ .../src/test/resources/v1/xml_data.proto.json | 18 +++ pom.xml | 1 + 11 files changed, 245 insertions(+) create mode 100644 formats/avro/README.md create mode 100644 formats/avro/pom.xml create mode 100644 formats/avro/src/main/resources/META-INF/services/io.cloudevents.core.format.EventFormat create mode 100644 formats/avro/src/test/resources/v03/min.proto.json create mode 100644 formats/avro/src/test/resources/v1/binary_ext.proto.json create mode 100644 formats/avro/src/test/resources/v1/json_data.proto.json create mode 100644 formats/avro/src/test/resources/v1/json_data_with_ext.proto.json create mode 100644 formats/avro/src/test/resources/v1/min.proto.json create mode 100644 formats/avro/src/test/resources/v1/text_data.proto.json create mode 100644 formats/avro/src/test/resources/v1/xml_data.proto.json diff --git a/formats/avro/README.md b/formats/avro/README.md new file mode 100644 index 000000000..d15345650 --- /dev/null +++ b/formats/avro/README.md @@ -0,0 +1,4 @@ +# CloudEvents Avro Format + +This project provides functionality for the Java SDK to handle the +[avro format](https://github.com/cloudevents/spec/blob/v1.0.1/avro-format.md). diff --git a/formats/avro/pom.xml b/formats/avro/pom.xml new file mode 100644 index 000000000..2e5f6f6ac --- /dev/null +++ b/formats/avro/pom.xml @@ -0,0 +1,128 @@ + + + + 4.0.0 + + + io.cloudevents + cloudevents-parent + 2.3.0-SNAPSHOT + ../../pom.xml + + + cloudevents-avro + CloudEvents - Avro + jar + + + 1.10.2 + io.cloudevents.formats.avro + + + + + + org.apache.avro + avro-maven-plugin + 1.10.2 + + + generate-sources + + schema + + + ${project.basedir}/src/main/avro/ + + ${project.build.directory}/generated-sources/java/ + + + + + + org.codehaus.mojo + build-helper-maven-plugin + + + add-source + generate-sources + + add-source + + + + ${project.build.directory}/generated-sources/java/ + + + + + + + + org.apache.maven.plugins + maven-compiler-plugin + + 1.8 + 1.8 + + + + + + + + io.cloudevents + cloudevents-core + ${project.version} + + + + org.apache.avro + avro + 1.10.2 + + + + + org.junit.jupiter + junit-jupiter + ${junit-jupiter.version} + test + + + + org.assertj + assertj-core + ${assertj-core.version} + test + + + + io.cloudevents + cloudevents-core + tests + test-jar + ${project.version} + test + + + + + diff --git a/formats/avro/src/main/resources/META-INF/services/io.cloudevents.core.format.EventFormat b/formats/avro/src/main/resources/META-INF/services/io.cloudevents.core.format.EventFormat new file mode 100644 index 000000000..31cb85d6d --- /dev/null +++ b/formats/avro/src/main/resources/META-INF/services/io.cloudevents.core.format.EventFormat @@ -0,0 +1 @@ +io.cloudevents.avro.AvroFormat diff --git a/formats/avro/src/test/resources/v03/min.proto.json b/formats/avro/src/test/resources/v03/min.proto.json new file mode 100644 index 000000000..79e252908 --- /dev/null +++ b/formats/avro/src/test/resources/v03/min.proto.json @@ -0,0 +1,6 @@ +{ + "specVersion": "0.3", + "id": "1", + "type": "mock.test", + "source": "http://localhost/source" +} diff --git a/formats/avro/src/test/resources/v1/binary_ext.proto.json b/formats/avro/src/test/resources/v1/binary_ext.proto.json new file mode 100644 index 000000000..34aa91400 --- /dev/null +++ b/formats/avro/src/test/resources/v1/binary_ext.proto.json @@ -0,0 +1,12 @@ +{ + "specVersion": "1.0", + "id": "1", + "type": "mock.test", + "source": "http://localhost/source", + "attributes" : { + "binary": { + "ceBytes" : "4P8ARKo=" + } + } + +} diff --git a/formats/avro/src/test/resources/v1/json_data.proto.json b/formats/avro/src/test/resources/v1/json_data.proto.json new file mode 100644 index 000000000..f3b7d3718 --- /dev/null +++ b/formats/avro/src/test/resources/v1/json_data.proto.json @@ -0,0 +1,21 @@ +{ + "specVersion": "1.0", + "id": "1", + "type": "mock.test", + "source": "http://localhost/source", + "attributes": { + "time": { + "ceTimestamp": "2018-04-26T14:48:09+02:00" + }, + "dataschema": { + "ceUri": "http://localhost/schema" + }, + "datacontenttype": { + "ceString": "application/json" + }, + "subject": { + "ceString": "sub" + } + }, + "textData": "{}" +} diff --git a/formats/avro/src/test/resources/v1/json_data_with_ext.proto.json b/formats/avro/src/test/resources/v1/json_data_with_ext.proto.json new file mode 100644 index 000000000..d88c21734 --- /dev/null +++ b/formats/avro/src/test/resources/v1/json_data_with_ext.proto.json @@ -0,0 +1,30 @@ +{ + "specVersion": "1.0", + "id": "1", + "type": "mock.test", + "source": "http://localhost/source", + "attributes": { + "dataschema": { + "ceUri": "http://localhost/schema" + }, + "datacontenttype": { + "ceString": "application/json" + }, + "subject": { + "ceString": "sub" + }, + "time": { + "ceTimestamp": "2018-04-26T14:48:09+02:00" + }, + "astring": { + "ceString": "aaa" + }, + "aboolean": { + "ceBoolean": true + }, + "anumber": { + "ceInteger": 10 + } + }, + "textData": "{}" +} diff --git a/formats/avro/src/test/resources/v1/min.proto.json b/formats/avro/src/test/resources/v1/min.proto.json new file mode 100644 index 000000000..fbe193671 --- /dev/null +++ b/formats/avro/src/test/resources/v1/min.proto.json @@ -0,0 +1,6 @@ +{ + "id": "1", + "source": "http://localhost/source", + "specVersion": "1.0", + "type": "mock.test" +} diff --git a/formats/avro/src/test/resources/v1/text_data.proto.json b/formats/avro/src/test/resources/v1/text_data.proto.json new file mode 100644 index 000000000..afac81638 --- /dev/null +++ b/formats/avro/src/test/resources/v1/text_data.proto.json @@ -0,0 +1,18 @@ +{ + "specVersion": "1.0", + "id": "1", + "type": "mock.test", + "source": "http://localhost/source", + "attributes": { + "time": { + "ceTimestamp": "2018-04-26T14:48:09+02:00" + }, + "subject": { + "ceString": "sub" + }, + "datacontenttype": { + "ceString": "text/plain" + } + }, + "textData": "Hello World Lorena!" +} diff --git a/formats/avro/src/test/resources/v1/xml_data.proto.json b/formats/avro/src/test/resources/v1/xml_data.proto.json new file mode 100644 index 000000000..b93ddbb68 --- /dev/null +++ b/formats/avro/src/test/resources/v1/xml_data.proto.json @@ -0,0 +1,18 @@ +{ + "specVersion": "1.0", + "id": "1", + "type": "mock.test", + "source": "http://localhost/source", + "attributes": { + "time": { + "ceTimestamp": "2018-04-26T14:48:09+02:00" + }, + "datacontenttype": { + "ceString": "application/xml" + }, + "subject": { + "ceString": "sub" + } + }, + "textData": "" +} diff --git a/pom.xml b/pom.xml index a124a66aa..4e8f961ed 100644 --- a/pom.xml +++ b/pom.xml @@ -70,6 +70,7 @@ core formats/json-jackson formats/protobuf + formats/avro amqp http/basic http/vertx From 662032e80b318416d1874b62d01e0a4c9352f6a4 Mon Sep 17 00:00:00 2001 From: Ning Sun Date: Tue, 13 Jul 2021 08:08:52 +0800 Subject: [PATCH 02/10] import avro schema Signed-off-by: Ning Sun --- formats/avro/pom.xml | 4 +- formats/avro/src/main/avro/spec.avsc | 64 ++++++++++++++++++++++++++++ 2 files changed, 66 insertions(+), 2 deletions(-) create mode 100644 formats/avro/src/main/avro/spec.avsc diff --git a/formats/avro/pom.xml b/formats/avro/pom.xml index 2e5f6f6ac..911a392b3 100644 --- a/formats/avro/pom.xml +++ b/formats/avro/pom.xml @@ -32,7 +32,7 @@ jar - 1.10.2 + 1.9.2 io.cloudevents.formats.avro @@ -96,7 +96,7 @@ org.apache.avro avro - 1.10.2 + ${avro.version} diff --git a/formats/avro/src/main/avro/spec.avsc b/formats/avro/src/main/avro/spec.avsc new file mode 100644 index 000000000..8d9b4994d --- /dev/null +++ b/formats/avro/src/main/avro/spec.avsc @@ -0,0 +1,64 @@ +{ + "namespace":"io.cloudevents", + "type":"record", + "name":"AvroCloudEvent", + "version":"1.0", + "doc":"Avro Event Format for CloudEvents", + "fields":[ + { + "name":"attribute", + "type":{ + "type":"map", + "values":[ + "null", + "boolean", + "int", + "string", + "bytes" + ] + } + }, + { + "name": "data", + "type": [ + "bytes", + "null", + "boolean", + { + "type": "map", + "values": [ + "null", + "boolean", + { + "type": "record", + "name": "AvroCloudEventData", + "doc": "Representation of a JSON Value", + "fields": [ + { + "name": "value", + "type": { + "type": "map", + "values": [ + "null", + "boolean", + { "type": "map", "values": "AvroCloudEventData" }, + { "type": "array", "items": "AvroCloudEventData" }, + "double", + "string" + ] + } + } + ] + }, + "double", + "string" + ] + }, + { "type": "array", "items": "AvroCloudEventData" }, + "double", + "string" + ] + } + ] +} + From 0b6494522bf433b3472cfa571f39dd905d89a8cb Mon Sep 17 00:00:00 2001 From: Ning Sun Date: Tue, 13 Jul 2021 22:09:32 +0800 Subject: [PATCH 03/10] wip add serializer and deserializer for avro Signed-off-by: Ning Sun --- .../io/cloudevents/avro/AvroDeserializer.java | 32 +++++++++ .../java/io/cloudevents/avro/AvroFormat.java | 68 +++++++++++++++++++ .../io/cloudevents/avro/AvroSerializer.java | 54 +++++++++++++++ 3 files changed, 154 insertions(+) create mode 100644 formats/avro/src/main/java/io/cloudevents/avro/AvroDeserializer.java create mode 100644 formats/avro/src/main/java/io/cloudevents/avro/AvroFormat.java create mode 100644 formats/avro/src/main/java/io/cloudevents/avro/AvroSerializer.java diff --git a/formats/avro/src/main/java/io/cloudevents/avro/AvroDeserializer.java b/formats/avro/src/main/java/io/cloudevents/avro/AvroDeserializer.java new file mode 100644 index 000000000..ad63689af --- /dev/null +++ b/formats/avro/src/main/java/io/cloudevents/avro/AvroDeserializer.java @@ -0,0 +1,32 @@ +/* + * Copyright 2018-Present The CloudEvents Authors + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package io.cloudevents.avro; + +import io.cloudevents.CloudEvent; +import io.cloudevents.SpecVersion; +import io.cloudevents.AvroCloudEvent; +import io.cloudevents.AvroCloudEventData; +import io.cloudevents.core.builder.CloudEventBuilder; + +public class AvroDeserializer { + + public static CloudEvent fromAvro(AvroCloudEvent avroCloudEvent) { + + } + +} diff --git a/formats/avro/src/main/java/io/cloudevents/avro/AvroFormat.java b/formats/avro/src/main/java/io/cloudevents/avro/AvroFormat.java new file mode 100644 index 000000000..51b70f6e8 --- /dev/null +++ b/formats/avro/src/main/java/io/cloudevents/avro/AvroFormat.java @@ -0,0 +1,68 @@ +/* + * Copyright 2018-Present The CloudEvents Authors + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package io.cloudevents.avro; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; + +import io.cloudevents.CloudEvent; +import io.cloudevents.CloudEventData; +import io.cloudevents.AvroCloudEvent; +import io.cloudevents.core.format.EventDeserializationException; +import io.cloudevents.core.format.EventFormat; +import io.cloudevents.core.format.EventSerializationException; +import io.cloudevents.rw.CloudEventDataMapper; + +public class AvroFormat implements EventFormat { + + public static final String AVRO_CONTENT_TYPE = "application/avro"; + + @Override + public byte[] serialize(CloudEvent event) throws EventSerializationException { + AvroCloudEvent avroCloudEvent = AvroSerializer.toAvro(event); + ByteArrayOutputStream output = new ByteArrayOutputStream(); + + try { + AvroCloudEvent.getEncoder().encode(avroCloudEvent, output); + } catch (IOException e) { + throw new EventSerializationException(e); + } + + return output.toByteArray(); + } + + @Override + public CloudEvent deserialize(byte[] bytes, CloudEventDataMapper mapper) + throws EventDeserializationException { + ByteArrayInputStream input = new ByteArrayInputStream(bytes); + + try { + AvroCloudEvent avroCloudEvent = AvroCloudEvent.getDecoder().decode(input); + + return AvroDeserializer.fromAvro(avroCloudEvent); + } catch (IOException e) { + throw new EventDeserializationException(e); + } + } + + @Override + public String serializedContentType() { + return AVRO_CONTENT_TYPE; + } +} diff --git a/formats/avro/src/main/java/io/cloudevents/avro/AvroSerializer.java b/formats/avro/src/main/java/io/cloudevents/avro/AvroSerializer.java new file mode 100644 index 000000000..ab78b5518 --- /dev/null +++ b/formats/avro/src/main/java/io/cloudevents/avro/AvroSerializer.java @@ -0,0 +1,54 @@ +/* + * Copyright 2018-Present The CloudEvents Authors + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package io.cloudevents.avro; + +import java.util.Map; +import java.util.HashMap; + +import io.cloudevents.CloudEvent; +import io.cloudevents.CloudEventData; +import io.cloudevents.AvroCloudEvent; +import io.cloudevents.AvroCloudEventData; + +public class AvroSerializer { + + public static final AvroCloudEvent toAvro(CloudEvent e) { + AvroCloudEvent avroCloudEvent = new AvroCloudEvent(); + + Map attrs = new HashMap<>(); + + attrs.put("type", e.getType()); + attrs.put("specversion", e.getSpecVersion().toString()); + attrs.put("id", e.getId()); + attrs.put("source", e.getSource()); + attrs.put("time", e.getTime()); + attrs.put("dataschema", e.getDataSchema()); + attrs.put("contenttype", AvroFormat.AVRO_CONTENT_TYPE); + attrs.put("datacontenttype", e.getDataContentType()); + + avroCloudEvent.setAttribute(attrs); + + // check datacontenttype + CloudEventData cloudEventData = e.getData(); + if (cloudEventData != null) { + avroCloudEvent.setData(cloudEventData.toBytes()); + } + + return avroCloudEvent; + } +} From b0f17b8114b535498e65ec10615696c35c4f02cc Mon Sep 17 00:00:00 2001 From: Ning Sun Date: Wed, 14 Jul 2021 23:58:03 +0800 Subject: [PATCH 04/10] wip improve serializer and deserializer Signed-off-by: Ning Sun --- .../io/cloudevents/avro/AvroDeserializer.java | 51 ++++++++++++++++++- .../io/cloudevents/avro/AvroSerializer.java | 20 +++++--- 2 files changed, 61 insertions(+), 10 deletions(-) diff --git a/formats/avro/src/main/java/io/cloudevents/avro/AvroDeserializer.java b/formats/avro/src/main/java/io/cloudevents/avro/AvroDeserializer.java index ad63689af..87f5fded7 100644 --- a/formats/avro/src/main/java/io/cloudevents/avro/AvroDeserializer.java +++ b/formats/avro/src/main/java/io/cloudevents/avro/AvroDeserializer.java @@ -17,16 +17,63 @@ package io.cloudevents.avro; +import java.util.Map; +import java.net.URI; +import java.time.Instant; +import java.time.OffsetDateTime; +import java.time.ZoneOffset; + import io.cloudevents.CloudEvent; +import io.cloudevents.CloudEventData; import io.cloudevents.SpecVersion; import io.cloudevents.AvroCloudEvent; import io.cloudevents.AvroCloudEventData; import io.cloudevents.core.builder.CloudEventBuilder; +import io.cloudevents.core.v1.CloudEventV1; +import io.cloudevents.rw.CloudEventRWException; +import io.cloudevents.rw.CloudEventReader; +import io.cloudevents.rw.CloudEventDataMapper; +import io.cloudevents.rw.CloudEventWriter; +import io.cloudevents.rw.CloudEventWriterFactory; + +public class AvroDeserializer implements CloudEventReader { + + private final AvroCloudEvent avroCloudEvent; + + public AvroDeserializer(AvroCloudEvent avroCloudEvent) { + this.avroCloudEvent = avroCloudEvent; + } + + @Override + public , R> R read(CloudEventWriterFactory writerFactory, + CloudEventDataMapper mapper) throws CloudEventRWException { + + Map avroCloudEventAttrs = this.avroCloudEvent.getAttribute(); + SpecVersion specVersion = SpecVersion.parse((String)avroCloudEventAttrs.get(CloudEventV1.SPECVERSION)); + final CloudEventWriter writer = writerFactory.create(specVersion); + + for (Map.Entry entry: avroCloudEventAttrs.entrySet()) { + String key = entry.getKey().toString(); -public class AvroDeserializer { + if (key.equals(CloudEventV1.TIME)) { + // OffsetDateTime + Long timeAsLong = (Long) entry.getValue(); + Instant timeAsInstant = Instant.ofEpochMilli(timeAsLong); + OffsetDateTime value = OffsetDateTime.ofInstant(timeAsInstant, ZoneOffset.UTC); + writer.withContextAttribute(key, value); - public static CloudEvent fromAvro(AvroCloudEvent avroCloudEvent) { + } else if (key.equals(CloudEventV1.DATASCHEMA)) { + // URI + URI value = URI.create((String) entry.getValue()); + writer.withContextAttribute(key, value); + } else { + // String + writer.withContextAttribute(key, (String) entry.getValue()); + } + } + // TOOD: data + return writer.end(); } } diff --git a/formats/avro/src/main/java/io/cloudevents/avro/AvroSerializer.java b/formats/avro/src/main/java/io/cloudevents/avro/AvroSerializer.java index ab78b5518..99a9c1de2 100644 --- a/formats/avro/src/main/java/io/cloudevents/avro/AvroSerializer.java +++ b/formats/avro/src/main/java/io/cloudevents/avro/AvroSerializer.java @@ -22,6 +22,7 @@ import io.cloudevents.CloudEvent; import io.cloudevents.CloudEventData; +import io.cloudevents.core.v1.CloudEventV1; import io.cloudevents.AvroCloudEvent; import io.cloudevents.AvroCloudEventData; @@ -32,14 +33,17 @@ public static final AvroCloudEvent toAvro(CloudEvent e) { Map attrs = new HashMap<>(); - attrs.put("type", e.getType()); - attrs.put("specversion", e.getSpecVersion().toString()); - attrs.put("id", e.getId()); - attrs.put("source", e.getSource()); - attrs.put("time", e.getTime()); - attrs.put("dataschema", e.getDataSchema()); - attrs.put("contenttype", AvroFormat.AVRO_CONTENT_TYPE); - attrs.put("datacontenttype", e.getDataContentType()); + attrs.put(CloudEventV1.TYPE, e.getType()); + attrs.put(CloudEventV1.SPECVERSION, e.getSpecVersion().toString()); + attrs.put(CloudEventV1.ID, e.getId()); + attrs.put(CloudEventV1.SOURCE, e.getSource()); + // convert to long + attrs.put(CloudEventV1.TIME, e.getTime().toInstant().toEpochMilli()); + // convert + attrs.put(CloudEventV1.DATASCHEMA, e.getDataSchema().toString()); + attrs.put(CloudEventV1.SUBJECT, e.getSubject()); + + attrs.put(CloudEventV1.DATACONTENTTYPE, e.getDataContentType()); avroCloudEvent.setAttribute(attrs); From 371d31e5574dca6a7cf594681b547c7b05854fc0 Mon Sep 17 00:00:00 2001 From: Ning Sun Date: Thu, 15 Jul 2021 23:57:19 +0800 Subject: [PATCH 05/10] fix time serde and add data deserialization support Signed-off-by: Ning Sun --- .../io/cloudevents/avro/AvroDeserializer.java | 19 +++++++++---------- .../java/io/cloudevents/avro/AvroFormat.java | 3 ++- .../io/cloudevents/avro/AvroSerializer.java | 4 ++-- 3 files changed, 13 insertions(+), 13 deletions(-) diff --git a/formats/avro/src/main/java/io/cloudevents/avro/AvroDeserializer.java b/formats/avro/src/main/java/io/cloudevents/avro/AvroDeserializer.java index 87f5fded7..ea48e232b 100644 --- a/formats/avro/src/main/java/io/cloudevents/avro/AvroDeserializer.java +++ b/formats/avro/src/main/java/io/cloudevents/avro/AvroDeserializer.java @@ -19,16 +19,12 @@ import java.util.Map; import java.net.URI; -import java.time.Instant; import java.time.OffsetDateTime; -import java.time.ZoneOffset; -import io.cloudevents.CloudEvent; import io.cloudevents.CloudEventData; import io.cloudevents.SpecVersion; import io.cloudevents.AvroCloudEvent; -import io.cloudevents.AvroCloudEventData; -import io.cloudevents.core.builder.CloudEventBuilder; +import io.cloudevents.core.data.BytesCloudEventData; import io.cloudevents.core.v1.CloudEventV1; import io.cloudevents.rw.CloudEventRWException; import io.cloudevents.rw.CloudEventReader; @@ -57,9 +53,7 @@ public , R> R read(CloudEventWriterFactory w if (key.equals(CloudEventV1.TIME)) { // OffsetDateTime - Long timeAsLong = (Long) entry.getValue(); - Instant timeAsInstant = Instant.ofEpochMilli(timeAsLong); - OffsetDateTime value = OffsetDateTime.ofInstant(timeAsInstant, ZoneOffset.UTC); + OffsetDateTime value = OffsetDateTime.parse((String) entry.getValue()); writer.withContextAttribute(key, value); } else if (key.equals(CloudEventV1.DATASCHEMA)) { @@ -72,8 +66,13 @@ public , R> R read(CloudEventWriterFactory w } } - // TOOD: data - return writer.end(); + byte[] data = (byte[]) this.avroCloudEvent.getData(); + + if (data != null) { + return writer.end(mapper.map(BytesCloudEventData.wrap(data))); + } else { + return writer.end(); + } } } diff --git a/formats/avro/src/main/java/io/cloudevents/avro/AvroFormat.java b/formats/avro/src/main/java/io/cloudevents/avro/AvroFormat.java index 51b70f6e8..75f9421dd 100644 --- a/formats/avro/src/main/java/io/cloudevents/avro/AvroFormat.java +++ b/formats/avro/src/main/java/io/cloudevents/avro/AvroFormat.java @@ -24,6 +24,7 @@ import io.cloudevents.CloudEvent; import io.cloudevents.CloudEventData; import io.cloudevents.AvroCloudEvent; +import io.cloudevents.core.builder.CloudEventBuilder; import io.cloudevents.core.format.EventDeserializationException; import io.cloudevents.core.format.EventFormat; import io.cloudevents.core.format.EventSerializationException; @@ -55,7 +56,7 @@ public CloudEvent deserialize(byte[] bytes, CloudEventDataMapper Date: Thu, 15 Jul 2021 23:59:31 +0800 Subject: [PATCH 06/10] fix maven plugin version Signed-off-by: Ning Sun --- formats/avro/pom.xml | 10 +--------- 1 file changed, 1 insertion(+), 9 deletions(-) diff --git a/formats/avro/pom.xml b/formats/avro/pom.xml index 911a392b3..7dc1f2818 100644 --- a/formats/avro/pom.xml +++ b/formats/avro/pom.xml @@ -59,6 +59,7 @@ org.codehaus.mojo build-helper-maven-plugin + 3.2.0 add-source @@ -74,15 +75,6 @@ - - - org.apache.maven.plugins - maven-compiler-plugin - - 1.8 - 1.8 - - From 8d8a6688e7451891aac54b989b6ebb47bc1e9585 Mon Sep 17 00:00:00 2001 From: Ning Sun Date: Sat, 17 Jul 2021 19:21:07 +0800 Subject: [PATCH 07/10] add avro data wrapper and hides serde classes from public Signed-off-by: Ning Sun --- .../avro/AvroCloudEventDataWrapper.java | 54 +++++++++++++++++++ .../io/cloudevents/avro/AvroDeserializer.java | 2 +- .../io/cloudevents/avro/AvroSerializer.java | 3 +- 3 files changed, 56 insertions(+), 3 deletions(-) create mode 100644 formats/avro/src/main/java/io/cloudevents/avro/AvroCloudEventDataWrapper.java diff --git a/formats/avro/src/main/java/io/cloudevents/avro/AvroCloudEventDataWrapper.java b/formats/avro/src/main/java/io/cloudevents/avro/AvroCloudEventDataWrapper.java new file mode 100644 index 000000000..a7fe4accd --- /dev/null +++ b/formats/avro/src/main/java/io/cloudevents/avro/AvroCloudEventDataWrapper.java @@ -0,0 +1,54 @@ +/* + * Copyright 2018-Present The CloudEvents Authors + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package io.cloudevents.avro; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.util.List; +import java.util.Map; + +import io.cloudevents.AvroCloudEventData; +import io.cloudevents.CloudEventData; + +/** + * Encode JSON style cloudevent data into Avro format. + * + */ +public class AvroCloudEventDataWrapper implements CloudEventData { + + private AvroCloudEventData avroCloudEventData; + + /** + * Wraps a JSON object-like data structure. + */ + public AvroCloudEventDataWrapper(Map data) { + avroCloudEventData = new AvroCloudEventData(); + avroCloudEventData.setValue(data); + } + + @Override + public byte[] toBytes() { + ByteArrayOutputStream bytes = new ByteArrayOutputStream(); + try { + AvroCloudEventData.getEncoder().encode(this.avroCloudEventData, bytes); + } catch (IOException ignore) { + // ignored + } + + return bytes.toByteArray(); + } +} diff --git a/formats/avro/src/main/java/io/cloudevents/avro/AvroDeserializer.java b/formats/avro/src/main/java/io/cloudevents/avro/AvroDeserializer.java index ea48e232b..be46246e5 100644 --- a/formats/avro/src/main/java/io/cloudevents/avro/AvroDeserializer.java +++ b/formats/avro/src/main/java/io/cloudevents/avro/AvroDeserializer.java @@ -32,7 +32,7 @@ import io.cloudevents.rw.CloudEventWriter; import io.cloudevents.rw.CloudEventWriterFactory; -public class AvroDeserializer implements CloudEventReader { +class AvroDeserializer implements CloudEventReader { private final AvroCloudEvent avroCloudEvent; diff --git a/formats/avro/src/main/java/io/cloudevents/avro/AvroSerializer.java b/formats/avro/src/main/java/io/cloudevents/avro/AvroSerializer.java index b4d53a279..95875edb7 100644 --- a/formats/avro/src/main/java/io/cloudevents/avro/AvroSerializer.java +++ b/formats/avro/src/main/java/io/cloudevents/avro/AvroSerializer.java @@ -24,9 +24,8 @@ import io.cloudevents.CloudEventData; import io.cloudevents.core.v1.CloudEventV1; import io.cloudevents.AvroCloudEvent; -import io.cloudevents.AvroCloudEventData; -public class AvroSerializer { +class AvroSerializer { public static final AvroCloudEvent toAvro(CloudEvent e) { AvroCloudEvent avroCloudEvent = new AvroCloudEvent(); From 8505d783d5495288b3fb2ebfbc294fc382d7dd46 Mon Sep 17 00:00:00 2001 From: Ning Sun Date: Sat, 17 Jul 2021 20:05:57 +0800 Subject: [PATCH 08/10] add test and fix serde issues Signed-off-by: Ning Sun --- formats/avro/pom.xml | 2 +- .../avro/AvroCloudEventDataWrapper.java | 2 +- .../io/cloudevents/avro/AvroDeserializer.java | 39 +++++++----- .../io/cloudevents/avro/AvroSerializer.java | 23 ++++--- .../io/cloudevents/avro/AvroFormatTest.java | 63 +++++++++++++++++++ .../src/test/resources/v03/min.proto.json | 6 -- .../test/resources/v1/binary_ext.proto.json | 12 ---- .../test/resources/v1/json_data.proto.json | 21 ------- .../v1/json_data_with_ext.proto.json | 30 --------- .../avro/src/test/resources/v1/min.proto.json | 6 -- .../test/resources/v1/text_data.proto.json | 18 ------ .../src/test/resources/v1/xml_data.proto.json | 18 ------ 12 files changed, 102 insertions(+), 138 deletions(-) create mode 100644 formats/avro/src/test/java/io/cloudevents/avro/AvroFormatTest.java delete mode 100644 formats/avro/src/test/resources/v03/min.proto.json delete mode 100644 formats/avro/src/test/resources/v1/binary_ext.proto.json delete mode 100644 formats/avro/src/test/resources/v1/json_data.proto.json delete mode 100644 formats/avro/src/test/resources/v1/json_data_with_ext.proto.json delete mode 100644 formats/avro/src/test/resources/v1/min.proto.json delete mode 100644 formats/avro/src/test/resources/v1/text_data.proto.json delete mode 100644 formats/avro/src/test/resources/v1/xml_data.proto.json diff --git a/formats/avro/pom.xml b/formats/avro/pom.xml index 7dc1f2818..b2dbe8d2e 100644 --- a/formats/avro/pom.xml +++ b/formats/avro/pom.xml @@ -50,8 +50,8 @@ ${project.basedir}/src/main/avro/ - ${project.build.directory}/generated-sources/java/ + String diff --git a/formats/avro/src/main/java/io/cloudevents/avro/AvroCloudEventDataWrapper.java b/formats/avro/src/main/java/io/cloudevents/avro/AvroCloudEventDataWrapper.java index a7fe4accd..cee76cc98 100644 --- a/formats/avro/src/main/java/io/cloudevents/avro/AvroCloudEventDataWrapper.java +++ b/formats/avro/src/main/java/io/cloudevents/avro/AvroCloudEventDataWrapper.java @@ -35,7 +35,7 @@ public class AvroCloudEventDataWrapper implements CloudEventData { /** * Wraps a JSON object-like data structure. */ - public AvroCloudEventDataWrapper(Map data) { + public AvroCloudEventDataWrapper(Map data) { avroCloudEventData = new AvroCloudEventData(); avroCloudEventData.setValue(data); } diff --git a/formats/avro/src/main/java/io/cloudevents/avro/AvroDeserializer.java b/formats/avro/src/main/java/io/cloudevents/avro/AvroDeserializer.java index be46246e5..3452a0480 100644 --- a/formats/avro/src/main/java/io/cloudevents/avro/AvroDeserializer.java +++ b/formats/avro/src/main/java/io/cloudevents/avro/AvroDeserializer.java @@ -19,6 +19,7 @@ import java.util.Map; import java.net.URI; +import java.nio.ByteBuffer; import java.time.OffsetDateTime; import io.cloudevents.CloudEventData; @@ -43,32 +44,36 @@ public AvroDeserializer(AvroCloudEvent avroCloudEvent) { @Override public , R> R read(CloudEventWriterFactory writerFactory, CloudEventDataMapper mapper) throws CloudEventRWException { - - Map avroCloudEventAttrs = this.avroCloudEvent.getAttribute(); + Map avroCloudEventAttrs = this.avroCloudEvent.getAttribute(); SpecVersion specVersion = SpecVersion.parse((String)avroCloudEventAttrs.get(CloudEventV1.SPECVERSION)); final CloudEventWriter writer = writerFactory.create(specVersion); - for (Map.Entry entry: avroCloudEventAttrs.entrySet()) { + for (Map.Entry entry: avroCloudEventAttrs.entrySet()) { String key = entry.getKey().toString(); - if (key.equals(CloudEventV1.TIME)) { - // OffsetDateTime - OffsetDateTime value = OffsetDateTime.parse((String) entry.getValue()); - writer.withContextAttribute(key, value); - - } else if (key.equals(CloudEventV1.DATASCHEMA)) { - // URI - URI value = URI.create((String) entry.getValue()); - writer.withContextAttribute(key, value); - } else { - // String - writer.withContextAttribute(key, (String) entry.getValue()); + switch(key) { + case CloudEventV1.SPECVERSION: + continue; + case CloudEventV1.TIME: { + // OffsetDateTime + OffsetDateTime value = OffsetDateTime.parse((String) entry.getValue()); + writer.withContextAttribute(key, value); + }; + case CloudEventV1.DATASCHEMA: { + // URI + URI value = URI.create((String) entry.getValue()); + writer.withContextAttribute(key, value); + }; + default: + writer.withContextAttribute(key, (String) entry.getValue()); } } - byte[] data = (byte[]) this.avroCloudEvent.getData(); + ByteBuffer buffer = (ByteBuffer) this.avroCloudEvent.getData(); - if (data != null) { + if (buffer != null) { + byte[] data = new byte[buffer.remaining()]; + buffer.get(data); return writer.end(mapper.map(BytesCloudEventData.wrap(data))); } else { return writer.end(); diff --git a/formats/avro/src/main/java/io/cloudevents/avro/AvroSerializer.java b/formats/avro/src/main/java/io/cloudevents/avro/AvroSerializer.java index 95875edb7..effed9394 100644 --- a/formats/avro/src/main/java/io/cloudevents/avro/AvroSerializer.java +++ b/formats/avro/src/main/java/io/cloudevents/avro/AvroSerializer.java @@ -17,6 +17,7 @@ package io.cloudevents.avro; +import java.nio.ByteBuffer; import java.util.Map; import java.util.HashMap; @@ -30,18 +31,24 @@ class AvroSerializer { public static final AvroCloudEvent toAvro(CloudEvent e) { AvroCloudEvent avroCloudEvent = new AvroCloudEvent(); - Map attrs = new HashMap<>(); + Map attrs = new HashMap<>(); - attrs.put(CloudEventV1.TYPE, e.getType()); attrs.put(CloudEventV1.SPECVERSION, e.getSpecVersion().toString()); + attrs.put(CloudEventV1.TYPE, e.getType()); attrs.put(CloudEventV1.ID, e.getId()); attrs.put(CloudEventV1.SOURCE, e.getSource()); - // convert to string - attrs.put(CloudEventV1.TIME, e.getTime().toString()); - // convert - attrs.put(CloudEventV1.DATASCHEMA, e.getDataSchema().toString()); - attrs.put(CloudEventV1.SUBJECT, e.getSubject()); + if (e.getTime() != null) { + // convert to string + attrs.put(CloudEventV1.TIME, e.getTime().toString()); + } + + if (e.getDataSchema() != null) { + // convert + attrs.put(CloudEventV1.DATASCHEMA, e.getDataSchema().toString()); + } + + attrs.put(CloudEventV1.SUBJECT, e.getSubject()); attrs.put(CloudEventV1.DATACONTENTTYPE, e.getDataContentType()); avroCloudEvent.setAttribute(attrs); @@ -49,7 +56,7 @@ public static final AvroCloudEvent toAvro(CloudEvent e) { // check datacontenttype CloudEventData cloudEventData = e.getData(); if (cloudEventData != null) { - avroCloudEvent.setData(cloudEventData.toBytes()); + avroCloudEvent.setData(ByteBuffer.wrap(cloudEventData.toBytes())); } return avroCloudEvent; diff --git a/formats/avro/src/test/java/io/cloudevents/avro/AvroFormatTest.java b/formats/avro/src/test/java/io/cloudevents/avro/AvroFormatTest.java new file mode 100644 index 000000000..5de602324 --- /dev/null +++ b/formats/avro/src/test/java/io/cloudevents/avro/AvroFormatTest.java @@ -0,0 +1,63 @@ +/* + * Copyright 2018-Present The CloudEvents Authors + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package io.cloudevents.avro; + +import java.util.Map; +import java.net.URI; + +import io.cloudevents.CloudEvent; +import io.cloudevents.CloudEventData; +import io.cloudevents.SpecVersion; +import io.cloudevents.core.builder.CloudEventBuilder; +import io.cloudevents.core.format.EventFormat; +import org.junit.jupiter.api.Test; +import static org.assertj.core.api.Assertions.assertThat; + +public class AvroFormatTest { + + Map testData = Map.of("name", "Ning", "age", 22.0); + + @Test + public void testSerde() { + EventFormat avroFormat = new AvroFormat(); + CloudEventData cloudEventData = new AvroCloudEventDataWrapper(testData); + + assertThat(cloudEventData).isNotNull(); + assertThat(cloudEventData.toBytes()).isNotNull(); + + CloudEvent cloudEvent = CloudEventBuilder.v1() + .withId("1") + .withType("testdata") + .withSource(URI.create("http://localhost/test")) + .withData("application/avro", cloudEventData) + .build(); + assertThat(cloudEvent).isNotNull(); + assertThat(cloudEvent.getSpecVersion()).isEqualTo(SpecVersion.V1); + + byte[] bytes = avroFormat.serialize(cloudEvent); + + assertThat(bytes).isNotNull(); + assertThat(bytes).hasSizeGreaterThan(0); + + CloudEvent cloudEvent2 = avroFormat.deserialize(bytes); + + assertThat(cloudEvent2).isNotNull(); + assertThat(cloudEvent2.getId()).isEqualTo("1"); + assertThat(cloudEvent2.getType()).isEqualTo("testdata"); + } + +} diff --git a/formats/avro/src/test/resources/v03/min.proto.json b/formats/avro/src/test/resources/v03/min.proto.json deleted file mode 100644 index 79e252908..000000000 --- a/formats/avro/src/test/resources/v03/min.proto.json +++ /dev/null @@ -1,6 +0,0 @@ -{ - "specVersion": "0.3", - "id": "1", - "type": "mock.test", - "source": "http://localhost/source" -} diff --git a/formats/avro/src/test/resources/v1/binary_ext.proto.json b/formats/avro/src/test/resources/v1/binary_ext.proto.json deleted file mode 100644 index 34aa91400..000000000 --- a/formats/avro/src/test/resources/v1/binary_ext.proto.json +++ /dev/null @@ -1,12 +0,0 @@ -{ - "specVersion": "1.0", - "id": "1", - "type": "mock.test", - "source": "http://localhost/source", - "attributes" : { - "binary": { - "ceBytes" : "4P8ARKo=" - } - } - -} diff --git a/formats/avro/src/test/resources/v1/json_data.proto.json b/formats/avro/src/test/resources/v1/json_data.proto.json deleted file mode 100644 index f3b7d3718..000000000 --- a/formats/avro/src/test/resources/v1/json_data.proto.json +++ /dev/null @@ -1,21 +0,0 @@ -{ - "specVersion": "1.0", - "id": "1", - "type": "mock.test", - "source": "http://localhost/source", - "attributes": { - "time": { - "ceTimestamp": "2018-04-26T14:48:09+02:00" - }, - "dataschema": { - "ceUri": "http://localhost/schema" - }, - "datacontenttype": { - "ceString": "application/json" - }, - "subject": { - "ceString": "sub" - } - }, - "textData": "{}" -} diff --git a/formats/avro/src/test/resources/v1/json_data_with_ext.proto.json b/formats/avro/src/test/resources/v1/json_data_with_ext.proto.json deleted file mode 100644 index d88c21734..000000000 --- a/formats/avro/src/test/resources/v1/json_data_with_ext.proto.json +++ /dev/null @@ -1,30 +0,0 @@ -{ - "specVersion": "1.0", - "id": "1", - "type": "mock.test", - "source": "http://localhost/source", - "attributes": { - "dataschema": { - "ceUri": "http://localhost/schema" - }, - "datacontenttype": { - "ceString": "application/json" - }, - "subject": { - "ceString": "sub" - }, - "time": { - "ceTimestamp": "2018-04-26T14:48:09+02:00" - }, - "astring": { - "ceString": "aaa" - }, - "aboolean": { - "ceBoolean": true - }, - "anumber": { - "ceInteger": 10 - } - }, - "textData": "{}" -} diff --git a/formats/avro/src/test/resources/v1/min.proto.json b/formats/avro/src/test/resources/v1/min.proto.json deleted file mode 100644 index fbe193671..000000000 --- a/formats/avro/src/test/resources/v1/min.proto.json +++ /dev/null @@ -1,6 +0,0 @@ -{ - "id": "1", - "source": "http://localhost/source", - "specVersion": "1.0", - "type": "mock.test" -} diff --git a/formats/avro/src/test/resources/v1/text_data.proto.json b/formats/avro/src/test/resources/v1/text_data.proto.json deleted file mode 100644 index afac81638..000000000 --- a/formats/avro/src/test/resources/v1/text_data.proto.json +++ /dev/null @@ -1,18 +0,0 @@ -{ - "specVersion": "1.0", - "id": "1", - "type": "mock.test", - "source": "http://localhost/source", - "attributes": { - "time": { - "ceTimestamp": "2018-04-26T14:48:09+02:00" - }, - "subject": { - "ceString": "sub" - }, - "datacontenttype": { - "ceString": "text/plain" - } - }, - "textData": "Hello World Lorena!" -} diff --git a/formats/avro/src/test/resources/v1/xml_data.proto.json b/formats/avro/src/test/resources/v1/xml_data.proto.json deleted file mode 100644 index b93ddbb68..000000000 --- a/formats/avro/src/test/resources/v1/xml_data.proto.json +++ /dev/null @@ -1,18 +0,0 @@ -{ - "specVersion": "1.0", - "id": "1", - "type": "mock.test", - "source": "http://localhost/source", - "attributes": { - "time": { - "ceTimestamp": "2018-04-26T14:48:09+02:00" - }, - "datacontenttype": { - "ceString": "application/xml" - }, - "subject": { - "ceString": "sub" - } - }, - "textData": "" -} From c659e701358f875f07538d079d690b1e46bd5512 Mon Sep 17 00:00:00 2001 From: Ning Sun Date: Wed, 10 Nov 2021 22:17:52 +0800 Subject: [PATCH 09/10] addressing some review comments for avro format Signed-off-by: Ning Sun --- formats/avro/pom.xml | 5 +++-- formats/avro/src/main/avro/spec.avsc | 3 +-- .../avro/AvroCloudEventDataWrapper.java | 16 +++++++--------- .../io/cloudevents/avro/AvroDeserializer.java | 2 +- .../java/io/cloudevents/avro/AvroFormat.java | 12 ++++-------- .../java/io/cloudevents/avro/AvroSerializer.java | 4 ++-- 6 files changed, 18 insertions(+), 24 deletions(-) diff --git a/formats/avro/pom.xml b/formats/avro/pom.xml index b2dbe8d2e..bd730296e 100644 --- a/formats/avro/pom.xml +++ b/formats/avro/pom.xml @@ -33,6 +33,7 @@ 1.9.2 + 3.2.0 io.cloudevents.formats.avro @@ -41,7 +42,7 @@ org.apache.avro avro-maven-plugin - 1.10.2 + ${avro.version} generate-sources @@ -59,7 +60,7 @@ org.codehaus.mojo build-helper-maven-plugin - 3.2.0 + ${build.helper.version} add-source diff --git a/formats/avro/src/main/avro/spec.avsc b/formats/avro/src/main/avro/spec.avsc index 8d9b4994d..8b4a76c6c 100644 --- a/formats/avro/src/main/avro/spec.avsc +++ b/formats/avro/src/main/avro/spec.avsc @@ -1,5 +1,5 @@ { - "namespace":"io.cloudevents", + "namespace":"io.cloudevents.avro", "type":"record", "name":"AvroCloudEvent", "version":"1.0", @@ -61,4 +61,3 @@ } ] } - diff --git a/formats/avro/src/main/java/io/cloudevents/avro/AvroCloudEventDataWrapper.java b/formats/avro/src/main/java/io/cloudevents/avro/AvroCloudEventDataWrapper.java index cee76cc98..04d389332 100644 --- a/formats/avro/src/main/java/io/cloudevents/avro/AvroCloudEventDataWrapper.java +++ b/formats/avro/src/main/java/io/cloudevents/avro/AvroCloudEventDataWrapper.java @@ -18,11 +18,11 @@ import java.io.ByteArrayOutputStream; import java.io.IOException; -import java.util.List; import java.util.Map; -import io.cloudevents.AvroCloudEventData; +import io.cloudevents.avro.AvroCloudEventData; import io.cloudevents.CloudEventData; +import io.cloudevents.core.format.EventDeserializationException; /** * Encode JSON style cloudevent data into Avro format. @@ -30,7 +30,7 @@ */ public class AvroCloudEventDataWrapper implements CloudEventData { - private AvroCloudEventData avroCloudEventData; + private final AvroCloudEventData avroCloudEventData; /** * Wraps a JSON object-like data structure. @@ -42,13 +42,11 @@ public AvroCloudEventDataWrapper(Map data) { @Override public byte[] toBytes() { - ByteArrayOutputStream bytes = new ByteArrayOutputStream(); - try { + try (ByteArrayOutputStream bytes = new ByteArrayOutputStream()) { AvroCloudEventData.getEncoder().encode(this.avroCloudEventData, bytes); - } catch (IOException ignore) { - // ignored + return bytes.toByteArray(); + } catch (IOException e) { + throw new EventDeserializationException(e); } - - return bytes.toByteArray(); } } diff --git a/formats/avro/src/main/java/io/cloudevents/avro/AvroDeserializer.java b/formats/avro/src/main/java/io/cloudevents/avro/AvroDeserializer.java index 3452a0480..d8690d001 100644 --- a/formats/avro/src/main/java/io/cloudevents/avro/AvroDeserializer.java +++ b/formats/avro/src/main/java/io/cloudevents/avro/AvroDeserializer.java @@ -24,7 +24,7 @@ import io.cloudevents.CloudEventData; import io.cloudevents.SpecVersion; -import io.cloudevents.AvroCloudEvent; +import io.cloudevents.avro.AvroCloudEvent; import io.cloudevents.core.data.BytesCloudEventData; import io.cloudevents.core.v1.CloudEventV1; import io.cloudevents.rw.CloudEventRWException; diff --git a/formats/avro/src/main/java/io/cloudevents/avro/AvroFormat.java b/formats/avro/src/main/java/io/cloudevents/avro/AvroFormat.java index 75f9421dd..39fc774a9 100644 --- a/formats/avro/src/main/java/io/cloudevents/avro/AvroFormat.java +++ b/formats/avro/src/main/java/io/cloudevents/avro/AvroFormat.java @@ -23,7 +23,7 @@ import io.cloudevents.CloudEvent; import io.cloudevents.CloudEventData; -import io.cloudevents.AvroCloudEvent; +import io.cloudevents.avro.AvroCloudEvent; import io.cloudevents.core.builder.CloudEventBuilder; import io.cloudevents.core.format.EventDeserializationException; import io.cloudevents.core.format.EventFormat; @@ -37,23 +37,19 @@ public class AvroFormat implements EventFormat { @Override public byte[] serialize(CloudEvent event) throws EventSerializationException { AvroCloudEvent avroCloudEvent = AvroSerializer.toAvro(event); - ByteArrayOutputStream output = new ByteArrayOutputStream(); - try { + try (ByteArrayOutputStream output = new ByteArrayOutputStream()) { AvroCloudEvent.getEncoder().encode(avroCloudEvent, output); + return output.toByteArray(); } catch (IOException e) { throw new EventSerializationException(e); } - - return output.toByteArray(); } @Override public CloudEvent deserialize(byte[] bytes, CloudEventDataMapper mapper) throws EventDeserializationException { - ByteArrayInputStream input = new ByteArrayInputStream(bytes); - - try { + try (ByteArrayInputStream input = new ByteArrayInputStream(bytes)) { AvroCloudEvent avroCloudEvent = AvroCloudEvent.getDecoder().decode(input); return new AvroDeserializer(avroCloudEvent).read(CloudEventBuilder::fromSpecVersion, mapper); diff --git a/formats/avro/src/main/java/io/cloudevents/avro/AvroSerializer.java b/formats/avro/src/main/java/io/cloudevents/avro/AvroSerializer.java index effed9394..67d34234e 100644 --- a/formats/avro/src/main/java/io/cloudevents/avro/AvroSerializer.java +++ b/formats/avro/src/main/java/io/cloudevents/avro/AvroSerializer.java @@ -24,11 +24,11 @@ import io.cloudevents.CloudEvent; import io.cloudevents.CloudEventData; import io.cloudevents.core.v1.CloudEventV1; -import io.cloudevents.AvroCloudEvent; +import io.cloudevents.avro.AvroCloudEvent; class AvroSerializer { - public static final AvroCloudEvent toAvro(CloudEvent e) { + static final AvroCloudEvent toAvro(CloudEvent e) { AvroCloudEvent avroCloudEvent = new AvroCloudEvent(); Map attrs = new HashMap<>(); From 3af281bf964d4a4fb09e901d156ded9c1e9beb03 Mon Sep 17 00:00:00 2001 From: Ning Sun Date: Thu, 11 Nov 2021 23:30:08 +0800 Subject: [PATCH 10/10] test: add test case for v03 Signed-off-by: Ning Sun --- .../avro/AvroCloudEventDataWrapper.java | 3 +- .../io/cloudevents/avro/AvroFormatTest.java | 37 ++++++++++++++++++- 2 files changed, 38 insertions(+), 2 deletions(-) diff --git a/formats/avro/src/main/java/io/cloudevents/avro/AvroCloudEventDataWrapper.java b/formats/avro/src/main/java/io/cloudevents/avro/AvroCloudEventDataWrapper.java index 04d389332..5dec5ae57 100644 --- a/formats/avro/src/main/java/io/cloudevents/avro/AvroCloudEventDataWrapper.java +++ b/formats/avro/src/main/java/io/cloudevents/avro/AvroCloudEventDataWrapper.java @@ -19,6 +19,7 @@ import java.io.ByteArrayOutputStream; import java.io.IOException; import java.util.Map; +import java.util.Objects; import io.cloudevents.avro.AvroCloudEventData; import io.cloudevents.CloudEventData; @@ -37,7 +38,7 @@ public class AvroCloudEventDataWrapper implements CloudEventData { */ public AvroCloudEventDataWrapper(Map data) { avroCloudEventData = new AvroCloudEventData(); - avroCloudEventData.setValue(data); + avroCloudEventData.setValue(Objects.requireNonNull(data)); } @Override diff --git a/formats/avro/src/test/java/io/cloudevents/avro/AvroFormatTest.java b/formats/avro/src/test/java/io/cloudevents/avro/AvroFormatTest.java index 5de602324..2e7de183b 100644 --- a/formats/avro/src/test/java/io/cloudevents/avro/AvroFormatTest.java +++ b/formats/avro/src/test/java/io/cloudevents/avro/AvroFormatTest.java @@ -17,6 +17,7 @@ package io.cloudevents.avro; import java.util.Map; +import java.util.HashMap; import java.net.URI; import io.cloudevents.CloudEvent; @@ -29,7 +30,12 @@ public class AvroFormatTest { - Map testData = Map.of("name", "Ning", "age", 22.0); + public static Map testData = new HashMap<>(); + + static { + testData.put("name", "Ning"); + testData.put("age", 22.0); + } @Test public void testSerde() { @@ -60,4 +66,33 @@ public void testSerde() { assertThat(cloudEvent2.getType()).isEqualTo("testdata"); } + @Test + public void testV03Serde() { + EventFormat avroFormat = new AvroFormat(); + CloudEventData cloudEventData = new AvroCloudEventDataWrapper(testData); + + assertThat(cloudEventData).isNotNull(); + assertThat(cloudEventData.toBytes()).isNotNull(); + + CloudEvent cloudEvent = CloudEventBuilder.v03() + .withId("1") + .withType("testdata") + .withSource(URI.create("http://localhost/test")) + .withData("application/avro", cloudEventData) + .build(); + assertThat(cloudEvent).isNotNull(); + assertThat(cloudEvent.getSpecVersion()).isEqualTo(SpecVersion.V03); + + byte[] bytes = avroFormat.serialize(cloudEvent); + + assertThat(bytes).isNotNull(); + assertThat(bytes).hasSizeGreaterThan(0); + + CloudEvent cloudEvent2 = avroFormat.deserialize(bytes); + + assertThat(cloudEvent2).isNotNull(); + assertThat(cloudEvent2.getId()).isEqualTo("1"); + assertThat(cloudEvent2.getType()).isEqualTo("testdata"); + } + }