feat(tools):cache log player batching all the way optimisation (#617)

feat(tools):cache log player batching all the way optimization

Signed-off-by: ashotland <ashotland@gmail.com>

Signed-off-by: ashotland <ashotland@gmail.com>
This commit is contained in:
ashotland 2022-12-29 16:01:01 +02:00 committed by GitHub
parent febcda7935
commit cee5af1baf
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23

70
tools/cache_logs_player.py Normal file → Executable file
View file

@ -13,7 +13,7 @@ To install: pip install -r requirements.txt
class Command: class Command:
args = None args = None
client_id = 0 sync_id = 0 # Commands with the same sync_id will be executed synchrnously
class TwitterCacheTraceParser: class TwitterCacheTraceParser:
""" """
@ -29,7 +29,7 @@ class TwitterCacheTraceParser:
ttl = csv[6] ttl = csv[6]
cmd = Command() cmd = Command()
cmd.client_id = client_id cmd.sync_id = client_id
if operation == "get": if operation == "get":
cmd.args = ["GET", key] cmd.args = ["GET", key]
@ -58,29 +58,20 @@ class TwitterCacheTraceParser:
class AsyncWorker: class AsyncWorker:
QUEUE_SIZE = 100000 QUEUE_SIZE = 100000
BATCH_SIZE = 20
def __init__(self, redis_client) -> None: def __init__(self, redis_client) -> None:
self.queue = asyncio.Queue(self.QUEUE_SIZE) self.queue = asyncio.Queue(self.QUEUE_SIZE)
self.redis_client = redis_client self.redis_client = redis_client
self.working = False self.working = False
async def put(self, cmd: Command) -> None: async def put(self, batch: list) -> None:
await self.queue.put(cmd) await self.queue.put(batch)
async def work(self) -> None: async def work(self) -> None:
self.working = True self.working = True
batch = [] while self.working or not self.queue.empty() :
while self.working or len(batch) > 0 or not self.queue.empty() : batch = await self.queue.get()
try: await self.execute(batch)
cmd = await asyncio.wait_for(self.queue.get(), timeout=1.0)
batch.append(cmd)
if len(batch) >= self.BATCH_SIZE:
await self.execute(batch)
batch.clear()
except asyncio.exceptions.TimeoutError:
await self.execute(batch)
batch.clear()
async def execute(self, batch) -> None: async def execute(self, batch) -> None:
async with self.redis_client.pipeline(transaction=False) as pipe: async with self.redis_client.pipeline(transaction=False) as pipe:
@ -97,18 +88,18 @@ class AsyncWorker:
class AsyncWorkerPool: class AsyncWorkerPool:
""" """
Mangaes worker pool to send commands in parallel Mangaes worker pool to send commands in parallel
Maintains synchronous order for commands with the same client id Maintains synchronous order for commands with the same sync_id
""" """
def __init__(self, redis_client, num_workers) -> None: def __init__(self, redis_client, num_workers) -> None:
self.redis_client = redis_client self.redis_client = redis_client
self.num_workers = num_workers self.num_workers = num_workers
self.workers = [] self.workers = []
self.tasks = [] self.tasks = []
self.client_id_to_worker = {} self.sync_id_to_worker = {}
self.next_worker_index = -1 self.next_worker_index = -1
def allocate(self, client_id) -> AsyncWorker: def allocate(self, sync_id) -> AsyncWorker:
if not client_id in self.client_id_to_worker: if not sync_id in self.sync_id_to_worker:
self.next_worker_index = (self.next_worker_index + 1) % self.num_workers self.next_worker_index = (self.next_worker_index + 1) % self.num_workers
if len(self.workers) <= self.next_worker_index: if len(self.workers) <= self.next_worker_index:
@ -116,13 +107,13 @@ class AsyncWorkerPool:
self.workers.append(AsyncWorker(self.redis_client)) self.workers.append(AsyncWorker(self.redis_client))
self.tasks.append(self.workers[self.next_worker_index].start()) self.tasks.append(self.workers[self.next_worker_index].start())
self.client_id_to_worker[client_id] = self.workers[self.next_worker_index] self.sync_id_to_worker[sync_id] = self.workers[self.next_worker_index]
return self.client_id_to_worker[client_id] return self.sync_id_to_worker[sync_id]
async def put(self, cmd: Command) -> None: async def put(self, batch: list, sync_id: int) -> None:
worker = self.allocate(cmd.client_id) worker = self.allocate(sync_id)
await worker.put(cmd) await worker.put(batch)
async def stop(self): async def stop(self):
for worker in self.workers: for worker in self.workers:
@ -131,18 +122,40 @@ class AsyncWorkerPool:
class AsyncPlayer: class AsyncPlayer:
READ_BATCH_SIZE = 10 * 1000 * 1000
def __init__(self, redis_uri, num_workers) -> None: def __init__(self, redis_uri, num_workers) -> None:
self.redis_uri = redis_uri self.redis_uri = redis_uri
self.redis_client = aioredis.from_url(f"redis://{self.redis_uri}", encoding="utf-8", decode_responses=True) self.redis_client = aioredis.from_url(f"redis://{self.redis_uri}", encoding="utf-8", decode_responses=True)
self.worker_pool = AsyncWorkerPool(self.redis_client, 100) self.worker_pool = AsyncWorkerPool(self.redis_client, 100)
self.batch_by_sync_id = {}
async def dispatch_batches(self):
for sync_id in self.batch_by_sync_id:
await self.worker_pool.put(self.batch_by_sync_id[sync_id], sync_id)
self.batch_by_sync_id.clear()
async def read_and_dispatch(self, csv_file, parser): async def read_and_dispatch(self, csv_file, parser):
print(f"dispatching from {csv_file}") print(f"dispatching from {csv_file}")
line_count = 0
async with aiofiles.open(csv_file, mode="r", encoding="utf-8", newline="") as afp: async with aiofiles.open(csv_file, mode="r", encoding="utf-8", newline="") as afp:
async for row in AsyncReader(afp): async for row in AsyncReader(afp):
await self.worker_pool.put(parser.parse(row)) cmd = parser.parse(row)
if not self.batch_by_sync_id.get(cmd.sync_id):
self.batch_by_sync_id[cmd.sync_id] = []
batch = self.batch_by_sync_id[cmd.sync_id]
batch.append(cmd)
line_count = line_count + 1
if (line_count >= self.READ_BATCH_SIZE):
await self.dispatch_batches()
line_count = 0
# handle the remaining lines
await self.dispatch_batches()
async def reportStats(self): async def report_stats(self):
while True: while True:
info = await self.redis_client.execute_command("info", "stats") info = await self.redis_client.execute_command("info", "stats")
print(f"{datetime.now()}: {info}") print(f"{datetime.now()}: {info}")
@ -154,13 +167,12 @@ class AsyncPlayer:
print(await self.redis_client.ping()) print(await self.redis_client.ping())
read_dispatch_task = asyncio.create_task(self.read_and_dispatch(csv_file, parser)) read_dispatch_task = asyncio.create_task(self.read_and_dispatch(csv_file, parser))
stats_task = asyncio.create_task(self.reportStats()) stats_task = asyncio.create_task(self.report_stats())
await read_dispatch_task await read_dispatch_task
print(f"finished reading {csv_file}") print(f"finished reading {csv_file}")
await self.worker_pool.stop() await self.worker_pool.stop()
stats_task.cancel() stats_task.cancel()
print("all done") print("all done")