Uploaded image for project: 'Spring Data Redis'
  1. Spring Data Redis
  2. DATAREDIS-1175

Unable to read a List from a Redis Stream

    XMLWordPrintable

    Details

      Description

      Trying to add Redis Stream support to Spring Integration I was stuck on this issue for a while and I didn't find a solution. 

      We have this simple test:  

      @Test
      @RedisAvailable
      public void explicitSerializationContextTest() {
      
         List<String> messagePayload = Arrays.asList("Hello", "stream", "message");
      
         RedisSerializationContext<String, Object> serializationContext = redisSerializationContext();
         
         // Writes our List to a Redis Stream 
         streamMessageHandler.setSerializationContext(serializationContext);
         streamMessageHandler.afterPropertiesSet();
         handlerAdapter.handleMessage(new GenericMessage<>(messagePayload));
      
         // We want to read our List of String
         ReactiveRedisTemplate<String, List> template = new ReactiveRedisTemplate(redisConnectionFactory,
               serializationContext);
      
         ObjectRecord<String, List> record = template.opsForStream().read(List.class, StreamOffset.fromStart(ReactiveRedisStreamMessageHandlerTestsContext.STREAM_KEY))
               .blockFirst();
      
         assertThat(record.getStream()).isEqualTo(ReactiveRedisStreamMessageHandlerTestsContext.STREAM_KEY);
      
         assertThat(record.getValue()).containsExactlyInAnyOrder("stream", "message", "Hello");
      }
      
      // Creates the SerializationContext used above
      private RedisSerializationContext redisSerializationContext() {
      
         RedisSerializer stringSerializer = StringRedisSerializer.UTF_8;
      
         RedisSerializationContext.SerializationPair stringSerializerPair = RedisSerializationContext
               .SerializationPair
               .fromSerializer(stringSerializer);
      
         return RedisSerializationContext
               .newSerializationContext()
               .key(stringSerializerPair)
               .value(stringSerializer)
               .hashKey(stringSerializer)
               .hashValue(stringSerializer)
               .build();
      }

       I can see data in the Stream  :

         1) "1593030360926-0"
         2)  1) "_class"
             2) "java.util.Arrays$ArrayList"
             3) "a.[0]._class"
             4) "java.lang.String"
             5) "a.[0]"
             6) "Hello"
             7) "a.[1]._class"
             8) "java.lang.String"
             9) "a.[1]"
            10) "stream"
            11) "a.[2]._class"
            12) "java.lang.String"
            13) "a.[2]"
            14) "message"
            15) "modCount"
            16) "0
      

      But this read method fails:

      template.opsForStream().read(List.class, StreamOffset.fromStart(ReactiveRedisStreamMessageHandlerTestsContext.STREAM_KEY))
      

      The error message is:

      java.lang.IllegalArgumentException: Value must not be null!
      	at org.springframework.util.Assert.notNull(Assert.java:201)
      	at org.springframework.data.redis.connection.stream.Record.of(Record.java:81)
      	at org.springframework.data.redis.connection.stream.MapRecord.toObjectRecord(MapRecord.java:147)
      	at org.springframework.data.redis.core.StreamObjectMapper.toObjectRecord(StreamObjectMapper.java:132)
      	at org.springframework.data.redis.core.ReactiveStreamOperations.lambda$read$4(ReactiveStreamOperations.java:426)
      	at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:100)
      	at reactor.core.publisher.FluxUsingWhen$UsingWhenSubscriber.onNext(FluxUsingWhen.java:355)
      	at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:114)
      	at reactor.core.publisher.MonoFlatMapMany$FlatMapManyInner.onNext(MonoFlatMapMany.java:242)
      	at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onNext(FluxOnErrorResume.java:73)
      	at reactor.core.publisher.MonoFlatMapMany$FlatMapManyInner.onNext(MonoFlatMapMany.java:242)
      	at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:114)
      	at io.lettuce.core.RedisPublisher$ImmediateSubscriber.onNext(RedisPublisher.java:886)
      	at io.lettuce.core.RedisPublisher$RedisSubscription.onNext(RedisPublisher.java:279)
      	at io.lettuce.core.output.StreamingOutput$Subscriber.onNext(StreamingOutput.java:64)
      	at io.lettuce.core.output.StreamReadOutput.complete(StreamReadOutput.java:92)
      	at io.lettuce.core.protocol.RedisStateMachine.decode(RedisStateMachine.java:401)
      	at io.lettuce.core.protocol.CommandHandler.decode(CommandHandler.java:707)
      	at io.lettuce.core.protocol.CommandHandler.decode0(CommandHandler.java:671)
      	at io.lettuce.core.protocol.CommandHandler.decode(CommandHandler.java:666)
      	at io.lettuce.core.protocol.CommandHandler.decode(CommandHandler.java:587)
      	at io.lettuce.core.protocol.CommandHandler.channelRead(CommandHandler.java:556)
      	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
      	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
      	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
      	at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
      	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
      	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
      	at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
      	at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:163)
      	at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:714)
      	at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:650)
      	at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:576)
      	at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493)
      	at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
      	at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
      	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
      	at java.lang.Thread.run(Thread.java:748)
      	Suppressed: java.lang.Exception: #block terminated with an error
      		at reactor.core.publisher.BlockingSingleSubscriber.blockingGet(BlockingSingleSubscriber.java:99)
      		at reactor.core.publisher.Flux.blockFirst(Flux.java:2452)
      		at org.springframework.integration.redis.outbound.ReactiveRedisStreamMessageHandlerTests.explicitSerializationContextTest(ReactiveRedisStreamMessageHandlerTests.java:119)
      		at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
      		at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
      		at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
      		at java.lang.reflect.Method.invoke(Method.java:498)
      		at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
      		at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
      		at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
      		at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
      		at org.springframework.test.context.junit4.statements.RunBeforeTestExecutionCallbacks.evaluate(RunBeforeTestExecutionCallbacks.java:74)
      		at org.springframework.test.context.junit4.statements.RunAfterTestExecutionCallbacks.evaluate(RunAfterTestExecutionCallbacks.java:84)
      		at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
      		at org.springframework.test.context.junit4.statements.RunBeforeTestMethodCallbacks.evaluate(RunBeforeTestMethodCallbacks.java:75)
      		at org.springframework.test.context.junit4.statements.RunAfterTestMethodCallbacks.evaluate(RunAfterTestMethodCallbacks.java:86)
      		at org.springframework.integration.redis.rules.RedisAvailableRule$1.evaluate(RedisAvailableRule.java:86)
      		at org.springframework.test.context.junit4.statements.SpringRepeat.evaluate(SpringRepeat.java:84)
      		at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
      		at org.springframework.test.context.junit4.SpringJUnit4ClassRunner.runChild(SpringJUnit4ClassRunner.java:251)
      		at org.springframework.test.context.junit4.SpringJUnit4ClassRunner.runChild(SpringJUnit4ClassRunner.java:97)
      		at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
      		at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
      		at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
      		at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
      		at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
      		at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
      		at org.springframework.test.context.junit4.statements.RunBeforeTestClassCallbacks.evaluate(RunBeforeTestClassCallbacks.java:61)
      		at org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
      		at org.springframework.test.context.junit4.statements.RunAfterTestClassCallbacks.evaluate(RunAfterTestClassCallbacks.java:70)
      		at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
      		at org.springframework.test.context.junit4.SpringJUnit4ClassRunner.run(SpringJUnit4ClassRunner.java:190)
      		at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
      		at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68)
      		at com.intellij.rt.execution.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:47)
      		at com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:242)
      		at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:70)
      

      What am I doing  wrong?

      Thanks a lot
       

        Attachments

          Activity

            People

            Assignee:
            cstrobl Christoph Strobl
            Reporter:
            akuma8 akuma8
            Last updater:
            Mark Paluch
            Votes:
            0 Vote for this issue
            Watchers:
            1 Start watching this issue

              Dates

              Created:
              Updated:
              Resolved: