Zeyuan (Faradawn) Yang

R-Tree

R-Tree in Python

from __future__ import annotations

from functools import cached_property
from typing import Iterable

from shapely.geometry.base import BaseGeometry


class Node:
    def __init__(self, min_degree: int, parent: InternalNode | None = None):
        self.min_degree = min_degree
        self.parent = parent
        self.bbox = BaseGeometry()

    def update_ancestors(self):
        """Update the bounding boxes of each ancestor to reflect changes to self.bbox"""
        if(self.parent == None or self.parent == self):
            return
        self.parent.bbox = self.parent.bbox.union(self.bbox).envelope
        self.parent.update_ancestors()

    @property
    def max_degree(self) -> int:
        return 2 * self.min_degree

    @property
    def is_leaf(self) -> bool:
        raise NotImplementedError

    @property
    def is_full(self) -> bool:
        raise NotImplementedError

    def split(self, payload):
        raise NotImplementedError


class LeafNode(Node):
    """A leaf node stores some geometries."""

    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.geometries: list[BaseGeometry] = []

    @cached_property
    def is_leaf(self) -> bool:
        return True

    @property
    def is_full(self) -> bool:
        return len(self.geometries) >= self.max_degree

    def __repr__(self) -> str:
        return f"{self.__class__.__name__}({', '.join(str(geom) for geom in self.geometries)})"

    def add(self, payload: BaseGeometry):
        """Adds the given geometry to this leaf node."""
        if self.is_full:
            self.split(payload)
        else:
            self.geometries.append(payload)
        self.bbox = self.bbox.union(payload).envelope
        self.update_ancestors()

    def split(self, payload: BaseGeometry):
        """Splits this leaf node into two new leaf nodes."""
        p = self.parent
        l = self.geometries
        print(len(l))
        if self.parent != None and self.parent != self:
            print("appending")
            l.append(payload)
        min_dist = -1.0
        i1 = 0
        i2 = 0
        for i in range(len(l)):
            for j in range(i + 1, len(l)):
                dist = l[i].distance(l[j])
                if(min_dist < -0.5 or dist < min_dist):
                    print(i, j)
                    min_dist = dist
                    i1 = i
                    i2 = j
        print(l[i1], l[i2], i1, i2)
        l1 = [l[i1]]
        l2 = [l[i2]]
        print("BBBBBBBB")
        print(l[i1], l[i2])
        l.pop(i2)
        l.pop(i1)

        p.remove(self)
        node1 = LeafNode(min_degree = self.min_degree)
        node2 = LeafNode(min_degree = self.min_degree)
        node1.geometries = l1
        node2.geometries = l2
        node1.bbox = l1[0].envelope
        node2.bbox = l2[0].envelope
        node1.parent = p
        node2.parent = p
        while(len(l1) < self.min_degree and len(l2) < self.min_degree):
            node1_enlarged = node1.bbox.union(l[0]).envelope
            node2_enlarged = node2.bbox.union(l[0]).envelope
            diff1 = node1_enlarged.area - node1.bbox.area
            diff2 = node2_enlarged.area - node2.bbox.area
            if(diff1 < diff2 - 1e-8 or (abs(diff1 - diff2) < 1e-8 and (node1.bbox.area < node2.bbox.area - 1e-8 or (abs(node1.bbox.area - node2.bbox.area) < 1e-8 and len(l1) < len(l2))))):
                l1.append(l[0])
                node1.bbox = node1.bbox.union(l[0]).envelope
            else:
                l2.append(l[0])
                node2.bbox = node2.bbox.union(l[0]).envelope
            l.pop(0)
        while(len(l1) < self.min_degree):
            l1.append(l[0])
            node1.bbox = node1.bbox.union(l[0]).envelope
            l.pop(0)
        while(len(l2) < self.min_degree):
            l2.append(l[0])
            node2.bbox = node2.bbox.union(l[0]).envelope
            l.pop(0)
        if(len(l) > 0):
            node1_enlarged = node1.bbox.union(l[0]).envelope
            node2_enlarged = node2.bbox.union(l[0]).envelope
            diff1 = node1_enlarged.area - node1.bbox.area
            diff2 = node2_enlarged.area - node2.bbox.area
            if(diff1 < diff2 - 1e-8 or (abs(diff1 - diff2) < 1e-8 and (node1.bbox.area < node2.bbox.area - 1e-8 or (abs(node1.bbox.area - node2.bbox.area) < 1e-8 and len(l1) < len(l2))))):
                l1.append(l[0])
                node1.bbox = node1.bbox.union(l[0]).envelope
            else:
                l2.append(l[0])
                node2.bbox = node2.bbox.union(l[0]).envelope
            l.pop(0)
        print(len(node1.geometries))
        print(len(node2.geometries))
        p.add(node1)
        p.add(node2)

        #if(p.is_full):
        #    p.split(self)

        # TODO
        # Find the two geometries which are farthest apart (including the payload).
        # Create two new leaf nodes, seeded with these geometries.
        # Split the remaining geometries between these two nodes.
        # Assign each to the node requiring the minimum enlargement to accommodate it.
        # Ensure that both new nodes have at least self.min_degree geometries.
        # Remove old node from parent and add new nodes.


class InternalNode(Node):
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.children: list[Node] = []

    def __repr__(self) -> str:
        return f"{self.__class__.__name__}(bbox={self.bbox}, children={len(self.children)})"

    @cached_property
    def is_leaf(self) -> bool:
        return False

    @property
    def is_full(self) -> bool:
        return len(self.children) >= self.max_degree

    def add(self, payload: Node):
        """Adds the given node to this internal node."""
        if self.is_full:
            self.split(payload)
        else:
            self.children.append(payload)
        self.bbox = self.bbox.union(payload.bbox).envelope
        self.update_ancestors()

    def remove(self, child: Node):
        self.children.remove(child)

    def split(self, payload: Node):
        """Splits this internal node into two new internal nodes."""
        p = self.parent
        l = self.children
        #l.append(payload)
        min_dist = -1.0
        i1 = 0
        i2 = 0
        for i in range(len(l)):
            for j in range(i + 1, len(l)):
                dist = l[i].bbox.distance(l[j].bbox)
                if(dist < min_dist or min_dist < -0.5):
                    min_dist = dist
                    i1 = i
                    i2 = j
        l1 = [l[i1]]
        l2 = [l[i2]]
        print("AAAAAAAA")
        print(l[i1], l[i2])
        l.pop(i2)
        l.pop(i1)

        p.remove(self)
        node1 = InternalNode(min_degree = self.min_degree)
        node2 = InternalNode(min_degree = self.min_degree)
        node1.children = l1
        node2.children = l2
        node1.parent = p
        node2.parent = p

        node1.bbox = l1[0].bbox
        node2.bbox = l2[0].bbox

        while(len(l1) < self.min_degree and len(l2) < self.min_degree):
            node1_enlarged = node1.bbox.union(l[0].bbox).envelope
            node2_enlarged = node2.bbox.union(l[0].bbox).envelope
            diff1 = node1_enlarged.area - node1.bbox.area
            diff2 = node2_enlarged.area - node2.bbox.area
            if(diff1 < diff2 - 1e-8 or (abs(diff1 - diff2) < 1e-8 and (node1.bbox.area < node2.bbox.area - 1e-8 or (abs(node1.bbox.area - node2.bbox.area) < 1e-8 and len(l1) < len(l2))))):
                l1.append(l[0])
                node1.bbox = node1.bbox.union(l[0].bbox).envelope
            else:
                l2.append(l[0])
                node2.bbox = node2.bbox.union(l[0].bbox).envelope
            l.pop(0)
        while(len(l1) < self.min_degree):
            l1.append(l[0])
            node1.bbox = node1.bbox.union(l[0].bbox).envelope
            l.pop(0)
        while(len(l2) < self.min_degree):
            l2.append(l[0])
            node2.bbox = node2.bbox.union(l[0].bbox).envelope
            l.pop(0)
        if(len(l) > 0):
            node1_enlarged = node1.bbox.union(l[0].bbox).envelope
            node2_enlarged = node2.bbox.union(l[0].bbox).envelope
            diff1 = node1_enlarged.area - node1.bbox.area
            diff2 = node2_enlarged.area - node2.bbox.area
            if(diff1 < diff2 - 1e-8 or (abs(diff1 - diff2) < 1e-8 and (node1.bbox.area < node2.bbox.area - 1e-8 or (abs(node1.bbox.area - node2.bbox.area) < 1e-8 and len(l1) < len(l2))))):
                l1.append(l[0])
                node1.bbox = node1.bbox.union(l[0].bbox).envelope
            else:
                l2.append(l[0])
                node2.bbox = node2.bbox.union(l[0].bbox).envelope
            l.pop(0)

        p.add(node1)
        p.add(node2)
        #if(p.is_full):
        #    self.split(p)
        # TODO
        # Find the two nodes which are farthest apart (including the payload).
        # Create two new internal nodes, seeded with these children.
        # Split the remaining children between these two nodes.
        # Assign each child to the node requiring the minimum enlargement to accommodate it.
        # Ensure that both new nodes have at least self.min_degree children.
        # Remove old node from parent and add new nodes.


class RTree:
    def __init__(self, min_degree: int):
        self.min_degree = min_degree
        self.root: Node = LeafNode(min_degree=min_degree)

    def insert_find_leaf(self, payload: BaseGeometry, node):
        if node.is_leaf:
            return node
        min_area_enlarge = -1.0
        min_area = -1.0
        child = None
        for c in node.children:
            bbox_unioned = c.bbox.union(payload).envelope
            area = bbox_unioned.area
            area_enlarge = area - c.bbox.area
            if min_area < -0.5 or (area_enlarge < min_area_enlarge - 1e-8 or (area_enlarge < min_area_enlarge + 1e-8 and area_enlarge > min_area_enlarge - 1e-8 and area < min_area - 1e-8)):
                min_area = area
                min_area_enlarge = area_enlarge
                child = c
        return self.insert_find_leaf(payload, child)

    def insert(self, payload: BaseGeometry):
        """Inserts the given geometry into the tree."""

        leaf = self.insert_find_leaf(payload, self.root)
        p = leaf.parent
        if(leaf != self.root):
            leaf.add(payload)
        else:
            self.root.geometries.append(payload)
            self.root.bbox = self.root.bbox.union(payload).envelope

        '''
        while(p != self.root and p != None):
            if(p.is_full):
                p.split(leaf)
                leaf = p
                p = leaf.parent
            else:
                break
        '''

        node = self.root
        if node.is_full:
            if(node.is_leaf):
                node.geometries.pop(-1)
            new_root = InternalNode(min_degree = self.min_degree)
            new_root.children.append(self.root)
            self.root.parent = new_root
            # node is being set to the node containing the ranges we want for payload insertion.
            #self.root.geometries.pop(-1)
            #self.display()
            node.split(payload)
            self.root = new_root
        # If root is full, create a new root
        # Add the old root as a child
        # Split the old root using the payload (wrapped in a new leaf node if needed)

        # Otherwise, descend until reaching a leaf
        # At each level, select the child that requires the minimum enlargement to cover the payload
        # Once you reach a leaf, add the payload geometry
    def search_helper(self, query: BaseGeometry, node):
        l = []
        if not node.is_leaf:
            for c in node.children:
                if c.bbox.intersects(query):
                    l += self.search_helper(query, c)
        else:
            for g in node.geometries:
                if(query.contains(g)):
                    l.append(g)
        return l

    def search(self, query: BaseGeometry, start_node: Node | None = None) -> Iterable[BaseGeometry]:
        """Searches the tree for geometries contained by the query geometry."""
        if start_node is None:
            start_node = self.root
        return self.search_helper(query, start_node)
        # TODO: Search recursively. Only check subtrees which intersect the query region.
        # When reaching a leaf, yield any geometry contained in the query region.

    def display(self, level=0, node: Node | None = None):
        """Prints the tree structure to the command line."""
        if node is None:
            node = self.root

        print(f"{'|   ' * level}|-- {node}")
        if not node.is_leaf:
            for child in node.children:
                self.display(level=level + 1, node=child)


if __name__ == "__main__":
    import random

    from shapely.geometry import Point, Polygon

    random.seed(6)


    points = [
        Point(0.79, 0.82),  # top right
        Point(0.49, 0.26),  # bottom left
        Point(0.01, 0.66),  # top left
        Point(0.47, 0.76),  # top left
        Point(0.37, 0.77),  # top left
        Point(0.27, 0.80),  # top left
        Point(0.73, 0.41),  # bottom right
        Point(0.54, 0.68),  # top right
        Point(0.19, 0.55),  # top left
        Point(0.80, 0.69),  # top right
        Point(0.84, 0.34),  # bottom right
    ]
    rtree = RTree(min_degree=2)
    for point in points:
        rtree.insert(point)
        rtree.display()
        print()
        print()

    polygon = Polygon([(1, 1), (1, 0.5), (0.5, 0.5), (0.5, 1)])
    results = list(rtree.search(polygon))
    assert all(polygon.contains(result) for result in results)
    #import pprint
    #pprint.pprint([str(res) for res in results])