Uploaded image for project: 'Spring Integration'
  1. Spring Integration
  2. INT-3633

Introduce a MessageSourceAdvice and callback hooks to that class within AbstractPollingEndpoint

    XMLWordPrintable

    Details

    • Type: Improvement
    • Status: Closed
    • Priority: Minor
    • Resolution: Complete
    • Affects Version/s: 4.1.1
    • Fix Version/s: 4.2 M1
    • Component/s: Core
    • Labels:

      Description

      Spring Integration's current offering for "hooks" into the workings of a `Poller` are via the `advice-chain` which only allows a hook into `Callable.call()`. This however is a bit limited since it doesn't allow developers to be able to peak at the number of rows returned by the poller (that's done within the `Message<T> receive()` method).
      I'm also aware that the `PollSkipAdvice` and `PollSkipStrategy` were introduced in 4.1 but they also do not allow for what I'm suggesting.

      If you look at the [Polling Consumer](http://camel.apache.org/polling-consumer.html) of Apache Camel you'll notice that their Polling Consumer offers these options:
      pollStrategy
      greedy
      backoffMultiplier
      backoffIdleThreshold
      backoffErrorThreshold

      I'm proposing that the Spring team introduce a `org.springframework.integration.scheduling.PollingStrategy` interface that looks similar to this:

          
          public interface PollingStrategy {
          	void afterOnInit();
          	void beforeReceive();
          	Message<?> afterReceive(Message<?> returnValue);
          	PollerMetadata getPollerMetadata();
          	void setPollerMetadata(PollerMetadata pollerMetadata);
          	Object getBoundResource();
          	void setBoundResource(Object resourceToBind);
          }
      

      Attached is a class diagram of how the classes I envision might look. Items in yellow currently exist, items in blue are newly introduced. This is preliminary and is expected to be used to spark the conversation.

      Following changes would need to be made:

      Class Changes Required
      `PollingConsumer`
      `SourcePollingChannelAdapter`
      `AbstractPollingEndpoint`
      Callback to `PollingStrategy` for:
      `void afterOnInit()`
      `void beforeReceive()`
      `Message<?> afterReceive(Message<?> returnValue)`
      `PollerMetadata` Add `PollingStrategy` property.
      `SourcePollingChannelAdapterFactoryBean` If `PollerMetadata` contains `PollingStrategy` call these methods on that object:
      `void setPollerMetadata(PollerMetadata pollerMetadata)`
      `void setBoundResource(Object resourceToBind)`

      Implementations of the `PollingStrategy` would vary but some initial ideas (these could be made more generic):
      `PollLessFrequentlyForHighVolumePollingStrategy`
      `PollLessFrequentlyForLowVolumePollingStrategy`
      `PollMoreFrequentlyForHighVolumePollingStrategy`
      `PollMoreFrequentlyForLowVolumePollingStrategy`
      `VariousPollingFrequenciesBasedOnVolumePollingStrategy` (i.e. like a sliding-scale that would have different frequencies depending on different thresholds being met).

      I have initial code I can share for the classes/interfaces in blue. Currently the hook into the framework however is via an `AfterReturningAdvice`.
      Example of a configuration file in Spring Integration 4.1.1:

          <bean id="jdbcDynamicTrigger" class="DynamicPeriodicTrigger">
          	<constructor-arg name="period" value="20000" />
          </bean>	
          <bean id="jdbcPollerMetaData" class="org.springframework.integration.scheduling.PollerMetadata">
          	<property name="maxMessagesPerPoll" value="1000"/>
          	<property name="trigger" ref="jdbcDynamicTrigger"/>
          </bean>
          <bean id="pollMoreFrequentlyForHighVolumePollingStrategy" class="springintegration.scheduling.PollMoreFrequentlyForHighVolumePollingStrategy">
          	<property name="newPeriod" value="1"/>
          	<property name="adjustmentThreshold" value="100"/>
          	<property name="pollerMetadata" ref="jdbcPollerMetaData"/>
          </bean>	
          <aop:config>
          	<aop:aspect ref="pollMoreFrequentlyForHighVolumePollingStrategy" >
          		<aop:after-returning pointcut="bean(jdbcInAdapterBean) and execution(* *.receive(..))" method="afterPoll" returning="returnValue"/>
          	</aop:aspect>	
          </aop:config>	
          <bean id="jdbcInAdapterBean" class="org.springframework.integration.jdbc.JdbcPollingChannelAdapter">
          	<constructor-arg ref="myDataSource" />
          	<constructor-arg value="SELECT column1, column2 from tableA" />
          	<property name="maxRowsPerPoll" value="100" />
          </bean>	
          <int:inbound-channel-adapter id="jdbcInAdapter" ref="jdbcInAdapterBean" 
          	channel="inputChannel"
          	auto-startup="false">
          	<int:poller ref="jdbcPollerMetaData" />
          </int:inbound-channel-adapter>
       

      Example of same above configuration if the changes I'm proposing were adopted:

          <int-jdbc:inbound-channel-adapter id="jdbcInAdapter" 
          	channel="inputChannel" data-source="myDataSource"
          	query="SELECT column1, column2 from tableA"
          	max-rows-per-poll="100">
          	<int:poller fixed-delay="10000" trigger="jdbcDynamicTrigger" polling-strategy="pollMoreFrequentlyForHighVolumePollingStrategy" />
          </int-jdbc:inbound-channel-adapter>   
          <bean id="jdbcDynamicTrigger" class="DynamicPeriodicTrigger">
          	<constructor-arg name="period" value="20000" />
          </bean>	
          <bean id="pollMoreFrequentlyForHighVolumePollingStrategy" class="springintegration.scheduling.PollMoreFrequentlyForHighVolumePollingStrategy">
          	<property name="newPeriod" value="1"/>
          	<property name="adjustmentThreshold" value="100"/>
          </bean>	
       

        Attachments

          Issue Links

            Activity

              People

              Assignee:
              grussell Gary Russell
              Reporter:
              tonyjoe Anthony Falabella
              Votes:
              0 Vote for this issue
              Watchers:
              3 Start watching this issue

                Dates

                Created:
                Updated:
                Resolved: