Skip to content

Apache Spark and OakVar

OakVar is natively integrated with Apache Spark, allowing annotation on the big genome project scale.

Since OakVar's annotation capability is unlimited through Python, including AI/ML-based annotation, this means that your Apache Spark pipeline can annotate variants in unlimited ways.

Let's see how it is done.

First, create a Spark session.

import pyspark
from pyspark.sql import SparkSession

conf = pyspark.SparkConf().setAppName("ov")
sc = pyspark.SparkContext(conf=conf)
spark = SparkSession(sc)

Then, let's make a test set of variants as a Spark DataFrame.

data = [
        {"chrom": "chr7", "pos":140734758, "ref_base": "T", "alt_base": "C"},
        {"chrom": "chr7", "pos":140734780, "ref_base": "-", "alt_base": "G"},
        {"chrom": "chr7", "pos":140736487, "ref_base": "GTGCGA", "alt_base": "-"},
        {"chrom": "chr7", "pos":140736487, "ref_base": "GTGCGAT", "alt_base": "-"},
        {"chrom": "chr7", "pos":140742186, "ref_base": "-", "alt_base": "T"},
        {"chrom": "chr7", "pos":140753351, "ref_base": "A", "alt_base": "G"},
        {"chrom": "chr7", "pos":140800417, "ref_base": "CT", "alt_base": "-"},
        {"chrom": "chr7", "pos":140800417, "ref_base": "CTG", "alt_base": "-"},
        {"chrom": "chr7", "pos":140807936, "ref_base": "A", "alt_base": "-"},
        {"chrom": "chr7", "pos":140924703, "ref_base": "A", "alt_base": "T"},
        {"chrom": "chr7", "pos":148847298, "ref_base": "A", "alt_base": "T"},
        {"chrom": "chr7", "pos":27199497, "ref_base": "C", "alt_base": "G"},
        {"chrom": "chr7", "pos":2958506, "ref_base": "A", "alt_base": "T"},
        {"chrom": "chr7", "pos":50319062, "ref_base": "A", "alt_base": "T"},
        {"chrom": "chr7", "pos":55019278, "ref_base": "A", "alt_base": "T"},
        {"chrom": "chr7", "pos":55019338, "ref_base": "G", "alt_base": "A"},
        {"chrom": "chr7", "pos":55181319, "ref_base": "-", "alt_base": "GGGTTG"},
        {"chrom": "chr8", "pos":127738263, "ref_base": "A", "alt_base": "T"},
        {"chrom": "chr8", "pos":43018497, "ref_base": "A", "alt_base": "G"},
        {"chrom": "chr9", "pos":107489172, "ref_base": "A", "alt_base": "T"},
        {"chrom": "chr9", "pos":130714320, "ref_base": "A", "alt_base": "T"},
        {"chrom": "chr9", "pos":132928872, "ref_base": "A", "alt_base": "T"},
        {"chrom": "chr9", "pos":136545786, "ref_base": "A", "alt_base": "T"},
        {"chrom": "chr9", "pos":21968622, "ref_base": "C", "alt_base": "-"},
        {"chrom": "chr9", "pos":21974827, "ref_base": "A", "alt_base": "T"},
        {"chrom": "chr9", "pos":37034031, "ref_base": "A", "alt_base": "T"},
        {"chrom": "chr9", "pos":5021988, "ref_base": "A", "alt_base": "T"},
]
for uid in range(len(data)):
    data[uid]["uid"] = uid
df = spark.createDataFrame(data)
print("Original DataFrame")
df.show()

Output:

Original DataFrame
+--------+-----+---------+--------+---+
|alt_base|chrom|      pos|ref_base|uid|
+--------+-----+---------+--------+---+
|       C| chr7|140734758|       T|  0|
|       G| chr7|140734780|       -|  1|
|       -| chr7|140736487|  GTGCGA|  2|
|       -| chr7|140736487| GTGCGAT|  3|
|       T| chr7|140742186|       -|  4|
|       G| chr7|140753351|       A|  5|
|       -| chr7|140800417|      CT|  6|
|       -| chr7|140800417|     CTG|  7|
|       -| chr7|140807936|       A|  8|
|       T| chr7|140924703|       A|  9|
|       T| chr7|148847298|       A| 10|
|       G| chr7| 27199497|       C| 11|
|       T| chr7|  2958506|       A| 12|
|       T| chr7| 50319062|       A| 13|
|       T| chr7| 55019278|       A| 14|
|       A| chr7| 55019338|       G| 15|
|  GGGTTG| chr7| 55181319|       -| 16|
|       T| chr8|127738263|       A| 17|
|       G| chr8| 43018497|       A| 18|
|       T| chr9|107489172|       A| 19|
+--------+-----+---------+--------+---+
only showing top 20 rows

Then, we create a Spark Resilient Distributed Dataset (RDD).

rdd = sc.parallelize(data, 4)

Then, let's define a custom function which will be run in each worker node and for a partition of the RDD.

import oakvar as ov

def get_ov_annotation(iterator):
    mapper = ov.get_mapper("gencode")
    clinvar = ov.get_annotator("clinvar")
    for row in iterator:
        ret = mapper.map(row)
        ret = clinvar.append_annotation(ret)
        yield ret

This function will be run as a standalone function in worker nodes, so the worker nodes should already have OakVar installed and their Python should have access to the installed OakVar package.

This function loads a gencode mapper and a clinvar annotation module, and for variant, runs the mapper and the annotator.

Let's apply this custom function to the variant RDD.

ret = rdd.mapPartitions(get_ov_annotation).collect()

ret will be a list of dict, each dict corresponding to one annotated variant.

Let's create a new RDD with annotated variants. We'll use GENCODE mapper (gencode) and ClinVar annotator (clinvar). They should be already installed in the worker nodes.

schema = ov.lib.util.run.get_spark_schema(["gencode", "clinvar"])
rdd = spark.createDataFrame(ret, schema)
rdd.show()

Output:

+---+-----+---------+--------+--------+----+------+------+-----------------+---+--------------------+--------------------+------+--------------------+------------+--------------------+---------------------+----------------------+--------------------+-----------+-----------------+
|uid|chrom|      pos|ref_base|alt_base|note|coding|  hugo|       transcript| so|             cchange|             achange|exonno|        all_mappings|clinvar__uid|        clinvar__sig|clinvar__disease_refs|clinvar__disease_names|   clinvar__rev_stat|clinvar__id|clinvar__sig_conf|
+---+-----+---------+--------+--------+----+------+------+-----------------+---+--------------------+--------------------+------+--------------------+------------+--------------------+---------------------+----------------------+--------------------+-----------+-----------------+
|  0| chr7|140734758|       T|       C|NULL|     Y|  BRAF|ENST00000646891.2|MIS|           c.2140A>G|         p.Ile714Val|    18|{"BRAF": [["A0A2U...|        NULL|Uncertain signifi...| MONDO:MONDO:00055...|  Colorectal cancer...|criteria provided...|    1410272|             NULL|
|  1| chr7|140734780|       -|       G|NULL|      |  BRAF|ENST00000646891.2|INT|c.2128-10_2128-9insC|
|  2| chr7|140736487|  GTGCGA|       -|NULL|      |  BRAF|ENST00000646891.2|INT|c.2128-1722_2128-...|
|  3| chr7|140736487| GTGCGAT|       -|NULL|      |  BRAF|ENST00000646891.2|INT|c.2128-1723_2128-...|
|  4| chr7|140742186|       -|       T|NULL|      |  BRAF|ENST00000646891.2|INT|      c.1993-2240dup|
|  5| chr7|140753351|       A|       G|NULL|     Y|  BRAF|ENST00000646891.2|MIS|           c.1784T>C|         p.Phe595Se
|  6| chr7|140800417|      CT|       -|NULL|     Y|  BRAF|ENST00000646891.2|FSD|        c.927_928del|  p.Glu309AspfsTer4
|  7| chr7|140800417|     CTG|       -|NULL|     Y|  BRAF|ENST00000646891.2|IND|        c.923_925del|         p.Ala308de
|  8| chr7|140807936|       A|       -|NULL|      |  BRAF|ENST00000646891.2|INT|         c.711+24del|
|  9| chr7|140924703|       A|       T|NULL|     Y|  BRAF|ENST00000646891.2|SYN|              c.1T>A|             p.Met1
| 10| chr7|148847298|       A|       T|NULL|     Y|  EZH2|ENST00000320356.7|SYN|              c.1T>A|             p.Met1
| 11| chr7| 27199497|       C|       G|NULL|     Y|HOXA13|ENST00000649031.1|MIS|            c.581G>C|         p.Cys194Se
| 12| chr7|  2958506|       A|       T|NULL|     Y|CARD11|ENST00000396946.9|SYN|              c.1T>A|             p.Met1
| 13| chr7| 50319062|       A|       T|NULL|     Y| IKZF1|ENST00000331340.8|MLO|              c.1A>T|             p.Met1
| 14| chr7| 55019278|       A|       T|NULL|     Y|  EGFR|ENST00000275493.7|MLO|              c.1A>T|             p.Met1?|     1|{"EGFR": [["P0053...|        NULL|                NULL|                 NULL|                  NULL|                NULL|       NULL|             NULL|
| 15| chr7| 55019338|       G|       A|NULL|     Y|  EGFR|ENST00000275493.7|MIS|             c.61G>A|          p.Ala21Thr|     1|{"EGFR": [["P0053...|        NULL|Uncertain signifi...|      MedGen:CN130014|  EGFR-related lung...|criteria provided...|     848579|             NULL|
| 16| chr7| 55181319|       -|  GGGTTG|NULL|     Y|  EGFR|ENST00000275493.7|INI|c.2309_2310insGGGTTG|p.Asp770delinsGlu...|    20|{"EGFR": [["P0053...|        NULL|                NULL|                 NULL|                  NULL|                NULL|       NULL|             NULL|
| 17| chr8|127738263|       A|       T|NULL|     Y|   MYC|ENST00000377970.6|MLO|              c.1A>T|             p.Met1?|     2|{"CASC11": [["", ...|        NULL|                NULL|                 NULL|                  NULL|                NULL|       NULL|             NULL|
| 18| chr8| 43018497|       A|       G|NULL|     Y| HOOK3|ENST00000307602.9|STL|           c.2156A>G| p.Ter719TrpextTer84|    22|{"HOOK3": [["Q86V...|        NULL|                NULL|                 NULL|                  NULL|                NULL|       NULL|             NULL|
| 19| chr9|107489172|       A|       T|NULL|     Y|  KLF4|ENST00000374672.5|SYN|              c.1T>A|             p.Met1=|     1|{"ENSG00000289987...|        NULL|                NULL|                 NULL|                  NULL|                NULL|       NULL|             NULL|
+---+-----+---------+--------+--------+----+------+------+-----------------+---+--------------------+--------------------+------+--------------------+------------+--------------------+---------------------+----------------------+--------------------+-----------+-----------------+
only showing top 20 rows

The annotated variants can be saved as Parquet files.

df.write.parquet("annotated_variants.parquet")

We are considering adding more helper methods to OakVar to make this process more ergonomic. Please let us know what you think at our Discord server at https://discord.gg/wZfkTMKTjG.