diff --git a/.gitignore b/.gitignore
index aa61b7a..61c6b93 100644
--- a/.gitignore
+++ b/.gitignore
@@ -2,4 +2,6 @@ venv/*
*.json
__pycache__
*.drawio
-*.yml
\ No newline at end of file
+*.yml
+grafana_test/
+testlabsv2/
\ No newline at end of file
diff --git a/clab2drawio.py b/clab2drawio.py
index b75a3ba..a40ac81 100644
--- a/clab2drawio.py
+++ b/clab2drawio.py
@@ -1,722 +1,155 @@
-from N2G import drawio_diagram
+#from N2G import drawio_diagram
+from lib.CustomDrawioDiagram import CustomDrawioDiagram
+from lib.Link import Link
+from lib.Node import Node
+from lib.Grafana import GrafanaDashboard
from collections import defaultdict
-import yaml, argparse, os, re, json
-import xml.etree.ElementTree as ET
-
-class CustomDrawioDiagram(drawio_diagram):
- # Overriding the drawio_diagram_xml with shadow=0
- drawio_diagram_xml = """
-
-
-
-
-
-
-
-
- """
-
- def __init__(self, styles, node_duplicates="skip", link_duplicates="skip"):
-
-
- background = styles['background']
- shadow = styles['shadow']
- grid = styles['grid']
- pagew = styles['pagew']
- pageh = styles['pageh']
-
- self.drawio_diagram_xml = f"""
-
-
-
-
-
-
-
-
- """
-
- super().__init__(node_duplicates, link_duplicates, )
-
- def calculate_new_group_positions(self, obj_pos_old, group_pos):
- # Adjust object positions relative to the new group's position
- obj_pos_new = (obj_pos_old[0] - group_pos[0], obj_pos_old[1] - group_pos[1])
- return obj_pos_new
-
- def group_nodes(self, member_objects, group_id, style=""):
- # Initialize bounding box coordinates
- min_x = min_y = float('inf')
- max_x = max_y = float('-inf')
-
- object_positions = [] # To store all object positions
-
- # Process each member object to update the bounding box
- for obj_id in member_objects:
- obj_mxcell = self.current_root.find(f".//object[@id='{obj_id}']/mxCell")
- if obj_mxcell is not None:
- geometry = obj_mxcell.find("./mxGeometry")
- if geometry is not None:
- x, y = float(geometry.get('x', '0')), float(geometry.get('y', '0'))
- width, height = float(geometry.get('width', '0')), float(geometry.get('height', '0'))
-
- # Store object positions and update bounding box
- object_positions.append((obj_id, x, y, width, height))
- min_x, min_y = min(min_x, x), min(min_y, y)
- max_x, max_y = max(max_x, x + width), max(max_y, y + height)
-
- # Define the group's position and size based on the bounding box
- group_x, group_y = min_x, min_y
- group_width, group_height = max_x - min_x, max_y - min_y
-
- # Create the group cell in the XML structure
- group_cell_xml = f"""
-
-
-
- """
- group_cell = ET.fromstring(group_cell_xml)
- self.current_root.append(group_cell)
-
- # Update positions of all objects within the group
- for obj_id, x, y, _, _ in object_positions:
- obj_pos_old = (x, y)
- obj_pos_new = self.calculate_new_group_positions(obj_pos_old, (group_x, group_y))
-
- obj_mxcell = self.current_root.find(f".//object[@id='{obj_id}']/mxCell")
- if obj_mxcell is not None:
- geometry = obj_mxcell.find("./mxGeometry")
- if geometry is not None:
- geometry.set('x', str(obj_pos_new[0]))
- geometry.set('y', str(obj_pos_new[1]))
- obj_mxcell.set("parent", group_id) # Set the object's parent to the new group
-
-def assign_graphlevels(nodes, links, verbose=False):
- """
- Assigns hierarchical graph levels to nodes based on connections or optional labels
- Returns a sorted list of nodes, their graph levels, and connection details.
- """
- node_graphlevels = {}
- for node, node_info in nodes.items():
- # Check if 'labels' is a dictionary
- labels = node_info.get('labels', {})
- if isinstance(labels, dict):
- graph_level = labels.get('graph-level', -1)
- graphlevel = labels.get('graphlevel', -1)
- node_graphlevels[node] = graph_level if graph_level != -1 else graphlevel
- else:
- node_graphlevels[node] = -1
-
- # Initialize the connections dictionary
- connections = {node: {'upstream': set(), 'downstream': set()} for node in nodes}
- for link in links:
- source, target = link['source'], link['target']
- connections[source]['downstream'].add(target)
- connections[target]['upstream'].add(source)
-
- # Helper function to assign graphlevel by recursively checking connections
- def set_graphlevel(node, current_graphlevel, verbose=False):
- if node_graphlevels[node] != -1 and node_graphlevels[node] < current_graphlevel:
- # Skip setting graphlevel if it is manually set and higher than the current graphlevel
- return
- node_graphlevels[node] = max(node_graphlevels[node], current_graphlevel)
- for downstream_node in connections[node]['downstream']:
- set_graphlevel(downstream_node, current_graphlevel + 1)
-
- # Start by setting the graphlevel of nodes with no upstream connections or with a manually set graphlevel
- for node in nodes:
- if node_graphlevels[node] == -1 and not connections[node]['upstream']:
- set_graphlevel(node, 0)
- elif node_graphlevels[node] != -1:
- # Manually set the graphlevel for nodes with a specified graphlevel
- set_graphlevel(node, node_graphlevels[node])
-
- # Dynamic approach to infer graphlevels from names
- prefix_map = {}
- for node in [n for n, graphlevel in node_graphlevels.items() if graphlevel == -1]:
- # Extract prefix (alphabetic part of the name)
- prefix = ''.join(filter(str.isalpha, node))
- if prefix not in prefix_map:
- prefix_map[prefix] = []
- prefix_map[prefix].append(node)
-
- # Attempt to assign graphlevels based on these groupings
- graphlevel_counter = max(node_graphlevels.values()) + 1
- for prefix, nodes in prefix_map.items():
- for node in nodes:
- node_graphlevels[node] = graphlevel_counter
- graphlevel_counter += 1
-
- sorted_nodes = sorted(node_graphlevels, key=lambda n: (node_graphlevels[n], n))
- return sorted_nodes, node_graphlevels, connections
-
-def center_align_nodes(nodes_by_graphlevel, positions, layout='vertical', verbose=False):
- """
- Center align nodes within each graphlevel based on the layout layout and ensure
- they are nicely distributed to align with the graphlevel above.
- """
-
- if layout == 'vertical':
- prev_graphlevel_center = None
- for graphlevel, nodes in sorted(nodes_by_graphlevel.items()):
- if prev_graphlevel_center is None:
- # For the first graphlevel, calculate its center and use it as the previous center for the next level
- graphlevel_min_x = min(positions[node][0] for node in nodes)
- graphlevel_max_x = max(positions[node][0] for node in nodes)
- prev_graphlevel_center = (graphlevel_min_x + graphlevel_max_x) / 2
- else:
- # Calculate current graphlevel's width and its center
- graphlevel_width = max(positions[node][0] for node in nodes) - min(positions[node][0] for node in nodes)
- graphlevel_center = sum(positions[node][0] for node in nodes) / len(nodes)
-
- # Calculate offset to align current graphlevel's center with the previous graphlevel's center
- offset = prev_graphlevel_center - graphlevel_center
-
- # Apply offset to each node in the current graphlevel
- for node in nodes:
- positions[node] = (positions[node][0] + offset, positions[node][1])
-
- # Update prev_graphlevel_center for the next level
- prev_graphlevel_center = sum(positions[node][0] for node in nodes) / len(nodes)
- else: # Horizontal layout
- prev_graphlevel_center = None
- for graphlevel, nodes in sorted(nodes_by_graphlevel.items()):
- if prev_graphlevel_center is None:
- # For the first graphlevel, calculate its center and use it as the previous center for the next level
- graphlevel_min_y = min(positions[node][1] for node in nodes)
- graphlevel_max_y = max(positions[node][1] for node in nodes)
- prev_graphlevel_center = (graphlevel_min_y + graphlevel_max_y) / 2
- else:
- # Calculate current graphlevel's height and its center
- graphlevel_height = max(positions[node][1] for node in nodes) - min(positions[node][1] for node in nodes)
- graphlevel_center = sum(positions[node][1] for node in nodes) / len(nodes)
-
- # Calculate offset to align current graphlevel's center with the previous graphlevel's center
- offset = prev_graphlevel_center - graphlevel_center
-
- # Apply offset to each node in the current graphlevel
- for node in nodes:
- positions[node] = (positions[node][0], positions[node][1] + offset)
-
- # Update prev_graphlevel_center for the next level
- prev_graphlevel_center = sum(positions[node][1] for node in nodes) / len(nodes)
-
-
-def adjust_intermediary_nodes_same_level(nodes_by_graphlevel, connections, positions, layout, verbose=False):
- """
- Identifies and adjusts positions of intermediary nodes on the same level to improve graph readability.
- Intermediary nodes directly connected to their preceding and following nodes are repositioned based on the layout.
- Returns the list of adjusted intermediary nodes and their new positions.
- """
-
- intermediaries = []
- if verbose:
- print("\nIdentifying intermediary nodes on the same level:")
-
- # Adjustment amount
- adjustment_amount = 100 # Adjust this value as needed
-
- # Iterate through each level and its nodes
- for level, nodes in nodes_by_graphlevel.items():
- # Determine the sorting key based on layout
- sort_key = lambda node: positions[node][1] if layout == 'horizontal' else positions[node][0]
-
- # Sort nodes based on their position
- sorted_nodes = sorted(nodes, key=sort_key)
-
- # Check connectivity and position to identify intermediaries
- for i in range(1, len(sorted_nodes) - 1): # Skip the first and last nodes
- prev_node, current_node, next_node = sorted_nodes[i-1], sorted_nodes[i], sorted_nodes[i+1]
-
- # Ensure prev_node and next_node are directly connected
- if next_node in connections[prev_node].get('downstream', []) or prev_node in connections[next_node].get('upstream', []):
- # Further check if current_node is directly connected to both prev_node and next_node
- if prev_node in connections[current_node].get('upstream', []) and next_node in connections[current_node].get('downstream', []):
- intermediaries.append(current_node)
- if verbose:
- print(f"{current_node} is an intermediary between {prev_node} and {next_node} on level {level}")
-
- # Adjust the position of the intermediary node based on the layout
- if layout == 'horizontal':
- # Move left for horizontal layout
- positions[current_node] = (positions[current_node][0] - adjustment_amount, positions[current_node][1])
- else:
- # Move down for vertical layout
- positions[current_node] = (positions[current_node][0], positions[current_node][1] + adjustment_amount)
- if verbose:
- print(f"Position of {current_node} adjusted to {positions[current_node]}")
+import yaml, argparse, os, re, random
- return intermediaries, positions
+def add_ports(diagram, styles, verbose=True):
+ nodes = diagram.nodes
+ # Calculate and set port positions for all nodes
+ for node in nodes.values():
+ links = node.get_all_links()
-def adjust_intermediary_nodes(nodes_by_graphlevel, connections, positions, layout, verbose=False):
- """
- Adjusts positions of intermediary nodes in a graph to avoid alignment issues between non-adjacent levels.
- It identifies nodes with indirect connections spanning multiple levels and repositions them to enhance clarity.
- Returns a set of nodes that were adjusted.
- """
-
- node_to_graphlevel = {node: level for level, nodes in nodes_by_graphlevel.items() for node in nodes}
- adjusted_nodes = set() # Set to track adjusted nodes
- upstream_positions = {}
-
- # Get all connections between non-adjacent levels
- non_adjacent_connections = []
- all_intermediary_nodes = set()
- for node, links in connections.items():
- node_level = node_to_graphlevel[node]
- for upstream in links['upstream']:
- upstream_level = node_to_graphlevel[upstream]
-
- # Check if the level is non-adjacent
- if abs(upstream_level - node_level) >= 2:
- # Check for the level between if it the nodes has adjacent connections to a node in this level
- intermediary_level = upstream_level + 1 if upstream_level < node_level else upstream_level - 1
- has_adjacent_connection = any(node_to_graphlevel[n] == intermediary_level for n in connections[upstream]['downstream']) or \
- any(node_to_graphlevel[n] == intermediary_level for n in connections[node]['upstream'])
-
- if has_adjacent_connection:
- if verbose:
- print(f"Adjacent connection to intermediary level: {upstream} -> {node} -> {intermediary_level}")
- intermediary_nodes_at_level = [n for n in connections[upstream]['downstream'] if node_to_graphlevel[n] == intermediary_level] + \
- [n for n in connections[node]['upstream'] if node_to_graphlevel[n] == intermediary_level]
-
- if verbose:
- print(f"Nodes at intermediary level {intermediary_level}: {', '.join(intermediary_nodes_at_level)}")
-
- for intermediary_node in intermediary_nodes_at_level:
- print(f"{intermediary_node} is between {upstream} and {node}")
-
- print(f"Nodes at intermediary level {intermediary_level}: {', '.join(intermediary_nodes_at_level)}")
-
- all_intermediary_nodes.update(intermediary_nodes_at_level)
-
- for intermediary_node in intermediary_nodes_at_level:
- # Store the position of the upstream node for each intermediary node
- upstream_positions[intermediary_node] = (upstream, positions[upstream])
-
+ # Group links by their direction
+ direction_groups = {}
+ for link in links:
+ direction = link.direction
+ if direction not in direction_groups:
+ direction_groups[direction] = []
+ direction_groups[direction].append(link)
+
+ for direction, group in direction_groups.items():
+ if diagram.layout == 'vertical':
+ if direction == 'downstream':
+ # Sort downstream links by x position of source and target
+ sorted_links = sorted(group, key=lambda link: (link.source.pos_x, link.target.pos_x))
+ num_links = len(sorted_links)
+ spacing = styles['node_width'] / (num_links + 1)
+ for i, link in enumerate(sorted_links):
+ port_x = node.pos_x + (i + 1) * spacing - styles['connector_width'] / 2
+ port_y = node.pos_y + styles['node_height'] - styles['connector_height'] / 2
+ link.port_pos = (port_x, port_y)
+ elif direction == 'upstream':
+ # Sort upstream links by x position of source and target
+ sorted_links = sorted(group, key=lambda link: (link.source.pos_x, link.target.pos_x))
+ num_links = len(sorted_links)
+ spacing = styles['node_width'] / (num_links + 1)
+ for i, link in enumerate(sorted_links):
+ port_x = node.pos_x + (i + 1) * spacing - styles['connector_width'] / 2
+ port_y = node.pos_y - styles['connector_height'] / 2
+ link.port_pos = (port_x, port_y)
else:
- for downstream in links['downstream']:
- downstream_level = node_to_graphlevel[downstream]
- if abs(downstream_level - node_level) >= 2:
- non_adjacent_connections.append((upstream, node, downstream))
- all_intermediary_nodes.add(node)
-
- # Group intermediary nodes by their levels
- intermediary_nodes_by_level = {}
- for node in all_intermediary_nodes:
- level = node_to_graphlevel[node]
- if level not in intermediary_nodes_by_level:
- intermediary_nodes_by_level[level] = []
- intermediary_nodes_by_level[level].append(node)
-
- # Print the intermediary nodes by level
- if verbose:
- print("\nIntermediary nodes by level:", intermediary_nodes_by_level)
-
- # Select a group of intermediary nodes by level
- if intermediary_nodes_by_level != {}:
- selected_level = max(intermediary_nodes_by_level.keys(), key=lambda lvl: len(intermediary_nodes_by_level[lvl]))
- selected_group = intermediary_nodes_by_level[selected_level]
-
- # Sort the selected group by their position to find the top and bottom nodes
- # The sorting key changes based on the layout
- if layout == 'horizontal':
- sorted_group = sorted(selected_group, key=lambda node: positions[node][1])
- else: # 'vertical'
- sorted_group = sorted(selected_group, key=lambda node: positions[node][0])
-
- top_node = sorted_group[0]
- bottom_node = sorted_group[-1]
-
- # Check if there's only one intermediary node and multiple levels
- if len(sorted_group) == 1 and len(intermediary_nodes_by_level) > 1:
- node = sorted_group[0]
- # Adjust position based on layout and axis alignment
- if layout == 'horizontal' and positions[node][1] == positions[upstream][1]:
- if verbose:
- print(f"Node {node} (before): {positions[node]}")
- positions[node] = (positions[node][0], positions[node][1] - 150)
- if verbose:
- print(f"Node {node} (adjusted): {positions[node]}")
- adjusted_nodes.add(node)
- elif layout == 'vertical' and positions[node][0] == positions[upstream][0]:
- if verbose:
- print(f"Node {node} (before): {positions[node]}")
- positions[node] = (positions[node][0] - 150, positions[node][1])
- if verbose:
- print(f"Node {node} (adjusted): {positions[node]}")
- adjusted_nodes.add(node)
-
- # Check if there are top and bottom nodes to adjust and more than one level
- elif len(sorted_group) > 1:
- # Print positions before adjustment
- if verbose:
- print(f"Top Node (before): {top_node} at position {positions[top_node]}")
- print(f"Bottom Node (before): {bottom_node} at position {positions[bottom_node]}")
-
- if layout == 'horizontal':
- # Check Y-axis alignment for top_node using upstream position
- if positions[top_node][1] == upstream_positions[top_node][1][1]: # [1][1] to access the Y position
- if verbose:
- print(f"{top_node} is aligned with its upstream {upstream_positions[top_node][0]} on the Y-axis")
- positions[top_node] = (positions[top_node][0], positions[top_node][1] - 100)
- adjusted_nodes.add(top_node)
- # Repeat for bottom_node
- if positions[bottom_node][1] == upstream_positions[bottom_node][1][1]:
- if verbose:
- print(f"{bottom_node} is aligned with its upstream {upstream_positions[bottom_node][0]} on the Y-axis")
- positions[bottom_node] = (positions[bottom_node][0], positions[bottom_node][1] + 100)
- adjusted_nodes.add(bottom_node)
- elif layout == 'vertical':
- # Check X-axis alignment for top_node using upstream position
- if positions[top_node][0] == upstream_positions[top_node][1][0]: # [1][0] to access the X position
- if verbose:
- print(f"{top_node} is aligned with its upstream {upstream_positions[top_node][0]} on the X-axis")
- positions[top_node] = (positions[top_node][0] - 100, positions[top_node][1])
- adjusted_nodes.add(top_node)
- # Repeat for bottom_node
- if positions[bottom_node][0] == upstream_positions[bottom_node][1][0]:
- if verbose:
- print(f"{bottom_node} is aligned with its upstream {upstream_positions[bottom_node][0]} on the X-axis")
- positions[bottom_node] = (positions[bottom_node][0] + 100, positions[bottom_node][1])
- adjusted_nodes.add(bottom_node)
-
- # Print positions after adjustment
- if verbose:
- print(f"Top Node (adjusted): {top_node} at position {positions[top_node]}")
- print(f"Bottom Node (adjusted): {bottom_node} at position {positions[bottom_node]}")
-
- return adjusted_nodes
-
-
-def calculate_positions(sorted_nodes, links, node_graphlevels, connections, layout='vertical', verbose=False):
- """
- Calculates and assigns positions to nodes for graph visualization based on their hierarchical levels and connectivity.
- Organizes nodes by graph level, applies prioritization within levels based on connectivity, and adjusts positions to enhance readability.
- Aligns and adjusts intermediary nodes to address alignment issues and improve visual clarity.
- Returns a dictionary mapping each node to its calculated position.
- """
-
- x_start, y_start = 100, 100
- padding_x, padding_y = 200, 200
- positions = {}
- adjacency = defaultdict(set)
-
- if verbose:
- print("Sorted nodes before calculate_positions:", sorted_nodes)
-
- # Build adjacency list
- for link in links:
- src, dst = link['source'], link['target']
- adjacency[src].add(dst)
- adjacency[dst].add(src)
-
- def prioritize_placement(nodes, adjacency, node_graphlevels, layout, verbose=False):
- # Calculate connection counts within the same level
- connection_counts_within_level = {}
- for node in nodes:
- level = node_graphlevels[node]
- connections_within_level = [n for n in adjacency[node] if n in nodes and node_graphlevels[n] == level]
- connection_counts_within_level[node] = len(connections_within_level)
-
- # Determine if sorting is needed by checking if any node has more than one connection within the level
- needs_sorting = any(count > 1 for count in connection_counts_within_level.values())
-
- if not needs_sorting:
- # If no sorting is needed, return the nodes in their original order
- return nodes
-
- # Separate nodes by their connection count within the level
- multi_connection_nodes = [node for node, count in connection_counts_within_level.items() if count > 1]
- single_connection_nodes = [node for node, count in connection_counts_within_level.items() if count == 1]
-
- # Sort nodes with multiple connections
- multi_connection_nodes_sorted = sorted(multi_connection_nodes, key=lambda node: (-len(adjacency[node]), node))
-
- # Sort single connection nodes
- single_connection_nodes_sorted = sorted(single_connection_nodes, key=lambda node: (len(adjacency[node]), node))
-
- # Merge single and multi-connection nodes, placing single-connection nodes at the ends
- ordered_nodes = single_connection_nodes_sorted[:len(single_connection_nodes_sorted)//2] + \
- multi_connection_nodes_sorted + \
- single_connection_nodes_sorted[len(single_connection_nodes_sorted)//2:]
-
- return ordered_nodes
-
- # Organize nodes by graphlevel and order within each graphlevel
- nodes_by_graphlevel = defaultdict(list)
- for node in sorted_nodes:
- nodes_by_graphlevel[node_graphlevels[node]].append(node)
-
- for graphlevel, graphlevel_nodes in nodes_by_graphlevel.items():
- ordered_nodes = prioritize_placement(graphlevel_nodes, adjacency, node_graphlevels, layout, verbose=verbose)
-
- for i, node in enumerate(ordered_nodes):
- if layout == 'vertical':
- positions[node] = (x_start + i * padding_x, y_start + graphlevel * padding_y)
- else:
- positions[node] = (x_start + graphlevel * padding_x, y_start + i * padding_y)
- # First, ensure all nodes are represented in node_graphlevels, even if missing from the adjacency calculations
- missing_nodes = set(sorted_nodes) - set(positions.keys())
- for node in missing_nodes:
- if node not in node_graphlevels:
- # Assign a default graphlevel if somehow missing
- node_graphlevels[node] = max(node_graphlevels.values()) + 1
-
- # Reorganize nodes by graphlevel after including missing nodes
- nodes_by_graphlevel = defaultdict(list)
- for node in sorted_nodes:
- graphlevel = node_graphlevels[node]
- nodes_by_graphlevel[graphlevel].append(node)
-
- for graphlevel, graphlevel_nodes in nodes_by_graphlevel.items():
- # Sort nodes within the graphlevel to ensure missing nodes are placed at the end
- graphlevel_nodes_sorted = sorted(graphlevel_nodes, key=lambda node: (node not in positions, node))
-
- for i, node in enumerate(graphlevel_nodes_sorted):
- if node in positions:
- continue # Skip nodes that already have positions
- # Assign position to missing nodes at the end of their graphlevel
- if layout == 'vertical':
- positions[node] = (x_start + i * padding_x, y_start + graphlevel * padding_y)
- else:
- positions[node] = (x_start + graphlevel * padding_x, y_start + i * padding_y)
-
- # Call the center_align_nodes function to align graphlevels relative to the widest/tallest graphlevel
- center_align_nodes(nodes_by_graphlevel, positions, layout=layout)
-
- adjust_intermediary_nodes(nodes_by_graphlevel, connections, positions, layout, verbose=verbose)
- adjust_intermediary_nodes_same_level(nodes_by_graphlevel, connections, positions, layout, verbose=verbose)
-
- return positions
-
-def create_links(base_style, positions, source, target, source_graphlevel, target_graphlevel, adjacency, layout='vertical', link_index=0, total_links=1, verbose=False):
- """
- Constructs a link style string for a graph visualization, considering the positions and graph levels of source and target nodes.
- Adjusts the link's entry and exit points based on the layout and whether nodes are on the same or different graph levels.
- Supports multiple links between the same nodes by adjusting the positioning to avoid overlaps.
- Returns a style string with parameters defining the link's appearance and positioning.
- """
-
- source_x, source_y = positions[source]
- target_x, target_y = positions[target]
- # Determine directionality
- left_to_right = source_x < target_x
- above_to_below = source_y < target_y
-
- # Calculate step for multiple links
- step = 0.5 if total_links == 1 else 0.25 + 0.5 * (link_index / (total_links - 1))
-
- if layout == 'horizontal':
- # Different graph levels
- if source_graphlevel != target_graphlevel:
- entryX, exitX = (0, 1) if left_to_right else (1, 0)
- entryY = exitY = step
- # Same graph level
- else:
- if above_to_below:
- entryY, exitY = (0, 1)
- else:
- entryY, exitY = (1, 0)
- entryX = exitX = step
-
- elif layout == 'vertical':
- # Different graph levels
- if source_graphlevel != target_graphlevel:
- entryY, exitY = (0, 1) if above_to_below else (1, 0)
- entryX = exitX = step
- # Same graph level
- else:
- if left_to_right:
- entryX, exitX = (0, 1)
- else:
- entryX, exitX = (1, 0)
- entryY = exitY = step
-
- links = f"{base_style}entryY={entryY};exitY={exitY};entryX={entryX};exitX={exitX};"
- return links
-
-def check_node_alignment(source_node_position, target_node_position):
- # Horizontal alignment if y-coordinates are equal
- if source_node_position[1] == target_node_position[1]:
- return 'horizontal'
- # Vertical alignment if x-coordinates are equal
- elif source_node_position[0] == target_node_position[0]:
- return 'vertical'
- return 'none'
-
-def sort_connector_positions(link_connector_positions):
-
- for link_id, link_info in link_connector_positions.items():
- source_node_position = link_info['source_node_position']
- target_node_position = link_info['target_node_position']
+ # Sort lateral links by y position of source and target
+ sorted_links = sorted(group, key=lambda link: (link.source.pos_y, link.target.pos_y))
+ num_links = len(sorted_links)
+ spacing = styles['node_height'] / (num_links + 1)
+ for i, link in enumerate(sorted_links):
+ if link.target.pos_x > link.source.pos_x:
+ # Lateral link to the right
+ port_x = node.pos_x + styles['node_width']
+ else:
+ # Lateral link to the left
+ port_x = node.pos_x
+ port_y = node.pos_y + (i + 1) * spacing
+ link.port_pos = (port_x, port_y)
+ elif diagram.layout == 'horizontal':
+ if direction == 'downstream':
+ # Sort downstream links by y position of source and target
+ sorted_links = sorted(group, key=lambda link: (link.source.pos_y, link.target.pos_y))
+ num_links = len(sorted_links)
+ spacing = styles['node_height'] / (num_links + 1)
+ for i, link in enumerate(sorted_links):
+ port_x = node.pos_x + styles['node_width']
+ port_y = node.pos_y + (i + 1) * spacing
+ link.port_pos = (port_x, port_y)
+ elif direction == 'upstream':
+ # Sort upstream links by y position of source and target
+ sorted_links = sorted(group, key=lambda link: (link.source.pos_y, link.target.pos_y))
+ num_links = len(sorted_links)
+ spacing = styles['node_height'] / (num_links + 1)
+ for i, link in enumerate(sorted_links):
+ port_x = node.pos_x
+ port_y = node.pos_y + (i + 1) * spacing
+ link.port_pos = (port_x, port_y)
+ else:
+ # Sort lateral links by x position of source and target
+ sorted_links = sorted(group, key=lambda link: (link.source.pos_x, link.target.pos_x))
+ num_links = len(sorted_links)
+ spacing = styles['node_width'] / (num_links + 1)
+ for i, link in enumerate(sorted_links):
+ if link.target.pos_y > link.source.pos_y:
+ # Lateral link to the bottom
+ port_y = node.pos_y + styles['node_height']
+ else:
+ # Lateral link to the top
+ port_y = node.pos_y
+ port_x = node.pos_x + (i + 1) * spacing
+ link.port_pos = (port_x, port_y)
+
+ connector_dict = {}
+ # Create connectors and links using the calculated port positions
+ for node in nodes.values():
+
+ downstream_links = node.get_downstream_links()
+ lateral_links = node.get_lateral_links()
+
+ links = downstream_links + lateral_links
- # Check if nodes for this link are aligned (horizontal or vertical)
- node_alignment = check_node_alignment(source_node_position, target_node_position)
-
- # Proceed to check connector alignment only if nodes are aligned
- if node_alignment != 'none':
- source_connector_pos = link_info['source_connector_position']
- target_connector_pos = link_info['target_connector_position']
- # Check connector alignment based on node alignment direction
- if node_alignment == 'horizontal' and source_connector_pos[1] == target_connector_pos[1]:
- #print(f"Connectors for link {link_id} are nicely aligned horizontally.")
- pass
- elif node_alignment == 'vertical' and source_connector_pos[0] == target_connector_pos[0]:
- pass
- #print(f"Connectors for link {link_id} are nicely aligned vertically.")
- else:
- print(f"Connectors for link {link_id} are not nicely aligned {node_alignment}.")
- #TODO: Adjust them
-
-def add_connector_nodes(diagram, nodes, links, positions, styles, verbose=False):
- # Set connector and node dimensions
- connector_width, connector_height = 8, 8
- node_width, node_height = 75, 75
- # Initialize dictionaries for connector directions and positions
- connector_directions = {node: {'up': 0, 'right': 0, 'down': 0, 'left': 0} for node in nodes}
- connector_positions = {node: {'up': [], 'right': [], 'down': [], 'left': []} for node in nodes}
- link_connector_positions = {}
+ for link in links:
+ # source connector
+ source_cID = f"{link.source.name}:{link.source_intf}:{link.target.name}:{link.target_intf}"
+ source_label = re.findall(r'\d+', link.source_intf)[-1]
+ source_connector_pos = link.port_pos
+ connector_width = styles['connector_width']
+ connector_height = styles['connector_height']
+
+ # Add the source connector ID to the source connector dictionary
+ if link.source.name not in connector_dict:
+ connector_dict[link.source.name] = []
+ connector_dict[link.source.name].append(source_cID)
+
+ # target connector
+ target_cID = f"{link.target.name}:{link.target_intf}:{link.source.name}:{link.source_intf}"
+ target_link = diagram.get_target_link(link)
+ target_connector_pos = target_link.port_pos
+ target_label = re.findall(r'\d+', target_link.source_intf)[-1]
+
+ if link.target.name not in connector_dict:
+ connector_dict[link.target.name] = []
+ connector_dict[link.target.name].append(target_cID)
+
+ # Adjust port positions if source and target have different numbers of links
+ source_downstream_links = link.source.get_downstream_links()
+ target_upstream_links = link.target.get_upstream_links()
+ if diagram.layout == 'vertical':
+ if link.source.pos_x == link.target.pos_x:
+ if len(source_downstream_links) != len(target_upstream_links):
+ if len(source_downstream_links) < len(target_upstream_links):
+ # Adjust source port position to align with the corresponding target port
+ adjusted_x = target_connector_pos[0]
+ source_connector_pos = (adjusted_x, source_connector_pos[1])
+ else:
+ # Adjust target port position to align with the corresponding source port
+ adjusted_x = source_connector_pos[0]
+ target_connector_pos = (adjusted_x, target_connector_pos[1])
+ elif diagram.layout == 'horizontal':
+ if link.source.pos_y == link.target.pos_y:
+ pass
+ if len(source_downstream_links) != len(target_upstream_links):
+ if len(source_downstream_links) < len(target_upstream_links):
+ # Adjust source port position to align with the corresponding target port
+ adjusted_y = target_connector_pos[1]
+ source_connector_pos = (source_connector_pos[0], adjusted_y)
+ else:
+ # Adjust target port position to align with the corresponding source port
+ adjusted_y = source_connector_pos[1]
+ target_connector_pos = (target_connector_pos[0], adjusted_y)
- if verbose:
- print(f"Total number of links: {len(links)}")
- print(f"Expected number of connectors: {len(links) * 2}")
- # Go through each link to determine the direction for both the source and target nodes
- for link in links:
- source = link['source']
- target = link['target']
-
- # Parse the unique link style parameters
- style_params = dict(param.split('=') for param in link['unique_link_style'].split(';') if '=' in param)
- exitY = float(style_params.get('exitY', '0.5'))
- exitX = float(style_params.get('exitX', '0.5'))
- entryY = float(style_params.get('entryY', '0.5'))
- entryX = float(style_params.get('entryX', '0.5'))
-
- # Determine the direction based on exit positions for the source node
- if exitY == 0:
- connector_directions[source]['up'] += 1
- elif exitY == 1:
- connector_directions[source]['down'] += 1
-
- if exitX == 0:
- connector_directions[source]['left'] += 1
- elif exitX == 1:
- connector_directions[source]['right'] += 1
-
- # Determine the direction based on entry positions for the target node
- if entryY == 0:
- connector_directions[target]['up'] += 1
- elif entryY == 1:
- connector_directions[target]['down'] += 1
-
- if entryX == 0:
- connector_directions[target]['left'] += 1
- elif entryX == 1:
- connector_directions[target]['right'] += 1
-
- # Calculate the connector positions based on the directions
- for node, directions in connector_directions.items():
- for direction, total_connectors in directions.items():
- for count in range(total_connectors):
- spacing = (node_width if direction in ['up', 'down'] else node_height) / (total_connectors + 1)
- position = spacing * (count + 1)
-
- if direction == 'up':
- connector_pos = (position - connector_width / 2, -connector_height / 2)
- elif direction == 'down':
- connector_pos = (position - connector_width / 2, node_height - connector_height / 2)
- elif direction == 'left':
- connector_pos = (-connector_width / 2, position - connector_height / 2)
- elif direction == 'right':
- connector_pos = (node_width - connector_width / 2, position - connector_height / 2)
-
- # Translate local connector position to global coordinates
- global_connector_pos = (
- positions[node][0] + connector_pos[0],
- positions[node][1] + connector_pos[1]
- )
- connector_positions[node][direction].append(global_connector_pos)
-
- # First loop: Populate link_connector_positions
- for link in links:
- source = link['source']
- target = link['target']
- source_intf = link['source_intf']
- target_intf = link['target_intf']
-
- # Parse the unique link style parameters
- style_params = dict(param.split('=') for param in link['unique_link_style'].split(';') if '=' in param)
- exitY = float(style_params.get('exitY', '0.5'))
- exitX = float(style_params.get('exitX', '0.5'))
- entryY = float(style_params.get('entryY', '0.5'))
- entryX = float(style_params.get('entryX', '0.5'))
-
- # Determine the direction based on exit positions for the source node
- source_direction = None
- if exitY == 0:
- source_direction = 'up'
- elif exitY == 1:
- source_direction = 'down'
- elif exitX == 0:
- source_direction = 'left'
- elif exitX == 1:
- source_direction = 'right'
-
- # Determine the direction based on entry positions for the target node
- target_direction = None
- if entryY == 0:
- target_direction = 'up'
- elif entryY == 1:
- target_direction = 'down'
- elif entryX == 0:
- target_direction = 'left'
- elif entryX == 1:
- target_direction = 'right'
-
- # Get the connector positions for the source and target nodes
- source_connector_pos = connector_positions[source][source_direction].pop(0) if connector_positions[source][source_direction] else None
- target_connector_pos = connector_positions[target][target_direction].pop(0) if connector_positions[target][target_direction] else None
-
- if source_connector_pos and target_connector_pos:
- link_id = f"{source}:{link['source_intf']}:{target}:{link['target_intf']}"
- link_connector_positions[link_id] = {
- 'source': source,
- 'target': target,
- 'target_intf': link['target_intf'],
- 'source_intf': link['source_intf'],
- 'source_node_position': positions[source],
- 'target_node_position': positions[target],
- 'source_connector_position': source_connector_pos,
- 'target_connector_position': target_connector_pos
- }
-
- # Sort connector positions
- _sorted_connector_positions = sort_connector_positions(link_connector_positions)
-
- # Second loop: Add connector nodes to the diagram and create connector links
- connector_links = []
- node_groups = {} # Dictionary to store groups by node
-
- for link_id, link_info in link_connector_positions.items():
- source = link_info['source']
- target = link_info['target']
- source_intf = link_info['source_intf']
- target_intf = link_info['target_intf']
- source_connector_pos = link_info['source_connector_position']
- target_connector_pos = link_info['target_connector_position']
- source_cID = f"{source}:{source_intf}:{target}:{target_intf}"
- target_cID = f"{target}:{target_intf}:{source}:{source_intf}"
-
- # Extract the numeric part from the interface names for the labels
- source_label = re.findall(r'\d+', source_intf)[-1]
- target_label = re.findall(r'\d+', target_intf)[-1]
-
- if source_connector_pos:
- if verbose:
- print(f"Adding connector for {source} with ID {source_cID} at position {source_connector_pos} with label {source_label}")
diagram.add_node(
id=source_cID,
label=source_label,
@@ -727,9 +160,6 @@ def add_connector_nodes(diagram, nodes, links, positions, styles, verbose=False)
style=styles['port_style']
)
- if target_connector_pos:
- if verbose:
- print(f"Adding connector for {target} with ID {target_cID} at position {target_connector_pos} with label {target_label}")
diagram.add_node(
id=target_cID,
label=target_label,
@@ -740,17 +170,6 @@ def add_connector_nodes(diagram, nodes, links, positions, styles, verbose=False)
style=styles['port_style']
)
- if source not in node_groups:
- node_groups[source] = []
- node_groups[source].append(source_cID)
-
- if target not in node_groups:
- node_groups[target] = []
- node_groups[target].append(target_cID)
-
- # Assuming each link has one source and one target connector, pair them to form a connector link
- if source_connector_pos and target_connector_pos:
-
# Calculate center positions
source_center = (source_connector_pos[0] + connector_width / 2, source_connector_pos[1] + connector_height / 2)
target_center = (target_connector_pos[0] + connector_width / 2, target_connector_pos[1] + connector_height / 2)
@@ -759,18 +178,33 @@ def add_connector_nodes(diagram, nodes, links, positions, styles, verbose=False)
midpoint_center_x = (source_center[0] + target_center[0]) / 2
midpoint_center_y = (source_center[1] + target_center[1]) / 2
- midpoint_top_left_x = midpoint_center_x - 2
- midpoint_top_left_y = midpoint_center_y - 2
+ # Generate a random offset within the range of ±10
+ random_offset = random.choice([random.uniform(-20, -10), random.uniform(10, 20)])
+
+ # Determine the direction of the link
+ dx = target_center[0] - source_center[0]
+ dy = target_center[1] - source_center[1]
- # Calculate the real middle between the centers
- midpoint_x = (source_center[0] + target_center[0]) / 2
- midpoint_y = (source_center[1] + target_center[1]) / 2
+ # Calculate the normalized direction vector for the line
+ magnitude = (dx**2 + dy**2)**0.5
+ if magnitude != 0:
+ direction_dx = dx / magnitude
+ direction_dy = dy / magnitude
+ else:
+ # If the magnitude is zero, the source and target are at the same position
+ # In this case, we don't need to move the midpoint
+ direction_dx = 0
+ direction_dy = 0
- midpoint_id = f"mid:{source}:{source_intf}:{target}:{target_intf}" # Adjusted ID format
- if verbose:
- print(f"Creating midpoint connector {midpoint_id} between source {source} and target {target} at position ({midpoint_x}, {midpoint_y})")
+ # Apply the offset
+ midpoint_center_x += direction_dx * random_offset
+ midpoint_center_y += direction_dy * random_offset
- # Add the midpoint connector node
+ midpoint_top_left_x = midpoint_center_x - 2
+ midpoint_top_left_y = midpoint_center_y - 2
+
+ # Create midpoint connector between source and target ports
+ midpoint_id = f"mid:{link.source.name}:{link.source_intf}:{link.target.name}:{link.target_intf}"
diagram.add_node(
id=midpoint_id,
label='\u200B',
@@ -781,109 +215,396 @@ def add_connector_nodes(diagram, nodes, links, positions, styles, verbose=False)
style=styles['connector_style']
)
- # Adjust connector_links to include the midpoint connector
- connector_links.append({'source': source_cID, 'target': midpoint_id, 'link_id' : f"{source_cID}-src"})
- connector_links.append({'source': target_cID, 'target': midpoint_id, 'link_id' : f"{target_cID}-trgt"})
+ diagram.add_link(source=source_cID, target=midpoint_id, style=styles["link_style"], label='rate', link_id=f"{source_cID}_src")
+ diagram.add_link(source=target_cID, target=midpoint_id, style=styles["link_style"], label='rate', link_id=f"{target_cID}_trgt")
- # Now, create groups for each node's connectors
- for node, connector_ids in node_groups.items():
- group_id = f"group-{node}"
-
- # write node node and group id into one array
- connector_ids.append(node)
- # Create a group for the connectors
- diagram.group_nodes(member_objects=connector_ids, group_id=group_id, style='group')
+ # Create groups for each node and its connectors
+ for node_name, connector_ids in connector_dict.items():
+ group_id = f"group-{node_name}"
+ member_objects = connector_ids + [node_name]
+ diagram.group_nodes(member_objects=member_objects, group_id=group_id, style='group')
+def add_links(diagram, styles):
+ nodes = diagram.nodes
- if verbose:
- # Calculate the total number of connectors, including midpoints
- total_connector_count = len(connector_links) # Each link now includes a midpoint, hence total is directly from connector_links
- print(f"Total number of connectors created: {total_connector_count}")
-
- # Expected connectors is now triple the number of links, since each link generates three connectors (source to midpoint, midpoint to target)
- expected_connector_count = len(links) * 3
-
- if total_connector_count != expected_connector_count:
- print("Warning: The number of connectors created does not match the expected count.")
- else:
- print("All connectors created successfully.")
+ for node in nodes.values():
+ downstream_links = node.get_downstream_links()
+ lateral_links = node.get_lateral_links()
- return connector_links
+ links = downstream_links + lateral_links
+ # Group links by their target
+ target_groups = {}
+ for link in links:
+ target = link.target
+ if target not in target_groups:
+ target_groups[target] = []
+ target_groups[target].append(link)
+
+ for target, group in target_groups.items():
+ for i, link in enumerate(group):
+ source_x, source_y = link.source.pos_x, link.source.pos_y
+ target_x, target_y = link.target.pos_x, link.target.pos_y
+
+ # Determine directionality
+ left_to_right = source_x < target_x
+ above_to_below = source_y < target_y
+
+ # Calculate step for multiple links with the same target
+ step = 0.5 if len(group) == 1 else 0.25 + 0.5 * (i / (len(group) - 1))
+
+ if diagram.layout == 'horizontal':
+ if link.level_diff > 0:
+ entryX, exitX = (0, 1) if left_to_right else (1, 0)
+ entryY = exitY = step
+ else:
+ if above_to_below:
+ entryY, exitY = (0, 1)
+ else:
+ entryY, exitY = (1, 0)
+ entryX = exitX = step
+ elif diagram.layout == 'vertical':
+ if link.level_diff > 0:
+ entryY, exitY = (0, 1) if above_to_below else (1, 0)
+ entryX = exitX = step
+ # Same graph level
+ else:
+ if left_to_right:
+ entryX, exitX = (0, 1)
+ else:
+ entryX, exitX = (1, 0)
+ entryY = exitY = step
+ style = f"{styles['link_style']}entryY={entryY};exitY={exitY};entryX={entryX};exitX={exitX};"
-def add_links_with_connectors(diagram, connector_links, link_style=None, verbose=False):
- for link in connector_links:
- diagram.add_link(source=link['source'], target=link['target'], style=link_style, label='rate', link_id=link['link_id'])
+ diagram.add_link(source=link.source.name, target=link.target.name, src_label=link.source_intf, trgt_label=link.target_intf, src_label_style=styles['src_label_style'], trgt_label_style=styles['trgt_label_style'], style=style)
-def add_nodes(diagram, nodes, positions, node_graphlevels, styles):
+def add_nodes(diagram, nodes, styles):
base_style = styles['base_style']
custom_styles = styles['custom_styles']
icon_to_group_mapping = styles['icon_to_group_mapping']
- for node_name, node_info in nodes.items():
- # Check for 'graph-icon' label and map it to the corresponding group
- labels = node_info.get('labels') or {}
- icon_label = labels.get('graph-icon', 'default')
- if icon_label in icon_to_group_mapping:
- group = icon_to_group_mapping[icon_label]
+ for node in nodes.values():
+ # Check for 'graph_icon' attribute and map it to the corresponding group
+ if node.graph_icon in icon_to_group_mapping:
+ group = icon_to_group_mapping[node.graph_icon]
else:
- # Determine the group based on the node's name if 'graph-icon' is not specified
- if "client" in node_name:
+ # Determine the group based on the node's name if 'graph_icon' is not specified
+ if "client" in node.name:
group = "server"
- elif "leaf" in node_name:
+ elif "leaf" in node.name:
group = "leaf"
- elif "spine" in node_name:
+ elif "spine" in node.name:
group = "spine"
- elif "dcgw" in node_name:
+ elif "dcgw" in node.name:
group = "dcgw"
else:
group = "default" # Fallback to 'default' if none of the conditions are met
style = custom_styles.get(group, base_style)
- x_pos, y_pos = positions[node_name]
+ x_pos, y_pos = node.pos_x, node.pos_y
# Add each node to the diagram with the given x and y position.
- diagram.add_node(id=node_name, label=node_name, x_pos=x_pos, y_pos=y_pos, style=style, width=75, height=75)
+ diagram.add_node(id=node.name, label=node.name, x_pos=x_pos, y_pos=y_pos, style=style, width=node.width, height=node.height)
-def add_links(diagram, links, positions, node_graphlevels, styles, no_links=False, layout='vertical', verbose=False):
-
+def adjust_intermediary_nodes(intermediaries, layout, verbose=False):
+
+ if not intermediaries:
+ return
+
+ #group the intermediaries by their graph level
+ intermediaries_by_level = defaultdict(list)
+ for node in intermediaries:
+ intermediaries_by_level[node.graph_level].append(node)
+
+ selected_level = max(intermediaries_by_level.keys(), key=lambda lvl: len(intermediaries_by_level[lvl]))
+ selected_group = intermediaries_by_level[selected_level]
+
+ if len(selected_group) == 1:
+ node = selected_group[0]
+ if layout == 'vertical':
+ node.pos_x = node.pos_x - 100
+ else:
+ node.pos_y = node.pos_y - 100
+ else:
+ for i, node in enumerate(selected_group):
+ if layout == 'vertical':
+ node.pos_x = node.pos_x - 100 + i * 200
+ else:
+ node.pos_y = node.pos_y - 100 + i * 200
+
+ pass
+
+def center_align_nodes(nodes_by_graphlevel, layout='vertical', verbose=False):
+ """
+ Center align nodes within each graphlevel based on the layout layout and ensure
+ they are nicely distributed to align with the graphlevel above.
+ """
- # Initialize a counter for links between the same nodes
- link_counter = defaultdict(lambda: 0)
- total_links_between_nodes = defaultdict(int)
- adjacency = defaultdict(set)
+ attr_x, attr_y = ('pos_x', 'pos_y') if layout == 'vertical' else ('pos_y', 'pos_x')
+
+ prev_graphlevel_center = None
+ for graphlevel, nodes in sorted(nodes_by_graphlevel.items()):
+ graphlevel_centers = [getattr(node, attr_x) for node in nodes]
+
+ if prev_graphlevel_center is None:
+ # For the first graphlevel, calculate its center and use it as the previous center for the next level
+ prev_graphlevel_center = (min(graphlevel_centers) + max(graphlevel_centers)) / 2
+ else:
+ # Calculate current graphlevel's center
+ graphlevel_center = sum(graphlevel_centers) / len(nodes)
+
+ # Calculate offset to align current graphlevel's center with the previous graphlevel's center
+ offset = prev_graphlevel_center - graphlevel_center
+
+ # Apply offset to each node in the current graphlevel
+ for node in nodes:
+ setattr(node, attr_x, getattr(node, attr_x) + offset)
+
+ # Update prev_graphlevel_center for the next level
+ prev_graphlevel_center = sum(getattr(node, attr_x) for node in nodes) / len(nodes)
- # Construct adjacency list once
- for link in links:
- src, dst = link['source'], link['target']
- adjacency[src].add(dst)
- adjacency[dst].add(src)
+def calculate_positions(diagram, layout='vertical', verbose=False):
+ """
+ Calculates and assigns positions to nodes for graph visualization based on their hierarchical levels and connectivity.
+ Organizes nodes by graph level, applies prioritization within levels based on connectivity, and adjusts positions to enhance readability.
+ Aligns and adjusts intermediary nodes to address alignment issues and improve visual clarity.
+ """
- # Prepare link counter and total links
- for link in links:
- source, target = link['source'], link['target']
- link_key = tuple(sorted([source, target]))
- total_links_between_nodes[link_key] += 1
+ nodes = diagram.nodes
+ nodes = sorted(nodes.values(), key=lambda node: (node.graph_level, node.name))
+
+ x_start, y_start = 100, 100
+ padding_x, padding_y = 200, 200
+ min_margin = 200
+
+ if verbose:
+ print("Nodes before calculate_positions:", nodes)
+
+ def prioritize_placement(nodes, level, verbose=False):
+ if level == diagram.get_max_level():
+ # If it's the maximum level, simply sort nodes by name
+ ordered_nodes = sorted(nodes, key=lambda node: node.name)
+ else:
+ # Separate nodes by their connection count within the level
+ multi_connection_nodes = [node for node in nodes if node.get_connection_count_within_level() > 1]
+ single_connection_nodes = [node for node in nodes if node.get_connection_count_within_level() == 1]
+ zero_connection_nodes = [node for node in nodes if node.get_connection_count_within_level() == 0]
+
+ # Separate multi-connection nodes with lateral links
+ multi_connection_nodes_with_lateral = []
+ multi_connection_nodes_without_lateral = []
+ for node in multi_connection_nodes:
+ if any(link.target in multi_connection_nodes for link in node.get_lateral_links()):
+ multi_connection_nodes_with_lateral.append(node)
+ else:
+ multi_connection_nodes_without_lateral.append(node)
+
+ # Sort multi-connection nodes with lateral links wisely
+ sorted_multi_connection_nodes_with_lateral = []
+ while multi_connection_nodes_with_lateral:
+ node = multi_connection_nodes_with_lateral.pop(0)
+ sorted_multi_connection_nodes_with_lateral.append(node)
+ for link in node.get_lateral_links():
+ if link.target in multi_connection_nodes_with_lateral:
+ multi_connection_nodes_with_lateral.remove(link.target)
+ sorted_multi_connection_nodes_with_lateral.append(link.target)
+
+ # sort by name
+ multi_connection_nodes_without_lateral = sorted(multi_connection_nodes_without_lateral, key=lambda node: node.name)
+ sorted_multi_connection_nodes_with_lateral = sorted(sorted_multi_connection_nodes_with_lateral, key=lambda node: node.name)
+ single_connection_nodes = sorted(single_connection_nodes, key=lambda node: node.name)
+
+
+ # Merge single, multi-connection (with and without lateral), and zero-connection nodes
+ ordered_nodes = single_connection_nodes[:len(single_connection_nodes)//2] + \
+ multi_connection_nodes_without_lateral + \
+ sorted_multi_connection_nodes_with_lateral + \
+ single_connection_nodes[len(single_connection_nodes)//2:] + \
+ zero_connection_nodes
+
+ return ordered_nodes
+
+ # Organize nodes by graphlevel and order within each graphlevel
+ nodes_by_graphlevel = defaultdict(list)
+ for node in nodes:
+ nodes_by_graphlevel[node.graph_level].append(node)
+
+ for graphlevel, graphlevel_nodes in nodes_by_graphlevel.items():
+ ordered_nodes = prioritize_placement(graphlevel_nodes, graphlevel, verbose=verbose)
+
+ for i, node in enumerate(ordered_nodes):
+ if layout == 'vertical':
+ node.pos_x = x_start + i * padding_x
+ node.pos_y = y_start + graphlevel * padding_y
+ else:
+ node.pos_x = x_start + graphlevel * padding_x
+ node.pos_y = y_start + i * padding_y
+
+ center_align_nodes(nodes_by_graphlevel, layout=layout, verbose=verbose)
+
+ intermediaries_x, intermediaries_y = diagram.get_nodes_between_interconnected()
+ if diagram.layout == "vertical":
+ adjust_intermediary_nodes(intermediaries_x, layout=diagram.layout, verbose=verbose)
+ else:
+ adjust_intermediary_nodes(intermediaries_y, layout=diagram.layout, verbose=verbose)
+
+
+
+
+def adjust_node_levels(diagram):
+ used_levels = diagram.get_used_levels()
+ max_level = diagram.get_max_level()
+ min_level = diagram.get_min_level()
+ print(f"Initial used levels: {used_levels}")
+ if len(used_levels) <= 1:
+ print("Only one level present, no adjustment needed.")
+ return # Only one level present, no adjustment needed
+
+ current_level = min_level
+ while current_level < max_level + 1:
+ #if level is the first used level or the last used level, skip it
+ if current_level == min_level:
+ print(f"Skip Level: {current_level} because it is the first or last level")
+ current_level += 1
+ continue
+
+ nodes_at_current_level = diagram.get_nodes_by_level(current_level)
+ nodes_at_next_level = diagram.get_nodes_by_level(current_level + 1)
+ print(f"Processing level {current_level}:")
+ print(f"Nodes at current level: {{current_level}} {[node.name for node in nodes_at_current_level.values()]}")
+ next_level = current_level + 1
+ before_level = current_level - 1
+ nodes_to_move = []
+ # if nodes_at_next_level:
+
+ if len(nodes_at_current_level.items()) == 1:
+ print(f"Only one node found at level {current_level}. No adjustment needed.")
+ current_level += 1
+ continue
+ for node_name , node in nodes_at_current_level.items():
+ has_upstream_connection = any(node.get_upstream_links_towards_level(before_level))
+
+
+ if not has_upstream_connection:
+ nodes_to_move.append(node)
+ else:
+ print(f"Node {node_name} has {len(node.get_upstream_links_towards_level(before_level))} upstream links against Level {before_level} No adjustment needed.")
+
+ if (len(nodes_to_move) == len(nodes_at_current_level) ):
+ print(f"Nothing to move here")
+ current_level += 1
+ continue
+ else:
+ for node in nodes_to_move:
+ print(f"!Node {node.name} does not have an upstream connection to level {before_level}. Marked for movement.")
+
+
+ if nodes_to_move:
+ print(f"Because we need to move, we are increasing all node_graphlevels from the next Levels Nodes by one level")
+
+ for level in range(max_level, current_level, -1):
+ nodes_at_level = diagram.get_nodes_by_level(level)
+ for node in nodes_at_level.values():
+ node.graph_level += 1
+ print(f" Moving node {node.name} from level {level} to level {level + 1}.")
+
+ # Move the nodes marked for movement to the next level
+ for node in nodes_to_move:
+ node.graph_level += 1
+ print(f" Moving node {node.name} from level {current_level} to level {next_level}")
+
+ print(f"Moved nodes at level {current_level} to level {next_level}.")
+ update_links(diagram.get_links_from_nodes())
+ max_level = diagram.get_max_level()
+
+ max_level = diagram.get_max_level()
+ current_level += 1
+
+ # Check all levels starting from the last level
+ for level in range(max_level, min_level - 1, -1):
+ nodes_at_level = diagram.get_nodes_by_level(level)
+ for node in nodes_at_level.values():
+ upstream_links = node.get_upstream_links()
+ can_move = True
+ for link in upstream_links:
+ level_diff = node.graph_level - link.target.graph_level
+ if level_diff == 1:
+ can_move = False
+ break # Stop checking if any upstream link has a level difference of 1
+
+ if can_move:
+ for link in upstream_links:
+ level_diff = node.graph_level - link.target.graph_level
+ if level_diff > 1:
+ node.graph_level -= 1
+ print(f" Moving node {node.name} from level {level} to level {level - 1} due to upstream link with level difference > 1")
+ update_links(diagram.get_links_from_nodes())
+ max_level = diagram.get_max_level()
+ break # Stop moving the node after adjusting its level once
+
+def update_links(links):
for link in links:
- source, target = link['source'], link['target']
- source_intf, target_intf = link['source_intf'], link['target_intf']
- source_graphlevel = node_graphlevels.get(source, -1)
- target_graphlevel = node_graphlevels.get(target, -1)
- link_key = tuple(sorted([source, target]))
- link_index = link_counter[link_key]
+ source_level = link.source.graph_level
+ target_level = link.target.graph_level
+ link.level_diff = target_level - source_level
+ if link.level_diff > 0:
+ link.direction = 'downstream'
+ elif link.level_diff < 0:
+ link.direction = 'upstream'
+ else:
+ link.direction = 'lateral'
+
+def assign_graphlevels(diagram, verbose=False):
+ """
+ Assigns hierarchical graph levels to nodes based on connections or optional labels
+ Returns a sorted list of nodes and their graph levels.
+ """
+ nodes = diagram.get_nodes()
+
+ # Check if all nodes already have a graphlevel != -1
+ if all(node.graph_level != -1 for node in nodes.values()):
+ already_set = True
+ else:
+ already_set = False
- link_counter[link_key] += 1
- total_links = total_links_between_nodes[link_key]
+ # Helper function to assign graphlevel by recursively checking connections
+ def set_graphlevel(node, current_graphlevel, visited=None):
+ if visited is None:
+ visited = set()
+ if node.name in visited:
+ return
+ visited.add(node.name)
+
+ if node.graph_level < current_graphlevel:
+ node.graph_level = current_graphlevel
+ for link in node.get_downstream_links():
+ target_node = nodes[link.target.name]
+ set_graphlevel(target_node, current_graphlevel + 1, visited)
+
+ # Start by setting the graphlevel to -1 if they don't already have a graphlevel
+ for node in nodes.values():
+ if node.graph_level != -1:
+ continue
+ # Setting the graphlevel of nodes with no upstream connections
+ elif not node.get_upstream_links():
+ set_graphlevel(node, 0)
+ else:
+ set_graphlevel(node, node.graph_level)
- unique_link_style = create_links(base_style=styles['link_style'], positions=positions, source=source, target=target, source_graphlevel=source_graphlevel, target_graphlevel=target_graphlevel, adjacency=adjacency, link_index=link_index, total_links=total_links, layout=layout)
- link['unique_link_style'] = unique_link_style
- link_id=f"{source}:{source_intf}:{target}:{target_intf}"
- link['link_id'] = f"link_id:{link_id}"
+ # Update the links of each node
+ for node in nodes.values():
+ node.update_links()
+
+ if not already_set:
+ adjust_node_levels(diagram)
+ for node in nodes.values():
+ node.update_links()
+
+ sorted_nodes = sorted(nodes.values(), key=lambda node: (node.graph_level, node.name))
+ return sorted_nodes
- if not no_links:
- diagram.add_link(source=source, target=target, src_label=source_intf, trgt_label=target_intf, src_label_style=styles['src_label_style'], trgt_label_style=styles['trgt_label_style'], style=unique_link_style, link_id=link_id)
def load_styles_from_config(config_path):
try:
@@ -900,10 +621,10 @@ def load_styles_from_config(config_path):
# Initialize the styles dictionary with defaults and override with config values
styles = {
- 'base_style': config['base_style'],
- 'link_style': config['link_style'],
- 'src_label_style': config['src_label_style'],
- 'trgt_label_style': config['trgt_label_style'],
+ 'base_style': config.get('base_style', ''),
+ 'link_style': config.get('link_style', ''),
+ 'src_label_style': config.get('src_label_style', ''),
+ 'trgt_label_style': config.get('trgt_label_style', ''),
'port_style': config.get('port_style', ''),
'connector_style': config.get('connector_style', ''),
'background': config.get('background', "#FFFFFF"),
@@ -911,541 +632,18 @@ def load_styles_from_config(config_path):
'pagew': config.get('pagew', "827"),
'pageh': config.get('pageh', "1169"),
'grid': config.get('grid', "1"),
- # Prepend base_style to each custom style
- 'custom_styles': {key: config['base_style'] + value for key, value in config['custom_styles'].items()},
- 'icon_to_group_mapping': config['icon_to_group_mapping'],
+ 'custom_styles': {key: config.get('base_style', '') + value for key, value in config.get('custom_styles', {}).items()},
+ 'icon_to_group_mapping': config.get('icon_to_group_mapping', {}),
}
- return styles
-
-
-def create_grafana_dashboard(diagram=None,dashboard_filename=None,link_list=[]):
- """
- Creates a Grafana JSON Dashboard using the FlowChart Plugin
- Requires as an input the Drawio Object to embed the XML
- The Link List obtained by add_nodes_and_links function and
- the file name
- Metrics format defaults:
- Ingress: node:itf:in
- Egress: node:itf:out
- OperState: oper_state:node:itf
- Where `node` comes from the clab node and `itf` from the interface name
- """
-
- # We just need the subtree objects from mxGraphModel.Single page drawings only
- xmlTree = ET.fromstring(diagram.dump_xml())
- subXmlTree=xmlTree.findall('.//mxGraphModel')[0]
-
- # Define Query rules for the Panel, rule_expr needs to match the collector metric name
- # Legend format needs to match the format expected by the metric
- panelQueryList = {
- "IngressTraffic" : {
- "rule_expr" : "gnmic_in_bps",
- "legend_format" : '{{source}}:{{interface_name}}:in',
- },
- "EgressTraffic" : {
- "rule_expr" : "gnmic_out_bps",
- "legend_format" : '{{source}}:{{interface_name}}:out',
- },
- "ItfOperState" : {
- "rule_expr" : "gnmic_oper_state",
- "legend_format" : 'oper_state:{{source}}:{{interface_name}}',
- },
- }
- # Create a targets list to embed in the JSON object, we add all the other default JSON attributes to the list
- targetsList = []
- for query in panelQueryList:
- targetsList.append(gf_dashboard_datasource_target(rule_expr=panelQueryList[query]["rule_expr"],legend_format=panelQueryList[query]["legend_format"], refId=query))
-
- # Create the Rules Data
- rulesData = []
- i=0
- for link in link_list:
- rule = link.split(":")
- if "-src" in link:
- #src port ingress
- rulesData.append(gf_flowchart_rule_traffic(ruleName=f"{rule[1]}:{rule[2]}:in", metric=f"{rule[1]}:{rule[2]}:in",link_id=link,order=i))
- #src port:
- # split this link which is link_id:spine1:e1-1:leaf1:e1-49-src into spine1:e1-1:leaf1:e1-49-src
- link_id = link.replace('link_id:', '').replace('-src', '')
- rulesData.append(gf_flowchart_rule_operstate(ruleName=f"oper_state:{rule[1]}:{rule[2]}",metric=f"oper_state:{rule[1]}:{rule[2]}",link_id=link_id,order=i+2))
- i=i+2
- elif "-trgt" in link:
- #src port egress, we can also change this for the ingress of remote port but there would not be an end
- rule2 = rule[2].replace('-trgt', '')
- rulesData.append(gf_flowchart_rule_traffic(ruleName=f"{rule[1]}:{rule2}:out",metric=f"{rule[1]}:{rule2}:out",link_id=link,order=i+1))
- #dest port:
- link_id = link.replace('link_id:', '').replace('-trgt', '')
-
- rulesData.append(gf_flowchart_rule_operstate(ruleName=f"oper_state:{rule[1]}:{rule2}",metric=f"oper_state:{rule[1]}:{rule2}",link_id=link_id,order=i+3))
- i=i+2
- # Create the Panel
- flowchart_panel=gf_flowchart_panel_template(xml=ET.tostring(subXmlTree, encoding="unicode"),
- rulesData=rulesData,
- panelTitle="Network Telemetry",
- targetsList=targetsList)
- #Create a dashboard from the panel
- dashboard_json=json.dumps(gf_dashboard_template(panels=flowchart_panel,dashboard_name=os.path.splitext(dashboard_filename)[0]),indent=4)
- with open(dashboard_filename,'w') as f:
- f.write(dashboard_json)
- print("Saved Grafana dashboard file to:", dashboard_filename)
-
-def gf_flowchart_rule_traffic(ruleName="traffic:inOrOut",metric=None,link_id=None,order=1):
- """
- Dictionary containg information relevant to the traffic Rules
- """
- rule = {
- "aggregation": "current",
- "alias": ruleName,
- "column": "Time",
- "dateColumn": "Time",
- "dateFormat": "YYYY-MM-DD HH:mm:ss",
- "dateTHData": [
- {
- "color": "rgba(245, 54, 54, 0.9)",
- "comparator": "ge",
- "level": 0,
- "value": "0d"
- },
- {
- "color": "rgba(237, 129, 40, 0.89)",
- "comparator": "ge",
- "level": 0,
- "value": "-1d"
- },
- {
- "color": "rgba(50, 172, 45, 0.97)",
- "comparator": "ge",
- "level": 0,
- "value": "-1w"
- }
- ],
- "decimals": 1,
- "gradient": False,
- "hidden": False,
- "invert": False,
- "mappingType": 1,
- "mapsDat": {
- "events": {
- "dataList": [],
- "options": {
- "enableRegEx": True,
- "identByProp": "id",
- "metadata": ""
- }
- },
- "links": {
- "dataList": [],
- "options": {
- "enableRegEx": True,
- "identByProp": "id",
- "metadata": ""
- }
- },
- "shapes": {
- "dataList": [
- {
- "colorOn": "a",
- "hidden": False,
- "pattern": link_id,
- "style": "strokeColor"
- }
- ],
- "options": {
- "enableRegEx": True,
- "identByProp": "id",
- "metadata": ""
- }
- },
- "texts": {
- "dataList": [
- {
- "hidden": False,
- "pattern": link_id,
- "textOn": "wmd",
- "textPattern": "/.*/",
- "textReplace": "content"
- }
- ],
- "options": {
- "enableRegEx": True,
- "identByProp": "id",
- "metadata": ""
- }
- }
- },
- "metricType": "serie",
- "newRule": False,
- "numberTHData": [
- {
- "color": "rgba(171, 187, 187, 1)",
- "comparator": "ge",
- "level": 0
- },
- {
- "color": "rgba(75, 221, 51, 1)",
- "comparator": "ge",
- "level": 0,
- "value": 500000
- },
- {
- "color": "rgba(255, 128, 0, 1)",
- "comparator": "gt",
- "level": 0,
- "value": 2000000
- },
- {
- "color": "rgba(245, 54, 54, 0.9)",
- "comparator": "ge",
- "level": 0,
- "value": 5000000
- }
- ],
- "order": order,
- "overlayIcon": False,
- "pattern": metric,
- "rangeData": [],
- "reduce": True,
- "refId": "A",
- "sanitize": False,
- "stringTHData": [
- {
- "color": "rgba(245, 54, 54, 0.9)",
- "comparator": "eq",
- "level": 0,
- "value": "/.*/"
- },
- {
- "color": "rgba(237, 129, 40, 0.89)",
- "comparator": "eq",
- "level": 0,
- "value": "/.*warning.*/"
- },
- {
- "color": "rgba(50, 172, 45, 0.97)",
- "comparator": "eq",
- "level": 0,
- "value": "/.*(success|ok).*/"
- }
- ],
- "tooltip": True,
- "tooltipColors": False,
- "tooltipLabel": "",
- "tooltipOn": "a",
- "tpDirection": "v",
- "tpGraph": True,
- "tpGraphScale": "linear",
- "tpGraphSize": "100%",
- "tpGraphType": "line",
- "tpMetadata": False,
- "type": "number",
- "unit": "bps",
- "valueData": []
- }
- return rule
+ # Read all other configuration values
+ for key, value in config.items():
+ if key not in styles:
+ styles[key] = value
-def gf_flowchart_rule_operstate(ruleName="oper_state",metric=None,link_id=None,order=1):
- """
- Dictionary containg information relevant to the Operational State Rules
- """
- rule = {
- "aggregation": "current",
- "alias": ruleName,
- "column": "Time",
- "dateColumn": "Time",
- "dateFormat": "YYYY-MM-DD HH:mm:ss",
- "dateTHData": [
- {
- "color": "rgba(245, 54, 54, 0.9)",
- "comparator": "ge",
- "level": 0,
- "value": "0d"
- },
- {
- "color": "rgba(237, 129, 40, 0.89)",
- "comparator": "ge",
- "level": 0,
- "value": "-1d"
- },
- {
- "color": "rgba(50, 172, 45, 0.97)",
- "comparator": "ge",
- "level": 0,
- "value": "-1w"
- }
- ],
- "decimals": 0,
- "gradient": False,
- "hidden": False,
- "invert": False,
- "mappingType": 1,
- "mapsDat": {
- "events": {
- "dataList": [],
- "options": {
- "enableRegEx": True,
- "identByProp": "id",
- "metadata": ""
- }
- },
- "links": {
- "dataList": [],
- "options": {
- "enableRegEx": True,
- "identByProp": "id",
- "metadata": ""
- }
- },
- "shapes": {
- "dataList": [
- {
- "colorOn": "a",
- "hidden": False,
- "pattern": link_id,
- "style": "labelBackgroundColor"
- }
- ],
- "options": {
- "enableRegEx": True,
- "identByProp": "id",
- "metadata": ""
- }
- },
- "texts": {
- "dataList": [],
- "options": {
- "enableRegEx": True,
- "identByProp": "id",
- "metadata": ""
- }
- }
- },
- "metricType": "serie",
- "newRule": False,
- "numberTHData": [
- {
- "color": "rgba(245, 54, 54, 0.9)",
- "comparator": "ge",
- "level": 0
- },
- {
- "color": "rgba(50, 172, 45, 0.97)",
- "comparator": "ge",
- "level": 0,
- "value": 1
- }
- ],
- "order": order,
- "overlayIcon": False,
- "pattern": metric,
- "rangeData": [],
- "reduce": True,
- "refId": "A",
- "sanitize": False,
- "stringTHData": [
- {
- "color": "rgba(245, 54, 54, 0.9)",
- "comparator": "eq",
- "level": 0,
- "value": "/.*/"
- },
- {
- "color": "rgba(237, 129, 40, 0.89)",
- "comparator": "eq",
- "level": 0,
- "value": "/.*warning.*/"
- },
- {
- "color": "rgba(50, 172, 45, 0.97)",
- "comparator": "eq",
- "level": 0,
- "value": "/.*(success|ok).*/"
- }
- ],
- "tooltip": False,
- "tooltipColors": False,
- "tooltipLabel": "",
- "tooltipOn": "a",
- "tpDirection": "v",
- "tpGraph": False,
- "tpGraphScale": "linear",
- "tpGraphSize": "100%",
- "tpGraphType": "line",
- "tpMetadata": False,
- "type": "number",
- "unit": "short",
- "valueData": []
- }
- return rule
-
-def gf_flowchart_panel_template(xml=None,rulesData=None,targetsList=None,panelTitle="Network Topology"):
- """
- Dictionary containg information relevant to the Panels Section in the JSON Dashboard
- Embeding of the XML diagram, the Rules and the Targets
- """
- panels = [
- {
- "datasource": {
- "type": "prometheus",
- "uid": "${DS_PROMETHEUS}"
- },
- "flowchartsData": {
- "allowDrawio": True,
- "editorTheme": "kennedy",
- "editorUrl": "https://embed.diagrams.net/",
- "flowcharts": [
- {
- "center": True,
- "csv": "",
- "download": False,
- "enableAnim": True,
- "grid": False,
- "lock": True,
- "name": "Main",
- "scale": True,
- "tooltip": True,
- "type": "xml",
- "url": "http:///",
- "xml": xml,
- "zoom": "100%"
- }
- ]
- },
- "format": "short",
- "graphId": "flowchart_1",
- "gridPos": {
- "h": 20,
- "w": 17,
- "x": 0,
- "y": 0
- },
- "id": 1,
- "rulesData": {
- "rulesData": rulesData,
- },
- "targets": targetsList,
- "title": panelTitle,
- "type": "agenty-flowcharting-panel",
- "valueName": "current",
- "version": "1.0.0e"
- }
- ]
- return panels
-
-def gf_dashboard_datasource_target(rule_expr="promql_query",legend_format=None, refId="Query1"):
- """
- Dictionary containg information relevant to the Targets queried
- """
- target = {
- "datasource": {
- "type": "prometheus",
- "uid": "${DS_PROMETHEUS}"
- },
- "editorMode": "code",
- "expr": rule_expr,
- "instant": False,
- "legendFormat": legend_format,
- "range": True,
- "refId": refId,
- }
- return target
-
-def gf_dashboard_template(panels=None,dashboard_name="lab-telemetry"):
- """
- Dictionary containg information relevant to the Grafana Dashboard Root JSON object
- """
- dashboard = {
- "__inputs": [
- {
- "name": "DS_PROMETHEUS",
- "label": "Prometheus",
- "description": "Autogenerated by clab2grafana.py",
- "type": "datasource",
- "pluginId": "prometheus",
- "pluginName": "Prometheus"
- }
- ],
- "__elements": {},
- "__requires": [
- {
- "type": "panel",
- "id": "agenty-flowcharting-panel",
- "name": "FlowCharting",
- "version": "1.0.0e"
- },
- {
- "type": "grafana",
- "id": "grafana",
- "name": "Grafana",
- "version": "10.3.3"
- },
- {
- "type": "datasource",
- "id": "prometheus",
- "name": "Prometheus",
- "version": "1.0.0"
- }
- ],
- "annotations": {
- "list": [
- {
- "builtIn": 1,
- "datasource": {
- "type": "grafana",
- "uid": "-- Grafana --"
- },
- "enable": True,
- "hide": True,
- "iconColor": "rgba(0, 211, 255, 1)",
- "name": "Annotations & Alerts",
- "type": "dashboard"
- }
- ]
- },
- "editable": True,
- "fiscalYearStartMonth": 0,
- "graphTooltip": 0,
- "id": None,
- "links": [],
- "liveNow": False,
- "panels": panels,
- "refresh": "5s",
- "schemaVersion": 39,
- "tags": [],
- "templating": {
- "list": []
- },
- "time": {
- "from": "now-6h",
- "to": "now"
- },
- "timepicker": {},
- "timezone": "",
- "title": dashboard_name,
- "uid": "",
- "version": 1,
- "weekStart": ""
- }
- return dashboard
-
-
- # link_ids = add_nodes_and_links(diagram, nodes, positions, links, node_graphlevels, no_links=no_links, layout=layout, verbose=verbose, base_style=base_style, link_style=link_style, custom_styles=custom_styles, icon_to_group_mapping=icon_to_group_mapping)
-
- # # If output_file is not provided, generate it from input_file
- # if not output_file:
- # output_file = os.path.splitext(input_file)[0] + ".drawio"
- # gf_file= os.path.splitext(input_file)[0] + ".grafana.json"
-
- # output_folder = os.path.dirname(output_file) or "."
- # output_filename = os.path.basename(output_file)
- # output_gf_filename = os.path.basename(gf_file)
- # os.makedirs(output_folder, exist_ok=True)
-
- # diagram.dump_file(filename=output_filename, folder=output_folder)
- # print("Saved file to:", output_file)
- # create_grafana_dashboard(diagram,dashboard_filename=output_gf_filename,link_list=link_ids)
+ return styles
-def main(input_file, output_file, theme, include_unlinked_nodes=False, no_links=False, layout='vertical', verbose=False, gf_dashboard=False):
+def main(input_file, output_file, grafana, theme, include_unlinked_nodes=False, no_links=False, layout='vertical', verbose=False):
"""
Generates a diagram from a given topology definition file, organizing and displaying nodes and links.
@@ -1477,63 +675,143 @@ def main(input_file, output_file, theme, include_unlinked_nodes=False, no_links=
# Load styles
styles = load_styles_from_config(config_path)
- # Nodes remain the same
- nodes = containerlab_data['topology']['nodes']
+ diagram = CustomDrawioDiagram()
+ diagram.layout = layout
+
+ nodes_from_clab = containerlab_data['topology']['nodes']
+
+ nodes = {}
+ for node_name, node_data in nodes_from_clab.items():
+ node = Node(
+ name=node_name,
+ kind=node_data.get('kind', ''),
+ mgmt_ipv4=node_data.get('mgmt_ipv4', ''),
+ graph_level=node_data.get('labels', {}).get('graph-level', None),
+ graph_icon=node_data.get('labels', {}).get('graph-icon', None),
+ base_style=styles.get('base_style', ''),
+ custom_style=styles.get(node_data.get('kind', ''), ''),
+ pos_x=node_data.get('pos_x', ''),
+ pos_y=node_data.get('pos_y', ''),
+ width=styles.get('node_width', 75),
+ height=styles.get('node_height', 75),
+ group=node_data.get('group', '')
+ )
+ nodes[node_name] = node
+
+ diagram.nodes = nodes
# Prepare the links list by extracting source and target from each link's 'endpoints'
- links = []
+ links_from_clab = []
for link in containerlab_data['topology'].get('links', []):
endpoints = link.get('endpoints')
if endpoints:
source_node, source_intf = endpoints[0].split(":")
target_node, target_intf = endpoints[1].split(":")
# Add link only if both source and target nodes exist
- if source_node in nodes and target_node in nodes:
- links.append({'source': source_node, 'target': target_node, 'source_intf': source_intf, 'target_intf': target_intf})
+ if source_node in nodes_from_clab and target_node in nodes_from_clab:
+ links_from_clab.append({'source': source_node, 'target': target_node, 'source_intf': source_intf, 'target_intf': target_intf})
+ # Create Link instances and attach them to nodes
+ links = []
+ for link_data in links_from_clab:
+ source_node = nodes.get(link_data['source'])
+ target_node = nodes.get(link_data['target'])
+ if source_node and target_node:
+ # Create two links, one for downstream and one for upstream
+ downstream_link = Link(
+ source=source_node,
+ target=target_node,
+ source_intf=link_data.get('source_intf', ''),
+ target_intf=link_data.get('target_intf', ''),
+ base_style=styles.get('base_style', ''),
+ link_style=styles.get('link_style', ''),
+ src_label_style=styles.get('src_label_style', ''),
+ trgt_label_style=styles.get('trgt_label_style', ''),
+ entryY=link_data.get('entryY', 0),
+ exitY=link_data.get('exitY', 0),
+ entryX=link_data.get('entryX', 0),
+ exitX=link_data.get('exitX', 0),
+ direction='downstream' # Set the direction to downstream
+ )
+ upstream_link = Link(
+ source=target_node,
+ target=source_node,
+ source_intf=link_data.get('target_intf', ''),
+ target_intf=link_data.get('source_intf', ''),
+ base_style=styles.get('base_style', ''),
+ link_style=styles.get('link_style', ''),
+ src_label_style=styles.get('src_label_style', ''),
+ trgt_label_style=styles.get('trgt_label_style', ''),
+ entryY=link_data.get('entryY', 0),
+ exitY=link_data.get('exitY', 0),
+ entryX=link_data.get('entryX', 0),
+ exitX=link_data.get('exitX', 0),
+ direction='upstream' # Set the direction to upstream
+ )
+ links.append(downstream_link)
+ links.append(upstream_link)
+
+ # Add the links to the source and target nodes
+ source_node.add_link(downstream_link)
+ target_node.add_link(upstream_link)
+
if not include_unlinked_nodes:
- linked_nodes = set()
- for link in links:
- linked_nodes.add(link['source'])
- linked_nodes.add(link['target'])
- nodes = {node: info for node, info in nodes.items() if node in linked_nodes}
+ connected_nodes = {name: node for name, node in nodes.items() if node.links}
+ diagram.nodes = connected_nodes
+ nodes = diagram.nodes
+
+ assign_graphlevels(diagram, verbose=False)
+ calculate_positions(diagram, layout=layout, verbose=verbose)
- sorted_nodes, node_graphlevels, connections = assign_graphlevels(nodes, links, verbose=verbose)
- positions = calculate_positions(sorted_nodes, links, node_graphlevels, connections, layout=layout, verbose=verbose)
+ # Calculate the diagram size based on the positions of the nodes
+ min_x = min(node.pos_x for node in nodes.values())
+ min_y = min(node.pos_y for node in nodes.values())
+ max_x = max(node.pos_x for node in nodes.values())
+ max_y = max(node.pos_y for node in nodes.values())
- #Calculate the diagram size based on the positions of the nodes
- min_x = min(position[0] for position in positions.values())
- min_y = min(position[1] for position in positions.values())
- max_x = max(position[0] for position in positions.values())
- max_y = max(position[1] for position in positions.values())
+ # Determine the necessary adjustments
+ adjust_x = -min_x + 100 # Adjust so the minimum x is at least 100
+ adjust_y = -min_y + 100 # Adjust so the minimum y is at least 100
- max_size_x = max_x - min_x + 2 * 150
- max_size_y = max_y - min_y + 2 * 150
+ # Apply adjustments to each node's position
+ for node in nodes.values():
+ node.pos_x += adjust_x
+ node.pos_y += adjust_y
+
+ # Recalculate diagram size if necessary, after adjustment
+ max_x = max(node.pos_x for node in nodes.values())
+ max_y = max(node.pos_y for node in nodes.values())
+
+ max_size_x = max_x + 100 # Adding a margin to the right side
+ max_size_y = max_y + 100 # Adding a margin to the bottom
if styles['pagew'] == "auto":
styles['pagew'] = max_size_x
if styles['pageh'] == "auto":
styles['pageh'] = max_size_y
- # Adjust positions to ensure the smallest x and y are at least 0
- positions = {node: (x - min_x + 100, y - min_y + 100) for node, (x, y) in positions.items()}
-
- # Create a draw.io diagram instance
- diagram = CustomDrawioDiagram(styles=styles)
+ diagram.update_style(styles)
- # Add a diagram page
diagram.add_diagram("Network Topology")
- # Add nodes to the diagram
- add_nodes(diagram, nodes, positions, node_graphlevels, styles=styles)
-
- # Add links to the diagram
- add_links(diagram, links, positions, node_graphlevels, styles=styles, no_links=no_links, layout=layout, verbose=verbose)
- # Add connector nodes for each link
- if 'grafana' in theme.lower():
- connector_links = add_connector_nodes(diagram, nodes, links, positions, styles=styles, verbose=verbose)
- add_links_with_connectors(diagram, connector_links, link_style=styles['link_style'], verbose=verbose)
- gf_dashboard = True
+ add_nodes(diagram, diagram.nodes, styles)
+
+ if grafana:
+ add_ports(diagram, styles)
+ if not output_file:
+ grafana_output_file = os.path.splitext(input_file)[0] + ".grafana.json"
+ output_folder = os.path.dirname(grafana_output_file) or "."
+ output_filename = os.path.basename(grafana_output_file)
+ diagram.grafana_dashboard_file = grafana_output_file
+ os.makedirs(output_folder, exist_ok=True)
+ grafana = GrafanaDashboard(diagram)
+ grafana_json = grafana.create_dashboard()
+ # dump the json to the file
+ with open(grafana_output_file, 'w') as f:
+ f.write(grafana_json)
+ print("Saved file to:", grafana_output_file)
+ else:
+ add_links(diagram, styles)
# If output_file is not provided, generate it from input_file
if not output_file:
@@ -1546,21 +824,12 @@ def main(input_file, output_file, theme, include_unlinked_nodes=False, no_links=
diagram.dump_file(filename=output_filename, folder=output_folder)
print("Saved file to:", output_file)
- if gf_dashboard:
- output_gf_filename = os.path.basename(os.path.splitext(input_file)[0] + ".grafana.json")
- if verbose:
- print(connector_links)
- link_id_list = []
- for link in connector_links:
- link_id_list.append(f"link_id:{link['link_id']}")
- create_grafana_dashboard(diagram,dashboard_filename=output_gf_filename,link_list=link_id_list)
-
def parse_arguments():
parser = argparse.ArgumentParser(description='Generate a topology diagram from a containerlab YAML or draw.io XML file.')
parser.add_argument('-i', '--input', required=True, help='The filename of the input file (containerlab YAML for diagram generation).')
parser.add_argument('-o', '--output', required=False, help='The output file path for the generated diagram (draw.io format).')
- parser.add_argument('-g', '--gf_dashboard',default=False, required=False, help='Generate Grafana Dashboard Flag.')
+ parser.add_argument('-g', '--gf_dashboard', action='store_true', required=False, help='Generate Grafana Dashboard Flag.')
parser.add_argument('--include-unlinked-nodes', action='store_true', help='Include nodes without any links in the topology diagram')
parser.add_argument('--no-links', action='store_true', help='Do not draw links between nodes in the topology diagram')
parser.add_argument('--layout', type=str, default='vertical', choices=['vertical', 'horizontal'], help='Specify the layout of the topology diagram (vertical or horizontal)')
@@ -1573,6 +842,4 @@ def parse_arguments():
script_dir = os.path.dirname(__file__)
- main(args.input, args.output, args.theme, args.include_unlinked_nodes, args.no_links, args.layout, args.verbose, args.gf_dashboard)
-
-
+ main(args.input, args.output, args.gf_dashboard, args.theme, args.include_unlinked_nodes, args.no_links, args.layout, args.verbose)
\ No newline at end of file
diff --git a/clab2drawio2.py b/clab2drawio2.py
deleted file mode 100644
index b701883..0000000
--- a/clab2drawio2.py
+++ /dev/null
@@ -1,845 +0,0 @@
-#from N2G import drawio_diagram
-from lib.CustomDrawioDiagram import CustomDrawioDiagram
-from lib.Link import Link
-from lib.Node import Node
-from lib.Grafana import GrafanaDashboard
-from collections import defaultdict
-import yaml, argparse, os, re, random, math
-
-def add_ports(diagram, styles, verbose=True):
- nodes = diagram.nodes
-
- # Calculate and set port positions for all nodes
- for node in nodes.values():
- links = node.get_all_links()
-
- # Group links by their direction
- direction_groups = {}
- for link in links:
- direction = link.direction
- if direction not in direction_groups:
- direction_groups[direction] = []
- direction_groups[direction].append(link)
-
- for direction, group in direction_groups.items():
- if diagram.layout == 'vertical':
- if direction == 'downstream':
- # Sort downstream links by x position of source and target
- sorted_links = sorted(group, key=lambda link: (link.source.pos_x, link.target.pos_x))
- num_links = len(sorted_links)
- spacing = styles['node_width'] / (num_links + 1)
- for i, link in enumerate(sorted_links):
- port_x = node.pos_x + (i + 1) * spacing - styles['connector_width'] / 2
- port_y = node.pos_y + styles['node_height'] - styles['connector_height'] / 2
- link.port_pos = (port_x, port_y)
- elif direction == 'upstream':
- # Sort upstream links by x position of source and target
- sorted_links = sorted(group, key=lambda link: (link.source.pos_x, link.target.pos_x))
- num_links = len(sorted_links)
- spacing = styles['node_width'] / (num_links + 1)
- for i, link in enumerate(sorted_links):
- port_x = node.pos_x + (i + 1) * spacing - styles['connector_width'] / 2
- port_y = node.pos_y - styles['connector_height'] / 2
- link.port_pos = (port_x, port_y)
- else:
- # Sort lateral links by y position of source and target
- sorted_links = sorted(group, key=lambda link: (link.source.pos_y, link.target.pos_y))
- num_links = len(sorted_links)
- spacing = styles['node_height'] / (num_links + 1)
- for i, link in enumerate(sorted_links):
- if link.target.pos_x > link.source.pos_x:
- # Lateral link to the right
- port_x = node.pos_x + styles['node_width']
- else:
- # Lateral link to the left
- port_x = node.pos_x
- port_y = node.pos_y + (i + 1) * spacing
- link.port_pos = (port_x, port_y)
- elif diagram.layout == 'horizontal':
- if direction == 'downstream':
- # Sort downstream links by y position of source and target
- sorted_links = sorted(group, key=lambda link: (link.source.pos_y, link.target.pos_y))
- num_links = len(sorted_links)
- spacing = styles['node_height'] / (num_links + 1)
- for i, link in enumerate(sorted_links):
- port_x = node.pos_x + styles['node_width']
- port_y = node.pos_y + (i + 1) * spacing
- link.port_pos = (port_x, port_y)
- elif direction == 'upstream':
- # Sort upstream links by y position of source and target
- sorted_links = sorted(group, key=lambda link: (link.source.pos_y, link.target.pos_y))
- num_links = len(sorted_links)
- spacing = styles['node_height'] / (num_links + 1)
- for i, link in enumerate(sorted_links):
- port_x = node.pos_x
- port_y = node.pos_y + (i + 1) * spacing
- link.port_pos = (port_x, port_y)
- else:
- # Sort lateral links by x position of source and target
- sorted_links = sorted(group, key=lambda link: (link.source.pos_x, link.target.pos_x))
- num_links = len(sorted_links)
- spacing = styles['node_width'] / (num_links + 1)
- for i, link in enumerate(sorted_links):
- if link.target.pos_y > link.source.pos_y:
- # Lateral link to the bottom
- port_y = node.pos_y + styles['node_height']
- else:
- # Lateral link to the top
- port_y = node.pos_y
- port_x = node.pos_x + (i + 1) * spacing
- link.port_pos = (port_x, port_y)
-
- connector_dict = {}
- # Create connectors and links using the calculated port positions
- for node in nodes.values():
-
- downstream_links = node.get_downstream_links()
- lateral_links = node.get_lateral_links()
-
- links = downstream_links + lateral_links
-
-
- for link in links:
- # source connector
- source_cID = f"{link.source.name}:{link.source_intf}:{link.target.name}:{link.target_intf}"
- source_label = re.findall(r'\d+', link.source_intf)[-1]
- source_connector_pos = link.port_pos
- connector_width = styles['connector_width']
- connector_height = styles['connector_height']
-
- # Add the source connector ID to the source connector dictionary
- if link.source.name not in connector_dict:
- connector_dict[link.source.name] = []
- connector_dict[link.source.name].append(source_cID)
-
- # target connector
- target_cID = f"{link.target.name}:{link.target_intf}:{link.source.name}:{link.source_intf}"
- target_link = diagram.get_target_link(link)
- target_connector_pos = target_link.port_pos
- target_label = re.findall(r'\d+', target_link.source_intf)[-1]
-
- if link.target.name not in connector_dict:
- connector_dict[link.target.name] = []
- connector_dict[link.target.name].append(target_cID)
-
- # Adjust port positions if source and target have different numbers of links
- source_downstream_links = link.source.get_downstream_links()
- target_upstream_links = link.target.get_upstream_links()
- if diagram.layout == 'vertical':
- if link.source.pos_x == link.target.pos_x:
- if len(source_downstream_links) != len(target_upstream_links):
- if len(source_downstream_links) < len(target_upstream_links):
- # Adjust source port position to align with the corresponding target port
- adjusted_x = target_connector_pos[0]
- source_connector_pos = (adjusted_x, source_connector_pos[1])
- else:
- # Adjust target port position to align with the corresponding source port
- adjusted_x = source_connector_pos[0]
- target_connector_pos = (adjusted_x, target_connector_pos[1])
- elif diagram.layout == 'horizontal':
- if link.source.pos_y == link.target.pos_y:
- pass
- if len(source_downstream_links) != len(target_upstream_links):
- if len(source_downstream_links) < len(target_upstream_links):
- # Adjust source port position to align with the corresponding target port
- adjusted_y = target_connector_pos[1]
- source_connector_pos = (source_connector_pos[0], adjusted_y)
- else:
- # Adjust target port position to align with the corresponding source port
- adjusted_y = source_connector_pos[1]
- target_connector_pos = (target_connector_pos[0], adjusted_y)
-
-
- diagram.add_node(
- id=source_cID,
- label=source_label,
- x_pos=source_connector_pos[0],
- y_pos=source_connector_pos[1],
- width=connector_width,
- height=connector_height,
- style=styles['port_style']
- )
-
- diagram.add_node(
- id=target_cID,
- label=target_label,
- x_pos=target_connector_pos[0],
- y_pos=target_connector_pos[1],
- width=connector_width,
- height=connector_height,
- style=styles['port_style']
- )
-
- # Calculate center positions
- source_center = (source_connector_pos[0] + connector_width / 2, source_connector_pos[1] + connector_height / 2)
- target_center = (target_connector_pos[0] + connector_width / 2, target_connector_pos[1] + connector_height / 2)
-
- # Calculate the real middle between the centers for the midpoint connector
- midpoint_center_x = (source_center[0] + target_center[0]) / 2
- midpoint_center_y = (source_center[1] + target_center[1]) / 2
-
- # Generate a random offset within the range of ±10
- random_offset = random.choice([random.uniform(-20, -10), random.uniform(10, 20)])
-
- # Determine the direction of the link
- dx = target_center[0] - source_center[0]
- dy = target_center[1] - source_center[1]
-
- # Calculate the normalized direction vector for the line
- magnitude = (dx**2 + dy**2)**0.5
- if magnitude != 0:
- direction_dx = dx / magnitude
- direction_dy = dy / magnitude
- else:
- # If the magnitude is zero, the source and target are at the same position
- # In this case, we don't need to move the midpoint
- direction_dx = 0
- direction_dy = 0
-
- # Apply the offset
- midpoint_center_x += direction_dx * random_offset
- midpoint_center_y += direction_dy * random_offset
-
- midpoint_top_left_x = midpoint_center_x - 2
- midpoint_top_left_y = midpoint_center_y - 2
-
- # Create midpoint connector between source and target ports
- midpoint_id = f"mid:{link.source.name}:{link.source_intf}:{link.target.name}:{link.target_intf}"
- diagram.add_node(
- id=midpoint_id,
- label='\u200B',
- x_pos=midpoint_top_left_x,
- y_pos=midpoint_top_left_y,
- width=4,
- height=4,
- style=styles['connector_style']
- )
-
- diagram.add_link(source=source_cID, target=midpoint_id, style=styles["link_style"], label='rate', link_id=f"{source_cID}_src")
- diagram.add_link(source=target_cID, target=midpoint_id, style=styles["link_style"], label='rate', link_id=f"{target_cID}_trgt")
-
-
- # Create groups for each node and its connectors
- for node_name, connector_ids in connector_dict.items():
- group_id = f"group-{node_name}"
- member_objects = connector_ids + [node_name]
- print(member_objects)
- diagram.group_nodes(member_objects=member_objects, group_id=group_id, style='group')
-
-
-def add_links(diagram, styles):
- nodes = diagram.nodes
-
- for node in nodes.values():
- downstream_links = node.get_downstream_links()
- lateral_links = node.get_lateral_links()
-
- links = downstream_links + lateral_links
-
- # Group links by their target
- target_groups = {}
- for link in links:
- target = link.target
- if target not in target_groups:
- target_groups[target] = []
- target_groups[target].append(link)
-
- for target, group in target_groups.items():
- for i, link in enumerate(group):
- source_x, source_y = link.source.pos_x, link.source.pos_y
- target_x, target_y = link.target.pos_x, link.target.pos_y
-
- # Determine directionality
- left_to_right = source_x < target_x
- above_to_below = source_y < target_y
-
- # Calculate step for multiple links with the same target
- step = 0.5 if len(group) == 1 else 0.25 + 0.5 * (i / (len(group) - 1))
-
- if diagram.layout == 'horizontal':
- if link.level_diff > 0:
- entryX, exitX = (0, 1) if left_to_right else (1, 0)
- entryY = exitY = step
- else:
- if above_to_below:
- entryY, exitY = (0, 1)
- else:
- entryY, exitY = (1, 0)
- entryX = exitX = step
- elif diagram.layout == 'vertical':
- if link.level_diff > 0:
- entryY, exitY = (0, 1) if above_to_below else (1, 0)
- entryX = exitX = step
- # Same graph level
- else:
- if left_to_right:
- entryX, exitX = (0, 1)
- else:
- entryX, exitX = (1, 0)
- entryY = exitY = step
- style = f"{styles['link_style']}entryY={entryY};exitY={exitY};entryX={entryX};exitX={exitX};"
-
- diagram.add_link(source=link.source.name, target=link.target.name, src_label=link.source_intf, trgt_label=link.target_intf, src_label_style=styles['src_label_style'], trgt_label_style=styles['trgt_label_style'], style=style)
-
-
-def add_nodes(diagram, nodes, styles):
- base_style = styles['base_style']
- custom_styles = styles['custom_styles']
- icon_to_group_mapping = styles['icon_to_group_mapping']
-
- for node in nodes.values():
- # Check for 'graph_icon' attribute and map it to the corresponding group
- if node.graph_icon in icon_to_group_mapping:
- group = icon_to_group_mapping[node.graph_icon]
- else:
- # Determine the group based on the node's name if 'graph_icon' is not specified
- if "client" in node.name:
- group = "server"
- elif "leaf" in node.name:
- group = "leaf"
- elif "spine" in node.name:
- group = "spine"
- elif "dcgw" in node.name:
- group = "dcgw"
- else:
- group = "default" # Fallback to 'default' if none of the conditions are met
-
- style = custom_styles.get(group, base_style)
- x_pos, y_pos = node.pos_x, node.pos_y
- # Add each node to the diagram with the given x and y position.
- diagram.add_node(id=node.name, label=node.name, x_pos=x_pos, y_pos=y_pos, style=style, width=node.width, height=node.height)
-
-def adjust_intermediary_nodes(intermediaries, layout, verbose=False):
-
- if not intermediaries:
- return
-
- #group the intermediaries by their graph level
- intermediaries_by_level = defaultdict(list)
- for node in intermediaries:
- intermediaries_by_level[node.graph_level].append(node)
-
- selected_level = max(intermediaries_by_level.keys(), key=lambda lvl: len(intermediaries_by_level[lvl]))
- selected_group = intermediaries_by_level[selected_level]
-
- if len(selected_group) == 1:
- node = selected_group[0]
- if layout == 'vertical':
- node.pos_x = node.pos_x - 100
- else:
- node.pos_y = node.pos_y - 100
- else:
- for i, node in enumerate(selected_group):
- if layout == 'vertical':
- node.pos_x = node.pos_x - 100 + i * 200
- else:
- node.pos_y = node.pos_y - 100 + i * 200
-
- pass
-
-def center_align_nodes(nodes_by_graphlevel, layout='vertical', verbose=False):
- """
- Center align nodes within each graphlevel based on the layout layout and ensure
- they are nicely distributed to align with the graphlevel above.
- """
-
- attr_x, attr_y = ('pos_x', 'pos_y') if layout == 'vertical' else ('pos_y', 'pos_x')
-
- prev_graphlevel_center = None
- for graphlevel, nodes in sorted(nodes_by_graphlevel.items()):
- graphlevel_centers = [getattr(node, attr_x) for node in nodes]
-
- if prev_graphlevel_center is None:
- # For the first graphlevel, calculate its center and use it as the previous center for the next level
- prev_graphlevel_center = (min(graphlevel_centers) + max(graphlevel_centers)) / 2
- else:
- # Calculate current graphlevel's center
- graphlevel_center = sum(graphlevel_centers) / len(nodes)
-
- # Calculate offset to align current graphlevel's center with the previous graphlevel's center
- offset = prev_graphlevel_center - graphlevel_center
-
- # Apply offset to each node in the current graphlevel
- for node in nodes:
- setattr(node, attr_x, getattr(node, attr_x) + offset)
-
- # Update prev_graphlevel_center for the next level
- prev_graphlevel_center = sum(getattr(node, attr_x) for node in nodes) / len(nodes)
-
-def calculate_positions(diagram, layout='vertical', verbose=False):
- """
- Calculates and assigns positions to nodes for graph visualization based on their hierarchical levels and connectivity.
- Organizes nodes by graph level, applies prioritization within levels based on connectivity, and adjusts positions to enhance readability.
- Aligns and adjusts intermediary nodes to address alignment issues and improve visual clarity.
- """
-
- nodes = diagram.nodes
- nodes = sorted(nodes.values(), key=lambda node: (node.graph_level, node.name))
-
- x_start, y_start = 100, 100
- padding_x, padding_y = 200, 200
- min_margin = 200
-
- if verbose:
- print("Nodes before calculate_positions:", nodes)
-
- def prioritize_placement(nodes, level, verbose=False):
- if level == diagram.get_max_level():
- # If it's the maximum level, simply sort nodes by name
- ordered_nodes = sorted(nodes, key=lambda node: node.name)
- else:
- # Separate nodes by their connection count within the level
- multi_connection_nodes = [node for node in nodes if node.get_connection_count_within_level() > 1]
- single_connection_nodes = [node for node in nodes if node.get_connection_count_within_level() == 1]
- zero_connection_nodes = [node for node in nodes if node.get_connection_count_within_level() == 0]
-
- # Separate multi-connection nodes with lateral links
- multi_connection_nodes_with_lateral = []
- multi_connection_nodes_without_lateral = []
- for node in multi_connection_nodes:
- if any(link.target in multi_connection_nodes for link in node.get_lateral_links()):
- multi_connection_nodes_with_lateral.append(node)
- else:
- multi_connection_nodes_without_lateral.append(node)
-
- # Sort multi-connection nodes with lateral links wisely
- sorted_multi_connection_nodes_with_lateral = []
- while multi_connection_nodes_with_lateral:
- node = multi_connection_nodes_with_lateral.pop(0)
- sorted_multi_connection_nodes_with_lateral.append(node)
- for link in node.get_lateral_links():
- if link.target in multi_connection_nodes_with_lateral:
- multi_connection_nodes_with_lateral.remove(link.target)
- sorted_multi_connection_nodes_with_lateral.append(link.target)
-
- # sort by name
- multi_connection_nodes_without_lateral = sorted(multi_connection_nodes_without_lateral, key=lambda node: node.name)
- sorted_multi_connection_nodes_with_lateral = sorted(sorted_multi_connection_nodes_with_lateral, key=lambda node: node.name)
- single_connection_nodes = sorted(single_connection_nodes, key=lambda node: node.name)
-
-
- # Merge single, multi-connection (with and without lateral), and zero-connection nodes
- ordered_nodes = single_connection_nodes[:len(single_connection_nodes)//2] + \
- multi_connection_nodes_without_lateral + \
- sorted_multi_connection_nodes_with_lateral + \
- single_connection_nodes[len(single_connection_nodes)//2:] + \
- zero_connection_nodes
-
- return ordered_nodes
-
- # Organize nodes by graphlevel and order within each graphlevel
- nodes_by_graphlevel = defaultdict(list)
- for node in nodes:
- nodes_by_graphlevel[node.graph_level].append(node)
-
- for graphlevel, graphlevel_nodes in nodes_by_graphlevel.items():
- ordered_nodes = prioritize_placement(graphlevel_nodes, graphlevel, verbose=verbose)
-
- for i, node in enumerate(ordered_nodes):
- if layout == 'vertical':
- node.pos_x = x_start + i * padding_x
- node.pos_y = y_start + graphlevel * padding_y
- else:
- node.pos_x = x_start + graphlevel * padding_x
- node.pos_y = y_start + i * padding_y
-
- center_align_nodes(nodes_by_graphlevel, layout=layout, verbose=verbose)
-
- intermediaries_x, intermediaries_y = diagram.get_nodes_between_interconnected()
-
- if diagram.layout == "vertical":
- adjust_intermediary_nodes(intermediaries_x, layout=diagram.layout, verbose=verbose)
- else:
- adjust_intermediary_nodes(intermediaries_y, layout=diagram.layout, verbose=verbose)
-
-
-
-
-def adjust_node_levels(diagram):
- used_levels = diagram.get_used_levels()
- max_level = diagram.get_max_level()
- min_level = diagram.get_min_level()
- print(f"Initial used levels: {used_levels}")
- if len(used_levels) <= 1:
- print("Only one level present, no adjustment needed.")
- return # Only one level present, no adjustment needed
-
- current_level = min_level
- while current_level < max_level + 1:
- #if level is the first used level or the last used level, skip it
- if current_level == min_level:
- print(f"Skip Level: {current_level} because it is the first or last level")
- current_level += 1
- continue
-
- nodes_at_current_level = diagram.get_nodes_by_level(current_level)
- nodes_at_next_level = diagram.get_nodes_by_level(current_level + 1)
- print(f"Processing level {current_level}:")
- print(f"Nodes at current level: {{current_level}} {[node.name for node in nodes_at_current_level.values()]}")
- next_level = current_level + 1
- before_level = current_level - 1
- nodes_to_move = []
- # if nodes_at_next_level:
-
- if len(nodes_at_current_level.items()) == 1:
- print(f"Only one node found at level {current_level}. No adjustment needed.")
- current_level += 1
- continue
- for node_name , node in nodes_at_current_level.items():
- has_upstream_connection = any(node.get_upstream_links_towards_level(before_level))
-
-
- if not has_upstream_connection:
- nodes_to_move.append(node)
- else:
- print(f"Node {node_name} has {len(node.get_upstream_links_towards_level(before_level))} upstream links against Level {before_level} No adjustment needed.")
-
- if (len(nodes_to_move) == len(nodes_at_current_level) ):
- print(f"Nothing to move here")
- current_level += 1
- continue
- else:
- for node in nodes_to_move:
- print(f"!Node {node.name} does not have an upstream connection to level {before_level}. Marked for movement.")
-
-
- if nodes_to_move:
- print(f"Because we need to move, we are increasing all node_graphlevels from the next Levels Nodes by one level")
-
- for level in range(max_level, current_level, -1):
- nodes_at_level = diagram.get_nodes_by_level(level)
- for node in nodes_at_level.values():
- node.graph_level += 1
- print(f" Moving node {node.name} from level {level} to level {level + 1}.")
-
- # Move the nodes marked for movement to the next level
- for node in nodes_to_move:
- node.graph_level += 1
- print(f" Moving node {node.name} from level {current_level} to level {next_level}")
-
- print(f"Moved nodes at level {current_level} to level {next_level}.")
- update_links(diagram.get_links_from_nodes())
- max_level = diagram.get_max_level()
-
- max_level = diagram.get_max_level()
- current_level += 1
-
- # Check all levels starting from the last level
- for level in range(max_level, min_level - 1, -1):
- nodes_at_level = diagram.get_nodes_by_level(level)
- for node in nodes_at_level.values():
- upstream_links = node.get_upstream_links()
- can_move = True
- for link in upstream_links:
- level_diff = node.graph_level - link.target.graph_level
- if level_diff == 1:
- can_move = False
- break # Stop checking if any upstream link has a level difference of 1
-
- if can_move:
- for link in upstream_links:
- level_diff = node.graph_level - link.target.graph_level
- if level_diff > 1:
- node.graph_level -= 1
- print(f" Moving node {node.name} from level {level} to level {level - 1} due to upstream link with level difference > 1")
- update_links(diagram.get_links_from_nodes())
- max_level = diagram.get_max_level()
- break # Stop moving the node after adjusting its level once
-
-def update_links(links):
- for link in links:
- source_level = link.source.graph_level
- target_level = link.target.graph_level
- link.level_diff = target_level - source_level
- if link.level_diff > 0:
- link.direction = 'downstream'
- elif link.level_diff < 0:
- link.direction = 'upstream'
- else:
- link.direction = 'lateral'
-
-def assign_graphlevels(diagram, verbose=False):
- """
- Assigns hierarchical graph levels to nodes based on connections or optional labels
- Returns a sorted list of nodes and their graph levels.
- """
- nodes = diagram.get_nodes()
-
- # Check if all nodes already have a graphlevel != -1
- if all(node.graph_level != -1 for node in nodes.values()):
- already_set = True
- else:
- already_set = False
-
- # Helper function to assign graphlevel by recursively checking connections
- def set_graphlevel(node, current_graphlevel, visited=None):
- if visited is None:
- visited = set()
- if node.name in visited:
- return
- visited.add(node.name)
-
- if node.graph_level < current_graphlevel:
- node.graph_level = current_graphlevel
- for link in node.get_downstream_links():
- target_node = nodes[link.target.name]
- set_graphlevel(target_node, current_graphlevel + 1, visited)
-
- # Start by setting the graphlevel to -1 if they don't already have a graphlevel
- for node in nodes.values():
- if node.graph_level != -1:
- continue
- # Setting the graphlevel of nodes with no upstream connections
- elif not node.get_upstream_links():
- set_graphlevel(node, 0)
- else:
- set_graphlevel(node, node.graph_level)
-
- # Update the links of each node
- for node in nodes.values():
- node.update_links()
-
- if not already_set:
- adjust_node_levels(diagram)
- for node in nodes.values():
- node.update_links()
-
- sorted_nodes = sorted(nodes.values(), key=lambda node: (node.graph_level, node.name))
- return sorted_nodes
-
-
-def load_styles_from_config(config_path):
- try:
- with open(config_path, 'r') as file:
- config = yaml.safe_load(file)
- except FileNotFoundError:
- error_message = f"Error: The specified config file '{config_path}' does not exist."
- print(error_message)
- exit()
- except Exception as e:
- error_message = f"An error occurred while loading the config: {e}"
- print(error_message)
- exit()
-
- # Initialize the styles dictionary with defaults and override with config values
- styles = {
- 'base_style': config.get('base_style', ''),
- 'link_style': config.get('link_style', ''),
- 'src_label_style': config.get('src_label_style', ''),
- 'trgt_label_style': config.get('trgt_label_style', ''),
- 'port_style': config.get('port_style', ''),
- 'connector_style': config.get('connector_style', ''),
- 'background': config.get('background', "#FFFFFF"),
- 'shadow': config.get('shadow', "1"),
- 'pagew': config.get('pagew', "827"),
- 'pageh': config.get('pageh', "1169"),
- 'grid': config.get('grid', "1"),
- 'custom_styles': {key: config.get('base_style', '') + value for key, value in config.get('custom_styles', {}).items()},
- 'icon_to_group_mapping': config.get('icon_to_group_mapping', {}),
- }
-
- # Read all other configuration values
- for key, value in config.items():
- if key not in styles:
- styles[key] = value
-
- return styles
-
-def main(input_file, output_file, grafana, theme, include_unlinked_nodes=False, no_links=False, layout='vertical', verbose=False):
- """
- Generates a diagram from a given topology definition file, organizing and displaying nodes and links.
-
- Processes an input YAML file containing node and link definitions, extracts relevant information,
- and applies logic to determine node positions and connectivity. The function supports filtering out unlinked nodes,
- optionally excluding links, choosing the layout orientation, and toggling verbose output for detailed processing logs.
- """
- try:
- with open(input_file, 'r') as file:
- containerlab_data = yaml.safe_load(file)
- except FileNotFoundError:
- error_message = f"Error: The specified clab file '{input_file}' does not exist."
- print(error_message)
- exit()
- except Exception as e:
- error_message = f"An error occurred while loading the config: {e}"
- print(error_message)
- exit()
-
- if 'grafana' in theme.lower():
- no_links = True
-
- if theme in ['nokia_bright', 'nokia_dark', 'grafana_dark']:
- config_path = os.path.join(script_dir, f'styles/{theme}.yaml')
- else:
- # Assume the user has provided a custom path
- config_path = theme
-
- # Load styles
- styles = load_styles_from_config(config_path)
-
- diagram = CustomDrawioDiagram()
- diagram.layout = layout
-
- nodes_from_clab = containerlab_data['topology']['nodes']
-
- nodes = {}
- for node_name, node_data in nodes_from_clab.items():
- node = Node(
- name=node_name,
- kind=node_data.get('kind', ''),
- mgmt_ipv4=node_data.get('mgmt_ipv4', ''),
- graph_level=node_data.get('labels', {}).get('graph-level', None),
- graph_icon=node_data.get('labels', {}).get('graph-icon', None),
- base_style=styles.get('base_style', ''),
- custom_style=styles.get(node_data.get('kind', ''), ''),
- pos_x=node_data.get('pos_x', ''),
- pos_y=node_data.get('pos_y', ''),
- width=styles.get('node_width', 75),
- height=styles.get('node_height', 75),
- group=node_data.get('group', '')
- )
- nodes[node_name] = node
-
- diagram.nodes = nodes
-
- # Prepare the links list by extracting source and target from each link's 'endpoints'
- links_from_clab = []
- for link in containerlab_data['topology'].get('links', []):
- endpoints = link.get('endpoints')
- if endpoints:
- source_node, source_intf = endpoints[0].split(":")
- target_node, target_intf = endpoints[1].split(":")
- # Add link only if both source and target nodes exist
- if source_node in nodes_from_clab and target_node in nodes_from_clab:
- links_from_clab.append({'source': source_node, 'target': target_node, 'source_intf': source_intf, 'target_intf': target_intf})
-
- # Create Link instances and attach them to nodes
- links = []
- for link_data in links_from_clab:
- source_node = nodes.get(link_data['source'])
- target_node = nodes.get(link_data['target'])
- if source_node and target_node:
- # Create two links, one for downstream and one for upstream
- downstream_link = Link(
- source=source_node,
- target=target_node,
- source_intf=link_data.get('source_intf', ''),
- target_intf=link_data.get('target_intf', ''),
- base_style=styles.get('base_style', ''),
- link_style=styles.get('link_style', ''),
- src_label_style=styles.get('src_label_style', ''),
- trgt_label_style=styles.get('trgt_label_style', ''),
- entryY=link_data.get('entryY', 0),
- exitY=link_data.get('exitY', 0),
- entryX=link_data.get('entryX', 0),
- exitX=link_data.get('exitX', 0),
- direction='downstream' # Set the direction to downstream
- )
- upstream_link = Link(
- source=target_node,
- target=source_node,
- source_intf=link_data.get('target_intf', ''),
- target_intf=link_data.get('source_intf', ''),
- base_style=styles.get('base_style', ''),
- link_style=styles.get('link_style', ''),
- src_label_style=styles.get('src_label_style', ''),
- trgt_label_style=styles.get('trgt_label_style', ''),
- entryY=link_data.get('entryY', 0),
- exitY=link_data.get('exitY', 0),
- entryX=link_data.get('entryX', 0),
- exitX=link_data.get('exitX', 0),
- direction='upstream' # Set the direction to upstream
- )
- links.append(downstream_link)
- links.append(upstream_link)
-
- # Add the links to the source and target nodes
- source_node.add_link(downstream_link)
- target_node.add_link(upstream_link)
-
- if not include_unlinked_nodes:
- connected_nodes = {name: node for name, node in nodes.items() if node.links}
- diagram.nodes = connected_nodes
- nodes = diagram.nodes
-
- assign_graphlevels(diagram, verbose=False)
- calculate_positions(diagram, layout=layout, verbose=verbose)
-
- # Calculate the diagram size based on the positions of the nodes
- min_x = min(node.pos_x for node in nodes.values())
- min_y = min(node.pos_y for node in nodes.values())
- max_x = max(node.pos_x for node in nodes.values())
- max_y = max(node.pos_y for node in nodes.values())
-
- # Determine the necessary adjustments
- adjust_x = -min_x + 100 # Adjust so the minimum x is at least 100
- adjust_y = -min_y + 100 # Adjust so the minimum y is at least 100
-
- # Apply adjustments to each node's position
- for node in nodes.values():
- node.pos_x += adjust_x
- node.pos_y += adjust_y
-
- # Recalculate diagram size if necessary, after adjustment
- max_x = max(node.pos_x for node in nodes.values())
- max_y = max(node.pos_y for node in nodes.values())
-
- max_size_x = max_x + 100 # Adding a margin to the right side
- max_size_y = max_y + 100 # Adding a margin to the bottom
-
- if styles['pagew'] == "auto":
- styles['pagew'] = max_size_x
- if styles['pageh'] == "auto":
- styles['pageh'] = max_size_y
-
- diagram.update_style(styles)
-
- diagram.add_diagram("Network Topology")
-
- add_nodes(diagram, diagram.nodes, styles)
-
- if grafana:
- add_ports(diagram, styles)
- if not output_file:
- output_file = os.path.splitext(input_file)[0] + ".grafana.json"
- output_folder = os.path.dirname(output_file) or "."
- output_filename = os.path.basename(output_file)
- diagram.grafana_dashboard_file = output_filename
- os.makedirs(output_folder, exist_ok=True)
- grafana = GrafanaDashboard(diagram)
- grafana.create_dashboard()
- else:
- add_links(diagram, styles)
-
- # If output_file is not provided, generate it from input_file
- if not output_file:
- output_file = os.path.splitext(input_file)[0] + ".drawio"
-
- output_folder = os.path.dirname(output_file) or "."
- output_filename = os.path.basename(output_file)
- os.makedirs(output_folder, exist_ok=True)
-
- diagram.dump_file(filename=output_filename, folder=output_folder)
-
- print("Saved file to:", output_file)
-
-def parse_arguments():
- parser = argparse.ArgumentParser(description='Generate a topology diagram from a containerlab YAML or draw.io XML file.')
- parser.add_argument('-i', '--input', required=True, help='The filename of the input file (containerlab YAML for diagram generation).')
- parser.add_argument('-o', '--output', required=False, help='The output file path for the generated diagram (draw.io format).')
- parser.add_argument('-g', '--gf_dashboard', action='store_true', required=False, help='Generate Grafana Dashboard Flag.')
- parser.add_argument('--include-unlinked-nodes', action='store_true', help='Include nodes without any links in the topology diagram')
- parser.add_argument('--no-links', action='store_true', help='Do not draw links between nodes in the topology diagram')
- parser.add_argument('--layout', type=str, default='vertical', choices=['vertical', 'horizontal'], help='Specify the layout of the topology diagram (vertical or horizontal)')
- parser.add_argument('--theme', default='nokia_bright', help='Specify the theme for the diagram (nokia_bright, nokia_dark, grafana_dark) or the path to a custom style config file.')
- parser.add_argument('--verbose', action='store_true', help='Enable verbose output for debugging purposes')
- return parser.parse_args()
-
-if __name__ == "__main__":
- args = parse_arguments()
-
- script_dir = os.path.dirname(__file__)
-
- main(args.input, args.output, args.gf_dashboard, args.theme, args.include_unlinked_nodes, args.no_links, args.layout, args.verbose)
-
-
diff --git a/lib/Grafana.py b/lib/Grafana.py
index 521b6e3..e18c1b7 100644
--- a/lib/Grafana.py
+++ b/lib/Grafana.py
@@ -18,15 +18,15 @@ def create_dashboard(self):
# Legend format needs to match the format expected by the metric
panelQueryList = {
"IngressTraffic": {
- "rule_expr": "gnmic_in_bps",
+ "rule_expr": "interface_traffic_rate_in_bps",
"legend_format": '{{source}}:{{interface_name}}:in',
},
"EgressTraffic": {
- "rule_expr": "gnmic_out_bps",
+ "rule_expr": "interface_traffic_rate_out_bps",
"legend_format": '{{source}}:{{interface_name}}:out',
},
"ItfOperState": {
- "rule_expr": "gnmic_oper_state",
+ "rule_expr": "interface_oper_state",
"legend_format": 'oper_state:{{source}}:{{interface_name}}',
},
}
@@ -47,11 +47,11 @@ def create_dashboard(self):
for link in self.links:
link_id = f"{link.source.name}:{link.source_intf}:{link.target.name}:{link.target_intf}"
- # Traffic in
+ # Traffic out
rulesData.append(
self.gf_flowchart_rule_traffic(
- ruleName=f"{link.source.name}:{link.source_intf}:in",
- metric=f"{link.source.name}:{link.source_intf}:in",
+ ruleName=f"{link.source.name}:{link.source_intf}:out",
+ metric=f"{link.source.name}:{link.source_intf}:out",
link_id=link_id,
order=i,
)
@@ -86,9 +86,7 @@ def create_dashboard(self):
),
indent=4,
)
- with open(self.dashboard_filename, 'w') as f:
- f.write(dashboard_json)
- print("Saved Grafana dashboard file to:", self.dashboard_filename)
+ return dashboard_json
def gf_dashboard_datasource_target(self, rule_expr="promql_query", legend_format=None, refId="Query1"):
"""