Extend Spark Data Source with DataFrame
In this article, we’ll implement a spark data source for reading and writing a Google spreadsheet, so that you’ll know how to extend the data source of Spark by yourself.
What’s a customized data source like?
read data from Google Spreadsheet into a DataFrame
1 | val data = spark.read.format("google-spreadsheet") |
write data of a DataFrame into Google Spreadsheet
1 | df.write.format("google-spreadsheet") |
Implement
In total, we’ll implement 4 files:
1 | GoogleSpreadsheetDataSource.scala |
GoogleSpreadsheetDataSource is the main class to define a DataSource.
GoogleSpreadsheetDataSourceReader and GoogleSpreadsheetDataSourceWriter tell Spark how to read and write data with a Sheet separately.
GoogleSpreadsheetDataSourceException is just a customized exception.
You can check the full code here Liam8/spark-google-spreadsheet-datasource.
main class
Let’s start with the main class.
1 | class GoogleSpreadsheetDataSource extends ReadSupport with WriteSupport with DataSourceRegister { |
In this class, we can get options from DataSourceOptions to create the Reader and Writer. If you only want to read data, you don’t need to extend the WriteSupport.
Reader
In the Reader we mainly need to deal with two things, one is how to generate partitions, and another one is how to read data in one partition.
The most important thing is to implement the DataSourceReader.
1 | override def planInputPartitions(): util.List[InputPartition[InternalRow]] = { |
In the method planInputPartitions
, we create a list of GoogleSpreadsheetInputPartition
.
GoogleSpreadsheetInputPartition
is a subclass of InputPartition
that holds the metadata of one partition.
1 | class GoogleSpreadsheetInputPartition( |
The startOffset: Int, endOffset: Int
means from which offset or row current partition should read and where to stop.
1 | val rowCount = sheet.getProperties.getGridProperties.getRowCount |
Here we get the total number of rows in the sheet, and then divide it by parallelism to get how many rows should be read in one partition. Then we can calculate the startOffset
and endOffset
off all pages.
1 | class GoogleSpreadsheetDataSourceReader( |
Where we really do the data reading is in the class GoogleSpreadsheetInputPartitionReader
, I override two methods here override def get(): InternalRow
and override def next(): Boolean
. It’s very similar like implement an iterator: next()
returns true or false to tell if any more data to read; get()
to get the next row of data.
To improve the performance, the next()
here read a batch of data from the sheet, and then buffs it. So the get()
just return a row of data from the buffer.
Writer
1 | class GoogleSpreadsheetDataWriter( |
In file GoogleSpreadsheetDataSourceWriter.scala
the most important class is GoogleSpreadsheetDataWriter
. In this class, we have a method override def write(record: InternalRow): Unit
to write data of one partition into an external system. In my implementation, I put data into a buffer first, and write data into the sheet when the buffer is full.
Register Datasource
If we want to make the data source can be used as the built-in data source like:spark.read.format("google-spreadsheet")
, we need to register it first. There is a file to do this /src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
.
The content of this file is:
1 | com.github.liam8.spark.datasource.googlesheet.GoogleSpreadsheetDataSource |
End
In a work, the core part of the data source implementation is reading and writing data of one partition. The Spark framework will do most of the rest things for you.
Following this way to extend the Spark data source, basically, you can let Spark read and write data with any storage.