python - Conditionally Combining/Reducing key-pairs -


i've had issue time now, , think has lack of understanding of how use combinebykey , reducebykey, can clear up.

i working dna sequences, have procedure produce bunch of different versions of (forwards, backwards , complimented). have several reading frames, meaning string abcabc, want following series of keys: abc abc, a bca bc, ab cab c.

right using following function break things (i run in flatmap procedure):

# modified http://stackoverflow.com/questions/312443/how-do-you-split-a-list-into-evenly-sized-chunks-in-python def chunkcodons((seq, strand, reading_frame)):     """      yield successive codons seq      """     # first characters     if reading_frame > 0:         yield (0, seq[0:reading_frame], strand, reading_frame)     in xrange(reading_frame, len(seq), 3):         if % 1000000 == 0:             print "base # = {:,}".format(i)         yield (i, seq[i:i + 3], strand, reading_frame) 

i run so: reading_frames_rdd = nascent_reading_frames_rdd.flatmap(chunkcodons)

however, takes long time on long string of dna, know has wrong.

therefore, want have spark in more direct fashion breaking character (i.e. base) , recombining 3 @ time. problem have combine keys not same, adjacent. meaning if have (1, 'a'), (2, 'b'), (3, 'c'),...., want able generate (1, 'abc').

i have no idea how this. suspect need use combinebykey , have produce output conditionally. have produce output can consumed combinebykey if meets conditions? how should it?

edit:

here input: [(0, 'a'), (1, 'a'), (2, 'b'), (3, 'a'), (4, 'c'), ....]

i want output this: [(0, 0, 'aab'), (0, 1, 'abx'), ...] , [(1, 0, 'a'), (1, 1, 'aba'), (1, 2, 'cxx')...].

the format [(reading frame, first base #, sequence)]

you can try this:

seq = sc.parallelize(zip(xrange(16), "atcgatgcatgcatgc"))  (seq  .flatmap(lambda (pos, x): ((pos - i, (pos, x)) in range(3)))  .groupbykey()  .mapvalues(lambda x: ''.join(v (pos, v)  in sorted(x)))  .filter(lambda (pos, codon): len(codon) == 3)  .map(lambda (pos, codon): (pos % 3, pos, codon))  .collect()) 

and result:

[(0, 0, 'atc'),  (1, 1, 'tcg'),  (2, 2, 'cga'),  (0, 3, 'gat'),  (1, 4, 'atg'),  (2, 5, 'tgc'),  (0, 6, 'gca'),  (1, 7, 'cat'),  (2, 8, 'atg'),  (0, 9, 'tgc'),  (1, 10, 'gca'),  (2, 11, 'cat'),  (0, 12, 'atg'),  (1, 13, 'tgc')] 

in practice try else:

from toolz.itertoolz import sliding_window, iterate, map, zip itertools import product numpy import uint8  def inc(x):     return x + uint8(1)  # create dictionary mapping codon integer mapping = dict(zip(product('atcg', repeat=3), iterate(inc, uint8(0))))  seq = sc.parallelize(["atcgatgcatgcatgc"])  (seq   # generate pairs (start-position, 3-gram)   .flatmap(lambda s: zip(iterate(inc, 0), sliding_window(3, s)))   # map 3-grams respective integers   .map(lambda (pos, seq): (pos, mapping.get(seq)))   .collect()) 

reading frame redundant , can obtained start position @ moment omitted here.

simple mapping between codon , small integer can save lot of memory , traffic.


Comments

Popular posts from this blog

PHP DOM loadHTML() method unusual warning -

python - How to create jsonb index using GIN on SQLAlchemy? -

c# - TransactionScope not rolling back although no complete() is called -