Extend Spark Data Source with DataFrame

In this article, we’ll implement a spark data source for reading and writing a Google spreadsheet, so that you’ll know how to extend the data source of Spark by yourself.

What’s a customized data source like?

read data from Google Spreadsheet into a DataFrame

1
2
3
4
5
val data = spark.read.format("google-spreadsheet")
.option("credentialsPath", credentialFile)
.option("spreadsheetId", spreadsheetId)
.option("sheetName", sheetName1)
.load()

write data of a DataFrame into Google Spreadsheet

1
2
3
4
5
6
df.write.format("google-spreadsheet")
.option("credentialsPath", credentialFile)
.option("spreadsheetId", spreadsheetId)
.option("sheetName", sheetName)
.mode(SaveMode.Overwrite)
.save()

Implement

In total, we’ll implement 4 files:

1
2
3
4
5
6
7
GoogleSpreadsheetDataSource.scala

GoogleSpreadsheetDataSourceException.scala

GoogleSpreadsheetDataSourceReader.scala

GoogleSpreadsheetDataSourceWriter.scala
  • GoogleSpreadsheetDataSource is the main class to define a DataSource.

  • GoogleSpreadsheetDataSourceReader and GoogleSpreadsheetDataSourceWriter tell Spark how to read and write data with a Sheet separately.

  • GoogleSpreadsheetDataSourceException is just a customized exception.

You can check the full code here Liam8/spark-google-spreadsheet-datasource.

main class

Let’s start with the main class.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
class GoogleSpreadsheetDataSource extends ReadSupport with WriteSupport with DataSourceRegister {

override def createReader(options: DataSourceOptions): DataSourceReader = {
createReader(null, options)
}

override def createReader(schema: StructType, options: DataSourceOptions): DataSourceReader = {
new GoogleSpreadsheetDataSourceReader(
options.get("spreadsheetId").get(),
options.get("sheetName").get(),
options.get("credentialsPath").get(),
options.getInt("bufferSizeOfEachPartition", 100),
Option(schema),
options.getBoolean("firstRowAsHeader", true),
options.getInt("parallelism",
SparkSession.getActiveSession.get.sparkContext.defaultParallelism)
)
}

override def shortName(): String = "google-spreadsheet"

override def createWriter(
writeUUID: String, schema: StructType, mode: SaveMode, options: DataSourceOptions
): Optional[DataSourceWriter] = {
Optional.of(new GoogleSpreadsheetDataSourceWriter(
mode,
options.get("spreadsheetId").get(),
options.get("sheetName").get(),
options.get("credentialsPath").get(),
options.getInt("bufferSizeOfEachPartition", 10),
schema,
options.getBoolean("firstRowAsHeader", true)
))
}
}

object GoogleSpreadsheetDataSource {
def buildSheet(credentialsPath: String): Sheets =
new Sheets.Builder(
GoogleNetHttpTransport.newTrustedTransport,
JacksonFactory.getDefaultInstance,
new HttpCredentialsAdapter(GoogleCredentials.fromStream(
this.getClass.getClassLoader.getResourceAsStream(credentialsPath)
).createScoped(SheetsScopes.SPREADSHEETS))
).setApplicationName("GoogleSpreadsheetDataSourceReader").build()
}

In this class, we can get options from DataSourceOptions to create the Reader and Writer. If you only want to read data, you don’t need to extend the WriteSupport.

Reader

In the Reader we mainly need to deal with two things, one is how to generate partitions, and another one is how to read data in one partition.

The most important thing is to implement the DataSourceReader.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
override def planInputPartitions(): util.List[InputPartition[InternalRow]] = {
val sheets = sheetService.spreadsheets().get(spreadsheetId)
.setFields("sheets.properties").execute().getSheets
// sheetName is case insensitive
val tmpSheetName = sheetName.toLowerCase
val sheet = sheets.asScala.find(_.getProperties.getTitle.toLowerCase == tmpSheetName).getOrElse(
throw GoogleSpreadsheetDataSourceException(s"Can't find the sheet named $sheetName")
)
val rowCount = sheet.getProperties.getGridProperties.getRowCount
val step = Math.ceil(rowCount / parallelism).toInt
val start = if (firstRowAsHeader) 2 else 1
Range.inclusive(start, rowCount, step).map { i =>
new GoogleSpreadsheetInputPartition(
credentialsPath,
spreadsheetId,
sheetName,
i,
Math.min(i + step - 1, rowCount),
bufferSizeOfEachPartition,
originalSchema,
prunedSchema
).asInstanceOf[InputPartition[InternalRow]]
}.toList.asJava
}

In the method planInputPartitions, we create a list of GoogleSpreadsheetInputPartition.

GoogleSpreadsheetInputPartition is a subclass of InputPartition that holds the metadata of one partition.

1
2
3
4
5
6
7
8
9
10
class GoogleSpreadsheetInputPartition(
credentialsPath: String,
spreadsheetId: String,
sheetName: String,
startOffset: Int,
endOffset: Int,
bufferSize: Int,
schema: StructType,
prunedSchema: Option[StructType]
) extends InputPartition[InternalRow] {

The startOffset: Int, endOffset: Int means from which offset or row current partition should read and where to stop.

1
2
val rowCount = sheet.getProperties.getGridProperties.getRowCount
val step = Math.ceil(rowCount / parallelism).toInt

Here we get the total number of rows in the sheet, and then divide it by parallelism to get how many rows should be read in one partition. Then we can calculate the startOffset and endOffset off all pages.

1
2
3
4
5
6
7
8
9
class GoogleSpreadsheetDataSourceReader(
spreadsheetId: String,
sheetName: String,
credentialsPath: String,
bufferSizeOfEachPartition: Int,
var schema: Option[StructType] = None,
firstRowAsHeader: Boolean,
parallelism: Int
) extends DataSourceReader with SupportsPushDownRequiredColumns

Where we really do the data reading is in the class GoogleSpreadsheetInputPartitionReader, I override two methods here override def get(): InternalRow and override def next(): Boolean. It’s very similar like implement an iterator: next() returns true or false to tell if any more data to read; get() to get the next row of data.

To improve the performance, the next() here read a batch of data from the sheet, and then buffs it. So the get() just return a row of data from the buffer.

Writer

1
2
3
4
5
6
7
8
9
class GoogleSpreadsheetDataWriter(
saveMode: SaveMode,
schema: StructType,
credentialsPath: String,
spreadsheetId: String,
sheetName: String,
bufferSize: Int,
firstRowAsHeader: Boolean
) extends DataWriter[InternalRow]

In file GoogleSpreadsheetDataSourceWriter.scala the most important class is GoogleSpreadsheetDataWriter. In this class, we have a method override def write(record: InternalRow): Unit to write data of one partition into an external system. In my implementation, I put data into a buffer first, and write data into the sheet when the buffer is full.

Register Datasource

If we want to make the data source can be used as the built-in data source like:spark.read.format("google-spreadsheet"), we need to register it first. There is a file to do this /src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister.

The content of this file is:

1
com.github.liam8.spark.datasource.googlesheet.GoogleSpreadsheetDataSource

End

In a work, the core part of the data source implementation is reading and writing data of one partition. The Spark framework will do most of the rest things for you.

Following this way to extend the Spark data source, basically, you can let Spark read and write data with any storage.

Check the repo in Github