Apache Spark
OakVar is natively integrated with Apache Spark, allowing annotation on the big genome project scale.
Since OakVar's annotation capability is unlimited through Python, including AI/ML-based annotation, this means that your Apache Spark pipeline can annotate variants in unlimited ways.
Let's see how it is done.
First, create a Spark session.
import pyspark
from pyspark.sql import SparkSession
conf = pyspark.SparkConf().setAppName("ov")
sc = pyspark.SparkContext(conf=conf)
spark = SparkSession(sc)
Then, let's make a test set of variants as a Spark DataFrame.
data = [
{"chrom": "chr7", "pos":140734758, "ref_base": "T", "alt_base": "C"},
{"chrom": "chr7", "pos":140734780, "ref_base": "-", "alt_base": "G"},
{"chrom": "chr7", "pos":140736487, "ref_base": "GTGCGA", "alt_base": "-"},
{"chrom": "chr7", "pos":140736487, "ref_base": "GTGCGAT", "alt_base": "-"},
{"chrom": "chr7", "pos":140742186, "ref_base": "-", "alt_base": "T"},
{"chrom": "chr7", "pos":140753351, "ref_base": "A", "alt_base": "G"},
{"chrom": "chr7", "pos":140800417, "ref_base": "CT", "alt_base": "-"},
{"chrom": "chr7", "pos":140800417, "ref_base": "CTG", "alt_base": "-"},
{"chrom": "chr7", "pos":140807936, "ref_base": "A", "alt_base": "-"},
{"chrom": "chr7", "pos":140924703, "ref_base": "A", "alt_base": "T"},
{"chrom": "chr7", "pos":148847298, "ref_base": "A", "alt_base": "T"},
{"chrom": "chr7", "pos":27199497, "ref_base": "C", "alt_base": "G"},
{"chrom": "chr7", "pos":2958506, "ref_base": "A", "alt_base": "T"},
{"chrom": "chr7", "pos":50319062, "ref_base": "A", "alt_base": "T"},
{"chrom": "chr7", "pos":55019278, "ref_base": "A", "alt_base": "T"},
{"chrom": "chr7", "pos":55019338, "ref_base": "G", "alt_base": "A"},
{"chrom": "chr7", "pos":55181319, "ref_base": "-", "alt_base": "GGGTTG"},
{"chrom": "chr8", "pos":127738263, "ref_base": "A", "alt_base": "T"},
{"chrom": "chr8", "pos":43018497, "ref_base": "A", "alt_base": "G"},
{"chrom": "chr9", "pos":107489172, "ref_base": "A", "alt_base": "T"},
{"chrom": "chr9", "pos":130714320, "ref_base": "A", "alt_base": "T"},
{"chrom": "chr9", "pos":132928872, "ref_base": "A", "alt_base": "T"},
{"chrom": "chr9", "pos":136545786, "ref_base": "A", "alt_base": "T"},
{"chrom": "chr9", "pos":21968622, "ref_base": "C", "alt_base": "-"},
{"chrom": "chr9", "pos":21974827, "ref_base": "A", "alt_base": "T"},
{"chrom": "chr9", "pos":37034031, "ref_base": "A", "alt_base": "T"},
{"chrom": "chr9", "pos":5021988, "ref_base": "A", "alt_base": "T"},
for uid in range(len(data)):
data[uid]["uid"] = uid
df = spark.createDataFrame(data)
print("Original DataFrame")
Original DataFrame
|alt_base|chrom| pos|ref_base|uid|
| C| chr7|140734758| T| 0|
| G| chr7|140734780| -| 1|
| -| chr7|140736487| GTGCGA| 2|
| -| chr7|140736487| GTGCGAT| 3|
| T| chr7|140742186| -| 4|
| G| chr7|140753351| A| 5|
| -| chr7|140800417| CT| 6|
| -| chr7|140800417| CTG| 7|
| -| chr7|140807936| A| 8|
| T| chr7|140924703| A| 9|
| T| chr7|148847298| A| 10|
| G| chr7| 27199497| C| 11|
| T| chr7| 2958506| A| 12|
| T| chr7| 50319062| A| 13|
| T| chr7| 55019278| A| 14|
| A| chr7| 55019338| G| 15|
| GGGTTG| chr7| 55181319| -| 16|
| T| chr8|127738263| A| 17|
| G| chr8| 43018497| A| 18|
| T| chr9|107489172| A| 19|
only showing top 20 rows
Then, we create a Spark Resilient Distributed Dataset (RDD).
rdd = sc.parallelize(data, 4)
Then, let's define a custom function which will be run in each worker node and for a partition of the RDD.
import oakvar as ov
def get_ov_annotation(iterator):
mapper = ov.get_mapper("gencode")
clinvar = ov.get_annotator("clinvar")
for row in iterator:
ret = mapper.map(row)
ret = clinvar.append_annotation(ret)
yield ret
This function will be run as a standalone function in worker nodes, so the worker nodes should already have OakVar installed and their Python should have access to the installed OakVar package.
This function loads a gencode
mapper and a clinvar
annotation module, and for variant, runs the mapper and the annotator.
Let's apply this custom function to the variant RDD.
ret = rdd.mapPartitions(get_ov_annotation).collect()
will be a list
of dict
, each dict
corresponding to one annotated variant.
Let's create a new RDD with annotated variants. We'll use GENCODE mapper (gencode
) and ClinVar annotator (clinvar
). They should be already installed in the worker nodes.
schema = ov.lib.util.run.get_spark_schema(["gencode", "clinvar"])
rdd = spark.createDataFrame(ret, schema)
|uid|chrom| pos|ref_base|alt_base|note|coding| hugo| transcript| so| cchange| achange|exonno| all_mappings|clinvar__uid| clinvar__sig|clinvar__disease_refs|clinvar__disease_names| clinvar__rev_stat|clinvar__id|clinvar__sig_conf|
| 0| chr7|140734758| T| C|NULL| Y| BRAF|ENST00000646891.2|MIS| c.2140A>G| p.Ile714Val| 18|{"BRAF": [["A0A2U...| NULL|Uncertain signifi...| MONDO:MONDO:00055...| Colorectal cancer...|criteria provided...| 1410272| NULL|
| 1| chr7|140734780| -| G|NULL| | BRAF|ENST00000646891.2|INT|c.2128-10_2128-9insC|
| 2| chr7|140736487| GTGCGA| -|NULL| | BRAF|ENST00000646891.2|INT|c.2128-1722_2128-...|
| 3| chr7|140736487| GTGCGAT| -|NULL| | BRAF|ENST00000646891.2|INT|c.2128-1723_2128-...|
| 4| chr7|140742186| -| T|NULL| | BRAF|ENST00000646891.2|INT| c.1993-2240dup|
| 5| chr7|140753351| A| G|NULL| Y| BRAF|ENST00000646891.2|MIS| c.1784T>C| p.Phe595Se
| 6| chr7|140800417| CT| -|NULL| Y| BRAF|ENST00000646891.2|FSD| c.927_928del| p.Glu309AspfsTer4
| 7| chr7|140800417| CTG| -|NULL| Y| BRAF|ENST00000646891.2|IND| c.923_925del| p.Ala308de
| 8| chr7|140807936| A| -|NULL| | BRAF|ENST00000646891.2|INT| c.711+24del|
| 9| chr7|140924703| A| T|NULL| Y| BRAF|ENST00000646891.2|SYN| c.1T>A| p.Met1
| 10| chr7|148847298| A| T|NULL| Y| EZH2|ENST00000320356.7|SYN| c.1T>A| p.Met1
| 11| chr7| 27199497| C| G|NULL| Y|HOXA13|ENST00000649031.1|MIS| c.581G>C| p.Cys194Se
| 12| chr7| 2958506| A| T|NULL| Y|CARD11|ENST00000396946.9|SYN| c.1T>A| p.Met1
| 13| chr7| 50319062| A| T|NULL| Y| IKZF1|ENST00000331340.8|MLO| c.1A>T| p.Met1
| 14| chr7| 55019278| A| T|NULL| Y| EGFR|ENST00000275493.7|MLO| c.1A>T| p.Met1?| 1|{"EGFR": [["P0053...| NULL| NULL| NULL| NULL| NULL| NULL| NULL|
| 15| chr7| 55019338| G| A|NULL| Y| EGFR|ENST00000275493.7|MIS| c.61G>A| p.Ala21Thr| 1|{"EGFR": [["P0053...| NULL|Uncertain signifi...| MedGen:CN130014| EGFR-related lung...|criteria provided...| 848579| NULL|
| 16| chr7| 55181319| -| GGGTTG|NULL| Y| EGFR|ENST00000275493.7|INI|c.2309_2310insGGGTTG|p.Asp770delinsGlu...| 20|{"EGFR": [["P0053...| NULL| NULL| NULL| NULL| NULL| NULL| NULL|
| 17| chr8|127738263| A| T|NULL| Y| MYC|ENST00000377970.6|MLO| c.1A>T| p.Met1?| 2|{"CASC11": [["", ...| NULL| NULL| NULL| NULL| NULL| NULL| NULL|
| 18| chr8| 43018497| A| G|NULL| Y| HOOK3|ENST00000307602.9|STL| c.2156A>G| p.Ter719TrpextTer84| 22|{"HOOK3": [["Q86V...| NULL| NULL| NULL| NULL| NULL| NULL| NULL|
| 19| chr9|107489172| A| T|NULL| Y| KLF4|ENST00000374672.5|SYN| c.1T>A| p.Met1=| 1|{"ENSG00000289987...| NULL| NULL| NULL| NULL| NULL| NULL| NULL|
only showing top 20 rows
The annotated variants can be saved as Parquet files.
We are considering adding more helper methods to OakVar to make this process more ergonomic. Please let us know what you think at our Discord server at https://discord.gg/wZfkTMKTjG.