본문 바로가기
데이터 엔지니어링

[Pyspark] Pyspark 일단 사용해보기

by 다람이도토리 2024. 12. 1.

Pyspark에 대해 자세하게 뜯어보기 전에, 우선 어떻게 사용하지는지부터 정리해보았습니다.

Pyspark 세션 시작

* 초기 환경 설정을 진행해야 한다. 자바, 파이썬 설치 이후 환경 변수 설정등을 모두 진행 후 시작해야 한다.

from pyspark.sql import SparkSession # 세션 시작시 import


# SparkSession이란, 스파크 응용 프로그램의 통합 진입점.
spark = SparkSession.builder\
        .master("local[*]")\
        .appName('PySpark_Tutorial')\
        .getOrCreate()

 

데이터 스키마 지정 후 데이터 불러오기

* 초기에 데이터를 불러올 때 컬럼의 자료형이 지정되지 않을 수 있다. 이를 지정하는 과정이다.

from pyspark.sql.types import *  
data_schema = [
    StructField('timestamp', StringType(), True),
    StructField('user_id', StringType(), True),
    StructField('amount', DoubleType(), True),
    StructField('location', StringType(), True),
    StructField('device_type', StringType(), True),
    StructField('is_fraud', IntegerType(), True),
    StructField('age', IntegerType(), True),
    StructField('income', DoubleType(), True),
    StructField('debt', DoubleType(), True),
    StructField('credit_score', IntegerType(), True)
]  # 컬럼명, 타입,  널 허락여부

final_struc = StructType(fields=data_schema)

data = spark.read.csv(
    'C:/Users/USER/Desktop/TIL_new/Data Engineer/spark_tutorial_dataset/fraud_detection_dataset.csv',
    sep = ',',
    header = True,
    schema = final_struc
)
data.printSchema()

 

스파크 데이터프레임 다루기

스파크의 데이터프레임은 rdd이다. rdd에 대해서는 추후 스파크 개념정리에서 정리하고 넘어가고자 한다.
(판다스와의 비교 분석이 필요!)

# 데이터 첫 N개 보기
data.show(3)
# 데이터 행마다 통계 내기
data.describe().show()
# 컬럼 명 맽기
data.columns
# 데이터 개수 세기
data.count()

# data의 기본적인 편집
# 주의사항 : pandas와 다르게, "rdd는 immutable" 하다.
# 즉 하나의 데이터를 원 상태로 보존하면서 계속해서 새로운걸 만들 수 있다.
data2 = data.withColumn('new_timestamp', data.timestamp) # 컬럼 복사, 뒤에서 data 부르면 안바뀌어있음 immutable해서.
data2 = data2.withColumnRenamed('user_id', 'UID') # 컬럼명 변경
data2 = data2.drop('location') # 컬럼 drop
data2.show(5)
# data의 querying : 실제 SQL 문법과 유사함.
data.select(['is_fraud', 'age', 'income', 'debt']).describe().show()
data.groupby('device_type').count().show()

# data filtering 및 간단한 전처리/파생 변수 생성하는 방법.
from pyspark.sql.functions import col, lit, when, avg # filter을 위해 필요.
# 특정 컬럼의 값이 어떻게 되나를 체크하는 용도, show를 붙여야 함을 잊어서는 안된다.
data.filter((col('credit_score')) <= lit(300)).show(5)

# sql에서 case when 역할을 하는 쿼리임. when도 import 붙여야함. alias를 붙여 컬럼명 변경 가능.
data.select('income', 'debt', 'is_fraud', when(data.credit_score>= 500, "high").otherwise("low").alias("c_score_high_low")).show(5)
# 통계량 계산하기(aggregation은 groupby로)
data.groupby('is_fraud').agg(
    avg("debt").alias("AvgDebt"),
    avg("income").alias("AvgIncome")   
).show(5)
# 특정 칼럼 값을 기준으로 정렬하기. 오름차순은 ascending으로! 빚이 가장적은 5명을 출력하기.
data.orderBy("debt", ascending = True).show(5)

 

스파크로 분류 모델 만들어보기

분류에 들어가는 특성을 별도로 Feature vector 로 지정하는 방식을 사용한다. (VectorAssembler)

from pyspark.ml.feature import VectorAssembler # ML 관련 작업시 필요
from pyspark.ml.classification import DecisionTreeClassifier # 분류 모델
from pyspark.ml.evaluation import MulticlassClassificationEvaluator # 정확도


# is_fraud를 예측하는 단순 분류 트리 모델을 우선 만들어보자.
# 우선 tvt split
train_df, test_df = data.randomSplit(weights = [0.8, 0.2], seed = 99)

# Feature Vector로 만들기
ftr_columns = ['amount', 'income', 'debt', 'credit_score']
vec_assembler = VectorAssembler(inputCols=ftr_columns, outputCol='feature')
train_ftr_vec = vec_assembler.transform(train_df)
test_ftr_vec = vec_assembler.transform(test_df)

train_ftr_vec.show(3) # 학습 후의 결과 보기 / 별도의 feature 행이 발생했다.
dt_clf = DecisionTreeClassifier(featuresCol='feature', labelCol='is_fraud',
                                maxDepth=7)
# model fit, pred 과정
dt_model = dt_clf.fit(train_ftr_vec)
train_pred = dt_model.transform(train_ftr_vec)
test_pred = dt_model.transform(test_ftr_vec)

# 정확도를 측정하자.
check = MulticlassClassificationEvaluator(predictionCol='prediction',
                                          labelCol='is_fraud',
                                          metricName='accuracy')
train_acc = check.evaluate(train_pred)
test_acc = check.evaluate(test_pred)
print('Train Acc:', train_acc)
print('Test Acc:', test_acc)

 

데이터 파이프라인 만들기

위의 과정을 압축하여 데이터 파이프라인 형태로 한 번에 만들수도 있다.

from pyspark.ml import Pipeline
train_df, test_df = data.randomSplit(weights = [0.8, 0.2], seed = 99)

# 과정을 합쳐서 한번에 적는다.
ftr_columns = ['amount', 'income', 'debt', 'credit_score']
stage_1 = VectorAssembler(inputCols=ftr_columns, outputCol='feature')
stage_2 = DecisionTreeClassifier(featuresCol='feature', labelCol='is_fraud')

# 파이프라인 만들기
pipes = Pipeline(stages = [stage_1, stage_2])
Pipeline_model = pipes.fit(train_df)

train_pred = Pipeline_model.transform(train_df)
test_pred = Pipeline_model.transform(test_df)

 

Onehot Encoding, Scaling

pandas에서는 알아서 one-hot encoding을 했지만, 여기서는 안 된다.
먼저 사전에, stringindexer로 encoding을 시킨 다음에, one-hot이 진행이 된다.

scaling또한 앞에서 언급한 벡터화 이후에 진행 가능하다.

from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import OneHotEncoder
from pyspark.ml.feature import StandardScaler

# encoding 해보기 - device_type에 대해서 작업하기
indexer = StringIndexer(inputCol='device_type', outputCol='device_ind')
en_df = indexer.fit(data).transform(data)
en_df.show(5)

# onehot encoder
# 주의사항 : pyspark에서는 사전 작업인 stringindexer를 해야만 가능하다.
# 내부적으로 문자열을 받아주지 않아...
# dropLast = '마지막 벡터를 드랍할 것인가?' 마지막을 드랍해도 구분이 됨.
oh_en = OneHotEncoder(inputCol='device_ind', outputCol='device_ohe',
                      dropLast = True)
oh_df = oh_en.fit(en_df).transform(en_df)

oh_df.show(5)


# Scaler. 
# pandas에서는 바로 해도 되지만, pyspark에서는 Feature vectorization 되어야함
# VectorAssembler를 하고 진행해야 한다.
ftr_columns = ['amount', 'income', 'debt', 'credit_score']
vec_assembler = VectorAssembler(inputCols=ftr_columns, outputCol='feature')
data_vec = vec_assembler.transform(data)

# True, True일 경우 표준정규분포를 기준으로 계산
# False로 할 경우 데이터들의 평균치로 적용된다.
scaler = StandardScaler(inputCol='feature', outputCol='feature_scaled',
                        withMean=True, withStd=True)
data_scaled = scaler.fit(data_vec).transform(data_vec)
data_scaled.show(5)