Implementing Avro in Spring Cloud Dataflow

Bah91

I am trying to implement a Spring Cloud Dataflow stream that reads records from a database, passes these to a processor which converts into a Avro schema and then pass this to be consumed by a sink application.

I have the data flowing from the SQL DB to my source app and passing the data across via the Kafka binder with no issues by I am running into problems sending the data across from the Processor to the the Sink application serializing/deserializing with Avro.

I have created a avro schema called ech.avsc and have generated a class called EchRecord for it using the avro-maven-plugin within the Processor.

I have added the following dependencies to the pom of both processor and sink

<dependency>
  <groupId>org.springframework.cloud</groupId>
  <artifactId>spring-cloud-stream-schema</artifactId>
    <version>1.2.2.RELEASE</version>
</dependency>
<dependency>
  <groupId>org.apache.avro</groupId>
  <artifactId>avro</artifactId>
  <version>1.8.2</version>
</dependency>

I have set the properties of the processor to

spring.cloud.stream.bindings.output.contentType=application/*+avro
spring.cloud.stream.schema.avro.dynamicSchemaGenerationEnabled=true
spring.cloud.stream.schemaRegistryClient.endpoint=http://192.168.99.100:8990

on the Sink side the properties look like spring.cloud.stream.schemaRegistryClient.endpoint=http://192.168.99.100:8990

The Processor application code looks as follows:

@EnableBinding(Processor.class)
@SpringBootApplication
@EnableSchemaRegistryClient
public class EchProcessorApplication {

private static Logger logger = LoggerFactory.getLogger(EchProcessorApplication.class);

public static void main(String[] args) {
    SpringApplication.run(EchProcessorApplication.class, args);
}


@StreamListener(Processor.INPUT)
@SendTo(Processor.OUTPUT)
public EchRecord transform(List<Map<String, Object>> record) {
    return EchRecord.newBuilder()
            .setCallId(11111).build();;
}

On the Sink side the code as it stands looks like as follows:

@EnableBinding(Sink.class)
@SpringBootApplication
@EnableSchemaRegistryClient
public class AvroLoggerApplication {



    private static Logger LOGGER = LoggerFactory.getLogger(AvroLoggerApplication.class);

    public static void main(String[] args) {
        SpringApplication.run(AvroLoggerApplication.class, args);
    }


    @StreamListener(Sink.INPUT)
    public void logHandler(Object data) {

        LOGGER.info("data='{}'", data.toString());
        LOGGER.info("class='{}'", data.getClass());


    }
}

I have a Spring Schema Registry Server running and reachable by both applications and I can see on querying the registry that the schema has been delivered to the server.

I can see if I enable debug logging on the sink application that the contentType is being set correctly on the received messages: contentType=application/vnd.echrecord.v1+avro

In the Sink application I have setup a method with the @StreamListener annotation to retrieve the messages taking in an Object and printing out the data and the class type and it appears to be retrieving a byte array.

How do I go about changing the code of the Sink application to deserialize the Avro message into something that I can retrieve the set data from?

Vinicius Carvalho

A couple of things to try here. On the producing side, since your type is already an Avro type (SpecificRecord or GenericRecord) you don't need the dynamicSchemaGeneration flag, that's meant for reflection based writers, mostly for testing as it has an impact on performance.

Since your sink can see the correct type as you posted, what you need now is to have your type on the sink. So for instance add the type on the sink and annotate the method with the proper type: EchRecord that will give you the right type.

You can also set it to be GenericRecord in order to be able to access it like an object container using record.get(<propertyname>)

Collected from the Internet

Please contact [email protected] to delete if infringement.

edited at
0

Comments

0 comments
Login to comment

Related

From Dev

Activating Avro message converter in Spring Cloud Dataflow

From Dev

Spring Cloud DataFlow automated run

From Dev

spring-cloud-stream kafka avro

From Dev

spring-cloud-stream kafka avro

From Dev

Spring Cloud DataFlow documentation has missing sections

From Dev

Spring Cloud DataFlow documentation has missing sections

From Dev

Spring Cloud Dataflow: how to persist stream definitions

From Dev

How to handle global resources in spring cloud dataflow?

From Dev

Spring Cloud Dataflow with multiple Kafka binders

From Dev

Spring Cloud DataFlow http polling and deduplication

From Dev

Spring Cloud Dataflow - how to pass credentials to task

From Dev

NotSerializableException: org.apache.avro.io.DecoderFactory in Google Cloud Dataflow pipeline

From Dev

convert csv to avro in python using google-cloud-dataflow beam.io.avroio.WriteToAvro(

From Dev

Spring Kafka, Spring Cloud Stream, and Avro compatibility Unknown magic byte

From Dev

Implementing retractions in google dataflow

From Dev

Execute Google Cloud Dataflow pipeline from Spring controller

From Dev

Unable to write streamed data to sink file using Spring cloud dataflow

From Dev

Is it safe to use Spring Cloud Dataflow Local Server on production?

From Dev

Stream CSV file using Spring Cloud Dataflow with Kafka

From Dev

spring-cloud-dataflow-kubernetes app memory limit

From Dev

Spring Cloud DataFlow RabbitMQ min-start-interval

From Dev

ETL on Google Cloud - (Dataflow vs. Spring Batch) -> BigQuery

From Dev

JobInstanceAlreadyCompleteException when running a Batch-Task in Spring cloud Dataflow

From Dev

how to start only one job in a task in spring cloud dataflow

From Dev

Kubernetes/Spring Cloud Dataflow stream > spring.cloud.stream.bindings.output.destination is ignored by producer

From Dev

Cannot deploy stream through spring-cloud-dataflow-server in sap-cloud-foundry

From Dev

Cloud Dataflow Job failed

From Dev

Cloud Dataflow failure recovery

From Java

How to use avro native decoder when using a PollableMessageSource input in spring cloud stream?

Related Related

  1. 1

    Activating Avro message converter in Spring Cloud Dataflow

  2. 2

    Spring Cloud DataFlow automated run

  3. 3

    spring-cloud-stream kafka avro

  4. 4

    spring-cloud-stream kafka avro

  5. 5

    Spring Cloud DataFlow documentation has missing sections

  6. 6

    Spring Cloud DataFlow documentation has missing sections

  7. 7

    Spring Cloud Dataflow: how to persist stream definitions

  8. 8

    How to handle global resources in spring cloud dataflow?

  9. 9

    Spring Cloud Dataflow with multiple Kafka binders

  10. 10

    Spring Cloud DataFlow http polling and deduplication

  11. 11

    Spring Cloud Dataflow - how to pass credentials to task

  12. 12

    NotSerializableException: org.apache.avro.io.DecoderFactory in Google Cloud Dataflow pipeline

  13. 13

    convert csv to avro in python using google-cloud-dataflow beam.io.avroio.WriteToAvro(

  14. 14

    Spring Kafka, Spring Cloud Stream, and Avro compatibility Unknown magic byte

  15. 15

    Implementing retractions in google dataflow

  16. 16

    Execute Google Cloud Dataflow pipeline from Spring controller

  17. 17

    Unable to write streamed data to sink file using Spring cloud dataflow

  18. 18

    Is it safe to use Spring Cloud Dataflow Local Server on production?

  19. 19

    Stream CSV file using Spring Cloud Dataflow with Kafka

  20. 20

    spring-cloud-dataflow-kubernetes app memory limit

  21. 21

    Spring Cloud DataFlow RabbitMQ min-start-interval

  22. 22

    ETL on Google Cloud - (Dataflow vs. Spring Batch) -> BigQuery

  23. 23

    JobInstanceAlreadyCompleteException when running a Batch-Task in Spring cloud Dataflow

  24. 24

    how to start only one job in a task in spring cloud dataflow

  25. 25

    Kubernetes/Spring Cloud Dataflow stream > spring.cloud.stream.bindings.output.destination is ignored by producer

  26. 26

    Cannot deploy stream through spring-cloud-dataflow-server in sap-cloud-foundry

  27. 27

    Cloud Dataflow Job failed

  28. 28

    Cloud Dataflow failure recovery

  29. 29

    How to use avro native decoder when using a PollableMessageSource input in spring cloud stream?

HotTag

Archive