Featured image of post Setup a On-Prem Document AI System with Apache Spark, PySpark, PHI-2 and Llama.cpp

Setup a On-Prem Document AI System with Apache Spark, PySpark, PHI-2 and Llama.cpp

Code Walk through -> setting up a local AI system classificatoin, and question-answer style prompts

What is Apache Spark?

Apache Spark is an open-source, distributed computing framework designed for big data processing.

It allows you to process massive datasets across multiple nodes while being fast, scalable, and fairly easy to use.

Unlike traditional Hadoop-based solutions, Spark performs most of its operations in-memory, making it 100x faster than MapReduce in certain cases.

Key Features of Apache Spark:

Lightning Fast: Thanks to in-memory computation.
Distributed Processing: Runs across multiple machines or locally on your computer.
Multi-Language Support: Supports Python (PySpark), Java, Scala, and R.
Integrated Libraries: Includes tools for SQL, machine learning (MLlib), graph processing (GraphX), and streaming.
Easy to Use: Provides an intuitive API for data manipulation.

What is PySpark?

PySpark is the Python API for Apache Spark.

It allows you to leverage the power of Spark while writing Python code.

If you’re already comfortable with pandas, NumPy, or SQL, PySpark will feel quite familiar.

Why Use PySpark Instead of Just Apache Spark?

  • Python-Friendly: Great for those who prefer Python over Scala or Java.
  • Data Science & ML Integration: Easily integrates with libraries like TensorFlow and pandas.
  • Simplifies Big Data Workflows: Write concise Python code while Spark does the heavy lifting.

Step 1: Prerequisites

Before installing Spark, ensure you have:
Java (JDK 8 or later) installed. Run:

1
java -version

Python 3.7+ installed. Run:

1
python --version

Scala (optional, but useful for Spark development):

1
scala -version

Ensure your system has at least 8GB RAM for smooth performance.


Step 2: Download Apache Spark

  1. Visit the official Apache Spark website.
  2. Choose the latest stable release.
  3. Select Pre-built for Apache Hadoop.
  4. Download the .tgz file and extract it.

Step 3: Install Spark on Windows

1. Extract Spark Files

Unzip the downloaded file into C:\spark.

2. Set Environment Variables

Add the following to your system environment variables:

  • SPARK_HOME = C:\spark
  • Add %SPARK_HOME%\bin to PATH.

3. Verify Installation

Open PowerShell and run:

1
spark-shell

If Spark starts, the installation is successful!


Step 4: Install Spark on macOS/Linux

1. Install via Homebrew (macOS only)

1
brew install apache-spark

2. Manually Extract Spark (Linux & macOS)

1
2
tar -xvf spark-*.tgz
mv spark-* /opt/spark

3. Set Environment Variables

Add these lines to ~/.bashrc or ~/.zshrc:

1
2
export SPARK_HOME=/opt/spark
export PATH=$SPARK_HOME/bin:$PATH

Then apply changes:

1
source ~/.bashrc

4. Verify Installation

Run:

1
spark-shell

You should see a Spark REPL session start.


Step 5: Install PySpark

If you plan to use Spark with Python, install PySpark via pip:

1
pip install pyspark

Test PySpark:

1
python -c "import pyspark; print(pyspark.__version__)"


Now, let’s start working with real data!

Now we will feed Spark a set of local documents (like research papers, articles, or logs) and perform basic data processing with PySpark.


Step 1: Prepare Your Data

For this example, we’ll assume you have a directory called documents/ that contains multiple .txt files with research papers or articles.

1. Create a Local Dataset

Make a directory and place some text files in it:

1
2
3
4
mkdir ~/spark_documents
cd ~/spark_documents
echo "This is a test document about Apache Spark." > doc1.txt
echo "Another document discussing distributed computing." > doc2.txt

Alternatively, download some real research papers in .txt format.


Step 2: Start a PySpark Session

Open a Python script or Jupyter Notebook and start PySpark:

1
2
3
4
5
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("DocumentProcessing") \
    .getOrCreate()

This initializes Apache Spark for processing.


Step 3: Load Documents into Spark

Now, we’ll load all text files in the spark_documents directory:

1
2
document_df = spark.read.text("~/spark_documents/*")
document_df.show(5, truncate=False)

This will output the first few lines from your documents.


Step 4: Perform Basic Processing

Let’s count the number of lines in our documents:

1
2
line_count = document_df.count()
print(f"Total number of lines in documents: {line_count}")

Or filter lines containing specific words:

1
2
spark_words = document_df.filter(document_df.value.contains("Spark"))
spark_words.show()

This filters out lines that mention “Spark.”


Step 5: Word Count Example

One of the most common text-processing examples is word count:

1
2
3
4
5
from pyspark.sql.functions import explode, split

words_df = document_df.select(explode(split(document_df.value, " ")).alias("word"))
word_count = words_df.groupBy("word").count().orderBy("count", ascending=False)
word_count.show(10)

This tokenizes the documents, counts word occurrences, and sorts them in descending order.


Now, let’s take things up a notch with advanced text analysis, including TF-IDF (Term Frequency-Inverse Document Frequency) and structured queries with PySpark DataFrames.

(WHAT!?!?!- Dont worry, we will explain it…)

Step 1: Recap – Load Documents into Spark

Before diving into advanced analytics, let’s ensure we have our document dataset loaded.

Start by launching a PySpark session:

1
2
3
4
5
6
7
8
9
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("AdvancedTextProcessing") \
    .getOrCreate()

# Load text documents into a DataFrame
document_df = spark.read.text("~/spark_documents/*")
document_df.show(5, truncate=False)

Step 2: Tokenizing Words (Splitting Text into Words)

To analyze text, we first tokenize it into individual words using Spark’s split() function:

1
2
3
4
from pyspark.sql.functions import explode, split

words_df = document_df.select(explode(split(document_df.value, " ")).alias("word"))
words_df.show(10)

This will break sentences into separate words and list them as rows.


Step 3: Removing Stopwords

Common words like “the,” “and,” or “is” don’t add much meaning to text analysis. We can remove them using PySpark’s built-in StopWordsRemover:

1
2
3
4
5
from pyspark.ml.feature import StopWordsRemover

remover = StopWordsRemover(inputCol="word", outputCol="filtered")
filtered_df = remover.transform(words_df)
filtered_df.show(10)

This helps in cleaning up the dataset before applying more advanced analytics.


Step 4: TF-IDF – Identifying Important Words

What is TF-IDF?

TF-IDF (Term Frequency-Inverse Document Frequency) is a technique to find important words in documents. It assigns higher scores to words that appear frequently in a document but rarely across all documents.

Applying TF-IDF in PySpark

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
from pyspark.ml.feature import HashingTF, IDF

# Convert words into numerical feature vectors
hashing_tf = HashingTF(inputCol="filtered", outputCol="rawFeatures", numFeatures=20)
tf_data = hashing_tf.transform(filtered_df)

# Compute IDF values
idf = IDF(inputCol="rawFeatures", outputCol="features")
idf_model = idf.fit(tf_data)
tf_idf_data = idf_model.transform(tf_data)

# Show TF-IDF scores
tf_idf_data.select("filtered", "features").show(10, truncate=False)

Now, Spark assigns importance scores to words, helping us identify keywords in research papers or most relevant terms in documents.


Step 5: Structured Queries on Documents

Since Spark supports SQL, let’s treat our documents like a database and run SQL queries on them.

Register DataFrame as a Temporary Table

1
document_df.createOrReplaceTempView("documents")

Example Queries:

Find lines containing “machine learning”:

1
spark.sql("SELECT * FROM documents WHERE value LIKE '%machine learning%'").show()

Find the top words in documents:

1
spark.sql("SELECT word, COUNT(*) as count FROM words_df GROUP BY word ORDER BY count DESC").show(10)

Now, let’s take it a step further and apply machine learning to classify documents and analyze sentiment using PySpark MLlib.


Step 1: Understanding Text Classification & Sentiment Analysis

What is Text Classification?

Text classification assigns categories (labels) to text documents. Examples include:

  • Spam detection (spam vs. not spam)
  • News categorization (politics, sports, technology, etc.)
  • Customer feedback tagging (positive, negative, neutral)

What is Sentiment Analysis?

Sentiment analysis determines the emotional tone of text, typically classifying it as positive, negative, or neutral. It is widely used in:

  • Social media monitoring
  • Product review analysis
  • Customer support automation

Step 2: Preparing the Dataset

For this tutorial, let’s assume we have a dataset of customer reviews stored as reviews.csv:

1
2
3
4
review_text,label
"The product is amazing!",positive
"Terrible experience, would not recommend.",negative
"It's okay, not the best but not the worst.",neutral

Load the dataset into Spark:

1
2
3
4
5
6
7
8
from pyspark.sql import SparkSession
from pyspark.sql.functions import col

spark = SparkSession.builder.appName("TextClassification").getOrCreate()

# Load CSV data
reviews_df = spark.read.csv("reviews.csv", header=True, inferSchema=True)
reviews_df.show()

Step 3: Preprocessing Text Data

Before training a model, we need to convert text into a numerical format.

1. Tokenization

Splitting sentences into words:

1
2
3
4
from pyspark.ml.feature import Tokenizer

tokenizer = Tokenizer(inputCol="review_text", outputCol="words")
reviews_tokenized = tokenizer.transform(reviews_df)

2. Removing Stopwords

1
2
3
4
from pyspark.ml.feature import StopWordsRemover

remover = StopWordsRemover(inputCol="words", outputCol="filtered_words")
reviews_cleaned = remover.transform(reviews_tokenized)

3. TF-IDF Feature Extraction

1
2
3
4
5
6
7
from pyspark.ml.feature import HashingTF, IDF

hashing_tf = HashingTF(inputCol="filtered_words", outputCol="rawFeatures", numFeatures=20)
tf_data = hashing_tf.transform(reviews_cleaned)
idf = IDF(inputCol="rawFeatures", outputCol="features")
idf_model = idf.fit(tf_data)
final_data = idf_model.transform(tf_data)

Step 4: Training a Machine Learning Model

We’ll use Logistic Regression for classification.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import StringIndexer
from pyspark.ml import Pipeline

# Convert text labels into numerical values
indexer = StringIndexer(inputCol="label", outputCol="labelIndex")
lr = LogisticRegression(featuresCol="features", labelCol="labelIndex")

# Create a pipeline
pipeline = Pipeline(stages=[indexer, lr])

# Train-test split
train_data, test_data = final_data.randomSplit([0.8, 0.2], seed=42)

# Train model
model = pipeline.fit(train_data)

Step 5: Evaluating the Model

Now, let’s test our model and check the accuracy.

1
2
predictions = model.transform(test_data)
predictions.select("review_text", "label", "prediction").show()

To measure performance:

1
2
3
4
5
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

evaluator = MulticlassClassificationEvaluator(labelCol="labelIndex", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print(f"Model Accuracy: {accuracy:.2f}")

Step 6: Predicting Sentiment on New Text

To classify new customer reviews:

1
2
3
4
5
6
7
8
9
new_reviews = spark.createDataFrame([
    ("I love this product!",),
    ("Worst purchase ever.",),
    ("Meh, it's alright.",)
], ["review_text"])

new_reviews_transformed = idf_model.transform(hashing_tf.transform(remover.transform(tokenizer.transform(new_reviews))))
new_predictions = model.transform(new_reviews_transformed)
new_predictions.select("review_text", "prediction").show()

Now, we’re taking things to the next level by integrating MLlib with PHI-2 and Llama.cpp.
This combo will give us a powerful system to summarize, classify and support question answer style prompts-interactions.


Step 1: Understanding MLlib in relation to PHI-2 and Llama.cpp

MLlib in relation to PHI-2 and Llama.cpp

  • PHI-2: A lightweight AI model from Microsoft, designed for low-resource LLM tasks.
  • Llama.cpp: A high-performance, CPU-friendly framework for running Meta’s Llama models on edge devices.
  • MLlib + PHI-2 + Llama.cpp: Combine Spark’s distributed ML capabilities with efficient, local AI inference for handling large-scale NLP, summarization, and text processing tasks.

Step 2: Why Integrate Spark with PHI-2 and Llama.cpp?

FeatureMLlibPHI-2Llama.cpp
Scalability✅ Distributed ML❌ Local Model✅ Efficient Execution
Ease of Use✅ Built-in ML Algorithms✅ Pre-trained NLP Model✅ Runs on CPU
Low Latency❌ (Distributed Overhead)✅ (Optimized for Speed)✅ (Minimal Compute Requirements)
AI Workloads✅ General ML & NLP✅ NLP Tasks✅ LLM Inference

Real-World Use Cases

🚀 Summarizing Large Datasets (PHI-2 can summarize documents before MLlib classifies them)
📊 AI-Powered Data Analysis (Llama.cpp can generate insights from Spark-based logs)
Real-Time NLP Processing (Combine all three for distributed inference)


Step 3: Setting Up Spark with PHI-2 and Llama.cpp

1. Install Dependencies

1
pip install pyspark llama-cpp-python transformers torch

2. Initialize Spark

1
2
3
4
5
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("Spark+PHI2+Llama") \
    .getOrCreate()

3. Load Documents into Spark

1
document_df = spark.read.text("~/spark_documents/*")

4. Run PHI-2 for Text Summarization

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
from transformers import AutoTokenizer, AutoModelForSeq2SeqLM

# Load PHI-2 Model
tokenizer = AutoTokenizer.from_pretrained("microsoft/phi-2")
model = AutoModelForSeq2SeqLM.from_pretrained("microsoft/phi-2")

def summarize_text(text):
    inputs = tokenizer(text, return_tensors="pt", max_length=512, truncation=True)
    summary_ids = model.generate(inputs["input_ids"], max_length=100)
    return tokenizer.decode(summary_ids[0], skip_special_tokens=True)

# Apply summarization to Spark DataFrame
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType

summarize_udf = udf(summarize_text, StringType())
document_df = document_df.withColumn("summary", summarize_udf(document_df.value))
document_df.show()

5. Run Llama.cpp for Question Answering

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
from llama_cpp import Llama

# Load Llama-2 model
llm = Llama(model_path="llama-2-7b.Q4_K.gguf")

def answer_question(text, question):
    response = llm(f"{text}\n\nQuestion: {question}\nAnswer:")
    return response["choices"][0]["text"].strip()

answer_udf = udf(lambda text: answer_question(text, "What is this document about?"), StringType())
document_df = document_df.withColumn("llama_answer", answer_udf(document_df.value))
document_df.show()

Step 4: Benefits of This Setup

Scalability: Spark processes large-scale data efficiently.
Efficiency: PHI-2 compresses large text before MLlib processes it.
Edge Deployment: Llama.cpp runs LLM inference on local machines (no GPU required).
AI-Driven Insights: Enables AI-powered NLP tasks directly within Spark.