Usage
Examples
The following examples have the following spec
fields in common:
-
version
: the current version is "1.0" -
sparkImage
: the docker image that will be used by job, driver and executor pods. This can be provided by the user. -
mode
: onlycluster
is currently supported -
mainApplicationFile
: the artifact (Java, Scala or Python) that forms the basis of the Spark job. -
args
: these are the arguments passed directly to the application. In the examples below it is e.g. the input path for part of the public New York taxi dataset. -
sparkConf
: these list spark configuration settings that are passed directly tospark-submit
and which are best defined explicitly by the user. Since theSparkApplication
"knows" that there is an external dependency (the s3 bucket where the data and/or the application is located) and how that dependency should be treated (i.e. what type of credential checks are required, if any), it is better to have these things declared together. -
volumes
: refers to any volumes needed by theSparkApplication
, in this case an underlyingPersistentVoulmeClaim
. -
driver
: driver-specific settings, including any volume mounts. -
executor
: executor-specific settings, including any volume mounts.
Job-specific settings are annotated below.
Pyspark: externally located artifact and dataset
---
apiVersion: spark.stackable.tech/v1alpha1
kind: SparkApplication
metadata:
name: example-sparkapp-external-dependencies
namespace: default
spec:
version: "1.0"
sparkImage: docker.stackable.tech/stackable/pyspark-k8s:3.3.0-stackable0.3.0
mode: cluster
mainApplicationFile: s3a://stackable-spark-k8s-jars/jobs/ny_tlc_report.py (1)
args:
- "--input 's3a://nyc-tlc/trip data/yellow_tripdata_2021-07.csv'" (2)
deps:
requirements:
- tabulate==0.8.9 (3)
sparkConf: (4)
"spark.hadoop.fs.s3a.aws.credentials.provider": "org.apache.hadoop.fs.s3a.AnonymousAWSCredentialsProvider"
"spark.driver.extraClassPath": "/dependencies/jars/*"
"spark.executor.extraClassPath": "/dependencies/jars/*"
volumes:
- name: job-deps (5)
persistentVolumeClaim:
claimName: pvc-ksv
driver:
volumeMounts:
- name: job-deps
mountPath: /dependencies (6)
executor:
instances: 3
volumeMounts:
- name: job-deps
mountPath: /dependencies (6)
1 | Job python artifact (external) |
2 | Job argument (external) |
3 | List of python job requirements: these will be installed in the pods via pip |
4 | Spark dependencies: the credentials provider (the user knows what is relevant here) plus dependencies needed to access external resources (in this case, in s3) |
5 | the name of the volume mount backed by a PersistentVolumeClaim that must be pre-existing |
6 | the path on the volume mount: this is referenced in the sparkConf section where the extra class path is defined for the driver and executors |
Pyspark: externally located dataset, artifact available via PVC/volume mount
---
apiVersion: spark.stackable.tech/v1alpha1
kind: SparkApplication
metadata:
name: example-sparkapp-image
namespace: default
spec:
version: "1.0"
image: docker.stackable.tech/stackable/ny-tlc-report:0.1.0 (1)
sparkImage: docker.stackable.tech/stackable/pyspark-k8s:3.3.0-stackable0.3.0
mode: cluster
mainApplicationFile: local:///stackable/spark/jobs/ny_tlc_report.py (2)
args:
- "--input 's3a://nyc-tlc/trip data/yellow_tripdata_2021-07.csv'" (3)
deps:
requirements:
- tabulate==0.8.9 (4)
sparkConf: (5)
"spark.hadoop.fs.s3a.aws.credentials.provider": "org.apache.hadoop.fs.s3a.AnonymousAWSCredentialsProvider"
job:
resources:
cpu:
min: "1"
max: "1"
memory:
limit: "1Gi"
driver:
resources:
cpu:
min: "1"
max: "1500m"
memory:
limit: "1Gi"
executor:
instances: 3
resources:
cpu:
min: "1"
max: "4"
memory:
limit: "2Gi"
1 | Job image: this contains the job artifact that will be retrieved from the volume mount backed by the PVC |
2 | Job python artifact (local) |
3 | Job argument (external) |
4 | List of python job requirements: these will be installed in the pods via pip |
5 | Spark dependencies: the credentials provider (the user knows what is relevant here) plus dependencies needed to access external resources (in this case, in an S3 store) |
JVM (Scala): externally located artifact and dataset
---
apiVersion: spark.stackable.tech/v1alpha1
kind: SparkApplication
metadata:
name: example-sparkapp-pvc
namespace: default
spec:
version: "1.0"
sparkImage: docker.stackable.tech/stackable/spark-k8s:3.3.0-stackable0.3.0
mode: cluster
mainApplicationFile: s3a://stackable-spark-k8s-jars/jobs/ny-tlc-report-1.0-SNAPSHOT.jar (1)
mainClass: org.example.App (2)
args:
- "'s3a://nyc-tlc/trip data/yellow_tripdata_2021-07.csv'"
sparkConf: (3)
"spark.hadoop.fs.s3a.aws.credentials.provider": "org.apache.hadoop.fs.s3a.AnonymousAWSCredentialsProvider"
"spark.driver.extraClassPath": "/dependencies/jars/*"
"spark.executor.extraClassPath": "/dependencies/jars/*"
volumes:
- name: job-deps (4)
persistentVolumeClaim:
claimName: pvc-ksv
driver:
volumeMounts:
- name: job-deps
mountPath: /dependencies (5)
executor:
instances: 3
volumeMounts:
- name: job-deps
mountPath: /dependencies (5)
1 | Job artifact located on S3. |
2 | Job main class |
3 | Spark dependencies: the credentials provider (the user knows what is relevant here) plus dependencies needed to access external resources (in this case, in an S3 store, accessed without credentials) |
4 | the name of the volume mount backed by a PersistentVolumeClaim that must be pre-existing |
5 | the path on the volume mount: this is referenced in the sparkConf section where the extra class path is defined for the driver and executors |
JVM (Scala): externally located artifact accessed with credentials
---
apiVersion: spark.stackable.tech/v1alpha1
kind: SparkApplication
metadata:
name: example-sparkapp-s3-private
spec:
version: "1.0"
sparkImage: docker.stackable.tech/stackable/spark-k8s:3.3.0-stackable0.3.0
mode: cluster
mainApplicationFile: s3a://my-bucket/spark-examples_2.12-3.3.0.jar (1)
mainClass: org.apache.spark.examples.SparkPi (2)
s3connection: (3)
inline:
host: test-minio
port: 9000
accessStyle: Path
credentials: (4)
secretClass: s3-credentials-class
sparkConf: (5)
spark.hadoop.fs.s3a.aws.credentials.provider: "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider" (6)
spark.driver.extraClassPath: "/dependencies/jars/hadoop-aws-3.2.0.jar:/dependencies/jars/aws-java-sdk-bundle-1.11.375.jar"
spark.executor.extraClassPath: "/dependencies/jars/hadoop-aws-3.2.0.jar:/dependencies/jars/aws-java-sdk-bundle-1.11.375.jar"
executor:
instances: 3
1 | Job python artifact (located in an S3 store) |
2 | Artifact class |
3 | S3 section, specifying the existing secret and S3 end-point (in this case, MinIO) |
4 | Credentials referencing a secretClass (not shown in is example) |
5 | Spark dependencies: the credentials provider (the user knows what is relevant here) plus dependencies needed to access external resources… |
6 | …in this case, in an S3 store, accessed with the credentials defined in the secret |
JVM (Scala): externally located artifact accessed with job arguments provided via configuration map
---
apiVersion: v1
kind: ConfigMap
metadata:
name: cm-job-arguments (1)
data:
job-args.txt: |
s3a://nyc-tlc/trip data/yellow_tripdata_2021-07.csv (2)
---
apiVersion: spark.stackable.tech/v1alpha1
kind: SparkApplication
metadata:
name: ny-tlc-report-configmap
namespace: default
spec:
version: "1.0"
sparkImage: docker.stackable.tech/stackable/spark-k8s:3.3.0-stackable0.3.0
mode: cluster
mainApplicationFile: s3a://stackable-spark-k8s-jars/jobs/ny-tlc-report-1.1.0.jar (3)
mainClass: tech.stackable.demo.spark.NYTLCReport
volumes:
- name: cm-job-arguments
configMap:
name: cm-job-arguments (4)
args:
- "--input /arguments/job-args.txt" (5)
sparkConf:
"spark.hadoop.fs.s3a.aws.credentials.provider": "org.apache.hadoop.fs.s3a.AnonymousAWSCredentialsProvider"
driver:
volumeMounts:
- name: cm-job-arguments (6)
mountPath: /arguments (7)
executor:
instances: 3
volumeMounts:
- name: cm-job-arguments (6)
mountPath: /arguments (7)
1 | Name of the configuration map |
2 | Argument required by the job |
3 | Job scala artifact that requires an input argument |
4 | The volume backed by the configuration map |
5 | The expected job argument, accessed via the mounted configuration map file |
6 | The name of the volume backed by the configuration map that will be mounted to the driver/executor |
7 | The mount location of the volume (this will contain a file /arguments/job-args.txt ) |
S3 bucket specification
You can specify S3 connection details directly inside the SparkApplication
specification or by referring to an external S3Bucket
custom resource.
To specify S3 connection details directly as part of the SparkApplication
resource you add an inline connection configuration as shown below.
s3connection: (1)
inline:
host: test-minio (2)
port: 9000 (3)
accessStyle: Path
credentials:
secretClass: s3-credentials-class (4)
1 | Entry point for the S3 connection configuration. |
2 | Connection host. |
3 | Optional connection port. |
4 | Name of the Secret object expected to contain the following keys: ACCESS_KEY_ID and SECRET_ACCESS_KEY |
It is also possible to configure the connection details as a separate Kubernetes resource and only refer to that object from the SparkApplication
like this:
s3connection:
reference: s3-connection-resource (1)
1 | Name of the connection resource with connection details. |
The resource named s3-connection-resource
is then defined as shown below:
---
apiVersion: s3.stackable.tech/v1alpha1
kind: S3Connection
metadata:
name: s3-connection-resource
spec:
host: test-minio
port: 9000
accessStyle: Path
credentials:
secretClass: minio-credentials-class
This has the advantage that one connection configuration can be shared across SparkApplications
and reduces the cost of updating these details.
Resource Requests
Stackable operators handle resource requests in a sligtly different manner than Kubernetes. Resource requests are defined on role or group level. See Roles and role groups for details on these concepts. On a role level this means that e.g. all workers will use the same resource requests and limits. This can be further specified on role group level (which takes priority to the role level) to apply different resources.
This is an example on how to specify CPU and memory resources using the Stackable Custom Resources:
---
apiVersion: example.stackable.tech/v1alpha1
kind: ExampleCluster
metadata:
name: example
spec:
workers: # role-level
config:
resources:
cpu:
min: 300m
max: 600m
memory:
limit: 3Gi
roleGroups: # role-group-level
resources-from-role: # role-group 1
replicas: 1
resources-from-role-group: # role-group 2
replicas: 1
config:
resources:
cpu:
min: 400m
max: 800m
memory:
limit: 4Gi
In this case, the role group resources-from-role
will inherit the resources specified on the role level. Resulting in a maximum of 3Gi
memory and 600m
CPU resources.
The role group resources-from-role-group
has maximum of 4Gi
memory and 800m
CPU resources (which overrides the role CPU resources).
For Java products the actual used Heap memory is lower than the specified memory limit due to other processes in the Container requiring memory to run as well. Currently, 80% of the specified memory limits is passed to the JVM. |
For memory only a limit can be specified, which will be set as memory request and limit in the Container. This is to always guarantee a Container the full amount memory during Kubernetes scheduling.
If no resources are configured explicitly, the operator uses the following defaults:
job:
resources:
cpu:
min: '500m'
max: "1"
memory:
limit: '1Gi'
driver:
resources:
cpu:
min: '1'
max: "2"
memory:
limit: '2Gi'
executor:
resources:
cpu:
min: '1'
max: "4"
memory:
limit: '4Gi'
The default values are most likely not sufficient to run a proper cluster in production. Please adapt according to your requirements. For more details regarding Kubernetes CPU limits see: Assign CPU Resources to Containers and Pods. |
Spark allocates a default amount of non-heap memory based on the type of job (JVM or non-JVM). This is taken into account when defining memory settings based exclusively on the resource limits, so that the "declared" value is the actual total value (i.e. including memory overhead). This may result in minor deviations from the stated resource value due to rounding differences.
It is possible to define Spark resources either directly by setting configuration properties listed under sparkConf , or by using resource limits. If both are used, then sparkConf properties take precedence. It is recommended for the sake of clarity to use either one or the other.
|
CRD argument coverage
Below are listed the CRD fields that can be defined by the user:
CRD field | Remarks |
---|---|
|
|
|
|
|
Job name |
|
"1.0" |
|
|
|
User-supplied image containing spark-job dependencies that will be copied to the specified volume mount |
|
Spark image which will be deployed to driver and executor pods, which must contain spark environment needed by the job e.g. |
|
Optional Enum (one of |
|
An optional list of references to secrets in the same namespace to use for pulling any of the images used by a |
|
The actual application file that will be called by |
|
The main class i.e. entry point for JVM artifacts |
|
Arguments passed directly to the job artifact |
|
S3 connection specification. See the S3 bucket specification for more details. |
|
A map of key/value strings that will be passed directly to |
|
A list of python packages that will be installed via |
|
A list of packages that is passed directly to |
|
A list of excluded packages that is passed directly to |
|
A list of repositories that is passed directly to |
|
A list of volumes |
|
The volume name |
|
The persistent volume claim backing the volume |
|
Resources specification for the initiating Job |
|
Resources specification for the driver Pod |
|
A list of mounted volumes for the driver |
|
Name of mount |
|
Volume mount path |
|
A dictionary of labels to use for node selection when scheduling the driver N.B. this assumes there are no implicit node dependencies (e.g. |
|
Resources specification for the executor Pods |
|
Number of executor instances launched for this job |
|
A list of mounted volumes for each executor |
|
Name of mount |
|
Volume mount path |
|
A dictionary of labels to use for node selection when scheduling the executors N.B. this assumes there are no implicit node dependencies (e.g. |