Spring Integration
  1. Spring Integration
  2. INT-2108

Consider adding support for processing Messages serially and sequentially when Messages belong to the same group while allowing concurrent processing of Messages that belong to different groups

    Details

      Description

      The current prototype is here: https://github.com/olegz/spring-integration/tree/SerialDispatcher (SerialDispatcher branch)

      Currently I didn't want to change any of the existing core code to either channels and/or dispatchers so I ended up providing implementations of SerialUnicastingDispatcher and SerialExecutorChannel. As you can gather from their names they are variations of UnicastingDispatcher and ExecutorChannel (with few enhancements)

      Simple non-namespace based configuration shows its usage:

      <bean id="serialChannel" class="org.springframework.integration.channel.SerialExecutorChannel">
      	<constructor-arg ref="executor"/>
      </bean>
      	
      <int:logging-channel-adapter channel="serialChannel" log-full-message="true" level="WARN"/>
      
      <task:executor id="executor" pool-size="10"/>
      

      Test case:

      Message<?> messageA = MessageBuilder.withPayload("A").setHeader(MessageHeaders.CORRELATION_ID, "A").setHeader(MessageHeaders.SEQUENCE_NUMBER, 3).build();
      Message<?> messageA1 = MessageBuilder.withPayload("A").setHeader(MessageHeaders.CORRELATION_ID, "A").setHeader(MessageHeaders.SEQUENCE_NUMBER, 1).build();
      Message<?> messageA2 = MessageBuilder.withPayload("A").setHeader(MessageHeaders.CORRELATION_ID, "A").setHeader(MessageHeaders.SEQUENCE_NUMBER, 2).build();
      Message<?> messageA3 = MessageBuilder.withPayload("A").setHeader(MessageHeaders.CORRELATION_ID, "A").setHeader(MessageHeaders.SEQUENCE_NUMBER, 0).build();
      		
      Message<?> messageB = MessageBuilder.withPayload("B").setHeader(MessageHeaders.CORRELATION_ID, "B").setHeader(MessageHeaders.SEQUENCE_NUMBER, 2).build();
      Message<?> messageB1 = MessageBuilder.withPayload("B").setHeader(MessageHeaders.CORRELATION_ID, "B").setHeader(MessageHeaders.SEQUENCE_NUMBER, 1).build();
      Message<?> messageB2 = MessageBuilder.withPayload("B").setHeader(MessageHeaders.CORRELATION_ID, "B").setHeader(MessageHeaders.SEQUENCE_NUMBER, 0).build();
      		
      serialChannel.send(messageB2);
      serialChannel.send(messageA1);
      serialChannel.send(messageA3);
      serialChannel.send(messageB1);
      serialChannel.send(messageA);
      serialChannel.send(messageA2);
      serialChannel.send(messageB);
      

      Output:

      LoggingHandler: [Payload=B][Headers={timestamp=1315348668477, id=e4882612-3b89-4168-8c99-73b12cb97da7, correlationId=B, sequenceNumber=0}]
      LoggingHandler: [Payload=A][Headers={timestamp=1315348668477, id=b2943346-d99b-45ff-93f8-827191d99185, correlationId=A, sequenceNumber=0}]
      LoggingHandler: [Payload=A][Headers={timestamp=1315348668477, id=37140554-ccf1-4a07-bb96-d0bebd71c6a9, correlationId=A, sequenceNumber=1}]
      LoggingHandler: [Payload=B][Headers={timestamp=1315348668477, id=bf792152-157d-4dc5-ae83-f98cb976c22d, correlationId=B, sequenceNumber=1}]
      LoggingHandler: [Payload=A][Headers={timestamp=1315348668477, id=adad4b27-35f7-475a-a3f9-c489216b3414, correlationId=A, sequenceNumber=2}]
      LoggingHandler: [Payload=B][Headers={timestamp=1315348668477, id=e74ffb1f-d585-4391-90cf-848ff62c948a, correlationId=B, sequenceNumber=2}]
      LoggingHandler: [Payload=A][Headers={timestamp=1315348668477, id=7a33c4f8-da5a-4ad5-945c-1e9badff16b7, correlationId=A, sequenceNumber=3}]
      

      As you can see form above there are two groups of Messages that are sent in sporadic order. The come out in sequence and 2 groups are processed concurrently while individual Messages within the same group are processed serially.

      Based on the current implementation this SerialUnicastingDispatcher is also a non-buffering re-sequencer. In other words it re-sequences Messages without storing them in the queue, therefore it does not require MessageStore

        Issue Links

          Activity

          Hide
          Mark Fisher added a comment -

          Moving to 2.2.

          We decided that this can be more general if we consider it as a Semaphore-driven gate. The simple "lock" case can always be handled as a Semaphore with only 1 permit available. In order for a Message to be allowed through the gate it must be able to acquire a permit (either immediately or within a given timeout). At some point downstream (after completing the restricted part of the flow), it will need to release its permit.

          We also discussed the fact that blocking threads that are trying to send is probably not a good approach in terms of scalability (could lead to OOM or lost messages upon system failure). Buffering Messages in a MessageStore while waiting for the lock is better. That then led to the realization that this might be related to the Aggregator/Resequencer at least in terms of reusing the CorrelatingMessageHandler. It is closer to Resequencer in that it releases individual Messages on the other side (instead of aggregating a group into one new Message), but it's simpler than the Resequencer since its "release strategy" is only concerned with the permit acquisition... i.e. it doesn't care at all about any of the other individual Messages from its group. The one part that is different is that somewhere downstream it needs to release it's permit. In that regard, it's a bit like the claim-check (with a symmetric 'in' and 'out' pair).

          Show
          Mark Fisher added a comment - Moving to 2.2. We decided that this can be more general if we consider it as a Semaphore-driven gate. The simple "lock" case can always be handled as a Semaphore with only 1 permit available. In order for a Message to be allowed through the gate it must be able to acquire a permit (either immediately or within a given timeout). At some point downstream (after completing the restricted part of the flow), it will need to release its permit. We also discussed the fact that blocking threads that are trying to send is probably not a good approach in terms of scalability (could lead to OOM or lost messages upon system failure). Buffering Messages in a MessageStore while waiting for the lock is better. That then led to the realization that this might be related to the Aggregator/Resequencer at least in terms of reusing the CorrelatingMessageHandler. It is closer to Resequencer in that it releases individual Messages on the other side (instead of aggregating a group into one new Message), but it's simpler than the Resequencer since its "release strategy" is only concerned with the permit acquisition... i.e. it doesn't care at all about any of the other individual Messages from its group. The one part that is different is that somewhere downstream it needs to release it's permit. In that regard, it's a bit like the claim-check (with a symmetric 'in' and 'out' pair).
          Hide
          Oleg Zhurakousky added a comment -

          This might be no fix as this is easily accomplishable (see S1 samples). Moving it to M2 for now

          Show
          Oleg Zhurakousky added a comment - This might be no fix as this is easily accomplishable (see S1 samples). Moving it to M2 for now

            People

            • Assignee:
              Gary Russell
              Reporter:
              Oleg Zhurakousky
            • Votes:
              2 Vote for this issue
              Watchers:
              2 Start watching this issue

              Dates

              • Created:
                Updated: