Uploaded image for project: 'Spring Integration'
  1. Spring Integration
  2. INT-2518

Resequencer With Custom Comparator Does Not Release Properly

    Details

      Description

      The ResequencingMessageGroupProcessor allows a custom Comparator to sort the messages in the group. However, the processMessageGroup() method still uses the sequenceNumber header to decide what can be released.

      The messages are sorted in some arbitrary order and then we release the partial group based on the sequence number...

      currentSequence = extractSequenceNumber(message);
      if (currentSequence - 1 > previousSequence) {
          //there is a gap in the sequence here
          break;
      }
      

      Well, it is very likely there will be a gap because the messages aren't sorted by sequenceNumber.

      It seems to me that, if we allow a custom comparator, we must also allow a custom expression to invoke in extractSequenceNumber (or provide some other mechanism to assess what can be released).

      I modified one of the test cases to show this problem

      @Test
      public void testBasicResequencingWithCustomComparator() throws InterruptedException {
      	this.processor.setComparator(new Comparator<Message<?>>() {			
      		@SuppressWarnings({ "unchecked", "rawtypes" })
      		public int compare(Message<?> o1, Message<?> o2) {
      			return ((Comparable)o1.getPayload()).compareTo(o2.getPayload());
      		}
      	});
      	QueueChannel replyChannel = new QueueChannel();
      	Message<?> message1 = createMessage("789", "ABC", 4, 1, replyChannel);
      	Message<?> message2 = createMessage("123", "ABC", 4, 4, replyChannel);
      	Message<?> message3 = createMessage("456", "ABC", 4, 3, replyChannel);
      	Message<?> message4 = createMessage("111", "ABC", 4, 2, replyChannel);		
      	this.resequencer.handleMessage(message1);
      	this.resequencer.handleMessage(message3);
      	this.resequencer.handleMessage(message2);
      	this.resequencer.handleMessage(message4);		
      	Message<?> reply0 = replyChannel.receive(0);
      	Message<?> reply1 = replyChannel.receive(0);
      	Message<?> reply2 = replyChannel.receive(0);
      	Message<?> reply3 = replyChannel.receive(0);
      	assertNotNull(reply0);
      	assertEquals(new Integer(2), reply0.getHeaders().getSequenceNumber());
      	assertNotNull(reply1);
      	assertEquals(new Integer(4), reply1.getHeaders().getSequenceNumber());
      	assertNotNull(reply2);
      	assertEquals(new Integer(3), reply2.getHeaders().getSequenceNumber());
      	assertNotNull(reply3);
      	assertEquals(new Integer(1), reply3.getHeaders().getSequenceNumber());
      }
      

      Only the first message (message4) is released, even though the full "set" of 4 messages are complete in that sequences 1, 2, 3, 4 are all present.

        Attachments

          Issue Links

            Activity

              People

              • Assignee:
                oleg.zhurakousky@springsource.com Oleg Zhurakousky
                Reporter:
                grussell Gary Russell
              • Votes:
                0 Vote for this issue
                Watchers:
                0 Start watching this issue

                Dates

                • Created:
                  Updated:
                  Resolved:

                  Time Tracking

                  Estimated:
                  Original Estimate - 0.25d
                  0.25d
                  Remaining:
                  Remaining Estimate - 0d
                  0d
                  Logged:
                  Time Spent - 0.25d
                  0.25d