Support Protocol Buffers for Spring Cloud Stream

Kobito.D5Bp80.png

Why Spring Cloud Stream and Protocol Buffers

Very simply

_I want to make messaging easier _

Because.

In addition to synchronous API calls, recent systems are increasingly using asynchronous messaging for scalability.

The problem here is how to manage the schema because it spans multiple systems.

In this case, something that is sharable and backwards compatible with the serialization format, such as the REST API definition, is desirable.

JSON Schema is one of the major JSON schema definitions often used for messaging, but the definition is by no means easy to handle.

Therefore, the candidate is Protocol Buffers, which is also used in gRPC.

Since Protocol Buffers is an IDL (Interface Definition Language), you can define the schema concisely without being bound by the conventional specifications for JSON Schema.

So, on Spring Cloud Stream, which is a framework of Spring messaging that is often used in Java, messaging by Protocol Buffers is performed. to watch.

Basic mechanism

Spring Cloud Stream supports Json as the default conversion format, but the MessageConverter class is provided to support other formats.

So it ’s simple to do

  1. Create a MessageConverter implementation for Protocol Buffers
  2. Bean registration in your application
  3. Specify Protocol Buffers MIME for content-type of stream in application.yaml (propeties) There are 3 points.

Implementation method

We will implement concretely in order.

1. Create a MessageConverter implementation for Protocol Buffers

Since it is a sample, detailed error handling etc. are omitted, but the following MessageConverter implementation class is prepared.

public class ProtobufMessageConverter extends AbstractMessageConverter {

	public ProtobufMessageConverter() {
	    //Prepare MIME for Protocol Buffers
		super(new MimeType("application", "x-protobuf"));
	}

	@Override
	protected boolean supports(Class<?> clazz) {
	    //Specify the superclass used for Protocol Buffers messages as the support class
		return AbstractMessageLite.class.isAssignableFrom(clazz);
	}

	@Override
	protected Object convertToInternal(Object payload, MessageHeaders headers, Object conversionHint) {
	    //Conversion to byte array with the function of Protocol Buffers
		return ((AbstractMessageLite) payload).toByteArray();
	}

	@Override
	@lombok.SneakyThrows
	protected Object convertFromInternal(Message<?> message, Class<?> targetClass, Object conversionHint) {
		Object payload = message.getPayload();

		//Convert from the MethodParameter of conversionHint to the definition expected by the recipient
		if (conversionHint instanceof MethodParameter) {
			MethodParameter param = (MethodParameter) conversionHint;
			param = param.nestedIfOptional();
			if (Message.class.isAssignableFrom(param.getParameterType())) {
				param = param.nested();
			}
			Class<?> clazz = param.getNestedParameterType();
			Method parseFrom = clazz.getMethod("parseFrom", byte[].class);
			return parseFrom.invoke(null, payload);
		}
		return payload;
	}
}

Regarding the convertFromInternal method, I think there are other methods because it depends on the design of the message receiving side, but this time I will make it a simple implementation.

2. Bean registration in your application

Define a bean in each application that uses the created MessageConverter.

@Configuration
class Config {
	@Bean
	@StreamMessageConverter
	public MessageConverter messageConverter() {
		return new ProtobufMessageConverter();
	}
}

3. Specify Protocol Buffers MIME for content-type of stream in application.yaml (propeties)

Set to recognize the content-type in the stream to be used.

Sender

application.yaml


spring:
  cloud:
    stream:
      bindings:
        output:
          destination: tasks
          content-type: application/x-protobuf #Specify the MIME of the created Protocol Buffers
          producer:
            partitionKeyExpression: "1"

Receiver

application.yaml


spring:
  cloud:
    stream:
      bindings:
        input:
          destination: tasks
          content-type: application/x-protobuf #Specify the MIME of the created Protocol Buffers

This completes the support for Protocol Buffers.

How to use

The rest is the same as using Spring Cloud Stream and Protocol Buffers as usual.

Schema definition

task.proto


syntax = "proto3";

option java_multiple_files = true;
option java_package = "com.example.task";

message TaskCreated {
    string id = 1;
    string name = 2;
}

message TaskStarted {
    string id = 1;
}

message TaskDone {
    string id = 1;
}

Transmission process

@EnableBinding(Source.class)
@AllArgsConstructor
class Publisher {

	Source source;

	void create() {
		TaskCreated task = // ...
		Message<TaskCreated> message = new GenericMessage<>(task);
		source.output().send(message);
	}

	void start() {
		TaskStarted task = // ...
		Message<TaskStarted> message = new GenericMessage<>(task);
		source.output().send(message);
	}

	void done() {
		TaskDone task = // ...
		Message<TaskDone> message = new GenericMessage<>(task);
		source.output().send(message);
	}
}

Reception processing

@EnableBinding(Sink.class)
class Subscriber {

	@StreamListener(Sink.INPUT)
	void handle(TaskCreated message) {
		System.out.println(message);
	}

	@StreamListener(Sink.INPUT)
	void handle(TaskStarted message) {
		System.out.println(message);
	}

	@StreamListener(Sink.INPUT)
	void handle(TaskDone message) {
		System.out.println(message);
	}
}

output

> id: "xxx-1"
name: "Task 1"

> id: "xxx-1"

> id: "xxx-1"

By sending the Protocol Buffers class as a Message on the sending side and using @StreamListener on the receiving side to specify the class you want to receive, you can use it without being aware of the mechanism of messaging implementation.

Spring Cloud Stream is not limited to Protocol Buffers, but Message Converter allows you to use any format, so it is convenient because it enables code-independent messaging of applications as needed!

Source code

The source code used this time is below.

Recommended Posts

Support Protocol Buffers for Spring Cloud Stream
Spring Cloud Stream demo released
Plans to support JDK 11 for Eclipse and Spring Boot
Try running Spring Cloud Config for the time being
Spring Framework multilingual support
Notes on Protocol Buffers
Spring Cloud Netflix Note
Microservices in Spring Cloud