from __future__ import print_function
_usermap = {}
def add_mapping(watermark_value, pattern_value):
_usermap[watermark_value] = pattern_value
class Mapping(dict):
"""Mapping tracks equivalences from watermark values to
pattern values."""
def decode(self, watermark, unmapped_value):
"""Decode watermark values into pattern values."""
best_pv = None
best_wv = None
best_errs = 0
for start in watermark.decode_start_indices():
pv, wv, errs = self._decode(watermark, start,
unmapped_value)
if best_pv is None or errs < best_errs:
best_pv = pv
best_wv = wv
best_errs = errs
return best_pv, best_wv, best_errs
def _decode(self, watermark, start, unmapped_value):
pattern_values = []
watermark_values = watermark.values(start)
errs = 0
for wv in watermark_values:
try:
pv = self[wv]
except KeyError:
try:
pv = _usermap[wv]
except KeyError:
errs += 1
pv = unmapped_value
pattern_values.append(pv)
return pattern_values, watermark_values, errs
class MappingSet(list):
"""MappingSet is a set of Mapping instances. Mapping instances
added to MappingSet's should not be modified subsequently."""
def add(self, m):
for om in self:
if m == om:
return
self.append(m)
def reconcile(initial, additional):
"""Reconcile to sets of mappings.
Do an all-by-all check to see whether two mappings define
different key-value pairs. If so, they are incompatible
and no new mapping is generated; otherwise, combine the
key-value pairs from the two mappings into a single mapping
and add to new set."""
if not initial:
# If initial set is empty, then just return additions
return additional
new_mappings = MappingSet()
for im in initial:
for am in additional:
new = _reconcile(im, am)
if new:
new_mappings.add(new)
if not new_mappings:
print("initial", len(initial))
print("additional", len(additional))
raise ValueError("incompatible mapping sets")
return new_mappings
def _reconcile(im, am):
m = Mapping()
for key in set(im.keys() + am.keys()):
try:
iv = im[key]
except KeyError:
m[key] = am[key]
continue
try:
av = am[key]
except KeyError:
m[key] = im[key]
continue
if iv != av:
return None
m[key] = iv
return m
if __name__ == "__main__":
import unittest
class TestMapping(unittest.TestCase):
def setUp(self):
self.m1 = Mapping({
"GTT": "A",
"CGA": "B",
"ATA": "C",
})
self.m2 = Mapping({
"ATA": "C",
"TTT": "D",
})
self.m3 = Mapping({
"ATA": "X",
"TTT": "D",
})
self.m4 = Mapping({
"ATA": "C",
"AAA": "E",
})
from watermark import DNAWatermark
self.w = DNAWatermark("GTTATACGACCCGTT")
def test_decode(self):
pv, wv, errs = self.m1.decode(self.w, '?')
self.assertEqual(''.join(pv), "ACB?A")
self.assertEqual(errs, 1)
def test_internal_reconcile(self):
m = _reconcile(self.m1, self.m2)
answer = Mapping({
"GTT": "A",
"CGA": "B",
"ATA": "C",
"TTT": "D",
})
self.assertEqual(m, answer)
m = _reconcile(self.m1, self.m3)
self.assertIsNone(m)
def test_reconcile(self):
m = reconcile(MappingSet([self.m1]),
MappingSet([self.m2, self.m3, self.m4]))
self.assertEqual(len(m), 2)
unittest.main()