Spark Connector User Guide
Overview
The Koverse Spark Connector is built using the Spark 3 DatasourceV2 framework and it uses Scala 2.12. It works just like built-in data sources such as the JSON data source or the JDBC data source.
It’s an intuitive and powerful tool for using Koverse to store your datasets before and after Spark transformations.
Installation
The first step is to add the Spark Connector as a dependency to your Spark application. It can be found at the Maven Central Repository.
Add the Connector to your project configuration file:
Maven:
<dependency>
<groupId>com.koverse</groupId>
<artifactId>kdp-spark-connector_2.12</artifactId>
<version>4.1.2</version>
</dependency>
SBT:
libraryDependencies += "com.koverse" % "kdp-spark-connector_2.12" % "4.0.2"
Gradle:
implementation group: 'com.koverse', name: 'kdp-spark-connector_2.12', version: '4.0.2'
Examples
Using the Koverse Spark Connector in Your Spark Application
The below example code shows how to add the Koverse Spark Connector to your Spark application for reading and writing to Koverse. The Spark Connector requires you to save your connection information as the environment variables before running your application. It will authenticate with Koverse or Keycloak before performing any operations.
Koverse Credentials:
export KDP_EMAIL=<kdp_email>
export KDP_PSWD=<kdp_password>
Keycloak Credentials:
export AUTHENTICATE_WITH_KEYCLOAK=true
export KEYCLOAK_BASE_URL=<your_kdp_url>
export KEYCLOAK_REALM=<your_keycloak_realm>
export KEYCLOAK_CLIENT_ID=<your_keycloak_client_id>
export KEYCLOAK_CLIENT_SECRET=<your_keycloak_client_secret>
export KEYCLOAK_USERNAME=<your_keycloak_username>
export KEYCLOAK_PASSWORD=<your_keycloak_password>
The default setting for the KdpUrl
is https://api.app.koverse.com
, although KdpUrl
can also be passed in as a data source “option”.
For all actions you will need to pass in the workspaceId
for the datasets you are reading from.
String workspaceId = “myworkspace”;
To read from the Koverse datasetId
of the dataset you want to read is required.
String datasetId = "bca220e3-67fc-4504-9ae8-bedee9d68e6c";
You call the Koverse Spark Connector in the format
option and include the additional required options:
workspaceId
datasetId
[Dataset<Row> randomDF = spark.read().format("com.koverse.spark.read")
.option("workspaceId", workspaceId)
.option("datasetId", datasetId)
.load();]
To write to a dataset, the Connector will create a new dataset to write to. Pass in the name for your new dataset as well as the workspaceId
.
String datasetName = "my-kdp-spark-test”;
counts.write().format("com.koverse.spark.write")
.option("workspaceId", workspaceId)
.option("datasetName", datasetName)
.mode("Append").save();
To configure your dataset for attribute-based access control (ABAC) you will need to include the required options of: parser
, labeledField
, HandlingPolicy
, datasetName
, and workspaceId
. The option replacementString
is required if you choose the handlingPolicy
of REPLACE. See ABAC for more details.
String parser = "simple-parser";
String abacDatasetName = “my-kdp-spark-abac-test”;
labeledData.write().format("com.koverse.spark.write")
.option("labelParser",parser)
.option("labeledFields","label")
.option("handlingPolicy","REPLACE")
.option("replacementString","NOPE")
.option("kdpUrl", kdpUrl)
.option("workspaceId", workspaceId)
.option("datasetName", abacDatasetName)
.mode("Append").save();
For the full example go to our examples repository on GitHub and see the Spark Connector - KdpCount.
Using Koverse Spark Connector in Databricks
Note: Please ensure you set up an account and configure a cluster in Databricks prior to starting these instructions.
In Databricks, select Compute on the left and click on the cluster that will be used to run Koverse jobs.
In the Cluster Configuration select Advanced Options.
Select the Spark tab.
Add the following Environment Variables:
[Koverse or Keycloak Credentials] see above in the **Using the Koverse Spark Connector in Your Spark Application** section.
PYSPARK_PYTHON=/databricks/python3/bin/python3
JNAME=zulu11-ca-amd64The
spark-connector_2.12-<version>.jar
will need to be uploaded to the databricks cluster that Koverse jobs will be run on.Select the Libraries tab.
Click Install New.
Drag and drop the jar into the Install library dialog.
Click Install.
Restart the cluster.
Now the cluster is ready to run Koverse jobs.
Create a new notebook.
Here is an example of a notebook that reads a dataset called actorfilms, then transforms and writes back to Koverse as a new dataset:
NOTE: actorfilms.csv can be found here
import com.koverse.spark.read._
import com.koverse.spark.write._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions.Window
val actorfilms = spark.read.format("com.koverse.spark.read").option("workspaceId", "<kdp_workspace_id>").option("datasetId","<kdp_dataset_id>").option("kdpUrl","https://api.dev.koverse.com").load()
val top = actorfilms.filter($"Rating" >= 5).filter($"Year" >= 2000).toDF
val ranked = top.drop(col("Actor")).drop(col("ActorID")).dropDuplicates.withColumn("Rank", rank().over(Window.orderBy(desc("Rating"), desc("Votes"), desc("Year")))).toDF
val rounded = ranked.withColumn("Rounded", round($"Rating"))
display(rounded)
ranked.write.format("com.koverse.spark.write").option("kdpUrl", "https://api.dev.koverse.com").option("workspaceId", "emily").option("datasetName", "top_movies").mode("Append").save()After running the above Spark job you can create a visualization of the data by clicking New Visualization in the results table.
Once the notebook is created, it can be run from the notebook cell.
This notebook can also be run by creating a new job pointing to the notebook in Workflows.
Select Workflows on the left.
Click Create Job.
Name the task.
Select an existing cluster or use
shared_job_cluster
.If you select
shared_job_cluster
make sure you configure according to steps 1 through 5.iv.