mirror of
https://github.com/rybbit-io/rybbit.git
synced 2025-05-11 12:25:36 +02:00
Refactor event generation logic for improved memory efficiency and performance
- Changed the event storage approach from a single large array to batch arrays to manage memory usage better. - Introduced a new constant for batch size to optimize event handling during generation. - Updated the insertion process to handle events in smaller, more manageable batches, enhancing throughput. - Ensured the final event array is constructed efficiently by flattening the batch arrays based on the actual event count.
This commit is contained in:
parent
dbe66cf5c8
commit
19d219a669
1 changed files with 42 additions and 26 deletions
|
@ -503,7 +503,10 @@ function formatTime(seconds) {
|
|||
|
||||
// Function to generate events for a specific day
|
||||
async function generateEventsForDay(date, targetEventsCount) {
|
||||
const events = [];
|
||||
// Instead of one large array, use a collection of batch arrays
|
||||
const eventBatches = [];
|
||||
const BATCH_SIZE = 100000; // Store events in manageable chunks
|
||||
let currentBatch = [];
|
||||
|
||||
// Active sessions map to track ongoing sessions
|
||||
const activeSessions = new Map();
|
||||
|
@ -520,9 +523,6 @@ async function generateEventsForDay(date, targetEventsCount) {
|
|||
|
||||
// Generate 4-10x more events than sessions (averaging 5-6 events per session)
|
||||
const eventsPerSession = 6;
|
||||
// Increase initial allocation for better performance
|
||||
const initialCapacity = Math.min(actualEventsCount, 2000000);
|
||||
events.length = initialCapacity;
|
||||
let eventCount = 0;
|
||||
|
||||
const sessionsToGenerate = Math.ceil(actualEventsCount / eventsPerSession);
|
||||
|
@ -692,21 +692,20 @@ async function generateEventsForDay(date, targetEventsCount) {
|
|||
sessionData
|
||||
);
|
||||
|
||||
// Add these events to our events array
|
||||
// Add these events to our collection using batch arrays
|
||||
const newEventCount = sessionEvents.length;
|
||||
|
||||
// Ensure we have capacity
|
||||
if (eventCount + newEventCount > events.length) {
|
||||
// Need more room, increase array size
|
||||
events.length = Math.max(
|
||||
events.length * 1.5,
|
||||
eventCount + newEventCount
|
||||
);
|
||||
}
|
||||
|
||||
// Copy events
|
||||
// Add events to the current batch, creating new batches as needed
|
||||
for (let i = 0; i < newEventCount; i++) {
|
||||
events[eventCount++] = sessionEvents[i];
|
||||
// If current batch is full, add it to batches and create a new one
|
||||
if (currentBatch.length >= BATCH_SIZE) {
|
||||
eventBatches.push(currentBatch);
|
||||
currentBatch = [];
|
||||
}
|
||||
|
||||
// Add event to current batch
|
||||
currentBatch.push(sessionEvents[i]);
|
||||
eventCount++;
|
||||
}
|
||||
|
||||
// Check if we've reached our target event count
|
||||
|
@ -723,8 +722,23 @@ async function generateEventsForDay(date, targetEventsCount) {
|
|||
}
|
||||
}
|
||||
|
||||
// Trim to the actual event count
|
||||
const finalEvents = events.slice(0, Math.min(eventCount, actualEventsCount));
|
||||
// Don't forget to add the last batch if it has any events
|
||||
if (currentBatch.length > 0) {
|
||||
eventBatches.push(currentBatch);
|
||||
}
|
||||
|
||||
// Flatten all batches into a single array, but only up to the actualEventsCount
|
||||
// This is more memory-efficient than pre-allocating a huge array
|
||||
let finalEvents = [];
|
||||
let remainingEvents = Math.min(eventCount, actualEventsCount);
|
||||
|
||||
for (const batch of eventBatches) {
|
||||
if (remainingEvents <= 0) break;
|
||||
|
||||
const eventsToTake = Math.min(batch.length, remainingEvents);
|
||||
finalEvents = finalEvents.concat(batch.slice(0, eventsToTake));
|
||||
remainingEvents -= eventsToTake;
|
||||
}
|
||||
|
||||
const generationEndTime = Date.now();
|
||||
const generationSeconds = (
|
||||
|
@ -743,25 +757,25 @@ async function generateEventsForDay(date, targetEventsCount) {
|
|||
let totalInserted = 0;
|
||||
|
||||
// Increase batch size for higher throughput
|
||||
const BATCH_SIZE = 50000; // Increased from 10000 to 50000
|
||||
const INSERTION_BATCH_SIZE = 50000; // Increased from 10000 to 50000
|
||||
|
||||
// Create batches
|
||||
const batches = [];
|
||||
for (let i = 0; i < finalEvents.length; i += BATCH_SIZE) {
|
||||
batches.push(finalEvents.slice(i, i + BATCH_SIZE));
|
||||
const insertionBatches = [];
|
||||
for (let i = 0; i < finalEvents.length; i += INSERTION_BATCH_SIZE) {
|
||||
insertionBatches.push(finalEvents.slice(i, i + INSERTION_BATCH_SIZE));
|
||||
}
|
||||
|
||||
// Maximum number of parallel inserts (based on CPU cores)
|
||||
const MAX_PARALLEL = 3; // Using 3 of the 4 cores for parallelism
|
||||
|
||||
// Process batches with controlled parallelism
|
||||
for (let i = 0; i < batches.length; i += MAX_PARALLEL) {
|
||||
for (let i = 0; i < insertionBatches.length; i += MAX_PARALLEL) {
|
||||
const batchPromises = [];
|
||||
|
||||
// Create a set of promises for parallel execution
|
||||
for (let j = 0; j < MAX_PARALLEL && i + j < batches.length; j++) {
|
||||
for (let j = 0; j < MAX_PARALLEL && i + j < insertionBatches.length; j++) {
|
||||
const batchIndex = i + j;
|
||||
const batch = batches[batchIndex];
|
||||
const batch = insertionBatches[batchIndex];
|
||||
|
||||
batchPromises.push(
|
||||
(async () => {
|
||||
|
@ -805,7 +819,9 @@ async function generateEventsForDay(date, targetEventsCount) {
|
|||
const averageSpeed = Math.round(totalInserted / totalElapsed);
|
||||
|
||||
console.log(
|
||||
`Inserted batch ${result.batchIndex + 1} of ${batches.length} | ` +
|
||||
`Inserted batch ${result.batchIndex + 1} of ${
|
||||
insertionBatches.length
|
||||
} | ` +
|
||||
`Batch speed: ${result.batchSpeed.toLocaleString()} events/sec | ` +
|
||||
`Avg speed: ${averageSpeed.toLocaleString()} events/sec`
|
||||
);
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue