Model klasyfikacyjny XGBoost w Spark Databricks

Tworzymy sesję Sparka i ładujemy dane z Bloba Azure

from pyspark.sql import SparkSession

from pyspark.ml.feature import VectorAssembler, StandardScaler

import xgboost as xgb

import pandas as pd

from collections import Counter

storage_account_name = "blobdawidbrejecki"

container_name = "xgboost"  

account_key = ""    

spark = SparkSession.builder.appName("XGBoostClassification").getOrCreate()

spark.conf.set(

    f"fs.azure.account.key.{storage_account_name}.blob.core.windows.net",

    account_key

)

data = spark.read.csv(

    f"wasbs://{container_name}@{storage_account_name}.blob.core.windows.net/wsad_z1.csv", header=True, sep=";",

    inferSchema=True

)

Zamiana przecinkow na kropki i zmiana typu danych

data = data.select([regexp_replace(col(c), ",", ".").alias(c) for c in data.columns])

data = data.select([col(c).cast("double").alias(c) for c in data.columns])

Zamiana nulli na 0

data = data.fillna(0)

Macierz korelacji i usunięcie skorelowanych kolumn

corr_matrix = pd.DataFrame(correlation_matrix.toArray(), columns=numeric_cols, index=numeric_cols)

print(corr_matrix)

columns_to_drop = ['VAR9','VAR10','VAR11','VAR12','VAR13','VAR14','VAR26','VAR27',"VAR28", "VAR2", "VAR5", "VAR4", "VAR15", "VAR17", "VAR6","VAR20", "VAR8"]

df=data

Pobieramy nazwy wszystkich zmiennych objasniajacych i łączymy w jeden wektor

columns = [col for col in df.columns]

feature_cols = [col for col in df.columns if col != 'docelowa']

assembler = VectorAssembler(inputCols=feature_cols, outputCol="features")

df = assembler.transform(df)

Standaryzujemy dane

scaler = StandardScaler(inputCol="features", outputCol="scaled_features", withStd=True, withMean=False)

df_scaled = scaler.fit(df).transform(df)

Dzielimy dane na zbiór testowy i treningowy

train_data, test_data = df_scaled.randomSplit([0.8, 0.2], seed=1234)

Nasz zbiór ma niezbalansowaną zmienną objaśnianą. Dlatego redukujemy ten problem używając parametru scale_pos_weight w XGBoost

label_counts = Counter(train_data.select("docelowa").toPandas()["docelowa"])

num_pos = label_counts[1]

num_neg = label_counts[0]

scale_pos_weight = num_neg / num_pos

Przygotowujemy dane do trenowania

train_data_pd = train_data.select("docelowa", "scaled_features").toPandas()

X_train = [x.toArray() for x in train_data_pd["scaled_features"]]

y_train = train_data_pd["docelowa"].values

Trenujemy

dtrain = xgb.DMatrix(X_train, label=y_train)

params = {

    'objective': 'binary:logistic',

    'max_depth': 3,

    'eta': 0.1,

    'eval_metric': 'logloss',

    'scale_pos_weight': scale_pos_weight

}

model = xgb.train(params, dtrain, num_boost_round=10)

Obliczamy accuracy i tworzymy confusion matrix

from sklearn.metrics import confusion_matrix

import seaborn as sns

import matplotlib.pyplot as plt

y_pred = [1 if x > 0.5 else 0 for x in y_pred]  # Zamiana na 0 lub 1 na podstawie progu 0.5

cm = confusion_matrix(y_test, y_pred)

plt.figure(figsize=(6, 4))

sns.heatmap(cm, annot=True, fmt="d", cmap="Blues", xticklabels=["0", "1"], yticklabels=["0", "1"])

plt.xlabel("Przewidywane")

plt.ylabel("Rzeczywiste")

plt.title("Macierz Pomyłek")

plt.show()

from sklearn.metrics import accuracy_score

accuracy = accuracy_score(y_test, y_pred)

print(f"Accuracy: {accuracy:.4f}")

Wykres ważności zmiennych objaśniających modelu:

original_columns = feature_cols

importances = model.get_score(importance_type='weight')

sorted_importances = sorted(importances.items(), key=lambda x: x[1], reverse=True)

print("Ważność cech:")

for feature, importance in sorted_importances:

    feature_index = int(feature[1:])

    print(f"Cecha: {original_columns[feature_index]} - Ważność: {importance}")

sorted_columns = [original_columns[int(feature[1:])] for feature, _ in sorted_importances]

importance_values = [importance for _, importance in sorted_importances]

plt.figure(figsize=(10, 6))

plt.barh(sorted_columns, importance_values)

plt.xlabel('Ważność')

plt.ylabel('Cecha')

plt.title('Ważność cech w modelu XGBoost')

plt.show()

Dobrą praktyką jest usuwanie zmiennych najmniej znaczących i obserwowanie zmianę accuracy. Jeżeli jest ona nieznaczna, usuwamy je. Im mniej zmiennych w modelu tym lepiej, model lepiej będzie klasyfikował nowe przypadki.

Wykres korelacji zmiennych mówi nam o kierunku i sile korelacji:

correlations = {}

for feature in feature_cols:

    correlation = df.stat.corr(feature, 'docelowa')

    correlations[feature] = correlation

correlation_df = pd.DataFrame(list(correlations.items()), columns=['Feature', 'Correlation'])

plt.figure(figsize=(10, 6))

sns.barplot(x='Correlation', y='Feature', data=correlation_df, palette='coolwarm')

plt.title('Korelacja cech z zmienną objaśniającą')

plt.xlabel('Korelacja')

plt.ylabel('Cecha')

plt.show()

Zapis modelu do pliku

model.save_model('modelz.bin')

Ładowanie modelu i predykcja na nowych danych

import xgboost as xgb
model = xgb.Booster()
model.load_model('sciezka/modelz.bin')
import pandas as pd
X_test['VAR3'] = pd.to_numeric(X_test['VAR3'], errors='coerce')
X_test['VAR16'] = pd.to_numeric(X_test['VAR16'], errors='coerce')
X_test = X_test.fillna(0)
dtest = xgb.DMatrix(X_test)
y_pred = model.predict(dtest)
X_test['prediction'] = y_pred