NotesWhat is notes.io?

Notes brand slogan

Notes - notes.io

Write a note in this area. It's really easy to share with others. Click here ...
import pyspark
import numpy as np
import pandas as pd

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, count, isnan, when

from pyspark.mllib.feature import StandardScaler,PCA
from pyspark.mllib.stat import Statistics

from pyspark.ml.feature import Imputer

from pyspark.ml.feature import VectorAssembler



spark = SparkSession.builder.appName("DataFrame Preprocessing").getOrCreate()
dataset = spark.read.csv("Admission_Prediction.csv",header=True)
dataset.show()


#printschema
dataset.printSchema()

# Nan value handling

#checking for null ir nan type values in our columns
dataset.select([count(when(col(c).isNull(), c)).alias(c) for c in new_data.columns]).show()

# treating missing value

imputer = Imputer(inputCols=["GRE Score", "TOEFL Score","University Rating"],
outputCols=["GRE Score", "TOEFL Score","University Rating"])
model = imputer.fit(new_data)

imputed_data = model.transform(dataset)
imputed_data.show()
print("*"*100)
imputed_data.select([count(when(col(c).isNull(), c)).alias(c) for c in imputed_data.columns]).show()
imputed_data.count()

features = imputed_data.drop('target')
# we need to convert dataframe intp a RDD to check for correlation
col_names = features.columns
features_rdd = features.rdd
#features_rdd.collect()
features_rdd = features.rdd.map(lambda row: row[0:])
features_rdd.collect()

#Statistics

summary = Statistics.colStats(features_rdd)
print(summary.mean()) # a dense vector containing the mean value for each column
print(summary.variance()) # column-wise variance
print(summary.numNonzeros()) # number of nonzeros in each column
print(summary.normL1())# return a column of normL1 summary

# correaltion
corr_mat=Statistics.corr(features_rdd, method="pearson")
corr_df = pd.DataFrame(corr_mat)
corr_df.index, corr_df.columns = col_names, col_names

corr_df.columns
corr_df.index
corr_df

#VectorAssembler
imputed_data.show()
features = imputed_data.drop('target')
assembler = VectorAssembler(inputCols=features.columns,outputCol="features")
output = assembler.transform(imputed_data)
output.select("features", "target").show(truncate=False)

#Standard Scaling
label = imputed_data.select('target')
label.show()
features = imputed_data.drop('Chance of Admit')
col_names = features.columns
features_rdd = features.rdd.map(lambda row: row[0:])
features_rdd.collect()
scaler1 = StandardScaler().fit(features_rdd)
scaled_features=scaler1.transform(features_rdd)
for data in scaled_features.collect():
print(data)

#PCA
pca = PCA(k=3)
model = pca.fit(scaled_features)
result = model.transform(scaled_features)
result.collect()
type(result)
#store dense vector in a dataframe
df =result.map(lambda x: (x, )).toDF(["PCA_Features"])
df.show(truncate=False)



*****************************************************************
import pyspark
import numpy as np
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, count, isnan, when
from pyspark.mllib.feature import StandardScaler, PCA
from pyspark.mllib.stat import Statistics
from pyspark.ml.feature import Imputer, VectorAssembler

class DataFramePreprocessing:
"""
A class for preprocessing data using PySpark DataFrames.
"""
def __init__(self, file_path):
"""
Initialize the DataFramePreprocessing object.

Args:
file_path (str): Path to the CSV file.
"""
self.spark = SparkSession.builder.appName("DataFrame Preprocessing").getOrCreate()
self.dataset = self.spark.read.csv(file_path, header=True)

def show_dataset(self):
"""
Display the dataset.
"""
self.dataset.show()

def print_schema(self):
"""
Print the schema of the dataset.
"""
self.dataset.printSchema()

def handle_missing_values(self):
"""
Handle missing values in the dataset using Imputer.
"""
imputer = Imputer(inputCols=["GRE Score", "TOEFL Score", "University Rating"],
outputCols=["GRE Score", "TOEFL Score", "University Rating"])
model = imputer.fit(self.dataset)
self.imputed_data = model.transform(self.dataset)

def show_missing_values(self):
"""
Display the count of missing values in each column.
"""
self.imputed_data.select([count(when(col(c).isNull(), c)).alias(c) for c in self.imputed_data.columns]).show()

def compute_statistics(self):
"""
Compute statistics on the dataset.
"""
features = self.imputed_data.drop('target')
col_names = features.columns
features_rdd = features.rdd.map(lambda row: row[0:])

# Statistics
summary = Statistics.colStats(features_rdd)
print("Mean:", summary.mean()) # a dense vector containing the mean value for each column
print("Variance:", summary.variance()) # column-wise variance
print("Num Nonzeros:", summary.numNonzeros()) # number of nonzeros in each column
print("Norm L1:", summary.normL1()) # return a column of normL1 summary

def compute_correlation(self):
"""
Compute the correlation matrix.
"""
features = self.imputed_data.drop('target')
col_names = features.columns
features_rdd = features.rdd.map(lambda row: row[0:])

# Correlation
corr_mat = Statistics.corr(features_rdd, method="pearson")
corr_df = pd.DataFrame(corr_mat)
corr_df.index, corr_df.columns = col_names, col_names
print(corr_df)

def vector_assembly(self):
"""
Use VectorAssembler to assemble features into a single vector.
"""
features = self.imputed_data.drop('target')
assembler = VectorAssembler(inputCols=features.columns, outputCol="features")
self.output = assembler.transform(self.imputed_data)

def standard_scaling(self):
"""
Perform Standard Scaling on the features.
"""
label = self.imputed_data.select('target')
features = self.imputed_data.drop('Chance of Admit')
col_names = features.columns
features_rdd = features.rdd.map(lambda row: row[0:])
scaler = StandardScaler().fit(features_rdd)
scaled_features = scaler.transform(features_rdd)
for data in scaled_features.collect():
print(data)

def apply_pca(self, k=3):
"""
Apply Principal Component Analysis (PCA) to the scaled features.

Args:
k (int): Number of principal components to keep.
"""
pca = PCA(k=k)
model = pca.fit(scaled_features)
result = model.transform(scaled_features)
# Store dense vectors in a DataFrame
df = result.map(lambda x: (x, )).toDF(["PCA_Features"])
df.show(truncate=False)

if __name__ == "__main__":
file_path = "Admission_Prediction.csv"
preprocessing = DataFramePreprocessing(file_path)
preprocessing.show_dataset()
preprocessing.print_schema()
preprocessing.handle_missing_values()
preprocessing.show_missing_values()
preprocessing.compute_statistics()
preprocessing.compute_correlation()
preprocessing.vector_assembly()
preprocessing.standard_scaling()
preprocessing.apply_pca()
****************************************************************************

import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from pyspark.ml.feature import Imputer, VectorAssembler, StandardScaler
from pyspark.ml.stat import Correlation
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import PCA

class DataFramePreprocessing:
"""
A class for preprocessing data using PySpark DataFrames and MLlib.
"""

def __init__(self, file_path):
"""
Initialize the DataFramePreprocessing object.

Args:
file_path (str): Path to the CSV file.
"""
self.spark = SparkSession.builder.appName("DataFrame Preprocessing").getOrCreate()
self.dataset = self.spark.read.csv(file_path, header=True, inferSchema=True)

def show_dataset(self):
"""
Display the dataset.
"""
self.dataset.show()

def print_schema(self):
"""
Print the schema of the dataset.
"""
self.dataset.printSchema()

def handle_missing_values(self):
"""
Handle missing values in the dataset using Imputer.
"""
imputer = Imputer(inputCols=["GRE Score", "TOEFL Score", "University Rating"],
outputCols=["GRE_Score", "TOEFL_Score", "University_Rating"])
imputer_model = imputer.fit(self.dataset)
self.imputed_data = imputer_model.transform(self.dataset)

def show_missing_values(self):
"""
Display the count of missing values in each column.
"""
missing_counts = [col(c).isNull().alias(c) for c in self.imputed_data.columns]
self.imputed_data.select(missing_counts).show()

def compute_statistics(self):
"""
Compute statistics on the dataset.
"""
feature_cols = self.imputed_data.columns[:-1]
feature_assembler = VectorAssembler(inputCols=feature_cols, outputCol="features")
assembled_data = feature_assembler.transform(self.imputed_data)
summary = assembled_data.describe()
summary.show()

def compute_correlation(self):
"""
Compute the correlation matrix.
"""
feature_cols = self.imputed_data.columns[:-1]
feature_assembler = VectorAssembler(inputCols=feature_cols, outputCol="features")
assembled_data = feature_assembler.transform(self.imputed_data)
correlation_matrix = Correlation.corr(assembled_data, "features").collect()[0][0]
corr_df = self.spark.createDataFrame(correlation_matrix.toArray().tolist(), feature_cols)
corr_df.show()

def standard_scaling(self):
"""
Perform Standard Scaling on the features.
"""
feature_cols = self.imputed_data.columns[:-1]
feature_assembler = VectorAssembler(inputCols=feature_cols, outputCol="features")
assembled_data = feature_assembler.transform(self.imputed_data)
scaler = StandardScaler(inputCol="features", outputCol="scaled_features", withMean=True, withStd=True)
scaler_model = scaler.fit(assembled_data)
self.scaled_data = scaler_model.transform(assembled_data)

def apply_pca(self, k=3):
"""
Apply Principal Component Analysis (PCA) to the scaled features.

Args:
k (int): Number of principal components to keep.
"""
pca = PCA(k=k, inputCol="scaled_features", outputCol="pca_features")
pca_model = pca.fit(self.scaled_data)
self.pca_result = pca_model.transform(self.scaled_data)

if __name__ == "__main__":
file_path = "Admission_Prediction.csv"
preprocessing = DataFramePreprocessing(file_path)
preprocessing.show_dataset()
preprocessing.print_schema()
preprocessing.handle_missing_values()
preprocessing.show_missing_values()
preprocessing.compute_statistics()
preprocessing.compute_correlation()
preprocessing.standard_scaling()
preprocessing.apply_pca()











     
 
what is notes.io
 

Notes.io is a web-based application for taking notes. You can take your notes and share with others people. If you like taking long notes, notes.io is designed for you. To date, over 8,000,000,000 notes created and continuing...

With notes.io;

  • * You can take a note from anywhere and any device with internet connection.
  • * You can share the notes in social platforms (YouTube, Facebook, Twitter, instagram etc.).
  • * You can quickly share your contents without website, blog and e-mail.
  • * You don't need to create any Account to share a note. As you wish you can use quick, easy and best shortened notes with sms, websites, e-mail, or messaging services (WhatsApp, iMessage, Telegram, Signal).
  • * Notes.io has fabulous infrastructure design for a short link and allows you to share the note as an easy and understandable link.

Fast: Notes.io is built for speed and performance. You can take a notes quickly and browse your archive.

Easy: Notes.io doesn’t require installation. Just write and share note!

Short: Notes.io’s url just 8 character. You’ll get shorten link of your note when you want to share. (Ex: notes.io/q )

Free: Notes.io works for 12 years and has been free since the day it was started.


You immediately create your first note and start sharing with the ones you wish. If you want to contact us, you can use the following communication channels;


Email: [email protected]

Twitter: http://twitter.com/notesio

Instagram: http://instagram.com/notes.io

Facebook: http://facebook.com/notesio



Regards;
Notes.io Team

     
 
Shortened Note Link
 
 
Looding Image
 
     
 
Long File
 
 

For written notes was greater than 18KB Unable to shorten.

To be smaller than 18KB, please organize your notes, or sign in.