from graph import Graph
graph = Graph('DiscoSnp/test/large_test/discoRes_k_31_c_auto.h5') # chr1 with simulated variants
graph
help(graph)
Iterate over branching nodes:
for i, node in enumerate(graph):
print('{}: {!r}'.format(i, node))
if i > 10: break
Graph is a factory for Nodes:
kmer = b'AACGAGCACCAAAGACTTAGCATGAAAACCC'
node = graph[kmer] # Either a real graph node or one of their neighbors
node
help(node)
bytes(node) # Conversion to bytestring encoded kmer
assert node.reversed == node
node.reversed
Query neighbors and degrees:
print(node.succs, node.out_degree)
print(node.preds, node.in_degree)
Query neighbors by manually doing the extension:
node_kmer = bytes(node)
for ext in b'ATGC':# NB: iterating over bytes produces character codes
ext = bytes((ext,)) # So this line reconstruct a single character bytes object
ext_kmer = node_kmer[1:] + ext
ext_node = graph[ext_kmer] # Construct the Node from the bytes encoded kmer
if ext_node in graph: # Checks if the node belong to the graph
print(ext_node)
Simple paths from neighbors are obtained as list of (path, end node, end reason):
node.paths
Both paths ends at the same k-mer : this is a bubble induced by A/T on the first nucleotide in the path.
This code tests the forward paths results for all branching nodes and their RCs:
def check_paths(origin_node):
origin_node_kmer = bytes(origin_node)
for path, end_node, end_reason in origin_node.paths:
assert (origin_node_kmer + path).endswith(bytes(end_node))
if end_reason == 2: # In-branching end reason prioritized over out-branching
assert end_node.in_degree > 1
elif end_reason == 1:
assert end_node.out_degree > 1 # Out-branching
elif end_reason == 3: # Dead-end
assert end_node.out_degree == 0
for origin_node in graph:
check_paths(origin_node)
origin_node.reverse() # in-place reverse
check_paths(origin_node)
A graph (branching node, simple path) is constructed by a bounded breadth first seach over forward simple paths.
The start node is the running example (AACGAGCACCAAAGACTTAGCATGAAAACCC) of a node preceding a bubble.
from math import log1p
import collections
import networkx as nx
def breadth_first_search(root, max_depth=10000, max_nodes=1000):
"max_depth in nucleotides, max_nodes in branching nodes encountered"
G = nx.MultiDiGraph()
G.add_node(root, er=0, depth=0)
queue = collections.deque([(root, 0)])
while queue:
origin_node, origin_depth = queue.popleft()
for path, succ_node, succ_er in origin_node.paths:
if succ_er == 3:
continue # drops ending simplePaths
if succ_node not in G.node:
succ_depth = origin_depth + len(path)
if succ_depth >= max_depth:
succ_er |= 4 # Mark nodes as not fully explored
else:
queue.append((succ_node, succ_depth))
G.add_node(succ_node, er=succ_er, depth=succ_depth)
G.add_edge(origin_node, succ_node,
weight=1/log1p(len(path)),
path=path)
if G.number_of_nodes() >= max_nodes:
for origin_node, origin_depth in queue:
G.node[origin_node]['er'] |= 4 # Mark nodes as not fully explored
break
return G
G = breadth_first_search(node, max_nodes=150)
The edges (paths) visited by the BFS are then displayed with matplotlib :
The numbers on the edges are the simple path lengths.
# Sorry, networkx's drawing routines are unusable here, I define my own, not perfect either...
import matplotlib.pyplot as plt
%matplotlib inline
from matplotlib.patches import FancyArrowPatch, Circle
import numpy as np
def draw_network(G, pos, ax):
circles = {}
for node, node_data in G.nodes(data=True):
er = node_data['er']
node_pos = pos[node]
if node.in_degree > 1:
if node.out_degree > 1:
color = '#00ffaa' # Cyan: in and out-branching
else:
color = '#55ff55' # Green: in-branching
else:
assert node.out_degree > 1
color = '#5555ff' # Blue: out-branching
c = Circle(pos[node],
radius=0.005, alpha=0.5, facecolor=color,
lw=1, edgecolor='r' if er & 4 else color) # Red edge: exploration pruned
ax.add_patch(c)
circles[node] = c
k = len(node) # kmer size
for pred, succ2edges in G.succ.items():
circle_pred = circles[pred]
x_pred, y_pred = circle_pred.center
for succ, edges in succ2edges.items():
circle_succ = circles[succ]
x_succ, y_succ = circle_succ.center
paths = [edge_data['path'] for edge_data in edges.values()]
for i, path in enumerate(paths):
rad = 0.2*(1+i//2)*(-1)**i # edge radius: 0.1, -0.1, 0.2, -0.2, ...
e = FancyArrowPatch(circle_pred.center, circle_succ.center,
patchA=circle_pred, patchB=circle_succ,
arrowstyle='-|>', mutation_scale=5,
connectionstyle='arc3,rad=%.2f'%rad,
lw=1, alpha=0.5, color='k')
ax.add_patch(e)
# Edge labels
if len(paths) > 1: # bubble case
s = '%s+' % path[:-k].decode('ascii')
elif len(path) < 4: # dense branching
s = path.decode('ascii')
else:
s = str(len(path))
x = (x_pred + x_succ)/2 - (y_pred - y_succ)*rad/2
y = (y_pred + y_succ)/2 + (x_pred - x_succ)*rad/2
ax.text(x, y, s,
fontsize=8,
horizontalalignment='left' if (y_pred - y_succ)*rad < 0 else 'right',
verticalalignment='top' if (x_pred - x_succ)*rad < 0 else 'bottom')
f = plt.figure(figsize=(17,17))
ax = f.gca()
ax.grid(False)
ax.get_xaxis().set_visible(False)
ax.get_yaxis().set_visible(False)
pos = nx.layout.spectral_layout(G, weight='weight')
pos = {n: p + np.random.randn(2)*0.1 for n,p in pos.items()}
pos = nx.layout.spring_layout(G, pos=pos, weight='weight', iterations=2000, k=0.018)
draw_network(G, pos, ax=f.gca());
ax.set_aspect('equal', 'datalim')
ax.autoscale_view()