### Eclipse Workspace Patch 1.0 #P compass Index: src/main/src/org/compass/needle/gae/GoogleAppEngineMemIndexOutput.java =================================================================== --- src/main/src/org/compass/needle/gae/GoogleAppEngineMemIndexOutput.java (revision 3931) +++ src/main/src/org/compass/needle/gae/GoogleAppEngineMemIndexOutput.java (working copy) @@ -20,11 +20,14 @@ import java.util.ArrayList; import java.util.Collections; +import org.apache.lucene.index.LuceneFileNames; +import org.apache.lucene.store.IndexOutput; +import org.compass.needle.gae.GoogleAppEngineDirectory.Callable; + import com.google.appengine.api.datastore.Blob; import com.google.appengine.api.datastore.Entity; import com.google.appengine.api.datastore.Key; -import org.apache.lucene.index.LuceneFileNames; -import org.apache.lucene.store.IndexOutput; +import com.google.appengine.api.datastore.Transaction; /** * @author kimchy @@ -72,7 +75,8 @@ public void writeByte(byte b) throws IOException { if (bufferPosition == dir.getBucketSize()) { if (seekOccured) { - throw new GoogleAppEngineDirectoryException(dir.getIndexName(), fileName, "Seek occured and overflowed first bucket"); + throw new GoogleAppEngineDirectoryException(dir.getIndexName(), fileName, + "Seek occured and overflowed first bucket"); } flushBucket(); } @@ -91,7 +95,8 @@ while (len > 0) { if (bufferPosition == dir.getBucketSize()) { if (seekOccured) { - throw new GoogleAppEngineDirectoryException(dir.getIndexName(), fileName, "Seek occured and overflowed first bucket"); + throw new GoogleAppEngineDirectoryException(dir.getIndexName(), fileName, + "Seek occured and overflowed first bucket"); } flushBucket(); } @@ -137,7 +142,8 @@ public void seek(long pos) throws IOException { if (pos >= dir.getBucketSize()) { - throw new GoogleAppEngineDirectoryException(dir.getIndexName(), fileName, "seek called outside of first bucket boundries"); + throw new GoogleAppEngineDirectoryException(dir.getIndexName(), fileName, + "seek called outside of first bucket boundries"); } // create the first bucket if still not created if (firstBucketEntity == null) { @@ -183,25 +189,36 @@ } private void flushBucket(long bucketIndex, byte[] buffer, int length) throws IOException { - Entity fileBucketEntity = new Entity(GoogleAppEngineDirectory.CONTENT_KEY_KIND, fileName + bucketIndex, metaDataKey); + Entity fileBucketEntity = new Entity(GoogleAppEngineDirectory.CONTENT_KEY_KIND, fileName + bucketIndex, + metaDataKey); byte[] data = new byte[length]; System.arraycopy(buffer, 0, data, 0, length); fileBucketEntity.setProperty("data", new Blob(data)); flushEntities.add(fileBucketEntity); if (flushEntities.size() >= dir.getFlushRate()) { + //What's going on here? forceFlushBuckets(); } } private void forceFlushBuckets(Entity... additionalEntities) throws IOException { + if (flushEntities.size() == 0 && additionalEntities == null) { return; } + Collections.addAll(flushEntities, additionalEntities); + try { - dir.getDatastoreService().put(flushEntities); - } catch (Exception e) { - throw new GoogleAppEngineDirectoryException(dir.getIndexName(), fileName, "Failed to write buckets", e); + dir.doInTransaction(new Callable() { + + @Override + public Void call(Transaction transaction) throws Exception { + dir.getDatastoreService().put(transaction, flushEntities); + return null; + } + + }); } finally { flushEntities.clear(); } Index: src/main/src/org/compass/needle/gae/GoogleAppEngineDirectoryStore.java =================================================================== --- src/main/src/org/compass/needle/gae/GoogleAppEngineDirectoryStore.java (revision 3931) +++ src/main/src/org/compass/needle/gae/GoogleAppEngineDirectoryStore.java (working copy) @@ -30,9 +30,9 @@ import org.compass.core.util.StringUtils; /** - * A plugin lucene store for Compass. Uses {@link GoogleAppEngineDirectory} - * as Lucene directory implementation. - * + * A plugin lucene store for Compass. Uses {@link GoogleAppEngineDirectory} as + * Lucene directory implementation. + * * @author kimchy */ public class GoogleAppEngineDirectoryStore extends AbstractDirectoryStore implements CompassConfigurable { @@ -44,14 +44,22 @@ public static final String FLUSH_RATE_PROP = "compass.engine.store.gae.flushRate"; /** - * Should the directory cache file meta data instead of fetching it from GAE each time. Defatuls to - * true. + * The GAE documentation recommends retrying transactions several times when + * attempting to maniuplate the datastore. Where appropraite, the directory + * story will attempt transactions this many times. Defaults to 3. */ + public static final String TRANSACTION_RETRY_COUNT_PROP = "compass.engine.store.gae.transactionRetryCount"; + + /** + * Should the directory cache file meta data instead of fetching it from GAE + * each time. Defatuls to true. + */ public static final String CACHE_META_DATA_PROP = "compass.engine.store.gae.cacheMetaData"; /** - * File names based patterns to use in order to choose which Lucene files to store in memcahe and which not. - * Lucene files are explained here: http://lucene.apache.org/java/2_4_1/fileformats.html. + * File names based patterns to use in order to choose which Lucene files to + * store in memcahe and which not. Lucene files are explained here: + * http://lucene.apache.org/java/2_4_1/fileformats.html. */ public static final String MEMCACHE_REGEX_PATTERNS_PROP = "compass.engine.store.gae.memcacheRegexPatterns"; @@ -63,23 +71,30 @@ private boolean cacheMetaData; + private int transactionRetryCount; + private String[] memcaheRegexPatterns; public void configure(CompassSettings settings) throws CompassException { this.indexName = settings.getSetting(CompassEnvironment.CONNECTION).substring(PROTOCOL.length()); + bucketSize = (int) settings.getSettingAsBytes(BUCKET_SIZE_PROP, GoogleAppEngineDirectory.DEFAULT_BUCKET_SIZE); flushRate = settings.getSettingAsInt(FLUSH_RATE_PROP, GoogleAppEngineDirectory.DEFAULT_FLUSH_RATE); cacheMetaData = settings.getSettingAsBoolean(CACHE_META_DATA_PROP, GoogleAppEngineDirectory.DEFAULT_CACHE_META_DATA); + transactionRetryCount = settings.getSettingAsInt(TRANSACTION_RETRY_COUNT_PROP, GoogleAppEngineDirectory.DEFAULT_TRANSACTION_RETRY_COUNT); + String memcaheRegexPatternsConf = settings.getSetting(MEMCACHE_REGEX_PATTERNS_PROP); + if (memcaheRegexPatternsConf != null) { memcaheRegexPatterns = StringUtils.commaDelimitedListToStringArray(memcaheRegexPatternsConf); } else { memcaheRegexPatterns = new String[0]; } + } public Directory open(String subContext, String subIndex) throws SearchEngineException { - return new GoogleAppEngineDirectory(buildFullIndexName(subContext, subIndex), bucketSize, flushRate, cacheMetaData, memcaheRegexPatterns); + return new GoogleAppEngineDirectory(buildFullIndexName(subContext, subIndex), bucketSize, flushRate, cacheMetaData, transactionRetryCount, memcaheRegexPatterns); } @Override @@ -89,20 +104,25 @@ @Override public void cleanIndex(Directory dir, String subContext, String subIndex) throws SearchEngineException { + try { ((GoogleAppEngineDirectory) dir).deleteContent(); } catch (IOException e) { - throw new SearchEngineException("Failed to delete index for sub context [" + subContext + "] and sub index [" + subIndex + "]", e); + throw new SearchEngineException("Failed to delete index for sub context [" + subContext+ "] and sub index [" + subIndex + "]", e); } + } @Override - public CopyFromHolder beforeCopyFrom(String subContext, String subIndex, Directory dir) throws SearchEngineException { + public CopyFromHolder beforeCopyFrom(String subContext, String subIndex, Directory dir) + throws SearchEngineException { + try { ((GoogleAppEngineDirectory) dir).deleteContent(); } catch (IOException e) { throw new SearchEngineException("Failed to delete context before copy from", e); } + return new CopyFromHolder(); } @@ -114,4 +134,5 @@ public String suggestedIndexDeletionPolicy() { return LuceneEnvironment.IndexDeletionPolicy.ExpirationTime.NAME; } + } \ No newline at end of file Index: src/main/src/org/compass/needle/gae/FlushOnCloseGoogleAppEngineIndexOutput.java =================================================================== --- src/main/src/org/compass/needle/gae/FlushOnCloseGoogleAppEngineIndexOutput.java (revision 3931) +++ src/main/src/org/compass/needle/gae/FlushOnCloseGoogleAppEngineIndexOutput.java (working copy) @@ -20,9 +20,12 @@ import java.util.ArrayList; import java.util.List; +import org.apache.lucene.store.IndexOutput; +import org.compass.needle.gae.GoogleAppEngineDirectory.Callable; + import com.google.appengine.api.datastore.Blob; import com.google.appengine.api.datastore.Entity; -import org.apache.lucene.store.IndexOutput; +import com.google.appengine.api.datastore.Transaction; /** * @author kimchy @@ -96,25 +99,29 @@ } public void close() throws IOException { + if (!open) { return; } + open = false; + // flush any buffer we might have flush(); + final List entities = new ArrayList(buckets.size() + 1); + final Entity metaDataEntity = new Entity(GoogleAppEngineDirectory.META_KEY_KIND, fileName, dir.getIndexKey()); - List entities = new ArrayList(buckets.size() + 1); - - Entity metaDataEntity = new Entity(GoogleAppEngineDirectory.META_KEY_KIND, fileName, dir.getIndexKey()); metaDataEntity.setProperty("size", length); metaDataEntity.setProperty("modified", System.currentTimeMillis()); entities.add(metaDataEntity); for (int i = 0; i < (buckets.size()); i++) { - Entity contentEntity = new Entity(GoogleAppEngineDirectory.CONTENT_KEY_KIND, fileName + i, metaDataEntity.getKey()); + Entity contentEntity = new Entity(GoogleAppEngineDirectory.CONTENT_KEY_KIND, fileName + i, metaDataEntity + .getKey()); if (i == (buckets.size() - 1)) { - // the last buffer is (probably) smaller than the bucket size, make sure we use only the part that has data + // the last buffer is (probably) smaller than the bucket size, + // make sure we use only the part that has data byte[] buff = new byte[(int) (length - (bucketSize * i))]; System.arraycopy(buckets.get(i), 0, buff, 0, buff.length); contentEntity.setProperty("data", new Blob(buff)); @@ -123,14 +130,22 @@ } entities.add(contentEntity); } + try { - dir.getDatastoreService().put(entities); - dir.addMetaData(metaDataEntity); + dir.doInTransaction(new Callable() { + + @Override + public Void call(Transaction transaction) throws Exception { + dir.getDatastoreService().put(transaction, entities); + dir.addMetaData(metaDataEntity); + return null; + } + + }); } finally { dir.getOnGoingIndexOutputs().remove(fileName); } - currentBuffer = null; } Index: src/main/src/org/compass/needle/gae/GoogleAppEngineIndexInput.java =================================================================== --- src/main/src/org/compass/needle/gae/GoogleAppEngineIndexInput.java (revision 3931) +++ src/main/src/org/compass/needle/gae/GoogleAppEngineIndexInput.java (working copy) @@ -18,12 +18,12 @@ import java.io.IOException; +import org.apache.lucene.store.IndexInput; + import com.google.appengine.api.datastore.Blob; import com.google.appengine.api.datastore.Entity; -import com.google.appengine.api.datastore.EntityNotFoundException; import com.google.appengine.api.datastore.Key; import com.google.appengine.api.datastore.KeyFactory; -import org.apache.lucene.store.IndexInput; /** * @author kimchy @@ -56,7 +56,7 @@ /** * Returns the current position in this file, where the next read will * occur. - * + * * @see #seek(long) */ public long getFilePointer() { @@ -72,7 +72,7 @@ /** * Reads and returns a single byte. - * + * * @see org.apache.lucene.store.IndexOutput#writeByte(byte) */ public byte readByte() throws IOException { @@ -82,12 +82,14 @@ } /** - * Reads a specified number of bytes into an array at the specified - * offset. - * - * @param b the array to read bytes into - * @param offset the offset in the array to start storing bytes - * @param len the number of bytes to read + * Reads a specified number of bytes into an array at the specified offset. + * + * @param b + * the array to read bytes into + * @param offset + * the offset in the array to start storing bytes + * @param len + * the number of bytes to read * @see org.apache.lucene.store.IndexOutput#writeBytes(byte[],int) */ public void readBytes(byte[] b, int offset, int len) throws IOException { @@ -121,7 +123,7 @@ /** * Sets current position in this file, where the next read will occur. - * + * * @see #getFilePointer() */ public void seek(long pos) throws IOException { @@ -135,26 +137,21 @@ if (bucketIndex == currentBucketIndex) { return; } - // reuse the current bucket entry as the template - try { - Key key = KeyFactory.createKey(metaDataEntity.getKey(), GoogleAppEngineDirectory.CONTENT_KEY_KIND, metaDataEntity.getKey().getName() + bucketIndex); - if (useMemcache) { - currentBucketData = (byte[]) dir.getMemcacheService().get(key); - if (currentBucketData == null) { - Entity bucketEntity = dir.getDatastoreService().get(key); - currentBucketData = ((Blob) bucketEntity.getProperty("data")).getBytes(); - dir.getMemcacheService().put(key, currentBucketData); - } else { - // hit - } - } else { - Entity bucketEntity = dir.getDatastoreService().get(key); + + Key key = KeyFactory.createKey(metaDataEntity.getKey(), GoogleAppEngineDirectory.CONTENT_KEY_KIND, + metaDataEntity.getKey().getName() + bucketIndex); + if (useMemcache) { + currentBucketData = (byte[]) dir.getMemcacheService().get(key); + if (currentBucketData == null) { + Entity bucketEntity = dir.getEntity(key); currentBucketData = ((Blob) bucketEntity.getProperty("data")).getBytes(); + dir.getMemcacheService().put(key, currentBucketData); } - currentBucketIndex = bucketIndex; - } catch (EntityNotFoundException e) { - throw new GoogleAppEngineDirectoryException(dir.getIndexName(), metaDataEntity.getKey().getName(), "bucket [" + bucketIndex + "] not found"); + } else { + Entity bucketEntity = dir.getEntity(key); + currentBucketData = ((Blob) bucketEntity.getProperty("data")).getBytes(); } + currentBucketIndex = bucketIndex; } public Object clone() { Index: src/main/src/org/compass/needle/gae/GoogleAppEngineDirectory.java =================================================================== --- src/main/src/org/compass/needle/gae/GoogleAppEngineDirectory.java (revision 3931) +++ src/main/src/org/compass/needle/gae/GoogleAppEngineDirectory.java (working copy) @@ -18,11 +18,17 @@ import java.io.IOException; import java.util.ArrayList; +import java.util.ConcurrentModificationException; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.regex.Pattern; +import org.apache.lucene.index.LuceneFileNames; +import org.apache.lucene.store.Directory; +import org.apache.lucene.store.IndexInput; +import org.apache.lucene.store.IndexOutput; + import com.google.appengine.api.datastore.DatastoreService; import com.google.appengine.api.datastore.DatastoreServiceFactory; import com.google.appengine.api.datastore.Entity; @@ -32,22 +38,24 @@ import com.google.appengine.api.datastore.KeyFactory; import com.google.appengine.api.datastore.PreparedQuery; import com.google.appengine.api.datastore.Query; +import com.google.appengine.api.datastore.Transaction; import com.google.appengine.api.memcache.MemcacheService; import com.google.appengine.api.memcache.MemcacheServiceFactory; -import org.apache.lucene.index.LuceneFileNames; -import org.apache.lucene.store.Directory; -import org.apache.lucene.store.IndexInput; -import org.apache.lucene.store.IndexOutput; /** - *

If caching of meta data is enabled, we maintain a cache of all the meta data information of files (as Entity). - * The cache is refreshed (cleared and updated) each time {@link #list()} is called (which is called by Lucene when finding segment files - * and opening a deleter. Meaning that we update the cache quite frequently). "Locally" each time a file is created - * or meta data is fetched on cache miss, it is added to the meta data cache. - * - *

Allows to provide a set of regex patterns that will be used to match on (Lucene) file names to choose - * if certain file names will also be cached in memcache as well. - * + *

+ * If caching of meta data is enabled, we maintain a cache of all the meta data + * information of files (as Entity). The cache is refreshed (cleared and + * updated) each time {@link #list()} is called (which is called by Lucene when + * finding segment files and opening a deleter. Meaning that we update the cache + * quite frequently). "Locally" each time a file is created or meta data is + * fetched on cache miss, it is added to the meta data cache. + * + *

+ * Allows to provide a set of regex patterns that will be used to match on + * (Lucene) file names to choose if certain file names will also be cached in + * memcache as well. + * * @author kimchy */ public class GoogleAppEngineDirectory extends Directory { @@ -56,9 +64,10 @@ public static final int DEFAULT_FLUSH_RATE = 50; + public static final int DEFAULT_TRANSACTION_RETRY_COUNT = 3; + public static final boolean DEFAULT_CACHE_META_DATA = true; - static final String META_KEY_KIND = "meta"; static final String CONTENT_KEY_KIND = "content"; @@ -83,10 +92,17 @@ private final Pattern[] memcacheRegexPatterns; - // we store an on going list of created index outputs since Lucene needs them + private final int transactionRetryCount; + + // we store an on going list of created index outputs since Lucene needs + // them // *before* it closes the index output. It calls fileExists in the middle. private Map onGoingIndexOutputs = new ConcurrentHashMap(); + public GoogleAppEngineDirectory(String indexName) { + this(indexName, DEFAULT_BUCKET_SIZE); + } + public GoogleAppEngineDirectory(String indexName, int bucketSize) { this(indexName, bucketSize, DEFAULT_FLUSH_RATE, new String[0]); } @@ -99,13 +115,21 @@ this(indexName, bucketSize, flushRate, DEFAULT_CACHE_META_DATA, memcacheRegexPatterns); } - public GoogleAppEngineDirectory(String indexName, int bucketSize, int flushRate, boolean cacheMetaData, String[] memcacheRegexPatterns) { + public GoogleAppEngineDirectory(String indexName, int bucketSize, int flushRate, boolean cacheMetaData, + String[] memcacheRegexPatterns) { + this(indexName, bucketSize, flushRate, cacheMetaData, DEFAULT_TRANSACTION_RETRY_COUNT, memcacheRegexPatterns); + } + + public GoogleAppEngineDirectory(String indexName, int bucketSize, int flushRate, boolean cacheMetaData, + int transactionRetryCount, String[] memcacheRegexPatterns) { this.indexName = indexName; this.bucketSize = bucketSize; this.flushRate = flushRate; this.cacheMetaData = cacheMetaData; this.datastoreService = DatastoreServiceFactory.getDatastoreService(); this.memcacheService = MemcacheServiceFactory.getMemcacheService(); + this.transactionRetryCount = transactionRetryCount; + memcacheService.setNamespace("index/" + indexName); this.indexKey = KeyFactory.createKey("index", indexName); @@ -115,7 +139,9 @@ if (memcacheRegexPatterns == null) { memcacheRegexPatterns = new String[0]; } + this.memcacheRegexPatterns = new Pattern[memcacheRegexPatterns.length]; + for (int i = 0; i < memcacheRegexPatterns.length; i++) { this.memcacheRegexPatterns[i] = Pattern.compile(memcacheRegexPatterns[i]); } @@ -131,19 +157,38 @@ } public String[] list() throws IOException { - List entities = listQuery.asList(FetchOptions.Builder.withChunkSize(Integer.MAX_VALUE)); - String[] result = new String[entities.size()]; - int i = 0; - for (Entity entity : entities) { - result[i++] = entity.getKey().getName(); - } - if (cacheMetaData) { - cachedMetaData.clear(); - for (Entity entity : entities) { - cachedMetaData.put(entity.getKey().getName(), entity); + + return doInTransaction(new Callable() { + + @Override + public String[] call() throws Exception { + return doList(listQuery); } - } - return result; + + @Override + public String[] call(Transaction transaction) { + Query query = new Query(META_KEY_KIND, indexKey); + PreparedQuery listQuery = datastoreService.prepare(transaction, query); + return doList(listQuery); + } + + private String[] doList(PreparedQuery listQuery) { + List entities = listQuery.asList(FetchOptions.Builder.withChunkSize(Integer.MAX_VALUE)); + String[] result = new String[entities.size()]; + int i = 0; + for (Entity entity : entities) { + result[i++] = entity.getKey().getName(); + } + if (cacheMetaData) { + cachedMetaData.clear(); + for (Entity entity : entities) { + cachedMetaData.put(entity.getKey().getName(), entity); + } + } + return result; + } + }); + } public boolean fileExists(String name) throws IOException { @@ -157,7 +202,7 @@ return false; } // TODO why this does not work? -// return buildMetaDataQuery(name).countEntities() > 0; + // return buildMetaDataQuery(name).countEntities() > 0; } public long fileModified(String name) throws IOException { @@ -170,25 +215,47 @@ datastoreService.put(entity); } - public void deleteFile(String name) throws IOException { - try { - Entity entity = datastoreService.get(buildMetaDataKey(name)); - if (entity != null) { - if (cacheMetaData) { - cachedMetaData.remove(name); + public void deleteFile(final String name) throws IOException { + + doInTransaction(new Callable() { + + @Override + public Void call(Transaction transaction) throws Exception { + + Entity entity; + + try { + entity = datastoreService.get(buildMetaDataKey(name)); + } catch (EntityNotFoundException ex) { + // Nothing to delete, we're okay. + return null; } - List keysToDelete = new ArrayList(); - keysToDelete.add(entity.getKey()); - long size = (Long) entity.getProperty("size"); - long count = Math.round((double) size / bucketSize); - for (int i = 0; i < count; i++) { - keysToDelete.add(KeyFactory.createKey(entity.getKey(), CONTENT_KEY_KIND, name + i)); + + if (entity != null) { + + if (cacheMetaData) { + cachedMetaData.remove(name); + } + + List keysToDelete = new ArrayList(); + + keysToDelete.add(entity.getKey()); + + long size = (Long) entity.getProperty("size"); + long count = Math.round((double) size / bucketSize); + + for (int i = 0; i < count; i++) { + keysToDelete.add(KeyFactory.createKey(entity.getKey(), CONTENT_KEY_KIND, name + i)); + } + + datastoreService.delete(transaction, keysToDelete); + } - datastoreService.delete(keysToDelete); + + return null; } - } catch (EntityNotFoundException e) { - // its fine, just do nothing - } + }); + } public void renameFile(String from, String to) throws IOException { @@ -225,22 +292,32 @@ // nothing to do here } - private Entity fetchMetaData(String name) throws GoogleAppEngineDirectoryException { + private Entity fetchMetaData(final String name) throws GoogleAppEngineDirectoryException { + if (cacheMetaData && !LuceneFileNames.isStaticFile(name)) { Entity entity = cachedMetaData.get(name); if (entity != null) { return entity; } } - try { - Entity entity = datastoreService.get(buildMetaDataKey(name)); - if (entity != null) { - addMetaData(entity); + + return doInTransaction(new Callable() { + + @Override + public Entity call(Transaction transaction) throws GoogleAppEngineDirectoryException { + try { + Entity entity = datastoreService.get(buildMetaDataKey(name)); + if (entity != null) { + addMetaData(entity); + } + return entity; + } catch (EntityNotFoundException e) { + throw new GoogleAppEngineDirectoryException(indexName, name, "Not found"); + } } - return entity; - } catch (EntityNotFoundException e) { - throw new GoogleAppEngineDirectoryException(indexName, name, "Not found"); - } + + }); + } private Key buildMetaDataKey(String name) { @@ -281,4 +358,175 @@ } cachedMetaData.put(metaDataEntity.getKey().getName(), metaDataEntity); } + + int getTransactionRetryCount() { + return transactionRetryCount; + } + + /** + * Runs the given Callable object in its own transaction. It attempts to + * retry the transaction as many times as dictated by the configuration. + * + * @param + * the return type + * @param c + * the callable + * @return + * @throws GoogleAppEngineDirectoryException + */ + T doInTransaction(Callable c) throws GoogleAppEngineDirectoryException { + return doInTransaction(transactionRetryCount, false, c); + } + + /** + * Runs the given Callable object in its own transaction if there is + * currently a transaction running, as not to interfere with the currently + * running transaction. In which case it will invoke + * Callable.call(Transaction). + * + * If there is no running transaction, it will invoke Callable.call() + * without the use of a transaction. + * + * The rationale behind this: + * + * A transaction can take place only within the same entity group on GAE. + * The index itself is not part of the object being indexed, and if an + * object is being changed within the context of an existing transaction the + * indexing process will interfere with the process of updating the data + * itself. However, a separate transaction can be used to write to the index + * so long as the last "current" transaction is put back in place when the + * writing to the index has finished. So if there is a running transaction, + * the following executes within its own transaction and is committed. That + * replaces the transaction state to what it was before this call was made. + * + * The Google App Engine documentation also recommends re-trying the + * transaction several times before giving up so this code also will attempt + * to redo the transaction as many times as it makes sense. + * + * For more information see: Google App Engine: What can be done in a transaction? + * + * @param + * the return type + * @param attempts + * the number of times to retry the transaction + * @param force + * set to true if you want to ignore the current state of the + * transaction and run the given code in a transaction + * regardless. + * @param c + * the callable to run + * @return the object returned by the callable + * @throws GoogleAppEngineDirectoryException + * if there was a problem with the transaction + */ + T doInTransaction(final int attempts, boolean force, Callable c) throws GoogleAppEngineDirectoryException { + int remaining; + + try { + ConcurrentModificationException cme = null; + + // Checks if there is an active transaction + Transaction trans = datastoreService.getCurrentTransaction(null); + + // If there is no currently running transaction that the indexing + // will interfere with, so we just let the call proceed without a + // transaction. + if (!force && trans == null) { + return c.call(); + } + + for (remaining = attempts; remaining > 0; --remaining) { + T r; + trans = datastoreService.beginTransaction(); + + try { + + r = c.call(trans); + + trans.commit(); + + return r; + } catch (ConcurrentModificationException ex) { + // Continues and tries to re-do the transaction. + cme = ex; + continue; + } finally { + // If it gets to this point and the transaction object is + // still active we assume it failed for some reason so we + // roll it back. + if (trans.isActive()) { + trans.rollback(); + } + } + } + + // We tried as many times as we could, so we give up. + throw new GoogleAppEngineAttemptsExpiredException("Datastore too busy to complete transaction.", cme); + + } catch (ConcurrentModificationException ex) { + //Depending on the preferences of the DatastoreService, it may + //run simple calls inside of a transaction. + throw new GoogleAppEngineDirectoryException("Transaction failed.", ex); + } catch (GoogleAppEngineDirectoryException ex) { + throw ex; + } catch (RuntimeException ex) { + throw ex; + } catch (EntityNotFoundException ex) { + String name = ex.getKey().getName(); + String kind = ex.getKey().getKind(); + throw new GoogleAppEngineDirectoryException("Could not find entity [" + name + "] of kind [" + kind + "]", ex); + } catch (Exception ex) { + throw new GoogleAppEngineDirectoryException("Transaction failed.", ex); + } + } + + /** + * Callback class for transactions. Subclasses must override at least one of + * these methods or it will recurse to infinity. + * + * @author patricktwohig + * + * @param + */ + static abstract class Callable implements java.util.concurrent.Callable { + + /** + * Called when there is not transaction active. If code needs to be + * different than that called from within a transaction this method will + * be called. + * + * By default, this method calls this.call(null) + */ + public T call() throws Exception { + return call(null); + } + + /** + * Called if there is a transaction active, or called with null if there + * is not. + * + * By default, this method calls this.call() + * + * @param transaction + * @return + * @throws Exception + */ + public T call(Transaction transaction) throws Exception { + return call(); + } + + } + + Entity getEntity(final Key key) throws GoogleAppEngineDirectoryException { + return doInTransaction(new Callable() { + + @Override + public Entity call(Transaction transaction) throws Exception { + return datastoreService.get(transaction, key); + } + + }); + + } + } Index: src/main/src/org/compass/needle/gae/GoogleAppEngineLockFactory.java =================================================================== --- src/main/src/org/compass/needle/gae/GoogleAppEngineLockFactory.java (revision 3931) +++ src/main/src/org/compass/needle/gae/GoogleAppEngineLockFactory.java (working copy) @@ -17,21 +17,24 @@ package org.compass.needle.gae; import java.io.IOException; -import java.util.ConcurrentModificationException; -import com.google.appengine.api.datastore.Entity; -import com.google.appengine.api.datastore.EntityNotFoundException; -import com.google.appengine.api.datastore.Transaction; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.lucene.store.Lock; import org.apache.lucene.store.LockFactory; +import org.compass.needle.gae.GoogleAppEngineDirectory.Callable; +import com.google.appengine.api.datastore.Entity; +import com.google.appengine.api.datastore.EntityNotFoundException; +import com.google.appengine.api.datastore.Transaction; + /** * @author kimchy */ public class GoogleAppEngineLockFactory extends LockFactory { + private static final String LOCK_KIND = "lock"; + private static final Log log = LogFactory.getLog(GoogleAppEngineLockFactory.class); private final GoogleAppEngineDirectory dir; @@ -45,69 +48,100 @@ } public void clearLock(String lockName) throws IOException { - dir.getDatastoreService().delete(dir.getIndexKey().getChild("lock", makeLockName(lockName, dir.getIndexName()))); + new GoogleAppEngineLock(lockName).doRelease(); } - private String makeLockName(String lockName, String indexName) { - // TODO remove the index name in the next release of GAE, bug in the local datastore - return indexName + "-" + lockName; - } - public class GoogleAppEngineLock extends Lock { private final Entity lock; public GoogleAppEngineLock(String lockName) { - this.lock = new Entity("lock", makeLockName(lockName, dir.getIndexName()), dir.getIndexKey()); + lockName +="-" + dir.getIndexName(); + this.lock = new Entity(LOCK_KIND, lockName); } public boolean isLocked() { + try { - try { - dir.getDatastoreService().get(lock.getKey()); - return true; - } catch (EntityNotFoundException e) { - return false; - } - } catch (Exception e1) { + // Does not force a transaction, but uses a transaction only if + // necessary. + return dir.doInTransaction(new Callable() { + + @Override + public Boolean call(Transaction transaction) throws Exception { + try { + dir.getDatastoreService().get(transaction, lock.getKey()); + return true; + } catch (EntityNotFoundException ex) { + return false; + } + } + + }); + } catch (GoogleAppEngineDirectoryException e) { + if (log.isWarnEnabled()) { - log.warn("Failed to check if object is locked on index [" + dir.getIndexName() + "]", e1); + log.warn("Failed to check if object is locked on index [" + dir.getIndexName() + "]", e); } + + return false; } - return false; + } public boolean obtain() throws IOException { - Transaction transaction = dir.getDatastoreService().beginTransaction(); + final int attempts = dir.getTransactionRetryCount(); + try { - dir.getDatastoreService().get(transaction, lock.getKey()); - transaction.commit(); + // Locking needs to be done in a transaction of its own because + // the get and subsequent set are not atomic operations with the + // datastore. + return dir.doInTransaction(attempts, true, new Callable() { + + @Override + public Boolean call(Transaction transaction) throws Exception { + try { + dir.getDatastoreService().get(transaction, lock.getKey()); + return false; + } catch (EntityNotFoundException enfex) { + dir.getDatastoreService().put(transaction, lock); + return true; + } + } + + }); + } catch (GoogleAppEngineAttemptsExpiredException ex) { + // Trying to obtain the lock failed several times. Give up and + // return false. return false; - } catch (EntityNotFoundException e) { - // no lock, continue with trying to create one } - try { - dir.getDatastoreService().put(transaction, lock); - transaction.commit(); - return true; - } catch (ConcurrentModificationException e) { - // someone has created an Entity with the same Key in between get and put - transaction.commit(); - return false; - } } public void release() { + try { - Transaction transaction = dir.getDatastoreService().beginTransaction(); - dir.getDatastoreService().delete(transaction, lock.getKey()); - transaction.commit(); - } catch (Exception e) { + doRelease(); + } catch (GoogleAppEngineDirectoryException ex) { if (log.isWarnEnabled()) { - log.warn("Failed to release lock on index [" + dir.getIndexName() + "]", e); + log.warn("Failed to release lock on index [" + dir.getIndexName() + "]", ex); } } + } + + private void doRelease() throws GoogleAppEngineDirectoryException { + dir.doInTransaction(1, true, new Callable() { + + @Override + public Void call(Transaction transaction) throws Exception { + dir.getDatastoreService().delete(transaction, lock.getKey()); + return null; + } + + }); + + } + } } Index: src/main/src/org/compass/needle/gae/GoogleAppEngineAttemptsExpiredException.java =================================================================== --- src/main/src/org/compass/needle/gae/GoogleAppEngineAttemptsExpiredException.java (revision 0) +++ src/main/src/org/compass/needle/gae/GoogleAppEngineAttemptsExpiredException.java (revision 0) @@ -0,0 +1,31 @@ +package org.compass.needle.gae; + +import java.util.ConcurrentModificationException; + +/** + * This exception arises when the number of prescribed attempts to complete a + * transaction have been used. + * + * @author patricktwohig + * + */ +public class GoogleAppEngineAttemptsExpiredException extends GoogleAppEngineDirectoryException { + + public GoogleAppEngineAttemptsExpiredException(String message, ConcurrentModificationException e) { + super(message, e); + } + + public GoogleAppEngineAttemptsExpiredException(String indexName, String fileName, String message, + ConcurrentModificationException e) { + super(indexName, fileName, message, e); + } + + public GoogleAppEngineAttemptsExpiredException(String indexName, String fileName, String message) { + super(indexName, fileName, message); + } + + public GoogleAppEngineAttemptsExpiredException(String message) { + super(message); + } + +}