Uploaded image for project: 'Spring AMQP'
  1. Spring AMQP
  2. AMQP-690

Fix race condition causing SimpleMessageListenerContainer to drop consumers to zero

    Details

      Description

      What is happening is that a SimpleMessageListenerContainer that hits the logic of scaling consumers up and down can reach a point where consumer count is 0. This is caused by a subtle race condition that exhibits itself in the considerStoppingAConsumer method.

      To shed some light on the matter. Lets consider a situation where we have a SimpleMessageListenerContainer that has been configured to have a maximum of 2 and minimum of 1 consumer. If at a certain point we have scaled to two consumers(call them C1 and C2) and the point of sufficient queue load has passed (no consecutive successful polls n times, etc), both consumers start to hit the considerStoppingAConsumer method.

      So at this point C1 comes along and starts executing:

      private void considerStoppingAConsumer(BlockingQueueConsumer consumer) {
      	synchronized (consumersMonitor) {
      		if (this.consumers != null && this.consumers.size() > concurrentConsumers) {
      			long now = System.currentTimeMillis();
      			if (this.lastConsumerStopped + this.stopConsumerMinInterval < now) {
      				consumer.basicCancel();
      				this.consumers.put(consumer, false);
      				if (logger.isDebugEnabled()) {
      					logger.debug("Idle consumer terminating: " + consumer);
      				}
      				this.lastConsumerStopped = now;
      			}
      		}
      	}
      }
      

      All it does is a basic cancel and a flip on the boolean in the "consumers" map. However what is important to note is that to enter into the process of canceling itself the main condition that needs to be met is associated with the size of this map, which does not change at this point. So C1 comes along and hits this code, cancels itself and as a result of that AsyncMessageProcessingConsumer breaks out of its loop and starts the whole process of gracefully shutting down the consumer. At the end of this process the AsyncMessageProcessingConsumer acquires the consumers monitor and executes:

      synchronized (consumersMonitor) {
      	if (SimpleMessageListenerContainer.this.consumers != null) {
      		SimpleMessageListenerContainer.this.consumers.remove(this.consumer);
      	}
      }
      

      This all makes sense if it wasn't for the fact that between these two events happening C2 can start executing considerStoppingAConsumer and see that the map is the very same size that C1 has seen it be (as C1's AsyncMessageProcessingConsumer has not yet reach the point of removing the consumer from the map). All of that results in consumer count dropping to zero.

        Attachments

          Issue Links

            Activity

              People

              • Assignee:
                abilan Artem Bilan
                Reporter:
                zaharidichev@gmail.com Zahari Dichev
              • Votes:
                0 Vote for this issue
                Watchers:
                1 Start watching this issue

                Dates

                • Created:
                  Updated:
                  Resolved: