diff --git a/tools/cache_testing.py b/tools/cache_testing.py old mode 100644 new mode 100755 index 828706196..d71cefbbf --- a/tools/cache_testing.py +++ b/tools/cache_testing.py @@ -1,8 +1,9 @@ #!/usr/bin/env python import redis +import aioredis +import asyncio import argparse -from urllib.parse import urlparse import numpy as np ''' @@ -16,7 +17,8 @@ between 0 and 1 being representative of real-life cache load scenarios) ''' -def rand_zipf_generator(n, alpha, count, pipeline): + +def rand_zipf_generator(alpha: float, upper: int, batch: int): """ n: The upper bound of the values to generate a zipfian distribution over (n = 30 would generate a distribution of given alpha from values 1 to 30) @@ -26,38 +28,24 @@ def rand_zipf_generator(n, alpha, count, pipeline): """ # Calculate Zeta values from 1 to n: - tmp = np.power( np.arange(1, n+1), -alpha ) + tmp = np.power(np.arange(1, upper+1), -alpha) zeta = np.r_[0.0, np.cumsum(tmp)] # Store the translation map: distMap = [x / zeta[-1] for x in zeta] - if pipeline == 0: + while True: # Generate an array of uniform 0-1 pseudo-random values: - u = np.random.random(count) + u = np.random.random(batch) # bisect them with distMap v = np.searchsorted(distMap, u) samples = [t-1 for t in v] + yield samples - for sample in samples: - yield sample - else: - current_count = 0 - while current_count < count: - # Generate an array of uniform 0-1 pseudo-random values, of the pipeline length: - u = np.random.random(pipeline) - # bisect them with distMap - v = np.searchsorted(distMap, u) - - samples = [t-1 for t in v] - yield samples - - current_count += len(samples) - -def update_stats(r, hits, misses, value_index, total_count): +def update_stats(hits, misses, value_index, total_count): """ A void function that uses terminal control sequences to update hit/miss ratio stats for the user @@ -69,48 +57,71 @@ def update_stats(r, hits, misses, value_index, total_count): print("\r", end="") # Print the loading bar and current hit rate - print("[{}{}] {:.0f}%, current hit rate: {:.6f}%".format("#" * int(percent_complete * 20), " " * int(20 - percent_complete * 20), percent_complete * 100, (hits / (hits + misses)) * 100), end="") + print("[{}{}] {:.0f}%, current hit rate: {:.6f}%".format("#" * int(percent_complete * 20), " " * + int(20 - percent_complete * 20), percent_complete * 100, (hits / (hits + misses)) * 100), end="") -if __name__ == '__main__': - parser = argparse.ArgumentParser(description='Cache Benchmark', formatter_class=argparse.ArgumentDefaultsHelpFormatter) - parser.add_argument('-c', '--count', type=int, default=100000, help='total number of incrby operations') - parser.add_argument('-u', '--uri', type=str, default='redis://localhost:6379', help='Redis server URI') - parser.add_argument('-a', '--alpha', type=int, default=1.0, help='alpha value being used for the Zipf distribution') - parser.add_argument('-n', '--number', type=int, default=30, help='the number of values to be used in the distribution') - parser.add_argument('-d', '--length', type=int, default=10, help='the length of the values to be used in the distribution') - parser.add_argument('-p', '--pipeline', type=int, default=0, help='pipeline size') - - args = parser.parse_args() - uri = urlparse(args.uri) - - r = redis.StrictRedis(host=uri.hostname, port=uri.port) +async def run_single_conn(redis_client, keys_gen, args) -> None: misses = 0 hits = 0 + val = 'x' * args.length + items_sent = 0 + last_stat = 0 + for keys in keys_gen: + if len(keys) == 1: + result = await redis_client.set(str(keys[0]), val, nx=True) + responses = [result] + else: + p = redis_client.pipeline(transaction=False) + for key in keys: + p.set(str(key), val, nx=True) + responses = await p.execute() - distribution_keys_generator = rand_zipf_generator(args.number, args.alpha, args.count, args.pipeline) - - if args.pipeline == 0: - for idx, key in enumerate(distribution_keys_generator): - result = r.set(str(key), 'x' * args.length, nx=True) - if result: + for resp in responses: + if resp: misses += 1 else: hits += 1 - if idx % 50 == 0: - update_stats(r, hits, misses, idx, args.count) - else: - total_count = 0 - for idx, keys in enumerate(distribution_keys_generator): - total_count += len(keys) - p = r.pipeline(transaction=False) - for key in keys: - p.set(str(key), 'x' * args.length, nx=True) - responses = p.execute() - for resp in responses: - if resp: - misses += 1 - else: - hits += 1 - if idx % 20 == 0: - update_stats(r, hits, misses, total_count, args.count) + items_sent += len(keys) + if items_sent // 100 != last_stat: + last_stat = items_sent // 100 + update_stats(hits, misses, items_sent, args.count) + if items_sent >= args.count: + break + print() + + + +if __name__ == '__main__': + parser = argparse.ArgumentParser( + description='Cache Benchmark', formatter_class=argparse.ArgumentDefaultsHelpFormatter) + parser.add_argument('-c', '--count', type=int, default=100000, + help='total number of operations') + parser.add_argument('-u', '--uri', type=str, + default='localhost:6379', help='Redis server URI') + parser.add_argument('-a', '--alpha', type=float, default=1.0, + help='alpha value being used for the Zipf distribution') + parser.add_argument('--upper_bound', type=int, default=1000, + help='the number of values to be used in the distribution') + parser.add_argument('-d', '--length', type=int, default=10, + help='the length of the values to be used in the distribution') + parser.add_argument('-p', '--pipeline', type=int, + default=1, help='pipeline size') + parser.add_argument('-t', '--test', action='store_true') + + args = parser.parse_args() + if args.test: + for idx, items in enumerate(rand_zipf_generator(args.alpha, args.upper_bound, 1)): + assert len(items) == 1 + print(items[0]) + if idx == args.count: + break + exit(0) + + r = aioredis.from_url( + f"redis://{args.uri}", encoding="utf-8", decode_responses=True) + + distribution_keys_generator = rand_zipf_generator( + args.alpha, args.upper_bound, args.pipeline) + + asyncio.run(run_single_conn(r, distribution_keys_generator, args))