Uploaded image for project: 'Spring Integration'
  1. Spring Integration
  2. INT-4444

Allow configuring executor for FluxMessageChannel

    Details

    • Type: Improvement
    • Status: Open
    • Priority: Minor
    • Resolution: Unresolved
    • Affects Version/s: 5.0.3
    • Fix Version/s: 5.2 Backlog
    • Component/s: Core
    • Labels:
      None

      Description

      Currently it is not possible to specify an Executor to be used for handling messages produced by splitting Flux further dowwflow. To support this feature you have to add additional ExecutorChannel just after the FluxMessageChannel. I think it would be nice to have a way to do it directly on FluxMessageChannel, maybe along with a way to also provide a subscribeOn executor, so you don't have to modify the original Flux.

      .handle((payload, headers) -> Flux.range(0, 10)
          .doOnNext(i -> LOG.info("> " + i))
          .subscribeOn(Schedulers.parallel()))
      .split()
      .channel(new FluxMessageChannel())
      .handle(message -> LOG.info(" -> " + message.getPayload())))
      

      produces:

      [     parallel-1] d.a.Application    : > 0
      [     parallel-1] d.a.Application    :  -> 0
      [     parallel-1] d.a.Application    : > 1
      [     parallel-1] d.a.Application    :  -> 1
      [     parallel-1] d.a.Application    : > 2
      [     parallel-1] d.a.Application    :  -> 2
      [     parallel-1] d.a.Application    : > 3
      [     parallel-1] d.a.Application    :  -> 3
      [     parallel-1] d.a.Application    : > 4
      [     parallel-1] d.a.Application    :  -> 4
      [     parallel-1] d.a.Application    : > 5
      [     parallel-1] d.a.Application    :  -> 5
      [     parallel-1] d.a.Application    : > 6
      [     parallel-1] d.a.Application    :  -> 6
      [     parallel-1] d.a.Application    : > 7
      [     parallel-1] d.a.Application    :  -> 7
      [     parallel-1] d.a.Application    : > 8
      [     parallel-1] d.a.Application    :  -> 8
      [     parallel-1] d.a.Application    : > 9
      [     parallel-1] d.a.Application    :  -> 9
      

      while adding:

      .channel(MessageChannels.executor(Executors.newCachedThreadPool()))
      

      just after the FluxMessageChannel

      produces an expected handling behavior:

      [     parallel-1] d.a.Application    : > 0
      [pool-1-thread-2] d.a.Application    :  -> 0
      [     parallel-1] d.a.Application    : > 1
      [pool-1-thread-3] d.a.Application    :  -> 1
      [     parallel-1] d.a.Application    : > 2
      [pool-1-thread-1] d.a.Application    :  -> 2
      [     parallel-1] d.a.Application    : > 3
      [pool-1-thread-4] d.a.Application    :  -> 3
      [     parallel-1] d.a.Application    : > 4
      [pool-1-thread-5] d.a.Application    :  -> 4
      [     parallel-1] d.a.Application    : > 5
      [pool-1-thread-7] d.a.Application    :  -> 5
      [     parallel-1] d.a.Application    : > 6
      [pool-1-thread-8] d.a.Application    :  -> 6
      [     parallel-1] d.a.Application    : > 7
      [pool-1-thread-2] d.a.Application    :  -> 7
      [     parallel-1] d.a.Application    : > 8
      [pool-1-thread-3] d.a.Application    :  -> 8
      [     parallel-1] d.a.Application    : > 9
      [pool-1-thread-4] d.a.Application    :  -> 9
      

      More details could be found there: https://stackoverflow.com/a/49499907/1776585

        Attachments

          Activity

            People

            • Assignee:
              abilan Artem Bilan
              Reporter:
              mkadan Mikhail Kadan
            • Votes:
              0 Vote for this issue
              Watchers:
              2 Start watching this issue

              Dates

              • Created:
                Updated: