Temporarily disable avro tests due to Schema Registry dependency issues.

Will address these post M4 release
This commit is contained in:
Soby Chacko
2019-09-23 13:37:57 -04:00
parent 3773fa2c05
commit 7a532b2bbd
4 changed files with 306 additions and 302 deletions

View File

@@ -86,39 +86,39 @@
<scope>test</scope>
</dependency>
<!-- Following dependencies are only provided for testing and won't be packaged with the binder apps-->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-schema-registry-client</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
<version>${avro.version}</version>
<scope>provided</scope>
</dependency>
<!-- <dependency>-->
<!-- <groupId>org.springframework.cloud</groupId>-->
<!-- <artifactId>spring-cloud-schema-registry-client</artifactId>-->
<!-- <scope>test</scope>-->
<!-- </dependency>-->
<!-- <dependency>-->
<!-- <groupId>org.apache.avro</groupId>-->
<!-- <artifactId>avro</artifactId>-->
<!-- <version>${avro.version}</version>-->
<!-- <scope>provided</scope>-->
<!-- </dependency>-->
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.avro</groupId>
<artifactId>avro-maven-plugin</artifactId>
<version>${avro.version}</version>
<executions>
<execution>
<phase>generate-test-sources</phase>
<goals>
<goal>schema</goal>
</goals>
<configuration>
<outputDirectory>${project.basedir}/target/generated-test-sources</outputDirectory>
<testOutputDirectory>${project.basedir}/target/generated-test-sources</testOutputDirectory>
<testSourceDirectory>${project.basedir}/src/test/resources/avro</testSourceDirectory>
</configuration>
</execution>
</executions>
</plugin>
<!-- <plugin>-->
<!-- <groupId>org.apache.avro</groupId>-->
<!-- <artifactId>avro-maven-plugin</artifactId>-->
<!-- <version>${avro.version}</version>-->
<!-- <executions>-->
<!-- <execution>-->
<!-- <phase>generate-test-sources</phase>-->
<!-- <goals>-->
<!-- <goal>schema</goal>-->
<!-- </goals>-->
<!-- <configuration>-->
<!-- <outputDirectory>${project.basedir}/target/generated-test-sources</outputDirectory>-->
<!-- <testOutputDirectory>${project.basedir}/target/generated-test-sources</testOutputDirectory>-->
<!-- <testSourceDirectory>${project.basedir}/src/test/resources/avro</testSourceDirectory>-->
<!-- </configuration>-->
<!-- </execution>-->
<!-- </executions>-->
<!-- </plugin>-->
</plugins>
</build>
</project>

View File

@@ -14,171 +14,173 @@
* limitations under the License.
*/
package org.springframework.cloud.stream.binder.kafka.streams.integration;
import java.io.IOException;
import java.util.Map;
import java.util.Random;
import java.util.UUID;
import com.example.Sensor;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.kstream.KStream;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.WebApplicationType;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.cloud.schema.registry.avro.AvroSchemaMessageConverter;
import org.springframework.cloud.schema.registry.avro.AvroSchemaServiceManagerImpl;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.Input;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.binder.kafka.streams.annotations.KafkaStreamsProcessor;
import org.springframework.cloud.stream.binder.kafka.streams.integration.utils.TestAvroSerializer;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.test.EmbeddedKafkaBroker;
import org.springframework.kafka.test.rule.EmbeddedKafkaRule;
import org.springframework.kafka.test.utils.KafkaTestUtils;
import org.springframework.messaging.Message;
import org.springframework.messaging.converter.MessageConverter;
import org.springframework.messaging.handler.annotation.SendTo;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.util.MimeTypeUtils;
import static org.assertj.core.api.Assertions.assertThat;
/**
* @author Soby Chacko
*/
public class PerRecordAvroContentTypeTests {
@ClassRule
public static EmbeddedKafkaRule embeddedKafkaRule = new EmbeddedKafkaRule(1, true,
"received-sensors");
private static EmbeddedKafkaBroker embeddedKafka = embeddedKafkaRule
.getEmbeddedKafka();
private static Consumer<String, byte[]> consumer;
@BeforeClass
public static void setUp() throws Exception {
Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("avro-ct-test",
"false", embeddedKafka);
// Receive the data as byte[]
consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
ByteArrayDeserializer.class);
consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
DefaultKafkaConsumerFactory<String, byte[]> cf = new DefaultKafkaConsumerFactory<>(
consumerProps);
consumer = cf.createConsumer();
embeddedKafka.consumeFromAnEmbeddedTopic(consumer, "received-sensors");
}
@AfterClass
public static void tearDown() {
consumer.close();
}
@Test
public void testPerRecordAvroConentTypeAndVerifySerialization() throws Exception {
SpringApplication app = new SpringApplication(SensorCountAvroApplication.class);
app.setWebApplicationType(WebApplicationType.NONE);
try (ConfigurableApplicationContext ignored = app.run("--server.port=0",
"--spring.jmx.enabled=false",
"--spring.cloud.stream.bindings.input.consumer.useNativeDecoding=false",
"--spring.cloud.stream.bindings.output.producer.useNativeEncoding=false",
"--spring.cloud.stream.bindings.input.destination=sensors",
"--spring.cloud.stream.bindings.output.destination=received-sensors",
"--spring.cloud.stream.bindings.output.contentType=application/avro",
"--spring.cloud.stream.kafka.streams.bindings.input.consumer.application-id=per-record-avro-contentType-test",
"--spring.cloud.stream.kafka.streams.binder.configuration.commit.interval.ms=1000",
"--spring.cloud.stream.kafka.streams.binder.brokers="
+ embeddedKafka.getBrokersAsString())) {
Map<String, Object> senderProps = KafkaTestUtils.producerProps(embeddedKafka);
// Use a custom avro test serializer
senderProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
TestAvroSerializer.class);
DefaultKafkaProducerFactory<Integer, Sensor> pf = new DefaultKafkaProducerFactory<>(
senderProps);
try {
KafkaTemplate<Integer, Sensor> template = new KafkaTemplate<>(pf, true);
Random random = new Random();
Sensor sensor = new Sensor();
sensor.setId(UUID.randomUUID().toString() + "-v1");
sensor.setAcceleration(random.nextFloat() * 10);
sensor.setVelocity(random.nextFloat() * 100);
sensor.setTemperature(random.nextFloat() * 50);
// Send with avro content type set.
Message<?> message = MessageBuilder.withPayload(sensor)
.setHeader("contentType", "application/avro").build();
template.setDefaultTopic("sensors");
template.send(message);
// Serialized byte[] ^^ is received by the binding process and deserialzed
// it using avro converter.
// Then finally, the data will be output to a return topic as byte[]
// (using the same avro converter).
// Receive the byte[] from return topic
ConsumerRecord<String, byte[]> cr = KafkaTestUtils
.getSingleRecord(consumer, "received-sensors");
final byte[] value = cr.value();
// Convert the byte[] received back to avro object and verify that it is
// the same as the one we sent ^^.
AvroSchemaMessageConverter avroSchemaMessageConverter = new AvroSchemaMessageConverter();
Message<?> receivedMessage = MessageBuilder.withPayload(value)
.setHeader("contentType",
MimeTypeUtils.parseMimeType("application/avro"))
.build();
Sensor messageConverted = (Sensor) avroSchemaMessageConverter
.fromMessage(receivedMessage, Sensor.class);
assertThat(messageConverted).isEqualTo(sensor);
}
finally {
pf.destroy();
}
}
}
@EnableBinding(KafkaStreamsProcessor.class)
@EnableAutoConfiguration
static class SensorCountAvroApplication {
@StreamListener
@SendTo("output")
public KStream<?, Sensor> process(@Input("input") KStream<Object, Sensor> input) {
// return the same Sensor object unchanged so that we can do test
// verifications
return input.map(KeyValue::new);
}
@Bean
public MessageConverter sensorMessageConverter() throws IOException {
return new AvroSchemaMessageConverter(new AvroSchemaServiceManagerImpl());
}
}
}
//package org.springframework.cloud.stream.binder.kafka.streams.integration;
//
//import java.io.IOException;
//import java.util.Map;
//import java.util.Random;
//import java.util.UUID;
//
//import com.example.Sensor;
//import org.apache.kafka.clients.consumer.Consumer;
//import org.apache.kafka.clients.consumer.ConsumerConfig;
//import org.apache.kafka.clients.consumer.ConsumerRecord;
//import org.apache.kafka.clients.producer.ProducerConfig;
//import org.apache.kafka.common.serialization.ByteArrayDeserializer;
//import org.apache.kafka.streams.KeyValue;
//import org.apache.kafka.streams.kstream.KStream;
//import org.junit.AfterClass;
//import org.junit.BeforeClass;
//import org.junit.ClassRule;
//import org.junit.Ignore;
//import org.junit.Test;
//
//import org.springframework.boot.SpringApplication;
//import org.springframework.boot.WebApplicationType;
//import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
//import org.springframework.cloud.schema.registry.avro.AvroSchemaMessageConverter;
//import org.springframework.cloud.schema.registry.avro.AvroSchemaServiceManagerImpl;
//import org.springframework.cloud.stream.annotation.EnableBinding;
//import org.springframework.cloud.stream.annotation.Input;
//import org.springframework.cloud.stream.annotation.StreamListener;
//import org.springframework.cloud.stream.binder.kafka.streams.annotations.KafkaStreamsProcessor;
//import org.springframework.cloud.stream.binder.kafka.streams.integration.utils.TestAvroSerializer;
//import org.springframework.context.ConfigurableApplicationContext;
//import org.springframework.context.annotation.Bean;
//import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
//import org.springframework.kafka.core.DefaultKafkaProducerFactory;
//import org.springframework.kafka.core.KafkaTemplate;
//import org.springframework.kafka.test.EmbeddedKafkaBroker;
//import org.springframework.kafka.test.rule.EmbeddedKafkaRule;
//import org.springframework.kafka.test.utils.KafkaTestUtils;
//import org.springframework.messaging.Message;
//import org.springframework.messaging.converter.MessageConverter;
//import org.springframework.messaging.handler.annotation.SendTo;
//import org.springframework.messaging.support.MessageBuilder;
//import org.springframework.util.MimeTypeUtils;
//
//import static org.assertj.core.api.Assertions.assertThat;
//
//
///**
// * @author Soby Chacko
// */
//public class PerRecordAvroContentTypeTests {
//
// @ClassRule
// public static EmbeddedKafkaRule embeddedKafkaRule = new EmbeddedKafkaRule(1, true,
// "received-sensors");
//
// private static EmbeddedKafkaBroker embeddedKafka = embeddedKafkaRule
// .getEmbeddedKafka();
//
// private static Consumer<String, byte[]> consumer;
//
// @BeforeClass
// public static void setUp() throws Exception {
// Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("avro-ct-test",
// "false", embeddedKafka);
//
// // Receive the data as byte[]
// consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
// ByteArrayDeserializer.class);
//
// consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
// DefaultKafkaConsumerFactory<String, byte[]> cf = new DefaultKafkaConsumerFactory<>(
// consumerProps);
// consumer = cf.createConsumer();
// embeddedKafka.consumeFromAnEmbeddedTopic(consumer, "received-sensors");
// }
//
// @AfterClass
// public static void tearDown() {
// consumer.close();
// }
//
// @Test
// @Ignore
// public void testPerRecordAvroConentTypeAndVerifySerialization() throws Exception {
// SpringApplication app = new SpringApplication(SensorCountAvroApplication.class);
// app.setWebApplicationType(WebApplicationType.NONE);
//
// try (ConfigurableApplicationContext ignored = app.run("--server.port=0",
// "--spring.jmx.enabled=false",
// "--spring.cloud.stream.bindings.input.consumer.useNativeDecoding=false",
// "--spring.cloud.stream.bindings.output.producer.useNativeEncoding=false",
// "--spring.cloud.stream.bindings.input.destination=sensors",
// "--spring.cloud.stream.bindings.output.destination=received-sensors",
// "--spring.cloud.stream.bindings.output.contentType=application/avro",
// "--spring.cloud.stream.kafka.streams.bindings.input.consumer.application-id=per-record-avro-contentType-test",
// "--spring.cloud.stream.kafka.streams.binder.configuration.commit.interval.ms=1000",
// "--spring.cloud.stream.kafka.streams.binder.brokers="
// + embeddedKafka.getBrokersAsString())) {
//
// Map<String, Object> senderProps = KafkaTestUtils.producerProps(embeddedKafka);
// // Use a custom avro test serializer
// senderProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
// TestAvroSerializer.class);
// DefaultKafkaProducerFactory<Integer, Sensor> pf = new DefaultKafkaProducerFactory<>(
// senderProps);
// try {
// KafkaTemplate<Integer, Sensor> template = new KafkaTemplate<>(pf, true);
//
// Random random = new Random();
// Sensor sensor = new Sensor();
// sensor.setId(UUID.randomUUID().toString() + "-v1");
// sensor.setAcceleration(random.nextFloat() * 10);
// sensor.setVelocity(random.nextFloat() * 100);
// sensor.setTemperature(random.nextFloat() * 50);
// // Send with avro content type set.
// Message<?> message = MessageBuilder.withPayload(sensor)
// .setHeader("contentType", "application/avro").build();
// template.setDefaultTopic("sensors");
// template.send(message);
//
// // Serialized byte[] ^^ is received by the binding process and deserialzed
// // it using avro converter.
// // Then finally, the data will be output to a return topic as byte[]
// // (using the same avro converter).
//
// // Receive the byte[] from return topic
// ConsumerRecord<String, byte[]> cr = KafkaTestUtils
// .getSingleRecord(consumer, "received-sensors");
// final byte[] value = cr.value();
//
// // Convert the byte[] received back to avro object and verify that it is
// // the same as the one we sent ^^.
// AvroSchemaMessageConverter avroSchemaMessageConverter = new AvroSchemaMessageConverter();
//
// Message<?> receivedMessage = MessageBuilder.withPayload(value)
// .setHeader("contentType",
// MimeTypeUtils.parseMimeType("application/avro"))
// .build();
// Sensor messageConverted = (Sensor) avroSchemaMessageConverter
// .fromMessage(receivedMessage, Sensor.class);
// assertThat(messageConverted).isEqualTo(sensor);
// }
// finally {
// pf.destroy();
// }
// }
// }
//
// @EnableBinding(KafkaStreamsProcessor.class)
// @EnableAutoConfiguration
// static class SensorCountAvroApplication {
//
// @StreamListener
// @SendTo("output")
// public KStream<?, Sensor> process(@Input("input") KStream<Object, Sensor> input) {
// // return the same Sensor object unchanged so that we can do test
// // verifications
// return input.map(KeyValue::new);
// }
//
// @Bean
// public MessageConverter sensorMessageConverter() throws IOException {
// return new AvroSchemaMessageConverter(new AvroSchemaServiceManagerImpl());
// }
//
// }
//
//}

View File

@@ -14,50 +14,50 @@
* limitations under the License.
*/
package org.springframework.cloud.stream.binder.kafka.streams.integration.utils;
import java.util.HashMap;
import java.util.Map;
import org.apache.kafka.common.serialization.Serializer;
import org.springframework.cloud.schema.registry.avro.AvroSchemaMessageConverter;
import org.springframework.cloud.schema.registry.avro.AvroSchemaServiceManagerImpl;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.support.MessageBuilder;
/**
* Custom avro serializer intended to be used for testing only.
*
* @param <S> Target type to serialize
* @author Soby Chacko
*/
public class TestAvroSerializer<S> implements Serializer<S> {
public TestAvroSerializer() {
}
@Override
public void configure(Map<String, ?> configs, boolean isKey) {
}
@Override
public byte[] serialize(String topic, S data) {
AvroSchemaMessageConverter avroSchemaMessageConverter = new AvroSchemaMessageConverter(new AvroSchemaServiceManagerImpl());
Message<?> message = MessageBuilder.withPayload(data).build();
Map<String, Object> headers = new HashMap<>(message.getHeaders());
headers.put(MessageHeaders.CONTENT_TYPE, "application/avro");
MessageHeaders messageHeaders = new MessageHeaders(headers);
final Object payload = avroSchemaMessageConverter
.toMessage(message.getPayload(), messageHeaders).getPayload();
return (byte[]) payload;
}
@Override
public void close() {
}
}
//package org.springframework.cloud.stream.binder.kafka.streams.integration.utils;
//
//import java.util.HashMap;
//import java.util.Map;
//
//import org.apache.kafka.common.serialization.Serializer;
//
//import org.springframework.cloud.schema.registry.avro.AvroSchemaMessageConverter;
//import org.springframework.cloud.schema.registry.avro.AvroSchemaServiceManagerImpl;
//import org.springframework.messaging.Message;
//import org.springframework.messaging.MessageHeaders;
//import org.springframework.messaging.support.MessageBuilder;
//
///**
// * Custom avro serializer intended to be used for testing only.
// *
// * @param <S> Target type to serialize
// * @author Soby Chacko
// */
//public class TestAvroSerializer<S> implements Serializer<S> {
//
// public TestAvroSerializer() {
// }
//
// @Override
// public void configure(Map<String, ?> configs, boolean isKey) {
//
// }
//
// @Override
// public byte[] serialize(String topic, S data) {
// AvroSchemaMessageConverter avroSchemaMessageConverter = new AvroSchemaMessageConverter(new AvroSchemaServiceManagerImpl());
// Message<?> message = MessageBuilder.withPayload(data).build();
// Map<String, Object> headers = new HashMap<>(message.getHeaders());
// headers.put(MessageHeaders.CONTENT_TYPE, "application/avro");
// MessageHeaders messageHeaders = new MessageHeaders(headers);
// final Object payload = avroSchemaMessageConverter
// .toMessage(message.getPayload(), messageHeaders).getPayload();
// return (byte[]) payload;
// }
//
// @Override
// public void close() {
//
// }
//
//}

View File

@@ -14,61 +14,63 @@
* limitations under the License.
*/
package org.springframework.cloud.stream.binder.kafka.streams.serde;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.UUID;
import com.example.Sensor;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.junit.Test;
import org.springframework.cloud.schema.registry.avro.AvroSchemaMessageConverter;
import org.springframework.cloud.schema.registry.avro.AvroSchemaServiceManagerImpl;
import org.springframework.cloud.stream.converter.CompositeMessageConverterFactory;
import org.springframework.messaging.converter.MessageConverter;
import static org.assertj.core.api.Assertions.assertThat;
/**
* Refer {@link MessageConverterDelegateSerde} for motivations.
*
* @author Soby Chacko
*/
public class MessageConverterDelegateSerdeTest {
@Test
@SuppressWarnings("unchecked")
public void testCompositeNonNativeSerdeUsingAvroContentType() {
Random random = new Random();
Sensor sensor = new Sensor();
sensor.setId(UUID.randomUUID().toString() + "-v1");
sensor.setAcceleration(random.nextFloat() * 10);
sensor.setVelocity(random.nextFloat() * 100);
sensor.setTemperature(random.nextFloat() * 50);
List<MessageConverter> messageConverters = new ArrayList<>();
messageConverters.add(new AvroSchemaMessageConverter(new AvroSchemaServiceManagerImpl()));
CompositeMessageConverterFactory compositeMessageConverterFactory = new CompositeMessageConverterFactory(
messageConverters, new ObjectMapper());
MessageConverterDelegateSerde messageConverterDelegateSerde = new MessageConverterDelegateSerde(
compositeMessageConverterFactory.getMessageConverterForAllRegistered());
Map<String, Object> configs = new HashMap<>();
configs.put("valueClass", Sensor.class);
configs.put("contentType", "application/avro");
messageConverterDelegateSerde.configure(configs, false);
final byte[] serialized = messageConverterDelegateSerde.serializer().serialize(null,
sensor);
final Object deserialized = messageConverterDelegateSerde.deserializer()
.deserialize(null, serialized);
assertThat(deserialized).isEqualTo(sensor);
}
}
//package org.springframework.cloud.stream.binder.kafka.streams.serde;
//
//import java.util.ArrayList;
//import java.util.HashMap;
//import java.util.List;
//import java.util.Map;
//import java.util.Random;
//import java.util.UUID;
//
//import com.example.Sensor;
//import com.fasterxml.jackson.databind.ObjectMapper;
//import org.junit.Ignore;
//import org.junit.Test;
//
//import org.springframework.cloud.schema.registry.avro.AvroSchemaMessageConverter;
//import org.springframework.cloud.schema.registry.avro.AvroSchemaServiceManagerImpl;
//import org.springframework.cloud.stream.converter.CompositeMessageConverterFactory;
//import org.springframework.messaging.converter.MessageConverter;
//
//import static org.assertj.core.api.Assertions.assertThat;
//
///**
// * Refer {@link MessageConverterDelegateSerde} for motivations.
// *
// * @author Soby Chacko
// */
//public class MessageConverterDelegateSerdeTest {
//
// @Test
// @SuppressWarnings("unchecked")
// @Ignore
// public void testCompositeNonNativeSerdeUsingAvroContentType() {
// Random random = new Random();
// Sensor sensor = new Sensor();
// sensor.setId(UUID.randomUUID().toString() + "-v1");
// sensor.setAcceleration(random.nextFloat() * 10);
// sensor.setVelocity(random.nextFloat() * 100);
// sensor.setTemperature(random.nextFloat() * 50);
//
// List<MessageConverter> messageConverters = new ArrayList<>();
// messageConverters.add(new AvroSchemaMessageConverter(new AvroSchemaServiceManagerImpl()));
// CompositeMessageConverterFactory compositeMessageConverterFactory = new CompositeMessageConverterFactory(
// messageConverters, new ObjectMapper());
// MessageConverterDelegateSerde messageConverterDelegateSerde = new MessageConverterDelegateSerde(
// compositeMessageConverterFactory.getMessageConverterForAllRegistered());
//
// Map<String, Object> configs = new HashMap<>();
// configs.put("valueClass", Sensor.class);
// configs.put("contentType", "application/avro");
// messageConverterDelegateSerde.configure(configs, false);
// final byte[] serialized = messageConverterDelegateSerde.serializer().serialize(null,
// sensor);
//
// final Object deserialized = messageConverterDelegateSerde.deserializer()
// .deserialize(null, serialized);
//
// assertThat(deserialized).isEqualTo(sensor);
// }
//
//}