Uploaded image for project: 'Spring Integration'
  1. Spring Integration
  2. INT-2108

Consider adding support for processing Messages serially and sequentially when Messages belong to the same group while allowing concurrent processing of Messages that belong to different groups

    Details

      Description

      The current prototype is here: https://github.com/olegz/spring-integration/tree/SerialDispatcher (SerialDispatcher branch)

      Currently I didn't want to change any of the existing core code to either channels and/or dispatchers so I ended up providing implementations of SerialUnicastingDispatcher and SerialExecutorChannel. As you can gather from their names they are variations of UnicastingDispatcher and ExecutorChannel (with few enhancements)

      Simple non-namespace based configuration shows its usage:

      <bean id="serialChannel" class="org.springframework.integration.channel.SerialExecutorChannel">
      	<constructor-arg ref="executor"/>
      </bean>
      	
      <int:logging-channel-adapter channel="serialChannel" log-full-message="true" level="WARN"/>
      
      <task:executor id="executor" pool-size="10"/>
      

      Test case:

      Message<?> messageA = MessageBuilder.withPayload("A").setHeader(MessageHeaders.CORRELATION_ID, "A").setHeader(MessageHeaders.SEQUENCE_NUMBER, 3).build();
      Message<?> messageA1 = MessageBuilder.withPayload("A").setHeader(MessageHeaders.CORRELATION_ID, "A").setHeader(MessageHeaders.SEQUENCE_NUMBER, 1).build();
      Message<?> messageA2 = MessageBuilder.withPayload("A").setHeader(MessageHeaders.CORRELATION_ID, "A").setHeader(MessageHeaders.SEQUENCE_NUMBER, 2).build();
      Message<?> messageA3 = MessageBuilder.withPayload("A").setHeader(MessageHeaders.CORRELATION_ID, "A").setHeader(MessageHeaders.SEQUENCE_NUMBER, 0).build();
      		
      Message<?> messageB = MessageBuilder.withPayload("B").setHeader(MessageHeaders.CORRELATION_ID, "B").setHeader(MessageHeaders.SEQUENCE_NUMBER, 2).build();
      Message<?> messageB1 = MessageBuilder.withPayload("B").setHeader(MessageHeaders.CORRELATION_ID, "B").setHeader(MessageHeaders.SEQUENCE_NUMBER, 1).build();
      Message<?> messageB2 = MessageBuilder.withPayload("B").setHeader(MessageHeaders.CORRELATION_ID, "B").setHeader(MessageHeaders.SEQUENCE_NUMBER, 0).build();
      		
      serialChannel.send(messageB2);
      serialChannel.send(messageA1);
      serialChannel.send(messageA3);
      serialChannel.send(messageB1);
      serialChannel.send(messageA);
      serialChannel.send(messageA2);
      serialChannel.send(messageB);
      

      Output:

      LoggingHandler: [Payload=B][Headers={timestamp=1315348668477, id=e4882612-3b89-4168-8c99-73b12cb97da7, correlationId=B, sequenceNumber=0}]
      LoggingHandler: [Payload=A][Headers={timestamp=1315348668477, id=b2943346-d99b-45ff-93f8-827191d99185, correlationId=A, sequenceNumber=0}]
      LoggingHandler: [Payload=A][Headers={timestamp=1315348668477, id=37140554-ccf1-4a07-bb96-d0bebd71c6a9, correlationId=A, sequenceNumber=1}]
      LoggingHandler: [Payload=B][Headers={timestamp=1315348668477, id=bf792152-157d-4dc5-ae83-f98cb976c22d, correlationId=B, sequenceNumber=1}]
      LoggingHandler: [Payload=A][Headers={timestamp=1315348668477, id=adad4b27-35f7-475a-a3f9-c489216b3414, correlationId=A, sequenceNumber=2}]
      LoggingHandler: [Payload=B][Headers={timestamp=1315348668477, id=e74ffb1f-d585-4391-90cf-848ff62c948a, correlationId=B, sequenceNumber=2}]
      LoggingHandler: [Payload=A][Headers={timestamp=1315348668477, id=7a33c4f8-da5a-4ad5-945c-1e9badff16b7, correlationId=A, sequenceNumber=3}]
      

      As you can see form above there are two groups of Messages that are sent in sporadic order. The come out in sequence and 2 groups are processed concurrently while individual Messages within the same group are processed serially.

      Based on the current implementation this SerialUnicastingDispatcher is also a non-buffering re-sequencer. In other words it re-sequences Messages without storing them in the queue, therefore it does not require MessageStore

        Attachments

          Issue Links

            Activity

              People

              • Assignee:
                grussell Gary Russell
                Reporter:
                oleg.zhurakousky@springsource.com Oleg Zhurakousky
              • Votes:
                2 Vote for this issue
                Watchers:
                2 Start watching this issue

                Dates

                • Created:
                  Updated: