How to access Socket directly with the TCP function of Spring Integration

When creating a TCP client using the TCP function of Spring Integration, there were cases (sometimes) where I wanted to make direct contact with Socket, so make a note of how to make direct contact with Socket.

The case where I wanted to make direct contact with Socket is ... FIN for normal disconnection, and RST for abnormal disconnection (when some error occurs)! It was a time when it was necessary to meet the connection requirement. In the TCP function of Spring Integration, it is possible to specify whether to use FIN or RST as the disconnection method in the Connection Factory setting, but it was not possible to switch depending on the conditions (in the range examined ...). ..

Use of TcpSocketSupport

Spring Integration TcpSocketSupport To support applying arbitrary processing to the Socket generated on the framework side (Socket linked to SocketChannel in the case of NIO). spring.io/spring-integration/docs/5.1.2.RELEASE/reference/htmlsingle/#_the_literal_tcpsocketsupport_literal_strategy_interface) is provided. By default, the class DefaultTcpSocketSupport is applied, but the process that changes the state of Socket is not executed.

Apply implementation to RST when exception occurs

The TCP function of Spring Integration supports a mechanism for handling events (connection, disconnection, error detection, etc.) that occur during processing within Spring Integration, and the code introduced here handles these events. , I tried to make the code to disconnect after detecting an exception with RST. It should be noted that the code introduced here is not the code applied (or planned to be) to the actual application, so it cannot be used as it is. (It is just a sample)

package com.example.demo;

import java.net.Socket;
import java.net.SocketException;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.context.annotation.Bean;
import org.springframework.context.event.EventListener;
import org.springframework.integration.core.MessagingTemplate;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.IntegrationFlows;
import org.springframework.integration.dsl.Transformers;
import org.springframework.integration.ip.dsl.Tcp;
import org.springframework.integration.ip.tcp.connection.*;
import org.springframework.integration.ip.tcp.serializer.SoftEndOfStreamException;
import org.springframework.integration.ip.tcp.serializer.TcpCodecs;

@SpringBootApplication
public class SprIntDemoApplication {

  public static void main(String[] args) {
    SpringApplication.run(SprIntDemoApplication.class, args);
  }

  // TCP Server
  @Bean
  public IntegrationFlow integrationInboundFlow() {
    return IntegrationFlows.from(Tcp.inboundGateway(Tcp.nioServer(5555)
        .serializer(TcpCodecs.crlf())
        .deserializer(TcpCodecs.crlf())
        .get()))
        .transform(Transformers.objectToString()) // byte[] -> String
        .transform(m -> m) //Reply the received message as it is
        .get();
  }

  // TCP Client
  @Bean
  public IntegrationFlow integrationOutboundFlow(ApplicationEventPublisher publisher) {
    AbstractClientConnectionFactory factory = Tcp.nioClient("localhost", 5555)
        .serializer(TcpCodecs.crlf())
        .deserializer(TcpCodecs.crlf())
        .tcpSocketSupport(socketManager()) //Apply support class to manage generated Socket
        .get();
    factory.setApplicationEventPublisher(publisher);
    return flow -> flow.handle(Tcp.outboundGateway(factory))
        .transform(Transformers.objectToString());  // byte[] -> String
  }

  @Bean
  public SocketManager socketManager() {
    return new SocketManager();
  }

  @Bean
  public MessagingTemplate messagingTemplate() {
    return new MessagingTemplate();
  }

  static class SocketManager extends DefaultTcpSocketSupport {
    private final Map<Integer, Socket> sockets = new ConcurrentHashMap<>();

    @Override
    public void postProcessSocket(Socket socket) {
      super.postProcessSocket(socket);
      sockets.put(socket.getLocalPort(), socket); //Save it internally so that you can access the Socket when an abnormality is detected.
    }

    @EventListener
    public void handleTcpConnectionExceptionEvent(TcpConnectionExceptionEvent event) {
      try {
        int localPort = ((TcpConnection) event.getSource()).getSocketInfo().getLocalPort();
        Socket socket = sockets.get(localPort);
        if (!socket.isClosed() && !(event.getCause() instanceof SoftEndOfStreamException)) {
          sockets.get(localPort).setSoLinger(true, 0); //Set to disconnect by RST
        }
      } catch (SocketException e) {
        // ignore
      }
    }

    @EventListener
    public void handleTcpConnectionCloseEvent(TcpConnectionCloseEvent event) {
      sockets.remove(((TcpConnection) event.getSource()).getSocketInfo().getLocalPort()); //Clean Sockets that are no longer needed after disconnection
    }

  }

}
package com.example.demo;

import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.integration.core.MessagingTemplate;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.test.context.junit4.SpringRunner;

@RunWith(SpringRunner.class)
@SpringBootTest
public class SpIntDemoApplicationTests {

  @Autowired
  MessagingTemplate template;

  @Test
  public void contextLoads() {
    Message<?> reply = template.sendAndReceive("integrationOutboundFlow.input",
        MessageBuilder.withPayload("hello!").build());
    System.out.println("reply: " + reply);
  }

}

By the way ... If you execute the above code as it is, no error (exception) will occur during processing, so in order to generate an error, it is necessary to disconnect the Socket during processing on the server side, but this book I will omit that part in the entry.

Summary

When I read the document, I feel that I am using it beyond the role of TcpSocketSupport, but it was helpful that the mechanism that allows me to touch Socket was supported.

Reference document

Recommended Posts

How to access Socket directly with the TCP function of Spring Integration
How to check before sending a message to the server with Spring Integration
I examined the flow of TCP communication with Spring Integration (client edition)
I examined the flow of TCP communication with Spring Integration (server edition)
How to apply HandlerInterceptor to http inbound-gateway of Spring Integration
Access the built-in h2db of spring boot with jdbcTemplate
How to boot by environment with Spring Boot of Maven
Let's implement a function to limit the number of access to the API with SpringBoot + Redis
How to add the delete function
How to use MinIO with the same function as S3 Use docker-compose
How to realize huge file upload with Rest Template of Spring
[Java] How to use the hasNext function
How to determine the number of parallels
[Swift] How to implement the countdown function
How to sort the List of SelectItem
How to implement TextInputLayout with validation function
[Processing × Java] How to use the function
How to apply thymeleaf changes to the browser immediately with #Spring Boot + maven
How to decorate the radio button of rails6 form_with (helper) with CSS
[Spring Boot] I investigated how to implement post-processing of the received request.
How to read Body of Request multiple times with Spring Boot + Spring Security
How to convert an array of Strings to an array of objects with the Stream API
I tried to visualize the access of Lambda → Athena with AWS X-Ray
[swift5] How to change the color of TabBar or the color of item of TabBar with code
How to implement the email authentication function at the time of user registration
How to find the cause of the Ruby error
Print forms directly to the printer with JasperReports
Customize how to divide the contents of Recyclerview
How to use MyBatis2 (iBatis) with Spring Boot 1.4 (Spring 4)
[Rails6] How to connect the posting function generated by Scaffold with the user function generated by devise
How to use built-in h2db with spring boot
[Swift] How to implement the LINE login function
How to check the latest version of io.spring.platform to describe in pom.xml of Spring (STS)
[swift5] How to implement the Twitter share function
The story of raising Spring Boot 1.5 series to 2.1 series
How to implement the breadcrumb function using gretel
Try to implement login function with Spring Boot
How to get today's day of the week
[For beginners] How to implement the delete function
Output of how to use the slice method
[Swift] How to implement the fade-in / out function
Resource handler settings when delivering SPA with the static resource function of Spring Boot
How to display the result of form input
How to set environment variables in the properties file of Spring boot application
[Java] How to get the authority of the folder
[Spring Boot] How to refer to the property file
[Introduction to Spring Boot] Authentication function with Spring Security
How to build a Jenkins server with a Docker container on CentOS 7 of VirtualBox and access the Jenkins server from a local PC
If you use SQLite with VSCode, use the extension (how to see the binary file of sqlite3)
How to use git with the power of jgit in an environment without git commands
How to request by passing an array to the query with HTTP Client of Ruby
[Docker] How to see the contents of Volumes. Start a container with root privileges.
How to set the retry limit of sidekiq and notify dead queues with slack
[Rough explanation] How to separate the operation of the production environment and the development environment with Rails
Summary of how to use the proxy set in IE when connecting with Java
[Java] How to get the URL of the transition source
How to change the action with multiple submit buttons
How to delete / update the list field of OneToMany
How to write Scala from the perspective of Java
[Java] How to omit spring constructor injection with Lombok
How to make LINE messaging function made with Ruby