Ok, I found the bug. It's not in Hibernate as i thought in the earlier post. It's in ScrollableHibernateIndexEntitiesIndexer.
The main problem is the prev/item mechanism used in lines 119-129:
Object prev = null;
while (cursor.next()) {
Object item = cursor.get(0);
if (item != prev && prev != null) {
buffer.put(prev);
}
prev = item;
}
if (prev != null) {
buffer.put(prev);
}
When buffer.put(prev) is called and the buffer is full, it calls session.clear(). This is very bad for the object "prev" passed in since it has been disconnected from the session. When compass tries to index it on the next RowBuffer.flush(), it will fail if it has any lazy associations. So this is a logic bug.
My suggestion is to remove the prev/item mechanism and just put items into the buffer and flush the buffer when it actually becomes full, not when you try to overflow past the buffer size because then the session.clear() would affect the current item.
/*
*
*
*
*/
package org.compass.gps.device.hibernate.indexer;
import java.util.HashMap;
import java.util.Map;
import java.util.Collection;
import java.util.ArrayList;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.compass.core.CompassSession;
import org.compass.gps.device.hibernate.HibernateGpsDevice;
import org.compass.gps.device.hibernate.HibernateGpsDeviceException;
import org.compass.gps.device.hibernate.entities.EntityInformation;
import org.compass.gps.device.support.parallel.IndexEntity;
import org.hibernate.Criteria;
import org.hibernate.Query;
import org.hibernate.ScrollMode;
import org.hibernate.ScrollableResults;
import org.hibernate.Session;
import org.hibernate.Transaction;
import org.hibernate.criterion.Order;
import org.hibernate.metadata.ClassMetadata;
/**
*
*
*
* @author kimchy
*/
public class ScrollableHibernateIndexEntitiesIndexer implements HibernateIndexEntitiesIndexer {
private static final Log log = LogFactory.getLog(ScrollableHibernateIndexEntitiesIndexer.class);
private HibernateGpsDevice device;
private boolean performOrderById = true;
private Map<String, Boolean> performOrderByPerEntity = new HashMap<String, Boolean>();
public void setHibernateGpsDevice(HibernateGpsDevice device) { this.device = device; }
/**
* Should this indxer order by the ids when <code>Criteria</code> is available.
* Defaults to <code>true</code>.
*/
public void setPerformOrderById(boolean performOrderById) { this.performOrderById = performOrderById; }
/**
* Should this indxer order by the ids when <code>Criteria</code> is available for
* the given entity. Defaults to {@link #setPerformOrderById(boolean)}.
*/
public void setPerformOrderById(String entity, boolean performOrderById) { performOrderByPerEntity.put(entity, performOrderById); }
public void performIndex(CompassSession session, IndexEntity[] entities) {
for (IndexEntity entity : entities) {
EntityInformation entityInformation = (EntityInformation) entity;
if (device.isFilteredForIndex(entityInformation.getName())) { continue; }
if (!device.isRunning()) { return; }
ScrollableResults cursor = null;
Session hibernateSession = device.getSessionFactory().openSession();
Transaction hibernateTransaction = null;
try {
hibernateTransaction = hibernateSession.beginTransaction();
if (log.isDebugEnabled()) { log.debug(device.buildMessage("Indexing entities [" + entityInformation.getName() + "] using query [" + entityInformation.getQueryProvider() + "]")); }
Criteria criteria = entityInformation.getQueryProvider().createCriteria(hibernateSession, entityInformation);
if (criteria != null) {
if (performOrderById) {
Boolean performOrder = performOrderByPerEntity.get(entityInformation.getName());
if (performOrder == null || performOrder) { ClassMetadata metadata = hibernateSession.getSessionFactory().getClassMetadata(entityInformation.getName()); criteria.addOrder(Order.asc(metadata.getIdentifierPropertyName())); }
}
criteria.setFetchSize(device.getFetchCount());
cursor = criteria.scroll(ScrollMode.FORWARD_ONLY);
} else { Query query = entityInformation.getQueryProvider().createQuery(hibernateSession, entityInformation); cursor = query.scroll(ScrollMode.FORWARD_ONLY); }
// store things in row buffer to allow using batch fetching in Hibernate
RowBuffer buffer = new RowBuffer(session, hibernateSession, device.getFetchCount());
while (cursor.next()) { buffer.put(cursor.get(0)); }
buffer.close();
cursor.close();
hibernateTransaction.commit();
} catch (Exception e) {
log.error(device.buildMessage("Failed to index the database"), e);
if (cursor != null) {
try { cursor.close(); } catch (Exception e1) { log.warn(device.buildMessage("Failed to close cursor on error, ignoring"), e1); }
}
if (hibernateTransaction != null) {
try { hibernateTransaction.rollback(); } catch (Exception e1) { log.warn("Failed to rollback Hibernate", e1); }
}
if (!(e instanceof HibernateGpsDeviceException)) { throw new HibernateGpsDeviceException(device.buildMessage("Failed to index the database"), e); }
throw (HibernateGpsDeviceException) e;
} finally { hibernateSession.close(); session.close(); }
}
}
private class RowBuffer {
private Collection<Object> buffer;
private int bufferSize = 0;
private CompassSession compassSession;
private Session hibernateSession;
RowBuffer(CompassSession compassSession, Session hibernateSession, int fetchCount) { this.compassSession = compassSession; this.hibernateSession = hibernateSession; this.bufferSize = fetchCount; this.buffer = new ArrayList<Object>(this.bufferSize); }
public void put(Object row) { buffer.add(row); if (buffer.size() >= bufferSize) flush(); }
public void close() { flush(); buffer = null; }
private void flush() {
for (Object item : buffer) { compassSession.create(item); }
// clear buffer and sessions to allow for GC
buffer.clear();
hibernateSession.clear();
compassSession.evictAll();
}
}
}