-
Notifications
You must be signed in to change notification settings - Fork 4
Expand file tree
/
Copy pathtest_perf.py
More file actions
314 lines (259 loc) · 11.2 KB
/
test_perf.py
File metadata and controls
314 lines (259 loc) · 11.2 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
import contextvars
from dataclasses import dataclass
import io
import multiprocessing
import os
from pathlib import Path
import subprocess
import threading
import time
from typing import Optional
import uuid
from concurrent.futures import ThreadPoolExecutor
from mypy_boto3_s3 import S3Client
import pytest
import requests
from minio_fixture import calculate_sha256, minio, MinioRunner
from hs5_fixture import hs5_perf, Hs5Runner
from rustfs_fixture import rustfs, RustfsRunner
from garage_fixture import garage, GarageRunner, garage_sqlite, garage_sqlite_full
from pytest_benchmark.fixture import BenchmarkFixture
from multiprocessing import Pool
def prep_warp():
def _wap_url() -> tuple[str, str]:
# if arm64
if os.uname().machine == "aarch64":
return ("https://dl.min.io/aistor/warp/release/linux-arm64/archive/warp.v1.4.1",
"06ca770dd68f7a9fa736ebaeffd120ebd80210fd80cfff3c4972e232ad4ec6af")
else:
return ("https://dl.min.io/aistor/warp/release/linux-amd64/archive/warp.v1.4.1",
"62b59ad40e609462ec769bc2d86a8552590dcdebd0e73dbeca5b8813a166f937")
url, expected_sha256 = _wap_url()
loc = Path("warp")
loc_new = Path("warp.new")
if not loc.exists():
print("Downloading Warp client...")
with requests.get(url, stream=True) as r:
r.raise_for_status()
with open(loc_new, "wb") as f:
for chunk in r.iter_content(chunk_size=8192):
f.write(chunk)
sha256 = calculate_sha256(loc_new)
if sha256 != expected_sha256:
raise ValueError(f"Downloaded Warp client has invalid SHA-256 checksum: expected {expected_sha256}, got {sha256}")
os.chmod(loc_new, 0o755)
loc_new.rename(loc)
@dataclass
class WarpConfig:
benchmark: str
duration: str
concurrent: int
objects: int
def run_warp(server_url: str, access_key: str, secret_key: str, bucketname: str, out_fn: str, warp_config: WarpConfig):
if server_url.startswith("http://"):
server_url = server_url[len("http://"):]
cfg = f"""warp:
api: v1
benchmark: {warp_config.benchmark}
host: {server_url}
access-key: {access_key}
secret-key: {secret_key}
bucket: {bucketname}
duration: {warp_config.duration}
concurrent: {warp_config.concurrent}
objects: {warp_config.objects}
analyze:
dur: 30s
verbose: true
skip-duration: 30s
benchdata: {out_fn}
"""
with open("warp_config.yaml", "w") as f:
f.write(cfg)
subprocess.run(["./warp", "run", "warp_config.yaml"], check=True, shell=False)
def warp_cmp(benchmark: str, before_fn: Path, after_fn: Path, out_fn: Path):
with open(out_fn, "w") as f:
subprocess.run(["./warp", "cmp", str(before_fn), str(after_fn)], check=True, shell=False, stdout=f)
with open(out_fn, "r") as f:
out = f.read()
with open(out_fn, "w") as f:
f.write(f"Warp comparison benchmark={benchmark} between {before_fn} (before) and {after_fn} (after)\n\n")
f.write(out)
warp_prev_result_fn : Optional[Path] = None
warp_prev_target : Optional[str] = None
#@pytest.mark.skipif(os.getenv("MINIO_ENABLED") != "1", reason="MINIO_ENABLED environment variable not set to 1")
@pytest.mark.parametrize("target", ["hs5", "minio"])
def test_perf_warp_mixed(benchmark: BenchmarkFixture, target: str, hs5_perf: Hs5Runner, minio: MinioRunner, tmp_path: Path):
"""
Run Warp against HS5 and MinIO and record each run in pytest-benchmark.
"""
prep_warp()
bench = "mixed"
warp_config = WarpConfig(benchmark=bench, duration="2m", concurrent=30, objects=100)
if target == "hs5":
server = hs5_perf
else:
server = minio
out_path = tmp_path / f"warp-{target}"
out_path_final = out_path.with_suffix(".json.zst")
benchmark.extra_info["warp_target"] = target
benchmark.extra_info["warp_output"] = str(out_path)
benchmark.pedantic(
run_warp,
args=(
server.get_url(),
server.get_root_user(),
server.get_root_key(),
server.testbucketname(),
str(out_path),
warp_config,
),
rounds=1,
iterations=1,
warmup_rounds=0,
)
time.sleep(1)
global warp_prev_result_fn
global warp_prev_target
if warp_prev_result_fn:
warp_cmp(bench, warp_prev_result_fn, out_path_final, Path(f"{bench}_{warp_prev_target}_vs_{target}.txt"))
warp_prev_result_fn = out_path_final
warp_prev_target = target
@pytest.mark.skipif(os.getenv("MINIO_ENABLED") != "1", reason="MINIO_ENABLED environment variable not set to 1")
def test_put_get_minio(tmp_path: Path, minio: MinioRunner):
"""
Make sure the minio fixture is working
"""
fdata = os.urandom(29*1024*1024)
with open(tmp_path / "upload.txt", "wb") as upload_file:
upload_file.write(fdata)
s3_client = minio.get_s3_client()
with io.FileIO(tmp_path / "upload.txt", "rb") as upload_file:
s3_client.put_object(Bucket=minio.testbucketname(), Key="upload.txt", Body=upload_file)
obj_info = s3_client.head_object(Bucket=minio.testbucketname(), Key="upload.txt")
assert obj_info["ContentLength"] == len(fdata)
obj_range = s3_client.get_object(Bucket=minio.testbucketname(), Key="upload.txt", Range="bytes=0-9")
bdata = obj_range["Body"].read()
assert len(bdata) == 10
assert bdata == fdata[:10]
@pytest.mark.skipif(os.getenv("RUSTFS_ENABLED") != "1", reason="RUSTFS_ENABLED environment variable not set to 1")
def test_put_get_rustfs(tmp_path: Path, rustfs: RustfsRunner):
"""
Make sure the rustfs fixture is working
"""
fdata = os.urandom(29*1024*1024)
with open(tmp_path / "upload.txt", "wb") as upload_file:
upload_file.write(fdata)
s3_client = rustfs.get_s3_client()
with io.FileIO(tmp_path / "upload.txt", "rb") as upload_file:
s3_client.put_object(Bucket=rustfs.testbucketname(), Key="upload.txt", Body=upload_file)
obj_info = s3_client.head_object(Bucket=rustfs.testbucketname(), Key="upload.txt")
assert obj_info["ContentLength"] == len(fdata)
obj_range = s3_client.get_object(Bucket=rustfs.testbucketname(), Key="upload.txt", Range="bytes=0-9")
bdata = obj_range["Body"].read()
assert len(bdata) == 10
assert bdata == fdata[:10]
@pytest.mark.skip(reason="Garage is slow to start up")
def test_put_get_garage(tmp_path: Path, garage: GarageRunner):
"""
Make sure the garage fixture is working
"""
fdata = os.urandom(29*1024*1024)
with open(tmp_path / "upload.txt", "wb") as upload_file:
upload_file.write(fdata)
s3_client = garage.get_s3_client()
with io.FileIO(tmp_path / "upload.txt", "rb") as upload_file:
s3_client.put_object(Bucket=garage.testbucketname(), Key="upload.txt", Body=upload_file)
obj_info = s3_client.head_object(Bucket=garage.testbucketname(), Key="upload.txt")
assert obj_info["ContentLength"] == len(fdata)
obj_range = s3_client.get_object(Bucket=garage.testbucketname(), Key="upload.txt", Range="bytes=0-9")
bdata = obj_range["Body"].read()
assert len(bdata) == 10
assert bdata == fdata[:10]
@dataclass
class FileUpload:
tmp_path: Path
s3_client: S3Client
bname: str
file_upload : Optional[FileUpload] = None
def upload_file(file_idx: int):
assert file_upload is not None
fpath = file_upload.tmp_path / f"test_file_{file_idx}.txt"
with open(fpath, "wb") as f:
f.write(b"a" * 1024)
file_upload.s3_client.upload_file(str(fpath), file_upload.bname, fpath.name)
fpath.unlink()
def upload_many_files(get_s3_client, tmp_path: Path):
"""
Upload 10,000 files to S3 server into a new bucket
"""
bname = f"test-{uuid.uuid4()}"
s3_client : S3Client = get_s3_client()
s3_client.create_bucket(Bucket=bname)
num_files = 10000
chunk_size = num_files//multiprocessing.cpu_count()
def init_file_upload():
global file_upload
file_upload = FileUpload(tmp_path, get_s3_client(), bname)
with Pool(multiprocessing.cpu_count(), initializer=init_file_upload) as pool:
pool.map(upload_file, range(0, num_files), chunk_size)
def test_many_range_downloads(hs5_perf: Hs5Runner, tmp_path: Path):
"""
Test many range downloads from HS5.
"""
s3_client = hs5_perf.get_s3_client()
bname = hs5_perf.testbucketname()
fsize = 50*1024*1024
fdata = os.urandom(fsize)
with open(tmp_path / "large_file.txt", "wb") as upload_file:
upload_file.write(fdata)
with io.FileIO(tmp_path / "large_file.txt", "rb") as upload_file:
s3_client.put_object(Bucket=bname, Key="large_file.txt", Body=upload_file)
def download_range(range_header: str):
obj_range = s3_client.get_object(Bucket=bname, Key="large_file.txt", Range=range_header)
bdata = obj_range["Body"].read()
return bdata
num_downloads = 1000
range_headers = []
for i in range(num_downloads):
start = (i * 1024) % fsize
end = start + 1023
range_header = f"bytes={start}-{end}"
range_headers.append(range_header)
# Use thread pool with maximum 10 threads
with ThreadPoolExecutor(max_workers=10) as executor:
futures = [executor.submit(download_range, range_header) for range_header in range_headers]
# Wait for all downloads to complete
for future in futures:
future.result()
def test_perf_upload_many_files_hs5(benchmark: BenchmarkFixture, hs5_perf: Hs5Runner, tmp_path: Path):
"""
Test the performance of uploading 10,000 files to HS5.
"""
rss_before = hs5_perf.get_rss_mb()
with hs5_perf.track_rss(interval=0.1) as tracker:
benchmark(upload_many_files, lambda: hs5_perf.get_s3_client(), tmp_path)
rss_after = hs5_perf.get_rss_mb()
print(f"\nHS5 RAM usage: before={rss_before:.1f} MB, peak={tracker.peak_mb:.1f} MB, after={rss_after:.1f} MB, delta={rss_after - rss_before:.1f} MB")
def test_perf_upload_many_files_minio(benchmark: BenchmarkFixture, minio: MinioRunner, tmp_path: Path):
"""
Test the performance of uploading 10,000 files to MinIO.
"""
benchmark(upload_many_files, lambda: minio.get_s3_client(), tmp_path)
@pytest.mark.skipif(os.getenv("RUSTFS_ENABLED") != "1", reason="RUSTFS_ENABLED environment variable not set to 1")
def test_perf_upload_many_files_rustfs(benchmark: BenchmarkFixture, rustfs: RustfsRunner, tmp_path: Path):
"""
Test the performance of uploading 10,000 files to RustFS.
"""
benchmark(upload_many_files, lambda: rustfs.get_s3_client(), tmp_path)
def test_perf_upload_many_files_garage_sqlite(benchmark: BenchmarkFixture, garage_sqlite: GarageRunner, tmp_path: Path):
"""
Test the performance of uploading 10,000 files to Garage with SQLite.
"""
benchmark(upload_many_files, lambda: garage_sqlite.get_s3_client(), tmp_path)
@pytest.mark.skip(reason="Garage needs to be re-compiled with sqlite synchronous set to FULL for this test")
def test_perf_upload_many_files_garage_sqlite_full(benchmark: BenchmarkFixture, garage_sqlite_full: GarageRunner, tmp_path: Path):
"""
Test the performance of uploading 10,000 files to Garage with SQLite Full.
"""
benchmark(upload_many_files, lambda: garage_sqlite_full.get_s3_client(), tmp_path)