Uploaded image for project: 'Spring XD'
  1. Spring XD
  2. XD-2873

Creating Streams sporadically using Kafka as a message bus throws TopicNotFound exception

    XMLWordPrintable

    Details

    • Type: Bug
    • Status: Done
    • Priority: Critical
    • Resolution: Invalid
    • Affects Version/s: 1.1.1
    • Fix Version/s: 1.2 GA
    • Component/s: None
    • Labels:
      None
    • Story Points:
      3
    • Rank (Obsolete):
      9223372036854775807
    • Sprint:
      Sprint 49

      Description

      XD Version Spring XD 1.1.1.Release
      1 Admin on own (on-metal) Rackspace machine
      2 Containers each having own (on-metal) rackspace machine
      1 zookeeper node collocated with admin

      While executing XD performance testing on Rackspace using Kafka as a transport we occasionally get the following exception:

      2015-03-26 18:36:30,677 1.1.1.RELEASE  INFO DeploymentsPathChildrenCache-0 server.DeploymentListener - Path cache event: path=/deployments/modules/allocated/4c3c9ccf-44db-4772-87c2-70c63b82c3aa/foo3.sink.throughput.1, type=CHILD_ADDED
      2015-03-26 18:36:30,685 1.1.1.RELEASE  INFO DeploymentsPathChildrenCache-0 server.DeploymentListener - Deploying module 'throughput' for stream 'foo3'
      2015-03-26 18:36:30,820 1.1.1.RELEASE  INFO DeploymentsPathChildrenCache-0 server.DeploymentListener - Deploying module [[email protected] moduleName = 'throughput', moduleLabel = 'throughput', group = 'foo3', sourceChannelName = [null], sinkChannelName = [null], index = 1, type = sink, parameters = map[[empty]], children = list[[empty]]]
      2015-03-26 18:36:31,372 1.1.1.RELEASE ERROR DeploymentsPathChildrenCache-0 server.DeploymentListener - Exception deploying module
      org.springframework.integration.kafka.core.TopicNotFoundException: No topic named 'foo3.0' found
      	at org.springframework.integration.kafka.core.DefaultConnectionFactory.getPartitions(DefaultConnectionFactory.java:209)
      	at org.springframework.xd.dirt.integration.kafka.KafkaMessageBus.createKafkaConsumer(KafkaMessageBus.java:640)
      	at org.springframework.xd.dirt.integration.kafka.KafkaMessageBus.bindConsumer(KafkaMessageBus.java:454)
      	at org.springframework.xd.dirt.plugins.AbstractMessageBusBinderPlugin.bindMessageConsumer(AbstractMessageBusBinderPlugin.java:275)
      	at org.springframework.xd.dirt.plugins.AbstractMessageBusBinderPlugin.bindConsumerAndProducers(AbstractMessageBusBinderPlugin.java:158)
      	at org.springframework.xd.dirt.plugins.stream.StreamPlugin.postProcessModule(StreamPlugin.java:73)
      	at org.springframework.xd.dirt.module.ModuleDeployer.postProcessModule(ModuleDeployer.java:238)
      	at org.springframework.xd.dirt.module.ModuleDeployer.doDeploy(ModuleDeployer.java:218)
      	at org.springframework.xd.dirt.module.ModuleDeployer.deploy(ModuleDeployer.java:200)
      	at org.springframework.xd.dirt.server.DeploymentListener.deployModule(DeploymentListener.java:363)
      	at org.springframework.xd.dirt.server.DeploymentListener.deployStreamModule(DeploymentListener.java:332)
      	at org.springframework.xd.dirt.server.DeploymentListener.onChildAdded(DeploymentListener.java:179)
      	at org.springframework.xd.dirt.server.DeploymentListener.childEvent(DeploymentListener.java:147)
      	at org.apache.curator.framework.recipes.cache.PathChildrenCache$5.apply(PathChildrenCache.java:509)
      	at org.apache.curator.framework.recipes.cache.PathChildrenCache$5.apply(PathChildrenCache.java:503)
      	at org.apache.curator.framework.listen.ListenerContainer$1.run(ListenerContainer.java:92)
      	at com.google.common.util.concurrent.MoreExecutors$SameThreadExecutorService.execute(MoreExecutors.java:297)
      	at org.apache.curator.framework.listen.ListenerContainer.forEach(ListenerContainer.java:83)
      	at org.apache.curator.framework.recipes.cache.PathChildrenCache.callListeners(PathChildrenCache.java:500)
      	at org.apache.curator.framework.recipes.cache.EventOperation.invoke(EventOperation.java:35)
      	at org.apache.curator.framework.recipes.cache.PathChildrenCache$10.run(PathChildrenCache.java:762)
      	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
      	at java.util.concurrent.FutureTask.run(FutureTask.java:262)
      	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
      	at java.util.concurrent.FutureTask.run(FutureTask.java:262)
      	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
      	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
      	at java.lang.Thread.run(Thread.java:745)
      

      stream used to create the exception:
      stream create foo4 --definition "load-generator --messageSize=1000 --messageCount=10000000 | throughput" --deploy

      After failed deployment. I destroy the stream and recreate it and it works fine.

        Attachments

          Issue Links

            Activity

              People

              Assignee:
              mbogoevici Marius Bogoevici
              Reporter:
              grenfro Glenn Renfro
              Votes:
              1 Vote for this issue
              Watchers:
              4 Start watching this issue

                Dates

                Created:
                Updated:
                Resolved: