I have a batch job with two steps
Once all of them are processed the flag for all of them is updated to PROCESSED. I.e. update all or nothing.
The step 1 is ok and works pretty smooth.
The step 2 is basically JpaPagingItemReader with pageSize=4, set of processors(mostly http calls) and JpaItemWriter with commit-interval=1. (I know that it is recommended to have pageSize equal to commit-interval, it's just what I have) It is also a multithreaded step with 10 threads doing the job.
That said on the step 2 I have two kind of queries:
Read: select * from ENTITY where processed=false order by id
nested into two queries for paging select ... from (select .. where rownum < M) where rownum >= N
Write: update ENTITY set .. where id = ID
For some reason when I have enough entities I get infamous:
Ora-01555, snapshot too old: rollback segment with name "" too small
I don't know exact reason of that error (undo stat doesn't show anything bad, so hopefully DBAs will find the culprit soon), but in the meantime I think that what read query does is terribly bad. Such paging queries are hard for a database anyway, but I guess when you read and at the same time update the entries which you read it may cause that kind of errors.
I would like to change the approach taken in the step 2. Instead of reading in pages. I would like to read all the ids into memory only once(i.e. give me ids of all entities I need to process) and then give each thread the id from that list. The first processor in chain will get the entity by the id through JPA. That way I continue to update and write entities one-by-one and at the same time I read the ids I need only once.
My problem is I couldn't find out-of the box solution for such reader. Is there anything I can use for that?
Well, I implemented the solution by myself and it is based on this and this. In fact I didn't use those directly, but my implementation is quite close.
Basically, this is how it looks (I don't have the code, so using my memory)
public class MyUnprocessedIdReader extends AbstractItemCountingItemStreamItemReader<Long> {
private final Object lock = new Object();
private initialized = false;
private final MyObjectsRepository repo;
private List<Long> ids;
private int current = -1;
public MyUnprocessedIdReader(MyObjectsRepository repo) {
this.repo = repo;
}
public void doOpen() {
synchronized(lock) {
Assert.state(!initialized, "Cannot open an already opened ItemReader, call close first");
this.initialized = true;
this.ids = ImmutableList.copyOf(repo.findAllUnprocessed());
}
}
public Long doRead() {
synchronized(lock) {
if (ids == null || !initialized) {
throw new IllegalStateException("Have you opened the reader?");
}
++current;
if (current < ids.size()) {
return ids.get(current);
} else {
return null;
}
}
}
public void doClose() {
synchronized(lock) {
this.initialized = false;
this.current = -1;
this.ids = null;
}
}
}
My repository is using JPA so under the hood it uses something like entityManager.createQuery("select obj.id from Objects where obj.processed = false order by obj.id asc", Long.class).executeSelect()
Also I have added one more processor to the chain:
public class LoadProcessor implements ItemProcessor<Long, MyObject> {
private final MyObjectsRepository repo;
public LoadProcessor(MyObjectsRepository repo) {
this.repo = repo;
}
public MyObject process(Long id) {
return repo.findById(id);
}
}
Someone may say that it is less scalable than using cursor, also there is a contention on read method, however it is very simple solution which do its job well until the number of unprocessed ids is too huge. Also processing threads are spending a lot of time in calling external REST services, so the contention on read won't be a bottleneck ever.
P.s. later I will give an update on whether it solved the issue with ORA-01555 or not.
Collected from the Internet
Please contact [email protected] to delete if infringement.
Comments