refactor: optimize state hydration pipeline ● Open
Files Changed 14
Commits 8
Checks 98%
Activity 42
src/core/hydrator.ts
+127
-43
| 1 | import { EventEmitter } from 'events'; | |
| 2 | import { StateSchema } from './types'; | |
| - | 3 | export class Hydrator { |
| - | 4 | private buffer: unknown[] = []; |
| - | 5 | private batchSize = 10; |
| - | 6 | constructor() { |
| - | 7 | setInterval(() => this.flush(), 500); |
| - | 8 | } |
| + | 3 | export class OptimisticHydrator extends EventEmitter { |
| + | 4 | private pending: Map<string, Promise<unknown>> = new Map(); |
| + | 5 | private maxConcurrency = 4; |
| + | 6 | private activeRequests = 0; |
| + | 7 | constructor(schema: StateSchema, maxConcurrency = 4) { |
| + | 8 | super(); |
| + | 9 | this.maxConcurrency = maxConcurrency; |
| + | 10 | } |
| 9 | async hydrate(path: string, payload: unknown): Promise<void> { | |
| - | 10 | this.buffer.push(payload); |
| + | 11 | if (this.pending.has(path)) return this.pending.get(path)! |
| + | 12 | const promise = this.queueRequest(path, payload); |
| + | 13 | this.pending.set(path, promise); |
| + | 14 | return promise.finally(() => this.pending.delete(path)); |
Activity & Review
JD
jdoe
• 2 hours ago
Replaced the interval-based batch flush with a concurrent request queue. This reduces memory pressure significantly and prevents duplicate hydration calls for the same state path. Added deduplication via the `pending` map. Benchmarks show ~40% reduction in peak RAM usage during load.
👍 12
Reply
Share
SL
• 45 minutes ago
Looks solid. Should we add a timeout fallback for the `queueRequest`? Network hangs could leave the `pending` map in an inconsistent state. Also, consider exposing the queue size via a getter for observability dashboards.
👍 3
Reply
Edit
JD
• 12 minutes ago
Good catch @sarah-liu. Added `AbortController` integration with a configurable `requestTimeout` (default 5s). Exposed `getQueueStats()` for metrics. Pushed to this branch.
👍 5
Reply
Edit