Spark의 CSV 파일에서 헤더를 건너 뛰려면 어떻게하나요?
읽을 Spark 컨텍스트에 3 개의 파일 경로를 제공하고 각 파일의 첫 번째 행에 스키마가 있다고 가정합니다. 헤더에서 스키마 줄을 어떻게 건너 뛸 수 있습니까?
val rdd=sc.textFile("file1,file2,file3")
이제이 rdd에서 헤더 행을 어떻게 건너 뛸 수 있습니까?
첫 번째 레코드에 헤더 행이 하나만있는 경우이를 필터링하는 가장 효율적인 방법은 다음과 같습니다.
rdd.mapPartitionsWithIndex {
(idx, iter) => if (idx == 0) iter.drop(1) else iter
}
물론 내부에 헤더 줄이 많은 파일이 많은 경우에는 도움이되지 않습니다. 실제로 이렇게 만든 세 개의 RDD를 결합 할 수 있습니다.
당신은 또한 단지 쓸 수 filter
헤더가 될 수 있습니다 만 라인을 일치합니다. 이것은 매우 간단하지만 덜 효율적입니다.
이에 상응하는 Python :
from itertools import islice
rdd.mapPartitionsWithIndex(
lambda idx, it: islice(it, 1, None) if idx == 0 else it
)
data = sc.textFile('path_to_data')
header = data.first() #extract header
data = data.filter(row => row != header) #filter out header
Spark 2.0에서는 CSV 리더가 Spark에 빌드되므로 다음과 같이 CSV 파일을 쉽게로드 할 수 있습니다.
spark.read.option("header","true").csv("filePath")
에서 스파크 2.0 이후 당신이 할 수있는 것은 사용입니다 SparkSession 이 한 라이너로 수행하려면 :
val spark = SparkSession.builder.config(conf).getOrCreate()
그리고 @SandeepPurohit이 말했듯이 :
val dataFrame = spark.read.format("CSV").option("header","true").load(csvfilePath)
질문이 해결 되었기를 바랍니다.
추신 : SparkSession은 Spark 2.0에 도입 된 새로운 진입 점 이며 spark_sql 패키지 에서 찾을 수 있습니다.
PySpark에서 데이터 프레임을 사용하고 헤더를 True로 설정할 수 있습니다.
df = spark.read.csv(dataPath, header=True)
각 파일을 개별적으로로드하고 필터링 file.zipWithIndex().filter(_._2 > 0)
한 다음 모든 파일 RDD를 통합 할 수 있습니다.
파일 수가 너무 많으면 공용체가 StackOverflowExeption
.
filter()
헤더를 제거하려면 첫 번째 열 이름을 필터링하여 PySpark 의 메서드를 사용합니다 .
# Read file (change format for other file formats)
contentRDD = sc.textfile(<filepath>)
# Filter out first column of the header
filterDD = contentRDD.filter(lambda l: not l.startswith(<first column name>)
# Check your result
for i in filterDD.take(5) : print (i)
2018 년 작업 (Spark 2.3)
파이썬
df = spark.read
.option("header", "true")
.format("csv")
.schema(myManualSchema)
.load("mycsv.csv")
스칼라
val myDf = spark.read
.option("header", "true")
.format("csv")
.schema(myManualSchema)
.load("mycsv.csv")
PD1 : myManualSchema는 내가 작성한 미리 정의 된 스키마입니다. 해당 코드 부분을 건너 뛸 수 있습니다.
read()
명령에 전달하는 옵션입니다 .
context = new org.apache.spark.sql.SQLContext(sc)
var data = context.read.option("header","true").csv("<path>")
또는 spark-csv 패키지를 사용할 수 있습니다 (또는 Spark 2.0에서는 기본적으로 CSV로 사용 가능함). 이것은 (원하는대로) 각 파일의 헤더를 예상합니다.
schema = StructType([
StructField('lat',DoubleType(),True),
StructField('lng',DoubleType(),True)])
df = sqlContext.read.format('com.databricks.spark.csv'). \
options(header='true',
delimiter="\t",
treatEmptyValuesAsNulls=True,
mode="DROPMALFORMED").load(input_file,schema=schema)
//Find header from the files lying in the directory
val fileNameHeader = sc.binaryFiles("E:\\sss\\*.txt",1).map{
case (fileName, stream)=>
val header = new BufferedReader(new InputStreamReader(stream.open())).readLine()
(fileName, header)
}.collect().toMap
val fileNameHeaderBr = sc.broadcast(fileNameHeader)
// Now let's skip the header. mapPartition will ensure the header
// can only be the first line of the partition
sc.textFile("E:\\sss\\*.txt",1).mapPartitions(iter =>
if(iter.hasNext){
val firstLine = iter.next()
println(s"Comparing with firstLine $firstLine")
if(firstLine == fileNameHeaderBr.value.head._2)
new WrappedIterator(null, iter)
else
new WrappedIterator(firstLine, iter)
}
else {
iter
}
).collect().foreach(println)
class WrappedIterator(firstLine:String,iter:Iterator[String]) extends Iterator[String]{
var isFirstIteration = true
override def hasNext: Boolean = {
if (isFirstIteration && firstLine != null){
true
}
else{
iter.hasNext
}
}
override def next(): String = {
if (isFirstIteration){
println(s"For the first time $firstLine")
isFirstIteration = false
if (firstLine != null){
firstLine
}
else{
println(s"Every time $firstLine")
iter.next()
}
}
else {
iter.next()
}
}
}
파이썬 개발자를 위해. 나는 spark2.0으로 테스트했습니다. 처음 14 개 행을 제거한다고 가정 해 보겠습니다.
sc = spark.sparkContext
lines = sc.textFile("s3://folder_location_of_csv/")
parts = lines.map(lambda l: l.split(","))
parts.zipWithIndex().filter(lambda tup: tup[1] > 14).map(lambda x:x[0])
withColumn은 df 함수입니다. 따라서 아래는 위에서 사용 된 RDD 스타일에서 작동하지 않습니다.
parts.withColumn("index",monotonically_increasing_id()).filter(index > 14)
참고 URL : https://stackoverflow.com/questions/27854919/how-do-i-skip-a-header-from-csv-files-in-spark
'Nice programing' 카테고리의 다른 글
Spring이 존재하는 경우 Bean xml 구성 파일을 찾을 수 없습니다. (0) | 2020.11.12 |
---|---|
Android Studio 인수에 대한 runProguard () 메서드를 찾을 수 없습니까? (0) | 2020.11.12 |
프로그래밍 방식으로 Android 앱을 종료하는 방법은 무엇입니까? (0) | 2020.11.12 |
Excel에서 열의 모든 셀에 동일한 텍스트 추가 (0) | 2020.11.12 |
반응 형으로 텍스트 정렬을 변경 하시겠습니까 (Bootstrap 3 사용)? (0) | 2020.11.12 |