Stacking-Machine-Learning-Method-Pyspark
The project is an implementation of popular stacking machine learning algorithms to get better prediction. We use PySpark for this implementation.
Machine Learning algorithm used
- Naive Bayes (used in stack as base model)
- SVM (used in stack as base model)
- Logistic Regression (for predicting new features from base models)
Dataset Description
The dataset consists of sentences from customer reviews of different restaurants. There are 2241, 800, 800 customer reviews in the train, development, and test datasets, respectively. It should be noted that there is at least one sentence in each customer review and each customer review may not be with ending punctuation such as ., ?. The task is to identify the category of each customer review using the review text and the trained model. The categories include:
- FOOD: reviews that involve comments on the food. e.g. “All the appetizers and salads were fabulous , the steak was mouth watering and the pasta was delicious”
- PAS: reviews that only involve comments on price, ambience, or service. e.g. “Now it ‘s so crowded and loud you ca n’t even talk to the person next to you”
- MISC: reviews that do not belong to the above categories including sentences that are general recommendations reviews describing the reviewer’s personal experience or context, but that do not usually provide information on the restaurant quality e.g. “Your friends will thank you for introducing them to this gem!” e.g. “I knew upon visiting NYC that I wanted to try an original deli”
To Execute
from pyspark.sql import *
from pyspark import SparkConf
from pyspark.sql import DataFrame
from pyspark.sql.functions import rand
from pyspark.sql.types import IntegerType, DoubleType
from pyspark.ml import Pipeline
from pyspark.ml.feature import OneHotEncoderEstimator, VectorAssembler
from pyspark.ml.classification import LogisticRegression, LinearSVC, NaiveBayes
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from submission import base_features_gen_pipeline, gen_meta_features, test_prediction
import random
rseed = 1024
random.seed(rseed)
def gen_binary_labels(df):
df = df.withColumn('label_0', (df['label'] == 0).cast(DoubleType()))
df = df.withColumn('label_1', (df['label'] == 1).cast(DoubleType()))
df = df.withColumn('label_2', (df['label'] == 2).cast(DoubleType()))
return df
# Create a Spark Session
conf = SparkConf().setMaster("local[*]").setAppName("lab3")
spark = SparkSession.builder.config(conf=conf).getOrCreate()
# Load data
train_data = spark.read.load("proj2train.csv", format="csv", sep="\t", inferSchema="true", header="true")
test_data = spark.read.load("proj2test.csv", format="csv", sep="\t", inferSchema="true", header="true")
# build the pipeline from task 1.1
base_features_pipeline = base_features_gen_pipeline()
# Fit the pipeline using train_data
base_features_pipeline_model = base_features_pipeline.fit(train_data)
# Transform the train_data using fitted pipeline
training_set = base_features_pipeline_model.transform(train_data)
# assign random groups and binarize the labels
training_set = training_set.withColumn('group', (rand(rseed)*5).cast(IntegerType()))
training_set = gen_binary_labels(training_set)
# define base models
nb_0 = NaiveBayes(featuresCol='features', labelCol='label_0', predictionCol='nb_pred_0', probabilityCol='nb_prob_0', rawPredictionCol='nb_raw_0')
nb_1 = NaiveBayes(featuresCol='features', labelCol='label_1', predictionCol='nb_pred_1', probabilityCol='nb_prob_1', rawPredictionCol='nb_raw_1')
nb_2 = NaiveBayes(featuresCol='features', labelCol='label_2', predictionCol='nb_pred_2', probabilityCol='nb_prob_2', rawPredictionCol='nb_raw_2')
svm_0 = LinearSVC(featuresCol='features', labelCol='label_0', predictionCol='svm_pred_0', rawPredictionCol='svm_raw_0')
svm_1 = LinearSVC(featuresCol='features', labelCol='label_1', predictionCol='svm_pred_1', rawPredictionCol='svm_raw_1')
svm_2 = LinearSVC(featuresCol='features', labelCol='label_2', predictionCol='svm_pred_2', rawPredictionCol='svm_raw_2')
# build pipeline to generate predictions from base classifiers, will be used in task 1.3
gen_base_pred_pipeline = Pipeline(stages=[nb_0, nb_1, nb_2, svm_0, svm_1, svm_2])
gen_base_pred_pipeline_model = gen_base_pred_pipeline.fit(training_set)
# task 1.2
meta_features = gen_meta_features(training_set, nb_0, nb_1, nb_2, svm_0, svm_1, svm_2)
# build onehotencoder and vectorassembler pipeline
onehot_encoder = OneHotEncoderEstimator(inputCols=['nb_pred_0', 'nb_pred_1', 'nb_pred_2', 'svm_pred_0', 'svm_pred_1', 'svm_pred_2', 'joint_pred_0', 'joint_pred_1', 'joint_pred_2'], outputCols=['vec{}'.format(i) for i in range(9)])
vector_assembler = VectorAssembler(inputCols=['vec{}'.format(i) for i in range(9)], outputCol='meta_features')
gen_meta_feature_pipeline = Pipeline(stages=[onehot_encoder, vector_assembler])
gen_meta_feature_pipeline_model = gen_meta_feature_pipeline.fit(meta_features)
meta_features = gen_meta_feature_pipeline_model.transform(meta_features)
# train the meta clasifier
lr_model = LogisticRegression(featuresCol='meta_features', labelCol='label', predictionCol='final_prediction', maxIter=20, regParam=1., elasticNetParam=0)
meta_classifier = lr_model.fit(meta_features)
# task 1.3
pred_test = test_prediction(test_data, base_features_pipeline_model, gen_base_pred_pipeline_model, gen_meta_feature_pipeline_model, meta_classifier)
# Evaluation
evaluator = MulticlassClassificationEvaluator(predictionCol="prediction",metricName='f1')
print(evaluator.evaluate(pred_test, {evaluator.predictionCol:'final_prediction'}))
spark.stop()