Spring Integration
  1. Spring Integration
  2. INT-2518

Resequencer With Custom Comparator Does Not Release Properly

    Details

      Description

      The ResequencingMessageGroupProcessor allows a custom Comparator to sort the messages in the group. However, the processMessageGroup() method still uses the sequenceNumber header to decide what can be released.

      The messages are sorted in some arbitrary order and then we release the partial group based on the sequence number...

      currentSequence = extractSequenceNumber(message);
      if (currentSequence - 1 > previousSequence) {
          //there is a gap in the sequence here
          break;
      }
      

      Well, it is very likely there will be a gap because the messages aren't sorted by sequenceNumber.

      It seems to me that, if we allow a custom comparator, we must also allow a custom expression to invoke in extractSequenceNumber (or provide some other mechanism to assess what can be released).

      I modified one of the test cases to show this problem

      @Test
      public void testBasicResequencingWithCustomComparator() throws InterruptedException {
      	this.processor.setComparator(new Comparator<Message<?>>() {			
      		@SuppressWarnings({ "unchecked", "rawtypes" })
      		public int compare(Message<?> o1, Message<?> o2) {
      			return ((Comparable)o1.getPayload()).compareTo(o2.getPayload());
      		}
      	});
      	QueueChannel replyChannel = new QueueChannel();
      	Message<?> message1 = createMessage("789", "ABC", 4, 1, replyChannel);
      	Message<?> message2 = createMessage("123", "ABC", 4, 4, replyChannel);
      	Message<?> message3 = createMessage("456", "ABC", 4, 3, replyChannel);
      	Message<?> message4 = createMessage("111", "ABC", 4, 2, replyChannel);		
      	this.resequencer.handleMessage(message1);
      	this.resequencer.handleMessage(message3);
      	this.resequencer.handleMessage(message2);
      	this.resequencer.handleMessage(message4);		
      	Message<?> reply0 = replyChannel.receive(0);
      	Message<?> reply1 = replyChannel.receive(0);
      	Message<?> reply2 = replyChannel.receive(0);
      	Message<?> reply3 = replyChannel.receive(0);
      	assertNotNull(reply0);
      	assertEquals(new Integer(2), reply0.getHeaders().getSequenceNumber());
      	assertNotNull(reply1);
      	assertEquals(new Integer(4), reply1.getHeaders().getSequenceNumber());
      	assertNotNull(reply2);
      	assertEquals(new Integer(3), reply2.getHeaders().getSequenceNumber());
      	assertNotNull(reply3);
      	assertEquals(new Integer(1), reply3.getHeaders().getSequenceNumber());
      }
      

      Only the first message (message4) is released, even though the full "set" of 4 messages are complete in that sequences 1, 2, 3, 4 are all present.

        Issue Links

          Activity

          Hide
          Oleg Zhurakousky added a comment -

          The actual issue is that any Comparator that is not comparing messages based on their sequential position will not work here since the its not about putting messages in order as it is about detecting a gap between the sequences. So in other words the bug is actually in the design.
          What we should have is a new feature/improvement request to expose SpEL expression to allow user to point to a custom header or something in the payload that would represent the sequential position of the Message.
          For now we've decided that fixing this issue means removing setComparator(..) method from the ResequencingMessageGroupProcessor and raise a new JIRA as explained above.

          Show
          Oleg Zhurakousky added a comment - The actual issue is that any Comparator that is not comparing messages based on their sequential position will not work here since the its not about putting messages in order as it is about detecting a gap between the sequences. So in other words the bug is actually in the design. What we should have is a new feature/improvement request to expose SpEL expression to allow user to point to a custom header or something in the payload that would represent the sequential position of the Message. For now we've decided that fixing this issue means removing setComparator(..) method from the ResequencingMessageGroupProcessor and raise a new JIRA as explained above.
          Show
          Oleg Zhurakousky added a comment - PR https://github.com/SpringSource/spring-integration/pull/477
          Hide
          Gary Russell added a comment -

          Work around is to set the sequenceNumber header upstream.

          Show
          Gary Russell added a comment - Work around is to set the sequenceNumber header upstream.

            People

            • Assignee:
              Oleg Zhurakousky
              Reporter:
              Gary Russell
            • Votes:
              0 Vote for this issue
              Watchers:
              0 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Time Tracking

                Estimated:
                Original Estimate - 0.25d
                0.25d
                Remaining:
                Remaining Estimate - 0d
                0d
                Logged:
                Time Spent - 0.25d
                0.25d