Spring Batch
  1. Spring Batch
  2. BATCH-1942

Storage of a LinkedList in ExecutionContext doesn't persist after a Job failed

    Details

    • Type: Bug Bug
    • Status: Resolved
    • Priority: Major Major
    • Resolution: Cannot Reproduce
    • Affects Version/s: 2.0.4
    • Fix Version/s: None
    • Component/s: Core
    • Labels:
      None

      Description

      In the reader's update()-method I wanted to store a LinkedList of Longs (my items) in the executionContext. So when the job would restart the open()-method could read them from there and continue with these items.

      I wrote myself a JUnit test and simulated exceptions for certain calls to reader until the skip limit was reached to make the job fail. Before the job failed (I paused the execution in debugging mode) I could see that my LinkedList got persisted in the database. But when I continued and let the job fail, the LinkedList in the database's execution context was empty!

      Even after long debugging I couldn't find the coe which caused this. Furthermore I also stored some other context information as primitive integer and this worked fine!
      The final workaround was to avoid the serialisation of the LinkedList and convert the List to a String by myself. Putting the String in the ExecutionContext worked fine.

        Activity

        Hide
        Michael Minella added a comment - - edited

        Do you have a unit test that mimics this behavior? I just wrote a quick job that fails the first time and I can see that it does have my LinkedList serialized in the execution context (step_execution_context table) and it is retrieved with all of it's elements on restart. Below is the code for the ItemReader I used:

        package org.springsource.batch.reader;
        
        import java.util.LinkedList;
        import java.util.List;
        
        import org.springframework.batch.item.ExecutionContext;
        import org.springframework.batch.item.file.FlatFileItemReader;
        import org.springframework.util.CollectionUtils;
        
        public class FlatFileItemReaderWrapper extends FlatFileItemReader {
        
        	@Override
        	public void update(ExecutionContext context) {
        		List<String> linkedList = new LinkedList<String>();
        		linkedList.add("1");
        		linkedList.add("2");
        		linkedList.add("3");
        
        		context.put("linkedList", linkedList);
        
        		super.update(context);
        	}
        
        	@Override
        	public void open(ExecutionContext context) {
        		List<String> linkedList = (List) context.get("linkedList");
        
        		if(CollectionUtils.isEmpty(linkedList)) {
        			System.out.println("============ Linked list is empty");
        		} else {
        			System.out.println("++++++++++++ LINKED LIST HAS " + linkedList.size() + " ELEMENTS");
        		}
        
        		super.open(context);
        	}
        }
        

        With the code above, the first time my job runs (and fails), I receive the following output:

        =========== Linked list is empty
        

        When restarted, I receive the following:

        ++++++++++++ LINKED LIST HAS 3 ELEMENTS
        

        Finally, in between the two executions, this is the data I see in my step_execution_context table:

        
        mysql> select * from batch_step_execution_context where step_execution_id = (select max(step_execution_id) from batch_step_execution);
        +-------------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+--------------------+
        | STEP_EXECUTION_ID | SHORT_CONTEXT                                                                                                                                                                                                                                                             | SERIALIZED_CONTEXT |
        +-------------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+--------------------+
        |              8271 | {"map":{"entry":[{"string":"StaxEventItemWriter.record.count","long":0},{"string":"StaxEventItemWriter.position","long":49},{"string":"FlatFileItemReader.read.count","int":0},{"string":["been_read","yup"]},{"string":"linkedList","linked-list":{"string":[1,2,3]}}]}} | NULL               |
        +-------------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+--------------------+
        1 row in set (0.00 sec)
        
        Show
        Michael Minella added a comment - - edited Do you have a unit test that mimics this behavior? I just wrote a quick job that fails the first time and I can see that it does have my LinkedList serialized in the execution context (step_execution_context table) and it is retrieved with all of it's elements on restart. Below is the code for the ItemReader I used: package org.springsource.batch.reader; import java.util.LinkedList; import java.util.List; import org.springframework.batch.item.ExecutionContext; import org.springframework.batch.item.file.FlatFileItemReader; import org.springframework.util.CollectionUtils; public class FlatFileItemReaderWrapper extends FlatFileItemReader { @Override public void update(ExecutionContext context) { List< String > linkedList = new LinkedList< String >(); linkedList.add( "1" ); linkedList.add( "2" ); linkedList.add( "3" ); context.put( "linkedList" , linkedList); super .update(context); } @Override public void open(ExecutionContext context) { List< String > linkedList = (List) context.get( "linkedList" ); if (CollectionUtils.isEmpty(linkedList)) { System .out.println( "============ Linked list is empty" ); } else { System .out.println( "++++++++++++ LINKED LIST HAS " + linkedList.size() + " ELEMENTS" ); } super .open(context); } } With the code above, the first time my job runs (and fails), I receive the following output: =========== Linked list is empty When restarted, I receive the following: ++++++++++++ LINKED LIST HAS 3 ELEMENTS Finally, in between the two executions, this is the data I see in my step_execution_context table: mysql> select * from batch_step_execution_context where step_execution_id = (select max(step_execution_id) from batch_step_execution); +-------------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+--------------------+ | STEP_EXECUTION_ID | SHORT_CONTEXT | SERIALIZED_CONTEXT | +-------------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+--------------------+ | 8271 | { "map" :{ "entry" :[{ "string" : "StaxEventItemWriter.record.count" , " long " :0},{ "string" : "StaxEventItemWriter.position" , " long " :49},{ "string" : "FlatFileItemReader.read.count" , " int " :0},{ "string" :[ "been_read" , "yup" ]},{ "string" : "linkedList" , "linked-list" :{ "string" :[1,2,3]}}]}} | NULL | +-------------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+--------------------+ 1 row in set (0.00 sec)
        Hide
        Peter Wippermann added a comment -

        I already thought of a Unit test for this but my code is so specific, so I tried to avoid this in the first place. But of course, I can try to prepare one for you tomorrow. Please stay tuned

        Show
        Peter Wippermann added a comment - I already thought of a Unit test for this but my code is so specific, so I tried to avoid this in the first place. But of course, I can try to prepare one for you tomorrow. Please stay tuned
        Hide
        Peter Wippermann added a comment -

        Ok, I'm afraid I can't publish more relevant Reader code. In general I can say the reader trys to read a chunk of items first. If there are any exceptions it will read the items one-by-one.

        Maybe the difference between your scenario and mine is how the job failed?
        This is my unit test to check the failing behaviour. I omitted the restart of the job, which would follow afterwards. Please notice that I used EasyMock to check to calls of the Reader against my item delivering serice. I'm also making use of an own Answer class, which can return exceptions for specific items.

        Unit test
        //Expected parameters of the method call to answer are: 1) int start 2) int amount
        //This answer will provide items 0-99 in a range list. But the items specified in the array are faulty and will lead to an exception if one of these items is requested.
        final IAnswer<List<Long>> answer = new AnswerWithSelectedFaultyItems(100, new int[]{13, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37});
        
        //The reader will try to read an item chunk of size 10
        EasyMock.expect(serviceCalledByReader.method(0, 10)).andAnswer(answer);
        //Next call leads to exception (faulty item 13!) and reader will try to read a single item then
        for (int x = 10; x <= 13; x++) {
        	EasyMock.expect(serviceCalledByReader.method(x, 10)).andAnswer(answer);
        	EasyMock.expect(serviceCalledByReader.method(x, 1)).andAnswer(answer);
        }
        // Items 14-23 are good again
        EasyMock.expect(serviceCalledByReader.method(14, 10)).andAnswer(answer);
        for (int x = 24; x <= 33; x++) {
        	// Skip limit of 10 will be reached with item 32
        	// Item 33 will cause the job to fail then
        	EasyMock.expect(serviceCalledByReader.method(x, 10)).andAnswer(answer);
        	EasyMock.expect(serviceCalledByReader.method(x, 1)).andAnswer(answer);
        }
        
        EasyMock.replay(serviceCalledByReader);
        final JobExecution firstStart = launchJob(this.currentUniqueJobParameters);
        assertEquals(BatchStatus.FAILED, firstStart.getStatus());
        EasyMock.verify(serviceCalledByReader);
        
        EasyMock.reset(serviceCalledByReader);
        

        I hope this helps. If it's still too vague for you to trace this bug, I can accept this ticket to be closed.

        Show
        Peter Wippermann added a comment - Ok, I'm afraid I can't publish more relevant Reader code. In general I can say the reader trys to read a chunk of items first. If there are any exceptions it will read the items one-by-one. Maybe the difference between your scenario and mine is how the job failed? This is my unit test to check the failing behaviour. I omitted the restart of the job, which would follow afterwards. Please notice that I used EasyMock to check to calls of the Reader against my item delivering serice. I'm also making use of an own Answer class, which can return exceptions for specific items. Unit test //Expected parameters of the method call to answer are: 1) int start 2) int amount //This answer will provide items 0-99 in a range list. But the items specified in the array are faulty and will lead to an exception if one of these items is requested. final IAnswer<List< Long >> answer = new AnswerWithSelectedFaultyItems(100, new int []{13, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37}); //The reader will try to read an item chunk of size 10 EasyMock.expect(serviceCalledByReader.method(0, 10)).andAnswer(answer); //Next call leads to exception (faulty item 13!) and reader will try to read a single item then for ( int x = 10; x <= 13; x++) { EasyMock.expect(serviceCalledByReader.method(x, 10)).andAnswer(answer); EasyMock.expect(serviceCalledByReader.method(x, 1)).andAnswer(answer); } // Items 14-23 are good again EasyMock.expect(serviceCalledByReader.method(14, 10)).andAnswer(answer); for ( int x = 24; x <= 33; x++) { // Skip limit of 10 will be reached with item 32 // Item 33 will cause the job to fail then EasyMock.expect(serviceCalledByReader.method(x, 10)).andAnswer(answer); EasyMock.expect(serviceCalledByReader.method(x, 1)).andAnswer(answer); } EasyMock.replay(serviceCalledByReader); final JobExecution firstStart = launchJob( this .currentUniqueJobParameters); assertEquals(BatchStatus.FAILED, firstStart.getStatus()); EasyMock.verify(serviceCalledByReader); EasyMock.reset(serviceCalledByReader); I hope this helps. If it's still too vague for you to trace this bug, I can accept this ticket to be closed.
        Hide
        Michael Minella added a comment -

        Are you saying that you are performing buffering/scanning like the framework does within your reader? While interesting, I can't think of why that would cause this particular issue.

        Just to confirm, what is the exact exception being thrown that causes the failure and from what component is it thrown (I"m assuming it's being thrown from the ItemReader)?

        The way I make my job fail is I wrap the reader I posted above in the reader posted below. This reader will throw an OutOfMemoryError the first time the job is run. The second time it allows the job to finish.

        
        package org.springsource.batch.reader;
        
        import java.util.Random;
        
        import org.springframework.batch.item.ExecutionContext;
        import org.springframework.batch.item.ItemReader;
        import org.springframework.batch.item.ItemStream;
        import org.springframework.batch.item.ItemStreamException;
        import org.springframework.batch.item.ItemStreamReader;
        import org.springframework.batch.item.NonTransientResourceException;
        import org.springframework.batch.item.ParseException;
        import org.springframework.batch.item.UnexpectedInputException;
        
        @SuppressWarnings("rawtypes")
        public class FirstTimeFailureItemReader implements ItemStreamReader {
        
        	private boolean blowUp = true;
        
        	private int failCount = 2;
        
        	private Integer readCount = 0;
        
        	private ItemReader delegate;
        
        	public void setFailCount(int failCount) {
        		this.failCount = failCount;
        	}
        
        	public void open(ExecutionContext executionContext)
        			throws ItemStreamException {
        		if(executionContext.containsKey("been_read")) {
        			blowUp = false;
        		}
        
        		if(delegate != null && delegate instanceof ItemStream) {
        			((ItemStream) delegate).open(executionContext);
        		}
        	}
        
        	public void update(ExecutionContext executionContext)
        			throws ItemStreamException {
        		executionContext.put("been_read", "yup");
        
        		if(delegate != null && delegate instanceof ItemStream) {
        			((ItemStream) delegate).update(executionContext);
        		}
        	}
        
        	public void close() throws ItemStreamException {
        		if(delegate != null && delegate instanceof ItemStream) {
        			((ItemStream) delegate).close();
        		}
        	}
        
        	public Object read() throws Exception, UnexpectedInputException,
        			ParseException, NonTransientResourceException {
        
        		synchronized (readCount) {
        			if(blowUp && readCount == failCount) {
        				readCount++;
        				throw new OutOfMemoryError("exception");
        			} else {
        				readCount++;
        
        				if(delegate != null) {
        					return delegate.read();
        				} else {
        					if(readCount >= 100) {
        						return null;
        					}
        
        					return String.valueOf(new Random().nextInt());
        				}
        			}
        		}
        	}
        
        	public void setDelegate(ItemReader delegate) {
        		this.delegate = delegate;
        	}
        }
        
        Show
        Michael Minella added a comment - Are you saying that you are performing buffering/scanning like the framework does within your reader? While interesting, I can't think of why that would cause this particular issue. Just to confirm, what is the exact exception being thrown that causes the failure and from what component is it thrown (I"m assuming it's being thrown from the ItemReader)? The way I make my job fail is I wrap the reader I posted above in the reader posted below. This reader will throw an OutOfMemoryError the first time the job is run. The second time it allows the job to finish. package org.springsource.batch.reader; import java.util.Random; import org.springframework.batch.item.ExecutionContext; import org.springframework.batch.item.ItemReader; import org.springframework.batch.item.ItemStream; import org.springframework.batch.item.ItemStreamException; import org.springframework.batch.item.ItemStreamReader; import org.springframework.batch.item.NonTransientResourceException; import org.springframework.batch.item.ParseException; import org.springframework.batch.item.UnexpectedInputException; @SuppressWarnings( "rawtypes" ) public class FirstTimeFailureItemReader implements ItemStreamReader { private boolean blowUp = true ; private int failCount = 2; private Integer readCount = 0; private ItemReader delegate; public void setFailCount( int failCount) { this .failCount = failCount; } public void open(ExecutionContext executionContext) throws ItemStreamException { if (executionContext.containsKey( "been_read" )) { blowUp = false ; } if (delegate != null && delegate instanceof ItemStream) { ((ItemStream) delegate).open(executionContext); } } public void update(ExecutionContext executionContext) throws ItemStreamException { executionContext.put( "been_read" , "yup" ); if (delegate != null && delegate instanceof ItemStream) { ((ItemStream) delegate).update(executionContext); } } public void close() throws ItemStreamException { if (delegate != null && delegate instanceof ItemStream) { ((ItemStream) delegate).close(); } } public Object read() throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException { synchronized (readCount) { if (blowUp && readCount == failCount) { readCount++; throw new OutOfMemoryError( "exception" ); } else { readCount++; if (delegate != null ) { return delegate.read(); } else { if (readCount >= 100) { return null ; } return String .valueOf( new Random().nextInt()); } } } } public void setDelegate(ItemReader delegate) { this .delegate = delegate; } }
        Hide
        Peter Wippermann added a comment -

        Hi Michael,

        yes, I added a custom buffering functionality to my own reader. This reader calls a web service and thus I wanted to avoid calling this service too often. Reading Items in a chunk seemed reasonary to me, but this doesn't fit to the interface of the read()-method (which allows to return single items only). So I implemented an internal buffer in my reader, so it could read chunks and return items one-by-one.
        I also can't imangine that this is the cause of the bug I faced. But this is the explanation why I wanted to persist items in the execution context.

        In my test scenario I throw a custom exception (which is configured as "skipable") from the web service call. So this exception is propagated from the reader's read()-method.

        Show
        Peter Wippermann added a comment - Hi Michael, yes, I added a custom buffering functionality to my own reader. This reader calls a web service and thus I wanted to avoid calling this service too often. Reading Items in a chunk seemed reasonary to me, but this doesn't fit to the interface of the read()-method (which allows to return single items only). So I implemented an internal buffer in my reader, so it could read chunks and return items one-by-one. I also can't imangine that this is the cause of the bug I faced. But this is the explanation why I wanted to persist items in the execution context. In my test scenario I throw a custom exception (which is configured as "skipable") from the web service call. So this exception is propagated from the reader's read()-method.
        Hide
        Michael Minella added a comment -

        Two questions:

        1. When you look in the database, are you checking the batch_step_execution_context(correct) or batch_job_execution_context(incorrect)?
        2. You have confirmed that it is not reloaded on restart?

        I'm running out of ideas on how to reproduce the issue. My original reply wasn't testing the job failing after skips but I've added that (by changing the exception thrown to RuntimeException, commenting out incrementing readCount when the exception is thrown and configuring the job to skip the RuntimeException) and I can see that the list is getting into the batch_step_execution_context table after the job fails and it is reloaded on restart. When you add to that the fact that we store other collections in the ExecutionContext (a Map of start after values within the JdbcPaginingItemReader for example) as part of the framework without issue, I think we're stuck. If you can provide a test case that recreated the issue, I'd be more than happy to keep digging but otherwise, I'm inclined to close this issue. Let me know your thoughts.

        Show
        Michael Minella added a comment - Two questions: When you look in the database, are you checking the batch_step_execution_context(correct) or batch_job_execution_context(incorrect)? You have confirmed that it is not reloaded on restart? I'm running out of ideas on how to reproduce the issue. My original reply wasn't testing the job failing after skips but I've added that (by changing the exception thrown to RuntimeException, commenting out incrementing readCount when the exception is thrown and configuring the job to skip the RuntimeException) and I can see that the list is getting into the batch_step_execution_context table after the job fails and it is reloaded on restart. When you add to that the fact that we store other collections in the ExecutionContext (a Map of start after values within the JdbcPaginingItemReader for example) as part of the framework without issue, I think we're stuck. If you can provide a test case that recreated the issue, I'd be more than happy to keep digging but otherwise, I'm inclined to close this issue. Let me know your thoughts.
        Hide
        Peter Wippermann added a comment -

        1) I was always checking the step's execution context. While I could see during the processing that the persisted items list was not empty, it looked like this after the job failed:

        {"map":{"entry":[
          {"string":"FlatFileItemWriter.written","long":20},
          {"string":"FlatFileItemWriter.current.count","long":70},
          {"string":"Start_From","int":24},
          {"string":"Buffered_Items","linked-list":""},
          {"string":"Number_buff_items","int":3}
        ]}}
        

        Also notice that I updated the amount of items I just persisted also in the execution context. This figure is correct, but the list is empty. But I always update both at once. So strange...

        2) Yes, when I restart the job, the open()-method will receive an empty item list from the execution context.

        Show
        Peter Wippermann added a comment - 1) I was always checking the step's execution context. While I could see during the processing that the persisted items list was not empty, it looked like this after the job failed: {"map":{"entry":[ {"string":"FlatFileItemWriter.written","long":20}, {"string":"FlatFileItemWriter.current.count","long":70}, {"string":"Start_From","int":24}, {"string":"Buffered_Items","linked-list":""}, {"string":"Number_buff_items","int":3} ]}} Also notice that I updated the amount of items I just persisted also in the execution context. This figure is correct, but the list is empty. But I always update both at once. So strange... 2) Yes, when I restart the job, the open()-method will receive an empty item list from the execution context.
        Hide
        Peter Wippermann added a comment -

        I'm sorry that I can't provide a working code sample to reenact this behaviour.
        I agree with you, Michael, that this issue should be closed then. Thanks for your patience anyway

        Show
        Peter Wippermann added a comment - I'm sorry that I can't provide a working code sample to reenact this behaviour. I agree with you, Michael, that this issue should be closed then. Thanks for your patience anyway

          People

          • Assignee:
            Michael Minella
            Reporter:
            Peter Wippermann
          • Votes:
            0 Vote for this issue
            Watchers:
            2 Start watching this issue

            Dates

            • Created:
              Updated:
              Resolved: