Source code for ucca.normalization

from ucca import layer0, layer1
from ucca.layer0 import NodeTags as L0Tags
from ucca.layer1 import EdgeTags as ETags, NodeTags as L1Tags

NO_MULTIPLE_INCOMING_CATEGORIES = {ETags.Function, ETags.ParallelScene, ETags.Linker, ETags.LinkRelation,
                                   ETags.Connector, ETags.Punctuation, ETags.Terminal}
TOP_CATEGORIES = {ETags.ParallelScene, ETags.Linker, ETags.Function, ETags.Ground, ETags.Punctuation,
                  ETags.LinkRelation, ETags.LinkArgument, ETags.Connector}
COORDINATED_MAIN_REL = "Coordinated_Main_Rel."

[docs]def traverse_up_centers(node): while True: found_center = False for edge in node.incoming: if not edge.attrib.get("remote") and layer1.EdgeTags.Center in edge.tags: node = edge.parent found_center = True continue if not found_center: return node
[docs]def fparent(node_or_edge): try: return node_or_edge.fparent except AttributeError: try: return node_or_edge.parent except AttributeError: return node_or_edge.parents[0] if node_or_edge.parents else None
[docs]def remove_unmarked_implicits(node): while node is not None and not node.children and not node.attrib.get("implicit"): parent = fparent(node) if parent is None: break node.destroy() node = parent
[docs]def remove(parent, child): if parent is not None: parent.remove(child) remove_unmarked_implicits(parent)
[docs]def destroy(node_or_edge): parent = fparent(node_or_edge) try: node_or_edge.destroy() except AttributeError: parent.remove(node_or_edge) if parent is not None: remove_unmarked_implicits(parent) return parent
[docs]def copy_edge(edge, parent=None, child=None, tag=None, attrib=None): if parent is None: parent = edge.parent if child is None: child = edge.child if not tag: categories = [(c.tag, c.slot, c.layer, c.parent) for c in edge.categories] else: categories = [(tag,)] if attrib is None: attrib = edge.attrib if parent in child.iter(): # raise ValueError("Created cycle (%s->%s) when trying to normalize '%s'" % ( # "->".join(n.ID for n in child.iter() if parent in n.iter()), child.ID, parent)) return False parent.add_multiple(categories, child, edge_attrib=attrib) return True
[docs]def replace_center(edge): if len(edge.parent) == 1 and not edge.parent.parents: return ETags.ParallelScene if edge.parent.participants and not edge.parent.is_scene(): return ETags.Process # TODO should be state if the word is a copula return edge.tag
[docs]def replace_edge_tags(node): for edge in node: if not edge.attrib.get("remote") and edge.tag == ETags.Center: edge.tag = replace_center(edge) elif node.parallel_scenes: if edge.tag == ETags.Connector: edge.tag = ETags.Linker elif edge.tag == ETags.Linker: edge.tag = ETags.Connector elif node.is_scene(): if edge.tag == ETags.Elaborator: edge.tag = ETags.Adverbial elif edge.tag == ETags.Adverbial: edge.tag = ETags.Elaborator
[docs]def move_elements(node, tags, parent_tags, forward=True): for edge in node: if edge.child.tag == L1Tags.Foundational and edge.tag in ((tags,) if isinstance(tags, str) else tags): try: parent_edge = min((e for e in node if e != edge and e.child.tag == L1Tags.Foundational), key=lambda e: abs(((edge.child.start_position - e.child.end_position), (e.child.start_position - edge.child.end_position))[forward])) except ValueError: continue if parent_edge.tag in ((parent_tags,) if isinstance(parent_tags, str) else parent_tags): parent = parent_edge.child if copy_edge(edge, parent=parent): remove(node, edge)
[docs]def move_scene_elements(node): if node.parallel_scenes: move_elements(node, tags=(ETags.Relator, ETags.Elaborator, ETags.Center), parent_tags=ETags.ParallelScene)
[docs]def move_sub_scene_elements(node): if node.is_scene(): move_elements(node, tags=(ETags.Elaborator, ETags.Center), parent_tags=ETags.Participant, forward=False)
[docs]def separate_scenes(node, l1, top_level=False): if (node.is_scene() or node.participants) and (top_level or node.parallel_scenes): edges = list(node) scene = l1.add_fnode(node, ETags.ParallelScene) for edge in edges: if edge.tag not in (ETags.ParallelScene, ETags.Punctuation, ETags.Linker, ETags.Ground): if copy_edge(edge, parent=scene): remove(node, edge)
[docs]def lowest_common_ancestor(*nodes): parents = [nodes[0]] if nodes else [] while parents: for parent in parents: if parent.tag == L1Tags.Foundational and (not parent.terminals or nodes[1:]) \ and all(n in parent.iter() for n in nodes[1:]): return parent parents = [p for n in parents for p in n.parents] return None
[docs]def nearest_word(l0, position, step): while True: position += step try: terminal = l0.by_position(position) except IndexError: return None if terminal.tag == L0Tags.Word: return terminal
[docs]def nearest_parent(l0, *terminals): return lowest_common_ancestor(*filter(None, (nearest_word(l0, terminals[0].position, -1), nearest_word(l0, terminals[-1].position, 1))))
[docs]def reattach_punct(l0, l1): detach_punct(l1) attach_punct(l0, l1)
[docs]def attach_punct(l0, l1): for terminal in l0.all: if layer0.is_punct(terminal) and not terminal.incoming: l1.add_punct(nearest_parent(l0, terminal), terminal)
[docs]def detach_punct(l1): for node in l1.all: if node.tag == L1Tags.Punctuation: destroy(node)
[docs]def reattach_terminals(l0, l1): attach_terminals(l0, l1) for terminal in l0.all: for edge in terminal.incoming: if any(e.tag != ETags.Terminal for e in edge.parent): node = l1.add_fnode(edge.parent, ETags.Center) if copy_edge(edge, parent=node): remove(edge.parent, edge)
[docs]def attach_terminals(l0, l1): for terminal in l0.all: if not terminal.incoming: node = l1.add_fnode(nearest_parent(l0, terminal), ETags.Function) node.add(ETags.Terminal, terminal)
[docs]def flatten_centers(node): """ Whenever there are Cs inside Cs, remove the external C. Whenever there is a C as an only child, remove it. """ if node.tag == L1Tags.Foundational and len(node.centers) == 1: if node.ftag == ETags.Center and len(fparent(node).centers) == 1: # Center inside center for edge in node.incoming: if edge.attrib.get("remote"): copy_edge(edge, child=node.centers[0]) for edge in node: copy_edge(edge, parent=fparent(node)) return destroy(node) elif len(node.children) == 1: # Center as only child for edge in node.incoming: attrib = edge.attrib if node.outgoing[0].attrib.get("remote"): attrib["remote"] = True copy_edge(edge, child=node.centers[0], attrib=attrib) return destroy(node) return node
[docs]def flatten_functions(node): """ Whenever there is an F as an only child, remove it. If an F has non-terminal children, move them up. """ if node.tag == L1Tags.Foundational and node.incoming: # Avoid creating root->terminal edge for child in node.functions: if len(child.children) > len(child.terminals): for edge in child: copy_edge(edge, parent=node, tag=ETags.Function if edge.tag == ETags.Center else edge.tag) destroy(child) if len(node.functions) == len(node.children) == 1: for edge in node.incoming: copy_edge(edge, child=node.functions[0]) return destroy(node) return node
[docs]def flatten_participants(node): """ Whenever there is an A as an only child, remove it. If there is an implicit A in a scene without a main relation, remove it. """ if node.tag == L1Tags.Foundational: participants = node.participants if len(participants) == len(node.children) == 1 and len(participants[0].ftags) == 1: for edge in node.incoming: copy_edge(edge, child=participants[0]) return destroy(node) elif participants and not node.is_scene(): for child in participants: if child.attrib.get("implicit"): destroy(child) return node
[docs]def split_coordinated_main_rel(node, l1): for edge in node: attrib = edge.attrib.copy() if attrib.pop(COORDINATED_MAIN_REL, None): assert {ETags.Process, ETags.State}.intersection(edge.tags), \ "%s node without main relation: %s" % (COORDINATED_MAIN_REL, node) main_rel = edge.child centers = main_rel.centers assert centers, "%s node without centers: %s" % (COORDINATED_MAIN_REL, main_rel) main_rel_non_centers = [e for e in main_rel.outgoing if ETags.Center not in e.tags] main_rel_incoming = list(main_rel.incoming) main_rel.destroy() top = fparent(node) if ETags.ParallelScene in node.ftags: top.remove(node) else: top = node outgoing = list(node.outgoing) scenes = [] for center in centers: new_scene = l1.add_fnode(top, ETags.ParallelScene) copy_edge(edge, parent=new_scene, child=center, attrib=attrib) for scene_edge in outgoing: if scene_edge.ID != edge.ID and not ( scenes and NO_MULTIPLE_INCOMING_CATEGORIES.intersection(scene_edge.tags)): # Not the CMR edge itself, and not a category that does not allow multiple parents copy_edge(scene_edge, parent=new_scene, attrib={"remote": True} if scenes else None) scenes.append(new_scene) for main_rel_edge in main_rel_non_centers: tags = main_rel_edge.tags copy_edge(main_rel_edge, parent=top if TOP_CATEGORIES.issuperset(tags) else scenes[0], tag=ETags.Linker if ETags.Connector in main_rel_edge.tags else None) for scene_edge in outgoing: if scene_edge.ID != edge.ID: destroy(scene_edge) for remote_edge in main_rel_incoming: if remote_edge.attrib.get("remote"): copy_edge(remote_edge, child=centers[0]) if not node.incoming: node.destroy() return node
[docs]def normalize_node(node, l1, extra): if node.tag == L1Tags.Foundational: if extra: replace_edge_tags(node) move_scene_elements(node) move_sub_scene_elements(node) separate_scenes(node, l1, top_level=node in l1.heads) node = split_coordinated_main_rel(node, l1) if node is None: return None node = flatten_centers(node) if node is None: return node = flatten_functions(node) if node is None: return flatten_participants(node)
[docs]def normalize(passage, extra=False): l0 = passage.layer(layer0.LAYER_ID) l1 = passage.layer(layer1.LAYER_ID) reattach_punct(l0, l1) heads = list(l1.heads) stack = [heads] visited = set() path = [] path_set = set() while stack: for edge in stack[-1]: try: node = edge.child except AttributeError: node = edge if node in path_set: destroy(edge) elif node not in visited: visited.add(node) path.append(node) path_set.add(node) stack.append(node) normalize_node(node, l1, extra) break else: if path: path_set.remove(path.pop()) stack.pop() reattach_punct(l0, l1) if extra: reattach_terminals(l0, l1)