programing

Scala Spark에서 csv 파일을 데이터 프레임으로 읽는 동안 스키마 제공

elecom 2023. 10. 5. 21:13
반응형

Scala Spark에서 csv 파일을 데이터 프레임으로 읽는 동안 스키마 제공

csv 파일을 데이터 프레임으로 읽으려고 합니다.csv 파일을 알고 있기 때문에 데이터 프레임의 스키마가 무엇인지 알고 있습니다.또한 파일을 읽기 위해 스파크 csv 패키지를 사용하고 있습니다.아래와 같이 스키마를 지정하려고 합니다.

val pagecount = sqlContext.read.format("csv")
  .option("delimiter"," ").option("quote","")
  .option("schema","project: string ,article: string ,requests: integer ,bytes_served: long")
  .load("dbfs:/databricks-datasets/wikipedia-datasets/data-001/pagecounts/sample/pagecounts-20151124-170000")

그런데 제가 작성한 데이터 프레임의 스키마를 확인해보니 자체 스키마를 가져간 것 같습니다.제가 무슨 잘못을 하고 있는 건가요? 제가 말한 스키마를 선택하기 위해 스파크를 만드는 방법은 무엇인가요?

> pagecount.printSchema
root
|-- _c0: string (nullable = true)
|-- _c1: string (nullable = true)
|-- _c2: string (nullable = true)
|-- _c3: string (nullable = true)

아래 코드를 사용해 보십시오. 스키마를 지정할 필요는 없습니다.interpseSchema를 true로 지정하면 csv 파일에서 가져와야 합니다.

val pagecount = sqlContext.read.format("csv")
  .option("delimiter"," ").option("quote","")
  .option("header", "true")
  .option("inferSchema", "true")
  .load("dbfs:/databricks-datasets/wikipedia-datasets/data-001/pagecounts/sample/pagecounts-20151124-170000")

스키마를 수동으로 지정하려면 다음과 같이 수행할 수 있습니다.

import org.apache.spark.sql.types._

val customSchema = StructType(Array(
  StructField("project", StringType, true),
  StructField("article", StringType, true),
  StructField("requests", IntegerType, true),
  StructField("bytes_served", DoubleType, true))
)

val pagecount = sqlContext.read.format("csv")
  .option("delimiter"," ").option("quote","")
  .option("header", "true")
  .schema(customSchema)
  .load("dbfs:/databricks-datasets/wikipedia-datasets/data-001/pagecounts/sample/pagecounts-20151124-170000")

Python에서 이것을 하는 것에 관심이 있는 사람들을 위해 여기 작동하는 버전이 있습니다.

customSchema = StructType([
    StructField("IDGC", StringType(), True),        
    StructField("SEARCHNAME", StringType(), True),
    StructField("PRICE", DoubleType(), True)
])
productDF = spark.read.load('/home/ForTesting/testProduct.csv', format="csv", header="true", sep='|', schema=customSchema)

testProduct.csv
ID|SEARCHNAME|PRICE
6607|EFKTON75LIN|890.88
6612|EFKTON100HEN|55.66

도움이 되길 바랍니다.

아루나키란 눌루가 제공하는 솔루션을 분석에 사용하고 있습니다(코드 참조).열에 올바른 유형을 할당할 수 있음에도 불구하고 반환되는 모든 값은null. 전에 그 옵션을 시도해 본 적이 있습니다..option("inferSchema", "true")그리고 데이터 프레임에서 올바른 값을 반환합니다(종류는 다르지만).

val customSchema = StructType(Array(
    StructField("numicu", StringType, true),
    StructField("fecha_solicitud", TimestampType, true),
    StructField("codtecnica", StringType, true),
    StructField("tecnica", StringType, true),
    StructField("finexploracion", TimestampType, true),
    StructField("ultimavalidacioninforme", TimestampType, true),
    StructField("validador", StringType, true)))

val df_explo = spark.read
        .format("csv")
        .option("header", "true")
        .option("delimiter", "\t")
        .option("timestampFormat", "yyyy/MM/dd HH:mm:ss") 
        .schema(customSchema)
        .load(filename)

결과

root


|-- numicu: string (nullable = true)
 |-- fecha_solicitud: timestamp (nullable = true)
 |-- codtecnica: string (nullable = true)
 |-- tecnica: string (nullable = true)
 |-- finexploracion: timestamp (nullable = true)
 |-- ultimavalidacioninforme: timestamp (nullable = true)
 |-- validador: string (nullable = true)

표는 다음과 같습니다.

|numicu|fecha_solicitud|codtecnica|tecnica|finexploracion|ultimavalidacioninforme|validador|
+------+---------------+----------+-------+--------------+-----------------------+---------+
|  null|           null|      null|   null|          null|                   null|     null|
|  null|           null|      null|   null|          null|                   null|     null|
|  null|           null|      null|   null|          null|                   null|     null|
|  null|           null|      null|   null|          null|                   null|     null|

이전 솔루션에서는 사용자 지정 StructType을 사용했습니다.

spark-sql 2.4.5 (scala version 2.12.10)을 사용하면 이제 스키마를 문자열로 지정할 수 있습니다.schema기능.

import org.apache.spark.sql.SparkSession;

val sparkSession = SparkSession.builder()
            .appName("sample-app")
            .master("local[2]")
            .getOrCreate();

val pageCount = sparkSession.read
  .format("csv")
  .option("delimiter","|")
  .option("quote","")
  .schema("project string ,article string ,requests integer ,bytes_served long")
  .load("dbfs:/databricks-datasets/wikipedia-datasets/data-001/pagecounts/sample/pagecounts-20151124-170000")

@Nulu의 답변 덕분에 최소한의 수정으로 pyspark에 적합합니다.

from pyspark.sql.types import LongType, StringType, StructField, StructType, BooleanType, ArrayType, IntegerType

customSchema = StructType(Array(
    StructField("project", StringType, true),
    StructField("article", StringType, true),
    StructField("requests", IntegerType, true),
    StructField("bytes_served", DoubleType, true)))

pagecount = sc.read.format("com.databricks.spark.csv")
         .option("delimiter"," ")
         .option("quote","")
         .option("header", "false")
         .schema(customSchema)
         .load("dbfs:/databricks-datasets/wikipedia-datasets/data-001/pagecounts/sample/pagecounts-20151124-170000")

스키마 정의를 단순 문자열로 지정

스키마 정의에 관심이 있는 사람이 날짜시간 스탬프가 있는 간단한 문자열인 경우.

터미널 또는 셸에서 데이터 파일 생성

echo " 
2019-07-02 22:11:11.000999, 01/01/2019, Suresh, abc  
2019-01-02 22:11:11.000001, 01/01/2020, Aadi, xyz 
" > data.csv

스키마를 String으로 정의

    user_schema = 'timesta TIMESTAMP,date DATE,first_name STRING , last_name STRING'

데이터 읽기

    df = spark.read.csv(path='data.csv', schema = user_schema, sep=',', dateFormat='MM/dd/yyyy',timestampFormat='yyyy-MM-dd HH:mm:ss.SSSSSS')

    df.show(10, False)

    +-----------------------+----------+----------+---------+
    |timesta                |date      |first_name|last_name|
    +-----------------------+----------+----------+---------+
    |2019-07-02 22:11:11.999|2019-01-01| Suresh   | abc     |
    |2019-01-02 22:11:11.001|2020-01-01| Aadi     | xyz     |
    +-----------------------+----------+----------+---------+

스파크가 스키마를 추론하도록 하는 대신 스키마를 명시적으로 정의하는 것도 스파크 읽기 성능을 향상시킵니다.

사용자 지정 스키마, 전체 데모로 작업할 수 있는 방법은 다음과 같습니다.

$> 셸 코드,

echo "
Slingo, iOS 
Slingo, Android
" > game.csv

스칼라 코드:

import org.apache.spark.sql.types._

val customSchema = StructType(Array(
  StructField("game_id", StringType, true),
  StructField("os_id", StringType, true)
))

val csv_df = spark.read.format("csv").schema(customSchema).load("game.csv")
csv_df.show 

csv_df.orderBy(asc("game_id"), desc("os_id")).show
csv_df.createOrReplaceTempView("game_view")
val sort_df = sql("select * from game_view order by game_id, os_id desc")
sort_df.show 
// import Library
import java.io.StringReader ;

import au.com.bytecode.opencsv.CSVReader

//filename

var train_csv = "/Path/train.csv";

//read as text file

val train_rdd = sc.textFile(train_csv)   

//use string reader to convert in proper format

var full_train_data  = train_rdd.map{line =>  var csvReader = new CSVReader(new StringReader(line)) ; csvReader.readNext();  }   

//declares  types

type s = String

// declare case class for schema

case class trainSchema (Loan_ID :s ,Gender :s, Married :s, Dependents :s,Education :s,Self_Employed :s,ApplicantIncome :s,CoapplicantIncome :s,
    LoanAmount :s,Loan_Amount_Term :s, Credit_History :s, Property_Area :s,Loan_Status :s)

//create DF RDD with custom schema 

var full_train_data_with_schema = full_train_data.mapPartitionsWithIndex{(idx,itr)=> if (idx==0) itr.drop(1); 
                     itr.toList.map(x=> trainSchema(x(0),x(1),x(2),x(3),x(4),x(5),x(6),x(7),x(8),x(9),x(10),x(11),x(12))).iterator }.toDF

pyspark 2.4 이후에는 간단히 사용할 수 있습니다.header정확한 헤더를 설정하기 위한 파라미터:

data = spark.read.csv('data.csv', header=True)

마찬가지로 스칼라를 사용하는 경우 사용할 수 있습니다.header매개 변수도 마찬가지입니다.

sparkSession과 암묵적으로 이렇게 할 수도 있습니다.

import sparkSession.implicits._
val pagecount:DataFrame = sparkSession.read
.option("delimiter"," ")
.option("quote","")
.option("inferSchema","true")
.csv("dbfs:/databricks-datasets/wikipedia-datasets/data-001/pagecounts/sample/pagecounts-20151124-170000")
.toDF("project","article","requests","bytes_served")

스파크 버전이 3.0.1인 경우 다음 Scala 스크립트를 사용할 수 있습니다.

val df = spark.read.format("csv").option("delimiter",",").option("header",true).load("file:///LOCAL_CSV_FILE_PATH")

하지만 이런 식으로 모든 데이터 유형은 다음과String.

CSV를 로드하는 동안 열 이름을 데이터 프레임에 전달할 수 있는 옵션 중 하나입니다.

import pandas
    names = ['sepal-length', 'sepal-width', 'petal-length', 'petal-width', 'class']
    dataset = pandas.read_csv("C:/Users/NS00606317/Downloads/Iris.csv", names=names, header=0)
print(dataset.head(10))

산출량

    sepal-length  sepal-width  petal-length  petal-width        class
1            5.1          3.5           1.4          0.2  Iris-setosa
2            4.9          3.0           1.4          0.2  Iris-setosa
3            4.7          3.2           1.3          0.2  Iris-setosa
4            4.6          3.1           1.5          0.2  Iris-setosa
5            5.0          3.6           1.4          0.2  Iris-setosa
6            5.4          3.9           1.7          0.4  Iris-setosa
7            4.6          3.4           1.4          0.3  Iris-setosa
8            5.0          3.4           1.5          0.2  Iris-setosa
9            4.4          2.9           1.4          0.2  Iris-setosa
10           4.9          3.1           1.5          0.1  Iris-setosa

여기 제 해결책은 다음과 같습니다.

import org.apache.spark.sql.types._
  val spark = org.apache.spark.sql.SparkSession.builder.
  master("local[*]").
  appName("Spark CSV Reader").
  getOrCreate()

val movie_rating_schema = StructType(Array(
  StructField("UserID", IntegerType, true),
  StructField("MovieID", IntegerType, true),
  StructField("Rating", DoubleType, true),
  StructField("Timestamp", TimestampType, true)))

val df_ratings: DataFrame = spark.read.format("csv").
  option("header", "true").
  option("mode", "DROPMALFORMED").
  option("delimiter", ",").
  //option("inferSchema", "true").
  option("nullValue", "null").
  schema(movie_rating_schema).
  load(args(0)) //"file:///home/hadoop/spark-workspace/data/ml-20m/ratings.csv"

val movie_avg_scores = df_ratings.rdd.map(_.toString()).
  map(line => {
    // drop "[", "]" and then split the str 
    val fileds = line.substring(1, line.length() - 1).split(",")
    //extract (movie id, average rating)
    (fileds(1).toInt, fileds(2).toDouble)
  }).
  groupByKey().
  map(data => {
    val avg: Double = data._2.sum / data._2.size
    (data._1, avg)
  })

언급URL : https://stackoverflow.com/questions/39926411/provide-schema-while-reading-csv-file-as-a-dataframe-in-scala-spark

반응형