Vector Space Classifier using Lucene

本文转载自http://sujitpal.blogspot.com/2009/03/vector-space-classifier-using-lucene.html 感谢Salmon Run

This blog copy from http://sujitpal.blogspot.com/2009/03/vector-space-classifier-using-lucene.html. Thank you very much Salmon Run’s great work.

I have only recently started playing with Lucene’s term API, and it looks really useful. Over the past year, I have tried to go through and understand the ideas presented in the TMAP book, and in the process I have built up a small set of tools to tokenize text, create and normalize term-document matrices, etc. Lucene provides some of this functionality through its term API, but in a more memory-efficient way. I was kind of aware of the term API, but I was too busy learning the basics of IR to worry too much about it, so I never took the time to figure it out earlier.

Anyway, I’ve been playing with classification lately, as you can see from my previous post. This week, I try out another popular classification approach based on the Term Vector Space Model. The idea is to compute the position in term space for the “average” or centroid document for each category, and then to find how “close” the target document is to each of these centroids. The closest centroid wins, ie the document is classified to its category.

Training

The classifier is trained with a pre-classified corpus of documents. Each document’s term vectors are computed, and based on its category, put into a Term-Document (TD) matrix for that category. Once all documents are read, then the centroid document for each set of documents are calculated.

During the centroid calculation, we normalize each matrix using TF-IDF and then calculate the centroid for the documents in the matrix. A centroid is basically just the average of the rows in the TD matrix. If you think of a column in the TD Matrix as representing a single document, then a tuple of the elements of that column matrix can be considered as a coordinate that represents a point in n-dimensional space, where n is the number of terms (rows) in our TD Matrix. Thus a document made up of term coordinates which are the average of the rows would represent the centroid of the documents in that category.

In my example, the centroids are stored as in-memory member variables of the classifier, which can be accessed during the classification phase via an accessor. Another data structure is the term to position map, also created as a side effect of the training phase and accessible via an accessor. In real-world systems, you may want to train the classifier once and then reuse it many times over different documents, possibly over a period of days or months, so its probably better to store this data in a database table or some other persistent medium. If you go the database table route, you can coalesce the two data structures needed by the classify method into a single table by keying the centroid coordinates off the term itself. I haven’t done this because I am lazy, so you are stuck with handing the two data structures to the classify method at the moment.

Classification

The classification process takes a body of text and the two data structures, creates an in-memory Lucene index off the text, and creates a document matrix out of the normalized term vectors. As in the training phase, we pass the raw frequencies through our TF-IDF indexers. Similarities are then calculated for this document matrix against the document matrices for each category. The category with the highest similarity between its centroid matrix and the document matrix is assigned to the document. The default similarity implementation used in this classifier is Cosine Similarity.

Notice that unlike the Naive Bayes classifier, this classifier is not binary. You can use the cosine similarity measure to find the best matching category for a document for multiple categories. Of course, it doesn’t have to be this way, a Naive Bayes classifier can be run multiple times to make it non-binary, and a Vector Space classifier can be trained appropriately to make it binary.

Classifier Code

The code for the classifier is shown below. There is a bunch of setters at the beginning, which allow the caller to configure the classifier. Then the caller calls the train() method. Once the training is complete, the caller can call the classify() method, which returns a String representing the best category for the document. There is another method that will report the similarity scores for each category for the document, which can be used for debugging. There is some test code further down that illustrates the usage.

// Source: src/main/java/com/mycompany/myapp/classifiers/LuceneVectorSpaceModelClassifier.java
package com.mycompany.myapp.classifiers;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import org.apache.commons.collections15.Bag;
import org.apache.commons.collections15.Transformer;
import org.apache.commons.collections15.bag.HashBag;
import org.apache.commons.collections15.comparators.ReverseComparator;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.commons.math.linear.RealMatrix;
import org.apache.commons.math.linear.SparseRealMatrix;
import org.apache.lucene.analysis.Analyzer;
import org.apache.lucene.analysis.standard.StandardAnalyzer;
import org.apache.lucene.document.Document;
import org.apache.lucene.document.Field;
import org.apache.lucene.document.Field.Index;
import org.apache.lucene.document.Field.Store;
import org.apache.lucene.document.Field.TermVector;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.index.TermEnum;
import org.apache.lucene.index.TermFreqVector;
import org.apache.lucene.index.IndexWriter.MaxFieldLength;
import org.apache.lucene.store.RAMDirectory;

import com.mycompany.myapp.clustering.ByValueComparator;
import com.mycompany.myapp.indexers.IdfIndexer;
import com.mycompany.myapp.indexers.TfIndexer;
import com.mycompany.myapp.similarity.AbstractSimilarity;
import com.mycompany.myapp.similarity.CosineSimilarity;

/**
 * Computes the position in term space of the centroid for each
 * category during the training phase. During the classify phase,
 * the position in term space of the document to be classified is
 * computed and the cosine similarity of this document with each
 * of the centroids is computed. The category of the centroid which
 * is closest to the document is assigned to the document.
 */
public class LuceneVectorSpaceModelClassifier {

  private final Log log = LogFactory.getLog(getClass());

  private String indexDir;
  private String categoryFieldName;
  private String bodyFieldName;

  private Analyzer analyzer = new StandardAnalyzer();

  @SuppressWarnings("unchecked")
  private Transformer<RealMatrix,RealMatrix>[] indexers =
    new Transformer[] {
      new TfIndexer(),
      new IdfIndexer()
  };

  private AbstractSimilarity similarity = new CosineSimilarity();

  private Map<String,RealMatrix> centroidMap;
  private Map<String,Integer> termIdMap;
  private Map<String,Double> similarityMap;

  /**
   * Set the directory where the Lucene index is located.
   * @param indexDir the index directory.
   */
  public void setIndexDir(String indexDir) {
    this.indexDir = indexDir;
  }

  /**
   * Set the name of the Lucene field containing the preclassified category.
   * @param categoryFieldName the category field name.
   */
  public void setCategoryFieldName(String categoryFieldName) {
    this.categoryFieldName = categoryFieldName;
  }

  /**
   * Set the name of the Lucene field containing the document body. The
   * document body must have been indexed with TermVector.YES.
   * @param bodyFieldName the name of the document body field.
   */
  public void setBodyFieldName(String bodyFieldName) {
    this.bodyFieldName = bodyFieldName;
  }

  /**
   * The Analyzer used for tokenizing the document body during indexing,
   * and to tokenize the text to be classified. If not specified, the
   * classifier uses the StandardAnalyzer.
   * @param analyzer the analyzer reference.
   */
  public void setAnalyzer(Analyzer analyzer) {
    this.analyzer = analyzer;
  }

  /**
   * A transformer chain of indexers (or normalizers) to normalize the
   * document matrices. If not specified, the default is a chain of TF-IDF.
   * @param indexers the normalizer chain.
   */
  public void setIndexers(
      Transformer<RealMatrix,RealMatrix>[] indexers) {
    this.indexers = indexers;
  }

  /**
   * The Similarity implementation used to calculate the similarity between
   * the text to be classified and the category centroid document matrices.
   * Uses CosineSimilarity if not specified.
   * @param similarity the similarity to use.
   */
  public void setSimilarity(AbstractSimilarity similarity) {
    this.similarity = similarity;
  }

  /**
   * Implements the logic for training the classifier. The input is a Lucene
   * index of preclassified documents. The classifier is provided the name
   * of the field which contains the document body, as well as the name of
   * the field which contains the category name. Additionally, the document
   * body must have had its Term Vectors computed during indexing (using
   * TermVector.YES). The train method uses the Term Vectors to compute a
   * geometrical centroid for each category in the index. The set of category
   * names to their associated centroid document matrix is available via the
   * getCentroidMap() method after training is complete.
   * @throws Exception if one is thrown.
   */
  public void train() throws Exception {
    log.info("Classifier training started");
    IndexReader reader = IndexReader.open(indexDir);
    // Set up a data structure for the term versus the row id in the matrix.
    // This is going to be used for looking up the term's row in the matrix.
    this.termIdMap = computeTermIdMap(reader);
    // Initialize the data structures to hold the td matrices for the various
    // categories.
    Bag<String> docsInCategory = computeDocsInCategory(reader);
    Map<String,Integer> currentDocInCategory =
      new HashMap<String,Integer>();
    Map<String,RealMatrix> categoryTfMap =
      new HashMap<String,RealMatrix>();
    for (String category : docsInCategory.uniqueSet()) {
      int numDocsInCategory = docsInCategory.getCount(category);
      categoryTfMap.put(category,
        new SparseRealMatrix(termIdMap.size(), numDocsInCategory));
      currentDocInCategory.put(category, new Integer(0));
    }
    // extract each document body's TermVector into the td matrix for
    // that document's category
    int numDocs = reader.numDocs();
    for (int i = 0; i < numDocs; i++) {
      Document doc = reader.document(i);
      String category = doc.get(categoryFieldName);
      RealMatrix tfMatrix = categoryTfMap.get(category);
      // get the term frequency map
      TermFreqVector vector = reader.getTermFreqVector(i, bodyFieldName);
      String[] terms = vector.getTerms();
      int[] frequencies = vector.getTermFrequencies();
      for (int j = 0; j < terms.length; j++) {
        int row = termIdMap.get(terms[j]);
        int col = currentDocInCategory.get(category);
        tfMatrix.setEntry(row, col, new Double(frequencies[j]));
      }
      incrementCurrentDoc(currentDocInCategory, category);
    }
    reader.close();
    // compute centroid vectors for each category
    this.centroidMap = new HashMap<String,RealMatrix>();
    for (String category : docsInCategory.uniqueSet()) {
      RealMatrix tdmatrix = categoryTfMap.get(category);
      RealMatrix centroid = computeCentroid(tdmatrix);
      centroidMap.put(category, centroid);
    }
    log.info("Classifier training complete");
  }

  /**
   * Returns the centroid map of category name to TD Matrix containing the
   * centroid document of the category. This data is computed as a side
   * effect of the train() method.
   * @return the centroid map computed from the training.
   */
  public Map<String,RealMatrix> getCentroidMap() {
    return centroidMap;
  }

  /**
   * Returns the map of analyzed terms versus their positions in the centroid
   * matrices. The data is computed as a side-effect of the train() method.
   * @return a Map of analyzed terms to their position in the matrix.
   */
  public Map<String,Integer> getTermIdMap() {
    return termIdMap;
  }

  /**
   * Once the classifier is trained using the train() method, it creates a
   * Map of category to associated centroid documents for each category, and
   * a termIdMap, which is a mapping of tokenized terms to its row number in
   * the document matrix for the centroid documents. These two structures are
   * used by the classify method to match up terms from the input text with
   * corresponding terms in the centroids to calculate similarities. Builds
   * a Map of category names and the similarities of the input text to the
   * centroids in each category as a side effect. Returns the category with
   * the highest similarity score, ie the category this text should be
   * classified under.
   * @param centroids a Map of category names to centroid document matrices.
   * @param termIdMap a Map of terms to their positions in the document
   *                  matrix.
   * @param text the text to classify.
   * @return the best category for the text.
   * @throws Exception if one is thrown.
   */
  public String classify(Map<String,RealMatrix> centroids,
      Map<String,Integer> termIdMap, String text) throws Exception {
    RAMDirectory ramdir = new RAMDirectory();
    indexDocument(ramdir, "text", text);
    // now find the (normalized) term frequency vector for this
    RealMatrix docMatrix = buildMatrixFromIndex(ramdir, "text");
    // compute similarity using passed in Similarity implementation, we
    // use CosineSimilarity by default.
    this.similarityMap = new HashMap<String,Double>();
    for (String category : centroids.keySet()) {
      RealMatrix centroidMatrix = centroids.get(category);
      double sim = similarity.computeSimilarity(docMatrix, centroidMatrix);
      similarityMap.put(category, sim);
    }
    // sort the categories
    List<String> categories = new ArrayList<String>();
    categories.addAll(centroids.keySet());
    Collections.sort(categories,
      new ReverseComparator<String>(
      new ByValueComparator<String,Double>(similarityMap)));
    // return the best category, the similarity map is also available
    // to the client for debugging or display.
    return categories.get(0);
  }

  /**
   * Returns the map of category to similarity with the document after
   * classification. The similarityMap is computed as a side-effect of
   * the classify() method, so the data is interesting only if this method
   * is called after classify() completes successfully.
   * @return map of category to similarity scores for text to classify.
   */
  public Map<String,Double> getSimilarityMap() {
    return similarityMap;
  }

  /**
   * Loops through the IndexReader's TermEnum enumeration, and creates a Map
   * of term to an integer id. This map is going to be used to assign string
   * terms to specific rows in the Term Document Matrix for each category.
   * @param reader a reference to the IndexReader.
   * @return a Map of terms to their integer ids (0-based).
   * @throws Exception if one is thrown.
   */
  private Map<String, Integer> computeTermIdMap(IndexReader reader)
      throws Exception {
    Map<String,Integer> termIdMap =
      new HashMap<String,Integer>();
    int id = 0;
    TermEnum termEnum = reader.terms();
    while (termEnum.next()) {
      String term = termEnum.term().text();
      if (termIdMap.containsKey(term)) {
        continue;
      }
      termIdMap.put(term, id);
      id++;
    }
    return termIdMap;
  }

  /**
   * Loops through the specified IndexReader and returns a Bag of categories
   * and their document counts. We don't use the BitSet/DocIdSet approach
   * here because we don't know how many categories the training documents
   * have been classified into.
   * @param reader the reference to the IndexReader.
   * @return a Bag of category names and counts.
   * @throws Exception if one is thrown.
   */
  private Bag<String> computeDocsInCategory(IndexReader reader)
      throws Exception {
    int numDocs = reader.numDocs();
    Bag<String> docsInCategory = new HashBag<String>();
    for (int i = 0; i < numDocs; i++) {
      Document doc = reader.document(i);
      String category = doc.get(categoryFieldName);
      docsInCategory.add(category);
    }
    return docsInCategory;
  }

  /**
   * Increments the counter for the category to point to the next document
   * index. This is used to manage the document index in the td matrix for
   * the category.
   * @param currDocs the Map of category-wise document Id counters.
   * @param category the category whose document-id we want to increment.
   */
  private void incrementCurrentDoc(Map<String,Integer> currDocs,
      String category) {
    int currentDoc = currDocs.get(category);
    currDocs.put(category, currentDoc + 1);
  }

  /**
   * Compute the centroid document from the TD Matrix. Result is a matrix
   * of term weights but for a single document only.
   * @param tdmatrix
   * @return
   */
  private RealMatrix computeCentroid(RealMatrix tdmatrix) {
    tdmatrix = normalizeWithTfIdf(tdmatrix);
    RealMatrix centroid =
      new SparseRealMatrix(tdmatrix.getRowDimension(), 1);
    int numDocs = tdmatrix.getColumnDimension();
    int numTerms = tdmatrix.getRowDimension();
    for (int row = 0; row < numTerms; row++) {
      double rowSum = 0.0D;
      for (int col = 0; col < numDocs; col++) {
        rowSum += tdmatrix.getEntry(row, col);
      }
      centroid.setEntry(row, 0, rowSum / ((double) numDocs));
    }
    return centroid;
  }

  /**
   * Builds an in-memory Lucene index using the text supplied for classification.
   * @param ramdir the RAM Directory reference.
   * @param fieldName the field name to index the text as.
   * @param text the text to index.
   * @throws Exception if one is thrown.
   */
  private void indexDocument(RAMDirectory ramdir, String fieldName,
      String text) throws Exception {
    IndexWriter writer =
      new IndexWriter(ramdir, analyzer, MaxFieldLength.UNLIMITED);
    Document doc = new Document();
    doc.add(new Field(
      fieldName, text, Store.YES, Index.ANALYZED, TermVector.YES));
    writer.addDocument(doc);
    writer.commit();
    writer.close();
  }

  /**
   * Given a Lucene index and a field name with pre-computed TermVectors,
   * creates a document matrix of terms. The document matrix is normalized
   * using the specified indexer chain.
   * @param ramdir the RAM Directory reference.
   * @param fieldName the name of the field to build the matrix from.
   * @return a normalized Document matrix of terms and frequencies.
   * @throws Exception if one is thrown.
   */
  private RealMatrix buildMatrixFromIndex(RAMDirectory ramdir,
      String fieldName) throws Exception {
    IndexReader reader = IndexReader.open(ramdir);
    TermFreqVector vector = reader.getTermFreqVector(0, fieldName);
    String[] terms = vector.getTerms();
    int[] frequencies = vector.getTermFrequencies();
    RealMatrix docMatrix = new SparseRealMatrix(termIdMap.size(), 1);
    for (int i = 0; i < terms.length; i++) {
      String term = terms[i];
      if (termIdMap.containsKey(term)) {
        int row = termIdMap.get(term);
        docMatrix.setEntry(row, 0, frequencies[i]);
      }
    }
    reader.close();
    // normalize the docMatrix using TF-IDF
    docMatrix = normalizeWithTfIdf(docMatrix);
    return docMatrix;
  }

  /**
   * Pass the input TD Matrix through a chain of transformers to normalize
   * the TD Matrix. Here we do TF-IDF normalization, although it is possible
   * to do other types of normalization (such as LSI) by passing in the
   * appropriate chain of normalizers (or indexers).
   * @param docMatrix the un-normalized TD Matrix.
   * @return the normalized TD Matrix.
   */
  private RealMatrix normalizeWithTfIdf(RealMatrix docMatrix) {
    for (Transformer<RealMatrix,RealMatrix> indexer : indexers) {
      docMatrix = indexer.transform(docMatrix);
    }
    return docMatrix;
  }
}

Related Code

I have reused some code that I had written to support other components developed earlier. When I wrote them earlier, I was using the Jama Matrixpackage. However, I switched sometime late last year to using the linear algebra classes in commons-math instead. I started using commons-math in anticipation of being able to use the SparseRealMatrix implementation which I had suggested and contributed a first cut for, but the 2.0 release is still not out, so its likely you will have to download and build from the svn repository if you want to run my code. In each subsection below, I point out where you can get the Jama version if you want it.

TfIndexer

This indexer normalizes each term count by dividing by the total number of terms for a given document. This has the effect of normalizing the effect of long documents versus shorter ones. At the end of the normalization, the term count becomes a number between 0 and 1, with the total of all the term frequencies for a document being equal to 1.

The Jama version of the code can be found in my post IR Math with Java : TF, IDF and LSI.

// Source: src/main/java/com/mycompany/myapp/indexers/TfIndexer.java
package com.mycompany.myapp.indexers;

import org.apache.commons.collections15.Transformer;
import org.apache.commons.math.linear.RealMatrix;

public class TfIndexer implements Transformer {

  public RealMatrix transform(RealMatrix matrix) {
    for (int j = 0; j < matrix.getColumnDimension(); j++) {
      double sum = sum(matrix.getSubMatrix(
        0, matrix.getRowDimension() -1, j, j));
      for (int i = 0; i < matrix.getRowDimension(); i++) {
        matrix.setEntry(i, j, (matrix.getEntry(i, j) / sum));
      }
    }
    return matrix;
  }

  private double sum(RealMatrix colMatrix) {
    double sum = 0.0D;
    for (int i = 0; i < colMatrix.getRowDimension(); i++) {
      sum += colMatrix.getEntry(i, 0);
    }
    return sum;
  }
}

IdfIndexer

This transformation has the effect of reducing the frequency of words that are commonly found in the document set. The factor fw by which the frequency of term w is reduced is given by the formula:

  fw = 1 + log(N/nw)
  where:
    N = total number of documents in the collection
    nw = number of documents containing word w

The code is shown below. The Jama version of the code can also be found in my post IR Math with Java : TF, IDF and LSI.

The code is shown below. The Jama version of the code can also be found in my post IR Math with Java : TF, IDF and LSI.

// Source: src/main/java/com/mycompany/myapp/indexers/IdfIndexer.java
package com.mycompany.myapp.indexers;

import org.apache.commons.collections15.Transformer;
import org.apache.commons.math.linear.RealMatrix;

public class IdfIndexer implements Transformer {

  public RealMatrix transform(RealMatrix matrix) {
    // Phase 1: apply IDF weight to the raw word frequencies
    int n = matrix.getColumnDimension();
    for (int j = 0; j < matrix.getColumnDimension(); j++) {
      for (int i = 0; i < matrix.getRowDimension(); i++) {         double dm = countDocsWithWord(           matrix.getSubMatrix(i, i, 0, matrix.getColumnDimension() - 1));         double matrixElement = matrix.getEntry(i, j);         if (matrixElement > 0.0D) {
          matrix.setEntry(i, j,
            matrix.getEntry(i,j) * (1 + Math.log(n) - Math.log(dm)));
        }
      }
    }
    // Phase 2: normalize the word scores for a single document
    for (int j = 0; j < matrix.getColumnDimension(); j++) {
      double sum = sum(matrix.getSubMatrix(
        0, matrix.getRowDimension() -1, j, j));
      for (int i = 0; i < matrix.getRowDimension(); i++) {
        matrix.setEntry(i, j, (matrix.getEntry(i, j) / sum));
      }
    }
    return matrix;
  }

  private double sum(RealMatrix colMatrix) {
    double sum = 0.0D;
    for (int i = 0; i < colMatrix.getRowDimension(); i++) {
      sum += colMatrix.getEntry(i, 0);
    }
    return sum;
  }

  private double countDocsWithWord(RealMatrix rowMatrix) {
    double numDocs = 0.0D;
    for (int j = 0; j < rowMatrix.getColumnDimension(); j++) {       if (rowMatrix.getEntry(0, j) > 0.0D) {
        numDocs++;
      }
    }
    return numDocs;
  }
}

CosineSimilarity

Cosine Similarity calculates the cosine of the angle between the lines joining the origin of the term space to the each document’s position. The higher the value of the cosine, the smaller the angle between the two lines, and hence more similar the documents. Cosine Similarity is calculated as:

  cos θ = A • B / |A| * |B|
  where A = document matrix for the first document,
        B = document matrix for the second document.

The code for the CosineSimilarity class is shown below. The Jama version can be found in my post IR Math with Java : Similarity Measures.

// Source: src/main/java/com/mycompany/myapp/similarity/CosineSimilarity.java
package com.mycompany.myapp.similarity;

import org.apache.commons.math.linear.RealMatrix;
import org.apache.commons.math.linear.SparseRealMatrix;

public class CosineSimilarity extends AbstractSimilarity {

  @Override
  public double computeSimilarity(
      RealMatrix sourceDoc, RealMatrix targetDoc) {
    if (sourceDoc.getRowDimension() != targetDoc.getRowDimension() ||
        sourceDoc.getColumnDimension() != targetDoc.getColumnDimension() ||
        sourceDoc.getColumnDimension() != 1) {
      throw new IllegalArgumentException(
        "Matrices are not column matrices or not of the same size");
    }
    // max col sum, only 1 col, so...
    double dotProduct = dot(sourceDoc, targetDoc);
    // sqrt of sum of squares of all elements, only one col, so...
    double eucledianDist =
      sourceDoc.getFrobeniusNorm() * targetDoc.getFrobeniusNorm();
    return dotProduct / eucledianDist;
  }

  private double dot(RealMatrix source, RealMatrix target) {
    int maxRows = source.getRowDimension();
    int maxCols = source.getColumnDimension();
    RealMatrix dotProduct = new SparseRealMatrix(maxRows, maxCols);
    for (int row = 0; row < maxRows; row++) {
      for (int col = 0; col < maxCols; col++) {
        dotProduct.setEntry(row, col,
          source.getEntry(row, col) * target.getEntry(row, col));
      }
    }
    return dotProduct.getNorm();
  }
}

Test Code

For the test, we use the same collection of Reuters news items from the TextMine project that was used for testing the Binary Naive Bayes Classifierdescribed in my previous post. The indexing code is pretty much the same, except that we now compute the term vectors of the body during indexing time. There is a single test, which trains the classifier, then classifies 5 documents with the classifier. Here is the JUnit test.

// Source: src/test/java/com/mycompany/myapp/classifiers/LuceneVectorSpaceModelClassifierTest.java
package com.mycompany.myapp.classifiers;
 

import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
import java.util.Map;

import org.apache.commons.collections15.Transformer;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.commons.math.linear.RealMatrix;
import org.apache.lucene.document.Document;
import org.apache.lucene.document.Field;
import org.apache.lucene.document.Field.Index;
import org.apache.lucene.document.Field.Store;
import org.apache.lucene.document.Field.TermVector;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.index.IndexWriter.MaxFieldLength;
import org.apache.lucene.store.FSDirectory;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;

import com.mycompany.myapp.indexers.IdfIndexer;
import com.mycompany.myapp.indexers.TfIndexer;
import com.mycompany.myapp.similarity.CosineSimilarity;
import com.mycompany.myapp.summarizers.SummaryAnalyzer;

public class LuceneVectorSpaceModelClassifierTest {

  private static final Log log =
    LogFactory.getLog(LuceneVectorSpaceModelClassifierTest.class);

  private static String INPUT_FILE =
    "src/test/resources/data/sugar-coffee-cocoa-docs.txt";
  private static String INDEX_DIR = "src/test/resources/data/scc-index";
  private static String[] DOCS_TO_CLASSIFY = new String[] {
    "src/test/resources/data/cocoa.txt",
    "src/test/resources/data/cocoa1.txt",
    "src/test/resources/data/cocoa2.txt",
    "src/test/resources/data/coffee.txt",
    "src/test/resources/data/coffee1.txt"
  };

  @BeforeClass
  public static void buildIndex() throws Exception {
    log.debug("Building index...");
    BufferedReader reader = new BufferedReader(new FileReader(INPUT_FILE));
    IndexWriter writer =
      new IndexWriter(FSDirectory.getDirectory(INDEX_DIR),
      new SummaryAnalyzer(), MaxFieldLength.UNLIMITED);
    String line = null;
    int lno = 0;
    StringBuilder bodybuf = new StringBuilder();
    String category = null;
    while ((line = reader.readLine()) != null) {
      if (line.endsWith(".sgm")) {
        // header line
        if (lno > 0) {
          // not the very first line, so dump current body buffer and
          // reinit the buffer.
          writeToIndex(writer, category, bodybuf.toString());
          bodybuf = new StringBuilder();
        }
        category = StringUtils.trim(StringUtils.split(line, ":")[1]);
        continue;
      } else {
        // not a header line, accumulate line into bodybuf
        bodybuf.append(line).append(" ");
      }
      lno++;
    }
    // last record
    writeToIndex(writer, category, bodybuf.toString());
    reader.close();
    writer.commit();
    writer.optimize();
    writer.close();
  }

  private static void writeToIndex(IndexWriter writer, String category,
      String body) throws Exception {
    Document doc = new Document();
    doc.add(new Field("category", category, Store.YES, Index.NOT_ANALYZED));
    doc.add(
      new Field("body", body, Store.YES, Index.ANALYZED, TermVector.YES));
    writer.addDocument(doc);
  }

  @AfterClass
  public static void deleteIndex() throws Exception {
    log.info("Deleting index directory...");
    FileUtils.deleteDirectory(new File(INDEX_DIR));
  }

  @Test
  public void testLuceneNaiveBayesClassifier() throws Exception {
    LuceneVectorSpaceModelClassifier classifier =
      new LuceneVectorSpaceModelClassifier();
    // setup
    classifier.setIndexDir(INDEX_DIR);
    classifier.setAnalyzer(new SummaryAnalyzer());
    classifier.setCategoryFieldName("category");
    classifier.setBodyFieldName("body");
    // this is the default but we set it anyway, to illustrate usage
    classifier.setIndexers(new Transformer[] {
      new TfIndexer(),
      new IdfIndexer()
    });
    // this is the default but we set it anyway, to illustrate usage.
    // Similarity need not be set before training, it can be set before
    // the classification step.
    classifier.setSimilarity(new CosineSimilarity());
    // training
    classifier.train();
    // classification
    Map centroidMap = classifier.getCentroidMap();
    Map termIdMap = classifier.getTermIdMap();
    String[] categories = centroidMap.keySet().toArray(new String[0]);
    for (String testDoc : DOCS_TO_CLASSIFY) {
      File f = new File(testDoc);
      String category = classifier.classify(centroidMap, termIdMap,
        FileUtils.readFileToString(f, "UTF-8"));
      System.out.println(">>> " + f.getName() +
        " => category: " + category);
      Map similarityMap = classifier.getSimilarityMap();
      String[] pairs = new String[categories.length];
      for (int i = 0; i < categories.length; i++) {
         pairs[i] = categories[i] + ":" + similarityMap.get(categories[i]);
      }
      System.out.println("(" + StringUtils.join(pairs, ", ") + ")");
    }
  }
}

Results

Here are the results. It was a bit surprising to see such good results, so I went back and checked the code to see if I was doing something wrong :-) . As you can see, it correctly classified all my 5 documents.

>>> cocoa.txt => category: cocoa
(cocoa:0.7499364961896885, coffee:0.21387426054867117, sugar:0.15213562681433365)
>>> cocoa1.txt => category: cocoa
(cocoa:0.35404965894048845, coffee:0.15006958907480905, sugar:0.14425804054775068)
>>> cocoa2.txt => category: cocoa
(cocoa:0.2993396230523616, coffee:0.1754388455250711, sugar:0.18650205458278443)
>>> coffee.txt => category: coffee
(cocoa:0.18703846088862733, coffee:0.45354676135783173, sugar:0.20549314483406184)
>>> coffee1.txt => category: coffee
(cocoa:0.1436949323744925, coffee:0.3702669738594301, sugar:0.2316259997838632)

Possible Improvements

With the Naive Bayes approach, I had to enable feature selection and use the top √n terms to get it to classify correctly. I had thought of doing something similar here if required, basically by using SVD to extract the principal √n components and using them to compute the similarity. It is quite easy to do if needed though, simply by setting a different chain of indexers.

Another interesting toolkit to try out for this stuff is the Semantic Vectors project, which seems to be quite promising from the little I understand about it. A commenter on a previous related post pointed me to this – now that I’ve made the leap to using Lucene for the tokenization part, it seems logical to give this a try, something I plan to do next week.

Update 2009-04-26: In recent posts, I have been building on code written and described in previous posts, so there were (and rightly so) quite a few requests for the code. So I’ve created a project on Sourceforge to host the code. You will find the complete source code built so far in the project’s SVN repository.

HBase Row Lock 浅析

Lock是实现原子read modify write操作的基础,从Google Bigtable的论文中的描述可以看出Bigtable只支持单行的transaction。HBase就是以此原则来设计Row Lock的。在现实使用中,基于良好的schema设计,大部分的transaction操作都是可以在一个row key下完成的。所以我们很有必要去了解Hbase是如何实现Row Lock。
一般Lock的使用都会成为系统性能的瓶颈,但是在Hbase中特定的row key只可能属于一个region server,所以对于这种只用于单行的锁对系统的影响会相对的减小。

首先我们先看看row lock如何在Hbase中使用。

  static public void putData(String tablename, String row, String family,
      String column, String value) throws IOException {
            Configuration config = HBaseConfiguration.create();
            HTable table = new HTable(config, tablename);
            byte[] brow = Bytes.toBytes(row);
            byte[] bfamily = Bytes.toBytes(family);
            byte[] bcolumn = Bytes.toBytes(column);
            byte[] bvalue = Bytes.toBytes(value);
            RowLock rowLock = table.lockRow(brow);
            Put p = new Put(brow);
            p.add(bfamily, bcolumn, bvalue);
            table.put(p);
            table.unlockRow(rowLock);
            table.close();
      }

由上面一段代码可以看书我们只需要简单的对某个row进行加锁之后对这个row的所有操作都将是原子操作。当然上段代码是没有必要的,因为Hbase在进行Get,Put,Delete操作的时候会自动的对操作加锁。所以我们需要进行多步操作是才有自己添加row lock的必要。

下面我们来看看row lock是如何实现的:


public class RowLock {
  private byte [] row = null;
  private long lockId = -1L;

  /**
   * Creates a RowLock from a row and lock id
   * @param row row to lock on
   * @param lockId the lock id
   */
  public RowLock(final byte [] row, final long lockId) {
    this.row = row;
    this.lockId = lockId;
  }

  /**
   * Creates a RowLock with only a lock id
   * @param lockId lock id
   */
  public RowLock(final long lockId) {
    this.lockId = lockId;
  }

  /**
   * Get the row for this RowLock
   * @return the row
   */
  public byte [] getRow() {
    return row;
  }

  /**
   * Get the lock id from this RowLock
   * @return the lock id
   */
  public long getLockId() {
    return lockId;
  }
}

它包含两个属性row和lockId。
当row lock产生后又是如何由region server管理的呢,我们现在来看看服务器端的代码就会对他有个清晰的认识。

public RowLock lockRow(final byte [] row)
  throws IOException {
    return connection.getRegionServerWithRetries(
      new ServerCallable(connection, tableName, row) {
        public RowLock call() throws IOException {
          long lockId =
              server.lockRow(location.getRegionInfo().getRegionName(), row);
          return new RowLock(row,lockId);
        }
      }
    );
  }

此操作通过regionname和row来确定取得特定的RowLock。

public long lockRow(byte [] regionName, byte [] row)
  throws IOException {
    checkOpen();
    NullPointerException npe = null;
    if(regionName == null) {
      npe = new NullPointerException("regionName is null");
    } else if(row == null) {
      npe = new NullPointerException("row to lock is null");
    }
    if(npe != null) {
      IOException io = new IOException("Invalid arguments to lockRow");
      io.initCause(npe);
      throw io;
    }
    requestCount.incrementAndGet();
    try {
      HRegion region = getRegion(regionName);
      Integer r = region.obtainRowLock(row);
      long lockId = addRowLock(r,region);
      LOG.debug("Row lock " + lockId + " explicitly acquired by client");
      return lockId;
    } catch (Throwable t) {
      throw convertThrowableToIOE(cleanup(t,
        "Error obtaining row lock (fsOk: " + this.fsOk + ")"));
    }
  }

此操作对该region下的row进行加锁操作。这里的关键部分是创建一个lease对象,用于过期来执行某些操作。默认的timeout是60s,也就是说这个锁只能维持60s,过期会由RowLockListener来处理,这样就为了防止用户忘记释放这个锁,造成系统deadlock的问题,当然你可以通过设定hbase.regionserver.lease.period来改变这个值。但是改变此值会影响系统的其他操作,因为lease time会用于系统其他time out检测。

protected long addRowLock(Integer r, HRegion region) throws LeaseStillHeldException {
    long lockId = -1L;
    lockId = rand.nextLong();
    String lockName = String.valueOf(lockId);
    rowlocks.put(lockName, r);
    this.leases.
      createLease(lockName, new RowLockListener(lockName, region));
    return lockId;
  }

  this.leases = new Leases((int) conf.getLong(HConstants.HBASE_REGIONSERVER_LEASE_PERIOD_KEY,
            HConstants.DEFAULT_HBASE_REGIONSERVER_LEASE_PERIOD),
        this.threadWakeFrequency);

此段代码是对row进行操作的时候获取row lock的过程。方法是使用一个Map:locksToRows来记录已经加锁的row。如果已经有人拿到这个锁,其他用户就会在此等待。

/**
   * Obtains or tries to obtain the given row lock.
   * @param waitForLock if true, will block until the lock is available.
   *        Otherwise, just tries to obtain the lock and returns
   *        null if unavailable.
   */
  private Integer internalObtainRowLock(final byte[] row, boolean waitForLock)
  throws IOException {
    checkRow(row);
    startRegionOperation();
    try {
      synchronized (lockedRows) {
        while (lockedRows.contains(row)) {
          if (!waitForLock) {
            return null;
          }
          try {
            lockedRows.wait();
          } catch (InterruptedException ie) {
            // Empty
          }
        }
        // generate a new lockid. Attempt to insert the new [lockid, row].
        // if this lockid already exists in the map then revert and retry
        // We could have first done a lockIds.get, and if it does not exist only
        // then do a lockIds.put, but the hope is that the lockIds.put will
        // mostly return null the first time itself because there won't be
        // too many lockId collisions.
        byte [] prev = null;
        Integer lockId = null;
        do {
          lockId = new Integer(lockIdGenerator++);
          prev = lockIds.put(lockId, row);
          if (prev != null) {
            lockIds.put(lockId, prev);    // revert old value
            lockIdGenerator = rand.nextInt(); // generate new start point
          }
        } while (prev != null);

        lockedRows.add(row);
        lockedRows.notifyAll();
        return lockId;
      }
    } finally {
      closeRegionOperation();
    }
  }

以上就是Hbase就行Row Lock的加锁过程,而释放锁与此过程相反。从以上可以看出我们在使用Row Lock时需要考虑一下几个问题。
1.由于锁的自动释放时间默认设置为60s,所以一种可能是在使用过程中如果原子操作的时间过长,锁会自动释放。从而使操作有可能出现不可预测的错误。另一种可能是在进行大数据量操作时,region server会进行table的split和move操作,这类操作一般耗时较长也会使锁自动释放。

2.使用row lock还会引起死锁的问题。假设一个region server能够接受最多50个并发链接。但一个客户端取得某一行row的锁并进行操作的时候,其他50个链接同时请求该锁,因为此锁已被第一个客户端获取所以其他客户端只能等待该锁释放。问题是这时第一个客户端需要获取这个锁来释放但是该region server的50个链接都被占,使第一个客户无法链接从而产生死锁的情况。

由此上两个原因Hbase提供了另外的机制来处理read-modify-write问题,ICV和CAS。之后的文档我会详细讨论这两种方法的实现以及可能产生的问题。

REST and Cloud Services

Cloud is becoming more and more popular within the IT industry, the cloud can provide different type of service and in reality almost all of the cloud service is provided through web service.

So what is web service? Web services are typically application programming interfaces (API) or web APIs that can be accessed over a network, such as the Internet, and executed on a remote system hosting the requested services. In common usage the term refers to clients and servers that communicate over the Hypertext Transfer Protocol (HTTP) used on the web.

When we design the modern web services’ architecture, Roy fielding said, “The modern Web architecture emphasizes scalability of component interactions, generality of interfaces, independent deployment of components, and intermediary components to reduce interaction latency, enforce security, and encapsulate legacy systems.”, according to his idea, we should focus on how the different components of different services interact , how efficient add new component to the system and how to improve the system’s performance. So the problems fall into the integration.

Integration styles
There are four application integration styles:

  1. shared database
  2. File transfer
  3. remote procedure call
  4. message bus.

The first is shared database, it is commonly used in an enterprise has multiple applications that are being built independently, with different languages and platforms. The enterprise needs information to be shared rapidly and consistently. But in the Internet environment, it has some issues about performance and security.

Second is file transfer, it uses some application level protocol such as FTP or SMTP to transfer files to both agreed location. The file can be consumed by service providers then the serve will generate the results to the designated location and then the file can be fetched back by client. It used in the 80’s~90’s before the web became popular and mature.

The other two styles are dominating in today’s web services area, for example, SOAP, XML-RPC and REST*. Depend on these styles, in the Internet or Intranet environment, Paul Prescod concludes three strategies in his paper Roots of the REST/SOAP Debate for doing integrations: first, custom protocols. It means you can design you own protocol based upon some existing protocol by study the problem domain. This method is natural but expensive and it is also hard to be reused and have the problem of interoperable. Second, protocol framework, this is what SOAP does. It creates new protocols constantly and can be used in lots of area because it has common features. But it also suffers the interoperable problem, because both the end-point need to agree on how to use the protocol (all right, there is WSDL). Another weakness is all software will inherit the problem made in the protocol. Third, horizontal protocol approach, instead of developing domain-specific protocols, we just use general-purpose protocols to transfer domain-specific data. REST (representational state transfer) is an example. The most obvious weakness is that it will inherit the problems of the general-purpose protocols. But in fact the general-purpose protocol must be designed and tested carefully. But the advantage is more obvious than the weakness because it solve the interoperable problem which is very important in the modern web architecture.

Introduction to REST
This article will have a short introduction about the REST (representational state transfer) and the resource oriented architecture based on it.

Representational State Transfer is a style of software architecture for distributed hypermedia systems such as the World Wide Web. The term Representational State Transfer was introduced and defined in 2000 by Roy Fielding in his doctoral dissertation.

It has following properties:

  • Client-server based
    this allows for system needs to be demarcated across client and server subsystems. This can separate the user interface from data-storage issues, yielding greater interface portability. Another benefit is that the resulting server needs are not complicated by user interfaces, and this greater simplicity can lead to easier scalability and performance enhancement.
  • Stateless
    this simplifies storage needs as compared to stateful architectures. It means that each request must possess all of the necessary information. Clients must keep track of any state information. This again simplifies the server needs, which can result in easier scalability and performance enhancements. One disadvantage of stateless architectures is that there can be an increase in the information that is repeatedly sent in requests.
  • Cache-enabled
    the concept of a cache is the use of large banks of memory to store data. Compared to database retrievals, cache retrievals are significantly faster. The REST architecture calls for the inclusion of cache response constraints, which means that the response data must be labeled as either cacheable or non-cacheable. When the response data is labeled cacheable, the client may cache this and use it as a response for future requests. Advantages here are that future requests can be served directly from the client cache. The potential problem is that the cache data may be outdated.
  • Uniform interface
    this constrains all interfaces to be uniform between system components. This improves understandability and separates implementations from interfaces.

In general, REST is a coordinated set of architectural constraints that attempts to minimize latency and network communication while at the same time maximizing the independence and scalability of component implementations.

Because of these advantages mentioned above and REST is responsible for the success of the Web (HTTP 1.1). Sum Ruby named the resource oriented architecture which follows the REST constraints. HTTP1.1 naturally has these advantages so the ROA (resource oriented architecture) uses the HTTP protocol followed by the third integration strategy mentioned above.

Resource Oriented Architecture
The ROA is a way of turning a problem into a RESTful web service: an arrangement of URIs, HTTP, and XML that works like the rest of the Web.

The ROA has some characteristics:

  • Addressability
    an application is addressable if it exposes the interesting aspects of its data set as resources.(1.bookmark is very efficient way in the surf life, so if we can bookmark all the resource include text, pictures, files even methods, it will be amazing. 2. If every resource has a URL, it will easy to use even as another URL’s input, for example, it can be used in the translation service)
  • Stateless
    when the client makes an HTTP request, it includes all information necessary for the server to fulfill that request. The server never relies on information from previous requests. If that information was important, the client would have sent it again in this request. I think it is the most important feature in ROA, reasons: state would make individual HTTP requests simpler, but it would make the HTTP protocol much more complicated. With stateless, the server never has to worry about the client timing out, because no interaction lasts longer than a single request. The server never loses track of “where” each client is in the application, because the client sends all necessary information with each request. The client never ends up performing an action in the wrong “working directory” due to the server keeping some state around without telling the client.

    • With stateless, it is much easier to distribute an application across load-balanced servers.
    • With stateless, it is easy to cache.
    • With stateless, the users can be bookmarking whatever they want. But it is also hard to be achieved by the developers.
  • Connectedness
    a web service is connected to the extent that you can put the service in different states just by following links and filling out forms.
  • Uniform Interface
    the service uses the HTTP method as the interface for communication. GET, POST, DELETE, PUT and another two uncommon method HEADER and OPTION.