Details

    • Type: Bug
    • Status: Closed
    • Priority: Major
    • Resolution: Complete
    • Affects Version/s: 5.0.6
    • Fix Version/s: 5.0.7
    • Component/s: Core
    • Labels:

      Description

      Now I'm not entirely sure about this one, because I don't know if I'm using the API correctly – the documentation about FluxMessageChannel is somewhat scarce. But anyway, running the below code causes Flux instances to remain in memory (forever?) until the application eventually runs out of memory.

      Update: not 100% sure about this code right now… Trying to simulate what happened in our application is a bit hard

      import org.reactivestreams.Publisher;
      import org.springframework.boot.SpringApplication;
      import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
      import org.springframework.context.annotation.Bean;
      import org.springframework.context.annotation.Configuration;
      import org.springframework.integration.channel.PublishSubscribeChannel;
      import org.springframework.integration.dsl.IntegrationFlow;
      import org.springframework.integration.dsl.IntegrationFlows;
      import org.springframework.messaging.Message;
      import org.springframework.messaging.MessageChannel;
      import org.springframework.messaging.support.GenericMessage;
      import org.springframework.scheduling.annotation.EnableScheduling;
      import org.springframework.scheduling.annotation.SchedulingConfigurer;
      import org.springframework.scheduling.config.ScheduledTaskRegistrar;
      import reactor.core.publisher.Flux;
      
      import java.util.ArrayList;
      import java.util.List;
      import java.util.stream.IntStream;
      
      import static java.util.stream.Collectors.joining;
      import static org.springframework.integration.dsl.channel.MessageChannels.flux;
      
      @Configuration
      @EnableAutoConfiguration
      @EnableScheduling
      public class SpringIntegrationTest implements SchedulingConfigurer {
      
          private static final List<String> PAYLOAD = new ArrayList<>(1024);
      
          static {
              for (int i = 0; i < 1024; i++) {
                  PAYLOAD.add(IntStream.range(0, 10_000).mapToObj(String::valueOf).collect(joining()));
              }
          }
      
          public static void main(String[] args) {
              SpringApplication.run(SpringIntegrationTest.class, args);
          }
      
          @Override
          public void configureTasks(ScheduledTaskRegistrar taskRegistrar) {
              taskRegistrar.addFixedDelayTask(() -> input().send(new GenericMessage<>(PAYLOAD)), 100);
          }
      
          @Bean
          public MessageChannel input() {
              return new PublishSubscribeChannel();
          }
      
          @Bean
          public IntegrationFlow flow() {
              return IntegrationFlows
                      .from(input())
                      .split(new PayloadToFlux())
                      .channel(flux())
                      .handle(System.out::println)
                      .get();
          }
      
          public static final class PayloadToFlux {
      
              public Publisher<String> handle(Message<List<String>> message) {
                  return Flux.fromStream(message.getPayload().stream());
              }
          }
      }
      

        Attachments

          Activity

            People

            • Assignee:
              abilan Artem Bilan
              Reporter:
              timurstrekalov Timur Strekalov
            • Votes:
              0 Vote for this issue
              Watchers:
              2 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved: