python - Conditionally Combining/Reducing key-pairs -
i've had issue time now, , think has lack of understanding of how use combinebykey , reducebykey, can clear up.
i working dna sequences, have procedure produce bunch of different versions of (forwards, backwards , complimented). have several reading frames, meaning string abcabc
, want following series of keys: abc abc
, a bca bc
, ab cab c
.
right using following function break things (i run in flatmap procedure):
# modified http://stackoverflow.com/questions/312443/how-do-you-split-a-list-into-evenly-sized-chunks-in-python def chunkcodons((seq, strand, reading_frame)): """ yield successive codons seq """ # first characters if reading_frame > 0: yield (0, seq[0:reading_frame], strand, reading_frame) in xrange(reading_frame, len(seq), 3): if % 1000000 == 0: print "base # = {:,}".format(i) yield (i, seq[i:i + 3], strand, reading_frame)
i run so: reading_frames_rdd = nascent_reading_frames_rdd.flatmap(chunkcodons)
however, takes long time on long string of dna, know has wrong.
therefore, want have spark in more direct fashion breaking character (i.e. base) , recombining 3 @ time. problem have combine keys not same, adjacent. meaning if have (1, 'a'), (2, 'b'), (3, 'c'),....
, want able generate (1, 'abc').
i have no idea how this. suspect need use combinebykey , have produce output conditionally. have produce output can consumed combinebykey if meets conditions? how should it?
edit:
here input: [(0, 'a'), (1, 'a'), (2, 'b'), (3, 'a'), (4, 'c'), ....]
i want output this: [(0, 0, 'aab'), (0, 1, 'abx'), ...]
, [(1, 0, 'a'), (1, 1, 'aba'), (1, 2, 'cxx')...]
.
the format [(reading frame, first base #, sequence)]
you can try this:
seq = sc.parallelize(zip(xrange(16), "atcgatgcatgcatgc")) (seq .flatmap(lambda (pos, x): ((pos - i, (pos, x)) in range(3))) .groupbykey() .mapvalues(lambda x: ''.join(v (pos, v) in sorted(x))) .filter(lambda (pos, codon): len(codon) == 3) .map(lambda (pos, codon): (pos % 3, pos, codon)) .collect())
and result:
[(0, 0, 'atc'), (1, 1, 'tcg'), (2, 2, 'cga'), (0, 3, 'gat'), (1, 4, 'atg'), (2, 5, 'tgc'), (0, 6, 'gca'), (1, 7, 'cat'), (2, 8, 'atg'), (0, 9, 'tgc'), (1, 10, 'gca'), (2, 11, 'cat'), (0, 12, 'atg'), (1, 13, 'tgc')]
in practice try else:
from toolz.itertoolz import sliding_window, iterate, map, zip itertools import product numpy import uint8 def inc(x): return x + uint8(1) # create dictionary mapping codon integer mapping = dict(zip(product('atcg', repeat=3), iterate(inc, uint8(0)))) seq = sc.parallelize(["atcgatgcatgcatgc"]) (seq # generate pairs (start-position, 3-gram) .flatmap(lambda s: zip(iterate(inc, 0), sliding_window(3, s))) # map 3-grams respective integers .map(lambda (pos, seq): (pos, mapping.get(seq))) .collect())
reading frame redundant , can obtained start position @ moment omitted here.
simple mapping between codon , small integer can save lot of memory , traffic.
Comments
Post a Comment