Created
January 29, 2026 09:54
-
-
Save RaczeQ/0b08ef50c3a47e433a3874d831c113fa to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| # this is a code snippet, whole logic with checks is in the final notebook | |
| subgraph_edges = ox.graph_to_gdfs(subgraph, nodes=False, edges=True) | |
| # find all endpoints and check their edges outside. Clip edges exactly at the distance point. | |
| edges_to_clip = {} | |
| for node in set(subgraph.nodes).union([center_node]): | |
| # iterate all edges starting from this node | |
| for u, v, data in G.edges(node, keys=False, data=True): | |
| # if whole edge is inside clipped subgraph - skip it | |
| if v in subgraph: | |
| continue | |
| # find a shortest path from the center node to the first node | |
| path = ox.shortest_path(subgraph, center_node_id, u, weight="time") | |
| # find a total length of a path from the center node | |
| length = sum( | |
| # notice min here | |
| # sometimes there are multiple edges between two nodes | |
| # we want to select the shortest one | |
| min( | |
| edge_data["length"] | |
| for edge_data in subgraph.get_edge_data(_u, _v).values() | |
| ) | |
| for _u, _v in pairwise(path) | |
| ) | |
| # calculate missing length | |
| length_left = distance_meters - length | |
| if length_left > 0: | |
| edges_to_clip[(u, v)] = length_left | |
| def subgraph_from_edge_pairs(G: nx.MultiDiGraph, edge_pairs: list[tuple[int, int]]): | |
| """ | |
| Return a MultiDiGraph containing only edges whose endpoints match edge_pairs. | |
| - G: original MultiDiGraph (osmnx graph) | |
| - edge_pairs: iterable of (u, v) tuples (node ids). Treated as directed by default. | |
| """ | |
| G_out = nx.MultiDiGraph() | |
| G_out.graph.update(G.graph) | |
| edge_set = set(edge_pairs) | |
| # add nodes that will be used (copy node attributes) | |
| nodes_to_add = set() | |
| for u, v in edge_set: | |
| if u in G: | |
| nodes_to_add.add(u) | |
| if v in G: | |
| nodes_to_add.add(v) | |
| for n in nodes_to_add: | |
| G_out.add_node(n, **G.nodes[n]) | |
| # copy matching edges (preserve keys and attributes) | |
| for u, v, key, data in G.edges(keys=True, data=True): | |
| if (u, v) in edge_set: | |
| # ensure nodes exist in G_out (they should from nodes_to_add, but double-check) | |
| if not G_out.has_node(u): | |
| G_out.add_node(u, **G.nodes[u]) | |
| if not G_out.has_node(v): | |
| G_out.add_node(v, **G.nodes[v]) | |
| G_out.add_edge(u, v, key=key, **data) | |
| return G_out | |
| # get geometries of edges to cut | |
| pruned_edges = ox.graph_to_gdfs( | |
| subgraph_from_edge_pairs(graph, list(edges_to_clip.keys())), | |
| nodes=False, | |
| edges=True, | |
| ) | |
| def cut_linestring(line: LineString, distance: float) -> list[LineString]: | |
| """Cut linestring into 2 components based on the normalised distance.""" | |
| if distance <= 0.0: | |
| return [line] | |
| elif distance >= 1.0: | |
| return [line] | |
| coords = list(line.coords) | |
| # iterate all coordinates of the linestring | |
| for i, p in enumerate(coords): | |
| # check where this point lies along the line (0-1) | |
| pd = line.project(Point(p), normalized=True) | |
| # if point is exactly at required distance - clip the line | |
| if pd == distance: | |
| return [LineString(coords[: i + 1]), LineString(coords[i:])] | |
| # if the point if farther than the required distance | |
| # create new point and clip the line | |
| if pd > distance: | |
| cp = line.interpolate(distance, normalized=True) | |
| return [ | |
| LineString(coords[:i] + [(cp.x, cp.y)]), | |
| LineString([(cp.x, cp.y)] + coords[i:]), | |
| ] | |
| raise RuntimeError | |
| clipped_edges_geometries = [] | |
| for (u, v), length_clip in edges_to_clip.items(): | |
| edge = pruned_edges.loc[(u, v)].iloc[0] | |
| edge_length = edge["length"] | |
| edge_linestring = edge["geometry"] | |
| # find out where to clip the linestring based on its length | |
| # and expected length to clip | |
| interpolation_ratio = length_clip / edge_length | |
| clipped_edge = cut_linestring(edge_linestring, interpolation_ratio)[0] | |
| clipped_edges_geometries.append(clipped_edge) | |
| clipped_edges_gdf = gpd.GeoDataFrame( | |
| geometry=gpd.GeoSeries(clipped_edges_geometries, crs=4326), | |
| ) | |
| # mark which edges were clipped | |
| clipped_edges_gdf["clipped"] = True | |
| subgraph_edges["clipped"] = False | |
| # concatenate two GeoDataFrames | |
| all_edges_gdf = gpd.pd.concat( | |
| [ | |
| subgraph_edges[["geometry", "clipped"]], | |
| clipped_edges_gdf, | |
| ], | |
| ignore_index=True, | |
| ) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment