Scala Spark에서 csv 파일을 데이터 프레임으로 읽는 동안 스키마 제공
csv 파일을 데이터 프레임으로 읽으려고 합니다.csv 파일을 알고 있기 때문에 데이터 프레임의 스키마가 무엇인지 알고 있습니다.또한 파일을 읽기 위해 스파크 csv 패키지를 사용하고 있습니다.아래와 같이 스키마를 지정하려고 합니다.
val pagecount = sqlContext.read.format("csv")
.option("delimiter"," ").option("quote","")
.option("schema","project: string ,article: string ,requests: integer ,bytes_served: long")
.load("dbfs:/databricks-datasets/wikipedia-datasets/data-001/pagecounts/sample/pagecounts-20151124-170000")
그런데 제가 작성한 데이터 프레임의 스키마를 확인해보니 자체 스키마를 가져간 것 같습니다.제가 무슨 잘못을 하고 있는 건가요? 제가 말한 스키마를 선택하기 위해 스파크를 만드는 방법은 무엇인가요?
> pagecount.printSchema
root
|-- _c0: string (nullable = true)
|-- _c1: string (nullable = true)
|-- _c2: string (nullable = true)
|-- _c3: string (nullable = true)
아래 코드를 사용해 보십시오. 스키마를 지정할 필요는 없습니다.interpseSchema를 true로 지정하면 csv 파일에서 가져와야 합니다.
val pagecount = sqlContext.read.format("csv")
.option("delimiter"," ").option("quote","")
.option("header", "true")
.option("inferSchema", "true")
.load("dbfs:/databricks-datasets/wikipedia-datasets/data-001/pagecounts/sample/pagecounts-20151124-170000")
스키마를 수동으로 지정하려면 다음과 같이 수행할 수 있습니다.
import org.apache.spark.sql.types._
val customSchema = StructType(Array(
StructField("project", StringType, true),
StructField("article", StringType, true),
StructField("requests", IntegerType, true),
StructField("bytes_served", DoubleType, true))
)
val pagecount = sqlContext.read.format("csv")
.option("delimiter"," ").option("quote","")
.option("header", "true")
.schema(customSchema)
.load("dbfs:/databricks-datasets/wikipedia-datasets/data-001/pagecounts/sample/pagecounts-20151124-170000")
Python에서 이것을 하는 것에 관심이 있는 사람들을 위해 여기 작동하는 버전이 있습니다.
customSchema = StructType([
StructField("IDGC", StringType(), True),
StructField("SEARCHNAME", StringType(), True),
StructField("PRICE", DoubleType(), True)
])
productDF = spark.read.load('/home/ForTesting/testProduct.csv', format="csv", header="true", sep='|', schema=customSchema)
testProduct.csv
ID|SEARCHNAME|PRICE
6607|EFKTON75LIN|890.88
6612|EFKTON100HEN|55.66
도움이 되길 바랍니다.
아루나키란 눌루가 제공하는 솔루션을 분석에 사용하고 있습니다(코드 참조).열에 올바른 유형을 할당할 수 있음에도 불구하고 반환되는 모든 값은null. 전에 그 옵션을 시도해 본 적이 있습니다..option("inferSchema", "true")그리고 데이터 프레임에서 올바른 값을 반환합니다(종류는 다르지만).
val customSchema = StructType(Array(
StructField("numicu", StringType, true),
StructField("fecha_solicitud", TimestampType, true),
StructField("codtecnica", StringType, true),
StructField("tecnica", StringType, true),
StructField("finexploracion", TimestampType, true),
StructField("ultimavalidacioninforme", TimestampType, true),
StructField("validador", StringType, true)))
val df_explo = spark.read
.format("csv")
.option("header", "true")
.option("delimiter", "\t")
.option("timestampFormat", "yyyy/MM/dd HH:mm:ss")
.schema(customSchema)
.load(filename)
결과
root
|-- numicu: string (nullable = true)
|-- fecha_solicitud: timestamp (nullable = true)
|-- codtecnica: string (nullable = true)
|-- tecnica: string (nullable = true)
|-- finexploracion: timestamp (nullable = true)
|-- ultimavalidacioninforme: timestamp (nullable = true)
|-- validador: string (nullable = true)
표는 다음과 같습니다.
|numicu|fecha_solicitud|codtecnica|tecnica|finexploracion|ultimavalidacioninforme|validador|
+------+---------------+----------+-------+--------------+-----------------------+---------+
| null| null| null| null| null| null| null|
| null| null| null| null| null| null| null|
| null| null| null| null| null| null| null|
| null| null| null| null| null| null| null|
이전 솔루션에서는 사용자 지정 StructType을 사용했습니다.
spark-sql 2.4.5 (scala version 2.12.10)을 사용하면 이제 스키마를 문자열로 지정할 수 있습니다.schema기능.
import org.apache.spark.sql.SparkSession;
val sparkSession = SparkSession.builder()
.appName("sample-app")
.master("local[2]")
.getOrCreate();
val pageCount = sparkSession.read
.format("csv")
.option("delimiter","|")
.option("quote","")
.schema("project string ,article string ,requests integer ,bytes_served long")
.load("dbfs:/databricks-datasets/wikipedia-datasets/data-001/pagecounts/sample/pagecounts-20151124-170000")
@Nulu의 답변 덕분에 최소한의 수정으로 pyspark에 적합합니다.
from pyspark.sql.types import LongType, StringType, StructField, StructType, BooleanType, ArrayType, IntegerType
customSchema = StructType(Array(
StructField("project", StringType, true),
StructField("article", StringType, true),
StructField("requests", IntegerType, true),
StructField("bytes_served", DoubleType, true)))
pagecount = sc.read.format("com.databricks.spark.csv")
.option("delimiter"," ")
.option("quote","")
.option("header", "false")
.schema(customSchema)
.load("dbfs:/databricks-datasets/wikipedia-datasets/data-001/pagecounts/sample/pagecounts-20151124-170000")
스키마 정의를 단순 문자열로 지정
스키마 정의에 관심이 있는 사람이 날짜 및 시간 스탬프가 있는 간단한 문자열인 경우.
터미널 또는 셸에서 데이터 파일 생성
echo "
2019-07-02 22:11:11.000999, 01/01/2019, Suresh, abc
2019-01-02 22:11:11.000001, 01/01/2020, Aadi, xyz
" > data.csv
스키마를 String으로 정의
user_schema = 'timesta TIMESTAMP,date DATE,first_name STRING , last_name STRING'
데이터 읽기
df = spark.read.csv(path='data.csv', schema = user_schema, sep=',', dateFormat='MM/dd/yyyy',timestampFormat='yyyy-MM-dd HH:mm:ss.SSSSSS')
df.show(10, False)
+-----------------------+----------+----------+---------+
|timesta |date |first_name|last_name|
+-----------------------+----------+----------+---------+
|2019-07-02 22:11:11.999|2019-01-01| Suresh | abc |
|2019-01-02 22:11:11.001|2020-01-01| Aadi | xyz |
+-----------------------+----------+----------+---------+
스파크가 스키마를 추론하도록 하는 대신 스키마를 명시적으로 정의하는 것도 스파크 읽기 성능을 향상시킵니다.
사용자 지정 스키마, 전체 데모로 작업할 수 있는 방법은 다음과 같습니다.
$> 셸 코드,
echo "
Slingo, iOS
Slingo, Android
" > game.csv
스칼라 코드:
import org.apache.spark.sql.types._
val customSchema = StructType(Array(
StructField("game_id", StringType, true),
StructField("os_id", StringType, true)
))
val csv_df = spark.read.format("csv").schema(customSchema).load("game.csv")
csv_df.show
csv_df.orderBy(asc("game_id"), desc("os_id")).show
csv_df.createOrReplaceTempView("game_view")
val sort_df = sql("select * from game_view order by game_id, os_id desc")
sort_df.show
// import Library
import java.io.StringReader ;
import au.com.bytecode.opencsv.CSVReader
//filename
var train_csv = "/Path/train.csv";
//read as text file
val train_rdd = sc.textFile(train_csv)
//use string reader to convert in proper format
var full_train_data = train_rdd.map{line => var csvReader = new CSVReader(new StringReader(line)) ; csvReader.readNext(); }
//declares types
type s = String
// declare case class for schema
case class trainSchema (Loan_ID :s ,Gender :s, Married :s, Dependents :s,Education :s,Self_Employed :s,ApplicantIncome :s,CoapplicantIncome :s,
LoanAmount :s,Loan_Amount_Term :s, Credit_History :s, Property_Area :s,Loan_Status :s)
//create DF RDD with custom schema
var full_train_data_with_schema = full_train_data.mapPartitionsWithIndex{(idx,itr)=> if (idx==0) itr.drop(1);
itr.toList.map(x=> trainSchema(x(0),x(1),x(2),x(3),x(4),x(5),x(6),x(7),x(8),x(9),x(10),x(11),x(12))).iterator }.toDF
pyspark 2.4 이후에는 간단히 사용할 수 있습니다.header정확한 헤더를 설정하기 위한 파라미터:
data = spark.read.csv('data.csv', header=True)
마찬가지로 스칼라를 사용하는 경우 사용할 수 있습니다.header매개 변수도 마찬가지입니다.
sparkSession과 암묵적으로 이렇게 할 수도 있습니다.
import sparkSession.implicits._
val pagecount:DataFrame = sparkSession.read
.option("delimiter"," ")
.option("quote","")
.option("inferSchema","true")
.csv("dbfs:/databricks-datasets/wikipedia-datasets/data-001/pagecounts/sample/pagecounts-20151124-170000")
.toDF("project","article","requests","bytes_served")
스파크 버전이 3.0.1인 경우 다음 Scala 스크립트를 사용할 수 있습니다.
val df = spark.read.format("csv").option("delimiter",",").option("header",true).load("file:///LOCAL_CSV_FILE_PATH")
하지만 이런 식으로 모든 데이터 유형은 다음과String.
CSV를 로드하는 동안 열 이름을 데이터 프레임에 전달할 수 있는 옵션 중 하나입니다.
import pandas
names = ['sepal-length', 'sepal-width', 'petal-length', 'petal-width', 'class']
dataset = pandas.read_csv("C:/Users/NS00606317/Downloads/Iris.csv", names=names, header=0)
print(dataset.head(10))
산출량
sepal-length sepal-width petal-length petal-width class
1 5.1 3.5 1.4 0.2 Iris-setosa
2 4.9 3.0 1.4 0.2 Iris-setosa
3 4.7 3.2 1.3 0.2 Iris-setosa
4 4.6 3.1 1.5 0.2 Iris-setosa
5 5.0 3.6 1.4 0.2 Iris-setosa
6 5.4 3.9 1.7 0.4 Iris-setosa
7 4.6 3.4 1.4 0.3 Iris-setosa
8 5.0 3.4 1.5 0.2 Iris-setosa
9 4.4 2.9 1.4 0.2 Iris-setosa
10 4.9 3.1 1.5 0.1 Iris-setosa
여기 제 해결책은 다음과 같습니다.
import org.apache.spark.sql.types._
val spark = org.apache.spark.sql.SparkSession.builder.
master("local[*]").
appName("Spark CSV Reader").
getOrCreate()
val movie_rating_schema = StructType(Array(
StructField("UserID", IntegerType, true),
StructField("MovieID", IntegerType, true),
StructField("Rating", DoubleType, true),
StructField("Timestamp", TimestampType, true)))
val df_ratings: DataFrame = spark.read.format("csv").
option("header", "true").
option("mode", "DROPMALFORMED").
option("delimiter", ",").
//option("inferSchema", "true").
option("nullValue", "null").
schema(movie_rating_schema).
load(args(0)) //"file:///home/hadoop/spark-workspace/data/ml-20m/ratings.csv"
val movie_avg_scores = df_ratings.rdd.map(_.toString()).
map(line => {
// drop "[", "]" and then split the str
val fileds = line.substring(1, line.length() - 1).split(",")
//extract (movie id, average rating)
(fileds(1).toInt, fileds(2).toDouble)
}).
groupByKey().
map(data => {
val avg: Double = data._2.sum / data._2.size
(data._1, avg)
})
언급URL : https://stackoverflow.com/questions/39926411/provide-schema-while-reading-csv-file-as-a-dataframe-in-scala-spark
'programing' 카테고리의 다른 글
| 함수 포인터에 캐스트 (0) | 2023.10.05 |
|---|---|
| 특정 날짜가 있는 레코드를 선택하는 MySql 쿼리 (0) | 2023.10.05 |
| 우커머스: 쿠폰 적용/제거 시 에이잭스를 비활성화하는 방법? (0) | 2023.10.05 |
| 접두사(++x) 및 후픽스(x++) 작업은 어떻게 작동합니까? (0) | 2023.10.05 |
| Carma Jasmine 테스트 실행에서 각도 모듈을 사용할 수 (0) | 2023.10.05 |