Mathematical Foundations & Implementation Details
Building the foundation for query execution
Let \( Q = (F, T, W, S) \) be a query where:
The initial QueryState \( \Sigma_0 \) is defined as:
$$ \Sigma_0 = \{ \text{ids}: F \mapsto \mathcal{N}_F, \text{connections}: \emptyset, \text{tables}: F \mapsto \mathcal{T}_F \} $$where \( \mathcal{N}_F \) is the set of all node IDs in schema \( F \), and \( \mathcal{T}_F \) is the Arrow table for schema \( F \).
All IDs in the initial state must belong to the FROM schema.
function initialize_query_state(query: Query) → QueryState:
Σ ← empty QueryState
// Step 1: Resolve FROM schema
F ← resolve_schema(query.from_clause())
// Step 2: Load table and extract IDs
T_F ← database.get_table(F)
N_F ← extract_ids(T_F) // O(|T_F|)
// Step 3: Initialize state
Σ.ids[F] ← N_F
Σ.tables[F] ← T_F
Σ.aliases[F.alias] ← F.schema
Σ.connections ← ∅
Σ.incoming ← ∅
// Invariant check
assert(|Σ.ids[F]| = |T_F|)
assert(Σ.connections.empty())
return Σ
Query: FROM u:users
Initial State Σ₀:
ids = {
u: {0, 1, 2, 3, 4} // All 5 users
}
tables = {
u: Arrow::Table(5 rows × 3 columns)
┌────┬───────┬─────┐
│ id │ name │ age │
├────┼───────┼─────┤
│ 0 │ alex │ 25 │
│ 1 │ bob │ 31 │
│ 2 │ jeff │ 33 │
│ 3 │ sam │ 21 │
│ 4 │ matt │ 40 │
└────┴───────┴─────┘
}
connections = ∅
aliases = { u → users }
Graph traversal with relational join semantics
For a traverse clause \( t = (s, e, d, j) \) where:
Define the edge set:
$$ \mathcal{E}_t = \{ (u, v) \mid u \in \Sigma.\text{ids}[s] \wedge (u \xrightarrow{e} v) \in \mathcal{G} \wedge v \in \mathcal{N}_d \} $$Matched sets:
$$ M_s = \{ u \mid \exists v : (u, v) \in \mathcal{E}_t \} \quad \text{(matched sources)} $$ $$ M_d = \{ v \mid \exists u : (u, v) \in \mathcal{E}_t \} \quad \text{(matched destinations)} $$ $$ U_s = \Sigma.\text{ids}[s] \setminus M_s \quad \text{(unmatched sources)} $$The new state \( \Sigma' \) after applying traverse \( t \) depends on join type \( j \):
INNER JOIN:
$$ \Sigma'.\text{ids}[s] = M_s $$ $$ \Sigma'.\text{ids}[d] = \begin{cases} M_d & \text{if } \Sigma.\text{ids}[d] = \emptyset \\ \Sigma.\text{ids}[d] \cap M_d & \text{otherwise} \end{cases} $$LEFT JOIN:
$$ \Sigma'.\text{ids}[s] = \Sigma.\text{ids}[s] \quad \text{(keep all sources)} $$ $$ \Sigma'.\text{ids}[d] = \Sigma.\text{ids}[d] \cup M_d $$RIGHT/FULL JOIN:
$$ \Sigma'.\text{ids}[d] = \begin{cases} \mathcal{N}_d \setminus M_s & \text{if } s = d \text{ (self-join)} \\ \mathcal{N}_d & \text{if } s \neq d \text{ (cross-schema)} \end{cases} $$LEFT joins never reduce the source ID set.
All connections must reference nodes in the current ID sets.
function process_traverse(traverse: Traverse, Σ: QueryState) → QueryState:
// Resolve schemas
s_schema ← resolve_schema(traverse.source, Σ)
d_schema ← resolve_schema(traverse.dest, Σ)
// Initialize tracking sets
M_s ← ∅ // Matched sources
M_d ← ∅ // Matched destinations
U_s ← ∅ // Unmatched sources
// Phase 2.1: Traverse edges [Complexity: O(|Σ.ids[s]| × E_avg)]
for each u in Σ.ids[s]:
edges ← edge_store.get_outgoing_edges(u, traverse.edge_type) // O(E_avg)
had_match ← false
for each edge in edges:
v ← edge.target_id
// Check if target exists and passes filters
node ← get_node(d_schema, v) // O(1) hash lookup
if node = null or not apply_filters(node, WHERE_clauses):
continue
// Valid edge found
had_match ← true
M_s.insert(u)
M_d.insert(v)
// Record connection [Complexity: O(1)]
conn ← GraphConnection(s, u, traverse.edge_type, d, v)
Σ.connections[s][u].append(conn)
Σ.incoming[v].append(conn)
if not had_match:
U_s.insert(u)
// Phase 2.2: Apply join logic [Complexity: O(|M_s| + |M_d|)]
Σ' ← apply_join_logic(traverse.join_type, M_s, M_d, U_s, Σ)
// Phase 2.3: Build target table [Complexity: O(|Σ'.ids[d]| × F)]
target_nodes ← [get_node(d_schema, v) for v in Σ'.ids[d]]
Σ'.tables[d] ← create_table_from_nodes(d_schema, target_nodes)
// Invariant checks
assert(all(u in Σ'.ids[s] for (u,v) in Σ'.connections))
assert(all(v in Σ'.ids[d] for (u,v) in Σ'.connections))
return Σ'
Query: FROM u:users TRAVERSE u-[friend]->f:users INNER
State Before (Σ₀):
ids[u] = {0, 1, 2, 3, 4}
Edge Traversal:
u:0 → edges = [(0, friend, 1)] → M_s.add(0), M_d.add(1)
u:1 → edges = [] → U_s.add(1)
u:2 → edges = [] → U_s.add(2)
u:3 → edges = [] → U_s.add(3)
u:4 → edges = [] → U_s.add(4)
Results:
M_s = {0} // Only alex has a friend
M_d = {1} // Only bob is a friend
U_s = {1,2,3,4} // 4 users with no friends
INNER JOIN Logic:
Remove U_s from ids[u]
ids[u]' = M_s = {0}
ids[f]' = M_d = {1}
State After (Σ₁):
ids = {
u: {0}, // Only alex (has friend)
f: {1} // Only bob (is friend)
}
connections = {
u:0 → [(0, friend, 1)]
}
incoming = {
1: [(u:0, friend, f:1)]
}
Problem: For FULL/RIGHT joins, how do we compute unmatched targets?
Case 1: Self-Join (e.g., users → users)
IDs are in the same space. A node can be both a source and a target.
$$ \text{Unmatched}_d = \mathcal{N}_d \setminus M_s $$Reason: If a node was a matched source, it shouldn't also appear as an unmatched target (prevents duplicates).
Case 2: Cross-Schema (e.g., users → companies)
IDs are in different spaces. No collision possible.
$$ \text{Unmatched}_d = \mathcal{N}_d \setminus M_d $$Reason: Compare within the same schema to avoid false exclusions due to ID collision.
Scenario: users:0 -[works-at]-> companies:0
Per-Schema IDs:
users: {0, 1, 2, 3, 4}
companies: {0, 1, 2}
FULL JOIN (WRONG approach - using M_s):
M_s = {0} // User 0 (alex) works at a company
Unmatched = companies - M_s = {0,1,2} - {0} = {1,2}
❌ WRONG! Company:0 (IBM) is excluded due to ID collision with User:0
FULL JOIN (CORRECT approach - using M_d):
M_d = {0} // Company 0 (IBM) is matched
Unmatched = companies - M_d = {0,1,2} - {0} = {1,2}
✅ CORRECT! All companies properly included
Materializing denormalized rows through breadth-first search
Given the connection graph \( \mathcal{G}_c = (\mathcal{V}, \mathcal{E}_c) \) where:
A path is a sequence:
$$ \pi = \langle (s_1, v_1), (s_2, v_2), ..., (s_k, v_k) \rangle $$such that \( \forall i \in [1, k-1] : ((s_i, v_i), (s_{i+1}, v_{i+1})) \in \mathcal{E}_c \)
A complete path is a path with no outgoing edges from \( (s_k, v_k) \):
$$ \text{Complete}(\pi) \iff \nexists (s', v') : ((s_k, v_k), (s', v')) \in \mathcal{E}_c $$The result set is:
$$ \mathcal{R} = \{ \text{row}(\pi) \mid \pi \text{ is a complete path starting from } F \} $$No node appears twice in the same path (cycle prevention).
function populate_rows_bfs(start_id: int, start_schema: Schema,
output_schema: Schema, Σ: QueryState) → List[Row]:
// Initialize state
Q ← Queue() // BFS queue
R ← [] // Result rows
G_visited ← Set() // Global visited (across all starting nodes)
row_counter ← 0
// Create initial row and queue item
r₀ ← create_empty_row(output_schema)
q₀ ← QueueItem(start_id, start_schema, depth=0, row=r₀, path=[], path_visited=∅)
Q.enqueue(q₀)
// BFS Loop [Complexity: O(|paths| × P_avg) where P_avg = avg path length]
while not Q.empty():
item ← Q.dequeue()
// Step 1: Mark as visited (cycle prevention)
packed_id ← hash(item.schema, item.node_id)
if packed_id in item.path_visited:
continue // Skip if already in current path (cycle)
G_visited.insert(packed_id)
item.path_visited.insert(packed_id)
// Step 2: Fill row with current node's fields
node ← get_node(item.schema, item.node_id)
for each field in item.schema.fields:
field_idx ← output_schema.field_index(item.schema.alias + "." + field.name)
item.row[field_idx] ← node.get_value(field)
item.path.append((item.schema, item.node_id))
// Step 3: Get outgoing connections, grouped by target schema
grouped_conns ← group_by_target_schema(Σ.connections[item.schema][item.node_id])
// Remove connections to already-visited nodes in this path
for schema, conns in grouped_conns:
filtered_conns ← [c for c in conns if hash(c.target, c.target_id) ∉ item.path_visited]
grouped_conns[schema] = filtered_conns
// Step 4: Branch or complete
if grouped_conns.empty():
// ✅ Complete path - add to results
r ← clone(item.row)
r.id ← row_counter++
r.path ← item.path
R.append(r)
else:
// 🌿 Branch: Continue traversal
for schema, conns in grouped_conns:
if |conns| = 1:
// Single connection: reuse row (optimization)
conn ← conns[0]
q_next ← QueueItem(
node_id=conn.target_id,
schema=conn.target,
depth=item.depth + 1,
row=item.row, // Reuse row
path=item.path.copy(),
path_visited=item.path_visited.copy()
)
Q.enqueue(q_next)
else:
// Multiple connections: clone row for each branch
for conn in conns:
r_new ← clone(item.row)
q_next ← QueueItem(
node_id=conn.target_id,
schema=conn.target,
depth=item.depth + 1,
row=r_new, // New row
path=item.path.copy(),
path_visited=item.path_visited.copy()
)
Q.enqueue(q_next)
// Step 5: Merge rows (handle multi-path scenarios)
tree ← RowNode(root)
for r in R:
tree.insert_row(r)
R_merged ← tree.merge_rows()
return R_merged
Query: u:users -[friend]-> f:users -[works-at]-> c:companies
Data:
u:0 (alex) -[friend]-> u:1 (bob) -[works-at]-> c:1 (google)
═══════════════════════════════════════════════════════════════
Initial State:
Q = [QueueItem(u:0, depth=0, row={}, path=[], visited=∅)]
R = []
G_visited = ∅
Iteration 1: Process u:0 (alex)
├─ Dequeue: QueueItem(u:0, depth=0, ...)
├─ Mark visited: G_visited = {u:0}
├─ Fill row: row[u.id]=0, row[u.name]="alex", row[u.age]=25
├─ Get connections: u:0 → {f:1}
├─ Not complete (has connections)
└─ Enqueue: QueueItem(f:1, depth=1, row={u.id:0, ...}, path=[(u,0)], visited={u:0})
Q = [QueueItem(f:1, depth=1, ...)]
R = []
G_visited = {u:0}
Iteration 2: Process f:1 (bob)
├─ Dequeue: QueueItem(f:1, depth=1, ...)
├─ Mark visited: G_visited = {u:0, f:1}
├─ Fill row: row[f.id]=1, row[f.name]="bob", row[f.age]=31
├─ Get connections: f:1 → {c:1}
├─ Not complete (has connections)
└─ Enqueue: QueueItem(c:1, depth=2, row={u.id:0, ..., f.id:1, ...}, path=[(u,0), (f,1)], visited={u:0, f:1})
Q = [QueueItem(c:1, depth=2, ...)]
R = []
G_visited = {u:0, f:1}
Iteration 3: Process c:1 (google)
├─ Dequeue: QueueItem(c:1, depth=2, ...)
├─ Mark visited: G_visited = {u:0, f:1, c:1}
├─ Fill row: row[c.id]=1, row[c.name]="google", row[c.size]=3000
├─ Get connections: c:1 → ∅ (no outgoing)
├─ ✅ Complete path!
└─ Add to results: R.append({u.id:0, u.name:"alex", ..., f.id:1, ..., c.id:1, ...})
Q = []
R = [Row(id=0, path=[(u,0), (f,1), (c,1)])]
G_visited = {u:0, f:1, c:1}
BFS Complete!
Return R = [Row{u.id:0, u.name:"alex", u.age:25, f.id:1, f.name:"bob", f.age:31, c.id:1, c.name:"google", c.size:3000}]
| Operation | Time Complexity | Space Complexity | Notes |
|---|---|---|---|
| BFS Loop | O(P × L) |
O(Q_max) |
P = num paths, L = avg path length, Q_max = max queue size |
| Field Filling | O(F) |
O(F) |
F = num fields per node |
| Connection Lookup | O(1) |
O(1) |
Hash map lookup |
| Row Cloning | O(F) |
O(F) |
Deep copy of row data |
| Tree Merge | O(R × log R) |
O(R) |
R = num result rows |
| Total | O(P × L × F) |
O(Q_max + R × F) |
Dominated by path traversal |
Handling multi-path scenarios with tree-based deduplication
A RowNode tree \( \mathcal{T} \) represents the hierarchical structure of paths:
$$ \mathcal{T} = (V_T, E_T, \lambda) $$where:
Merge Operation:
For sibling nodes \( n_1, n_2 \in V_T \) with the same parent and path segment:
$$ \text{merge}(n_1, n_2) = n_m \text{ where } \lambda(n_m) = \lambda(n_1) \oplus \lambda(n_2) $$The \( \oplus \) operator combines rows field-by-field, preserving non-NULL values.
struct RowNode:
path_segment: (schema, node_id)
row: Row
children: Map[PathSegment, RowNode]
function insert_row(tree: RowNode, row: Row) → void:
"""Insert a row into the tree based on its path."""
current ← tree
for segment in row.path:
if segment not in current.children:
// Create new tree node
current.children[segment] ← RowNode(segment, empty_row, {})
current ← current.children[segment]
// At leaf: merge row data
current.row ← merge_rows(current.row, row)
function merge_rows(r1: Row, r2: Row) → Row:
"""Merge two rows field-by-field."""
r_merged ← clone(r1)
for i in 0 to |r2.fields|:
if r1.fields[i] is NULL and r2.fields[i] is not NULL:
r_merged.fields[i] ← r2.fields[i]
elif r1.fields[i] is not NULL and r2.fields[i] is NULL:
r_merged.fields[i] ← r1.fields[i]
elif r1.fields[i] = r2.fields[i]:
r_merged.fields[i] ← r1.fields[i]
else:
// Conflict: prefer first row
r_merged.fields[i] ← r1.fields[i]
return r_merged
function flatten_tree(tree: RowNode) → List[Row]:
"""Convert tree back to flat list of rows."""
result ← []
function dfs(node: RowNode, accumulated_row: Row):
// Merge current node's data
current_row ← merge_rows(accumulated_row, node.row)
if node.children.empty():
// Leaf: complete row
result.append(current_row)
else:
// Internal: recurse to children
for child in node.children.values():
dfs(child, current_row)
dfs(tree, empty_row)
return result
Scenario: One user with TWO friends
Data:
u:0 (alex) -[friend]-> u:1 (bob)
u:0 (alex) -[friend]-> u:2 (jeff)
BFS generates 2 rows:
Row₁: {u.id:0, u.name:"alex", u.age:25, f.id:1, f.name:"bob", f.age:31}
Row₂: {u.id:0, u.name:"alex", u.age:25, f.id:2, f.name:"jeff", f.age:33}
Tree Construction:
root
└─ u:0 (alex) [u.id:0, u.name:"alex", u.age:25]
├─ f:1 (bob) [f.id:1, f.name:"bob", f.age:31]
└─ f:2 (jeff) [f.id:2, f.name:"jeff", f.age:33]
Flattened Result (2 rows):
┌────┬───────┬─────┬────┬───────┬─────┐
│ u.id │ u.name │ u.age │ f.id │ f.name │ f.age │
├────┼───────┼─────┼────┼───────┼─────┤
│ 0 │ alex │ 25 │ 1 │ bob │ 31 │
│ 0 │ alex │ 25 │ 2 │ jeff │ 33 │
└────┴───────┴─────┴────┴───────┴─────┘
Note: alex's data (u.*) is duplicated in both rows.
Practical considerations for real-world queries
For a query with \( n \) traverse clauses, the total complexity is:
$$ T_{\text{total}} = T_{\text{init}} + \sum_{i=1}^{n} T_{\text{traverse}_i} + T_{\text{populate}} + T_{\text{arrow}} $$Breaking down each component:
$$ T_{\text{init}} = O(N_F) \quad \text{(load FROM table)} $$ $$ T_{\text{traverse}_i} = O(S_i \times E_{\text{avg}} + M_{s_i} + M_{d_i}) $$ $$ T_{\text{populate}} = O(S_{\text{start}} \times P_{\text{avg}} \times F) $$ $$ T_{\text{arrow}} = O(R \times F) $$where:
Without optimization:
$$ T = O(S \times E_{\text{avg}}) + O(M_d \times W) $$All edges traversed, then filters applied.
With optimization:
$$ T = O(S \times E_{\text{avg}} \times W) $$Filters applied during traversal, pruning early.
Speedup:
$$ \text{Speedup} \approx \frac{M_d}{M_d'} \quad \text{where } M_d' \text{ is filtered matched set} $$// Instead of allocating new connections:
for each edge:
conn = new GraphConnection(...) // ❌ Allocation overhead
// Use object pool:
for each edge:
conn = connection_pool.get() // ✅ Reuse existing
conn.update(source, target, ...)
state.connections.append(conn)
Benefit: Reduces allocations from \( O(E) \) to \( O(1) \).
For \( k \) threads and batch size \( b \):
$$ T_{\text{parallel}} = \frac{T_{\text{sequential}}}{k} + O(\text{sync}) $$ $$ \text{Optimal } b \approx \frac{N}{k \times 10} $$Trade-off: Larger \( b \) reduces overhead but increases imbalance.