-
Notifications
You must be signed in to change notification settings - Fork 10
/
conftest.py
408 lines (340 loc) · 13.8 KB
/
conftest.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
import logging
import os
from pathlib import Path
import mock
import pytest
import vcr
from shared.config import ConfigHelper
from shared.storage.memory import MemoryStorageService
from shared.torngit import Github as GithubHandler
from sqlalchemy import event
from sqlalchemy.orm import Session
from sqlalchemy_utils import database_exists
from celery_config import initialize_logging
from database.base import Base
from database.engine import json_dumps
from helpers.environment import _get_cached_current_env
# @pytest.hookimpl(tryfirst=True)
def pytest_configure(config):
"""
Allows plugins and conftest files to perform initial configuration.
This hook is called for every plugin and initial conftest
file after command line options have been parsed.
"""
os.environ["CURRENT_ENVIRONMENT"] = "local"
os.environ["RUN_ENV"] = "DEV"
_get_cached_current_env.cache_clear()
initialize_logging()
def pytest_itemcollected(item):
"""logic that runs on the test collection step"""
if "codecov_vcr" in item.fixturenames:
# Tests with codecov_vcr fixtures are automatically 'integration'
item.add_marker("integration")
@pytest.fixture(scope="session")
def engine(request, sqlalchemy_db, sqlalchemy_connect_url, app_config):
"""Engine configuration.
See http://docs.sqlalchemy.org/en/latest/core/engines.html
for more details.
:sqlalchemy_connect_url: Connection URL to the database. E.g
postgresql://scott:tiger@localhost:5432/mydatabase
:app_config: Path to a ini config file containing the sqlalchemy.url
config variable in the DEFAULT section.
:returns: Engine instance
"""
if app_config:
from sqlalchemy import engine_from_config
engine = engine_from_config(app_config)
elif sqlalchemy_connect_url:
from sqlalchemy.engine import create_engine
engine = create_engine(sqlalchemy_connect_url, json_serializer=json_dumps)
else:
raise RuntimeError("Can not establish a connection to the database")
# Put a suffix like _gw0, _gw1 etc on xdist processes
xdist_suffix = getattr(request.config, "slaveinput", {}).get("slaveid")
if engine.url.database != ":memory:" and xdist_suffix is not None:
engine.url.database = "{}_{}".format(engine.url.database, xdist_suffix)
engine = create_engine(engine.url) # override engine
# Check that the DB exist and migrate the unmigrated SQLALchemy models as a stop-gap
database_url = sqlalchemy_connect_url
if not database_exists(database_url):
raise RuntimeError(f"SQLAlchemy cannot connect to DB at {database_url}")
Base.metadata.tables["profiling_profilingcommit"].create(
bind=engine, checkfirst=True
)
Base.metadata.tables["profiling_profilingupload"].create(
bind=engine, checkfirst=True
)
Base.metadata.tables["timeseries_measurement"].create(bind=engine, checkfirst=True)
Base.metadata.tables["timeseries_dataset"].create(bind=engine, checkfirst=True)
Base.metadata.tables["compare_commitcomparison"].create(
bind=engine, checkfirst=True
)
Base.metadata.tables["compare_flagcomparison"].create(bind=engine, checkfirst=True)
Base.metadata.tables["compare_componentcomparison"].create(
bind=engine, checkfirst=True
)
Base.metadata.tables["labelanalysis_labelanalysisrequest"].create(
bind=engine, checkfirst=True
)
Base.metadata.tables["labelanalysis_labelanalysisprocessingerror"].create(
bind=engine, checkfirst=True
)
Base.metadata.tables["staticanalysis_staticanalysissuite"].create(
bind=engine, checkfirst=True
)
Base.metadata.tables["staticanalysis_staticanalysissinglefilesnapshot"].create(
bind=engine, checkfirst=True
)
Base.metadata.tables["staticanalysis_staticanalysissuitefilepath"].create(
bind=engine, checkfirst=True
)
yield engine
print("Disposing engine")
engine.dispose()
@pytest.fixture(scope="session")
def sqlalchemy_db(request: pytest.FixtureRequest, django_db_blocker, django_db_setup):
# Bootstrap the DB by running the Django bootstrap version.
from django.conf import settings
from django.db import connections
from django.test.utils import setup_databases, teardown_databases
keepdb = request.config.getvalue("reuse_db", False) and not request.config.getvalue(
"create_db", False
)
with django_db_blocker.unblock():
# Temporarily reset the database to the SQLAlchemy DBs to run the migrations.
original_db_name = settings.DATABASES["default"]["NAME"]
original_test_name = settings.DATABASES["default"]["TEST"]["NAME"]
settings.DATABASES["default"]["NAME"] = "sqlalchemy"
settings.DATABASES["default"]["TEST"]["NAME"] = "test_postgres_sqlalchemy"
db_cfg = setup_databases(
verbosity=request.config.option.verbose,
interactive=False,
keepdb=keepdb,
)
settings.DATABASES["default"]["NAME"] = original_db_name
settings.DATABASES["default"]["TEST"]["NAME"] = original_test_name
# Hack to get the default connection for the test database to _actually_ be the
# Django database that the django_db should actually use. It was set to the SQLAlchemy database,
# but this makes sure that the default Django DB connection goes to the Django database.
# Since the database was already created and migrated in the django_db_setup fixture,
# we set keepdb=True to avoid recreating the database and rerunning the migrations.
connections.configure_settings(settings.DATABASES)
connections["default"].creation.create_test_db(
verbosity=request.config.option.verbose,
autoclobber=True,
keepdb=True,
)
yield
if not keepdb:
try:
with django_db_blocker.unblock():
# Need to set `test_postgres_sqlalchemy` as the main db name to tear down properly.
settings.DATABASES["default"]["NAME"] = "test_postgres_sqlalchemy"
teardown_databases(db_cfg, verbosity=request.config.option.verbose)
settings.DATABASES["default"]["NAME"] = original_db_name
except Exception as exc: # noqa: BLE001
request.node.warn(
pytest.PytestWarning(
f"Error when trying to teardown test databases: {exc!r}"
)
)
@pytest.fixture
def dbsession(sqlalchemy_db, engine):
"""Sets up the SQLAlchemy dbsession."""
connection = engine.connect()
connection_transaction = connection.begin()
# bind an individual Session to the connection
session = Session(bind=connection)
# start the session in a SAVEPOINT...
session.begin_nested()
# then each time that SAVEPOINT ends, reopen it
@event.listens_for(session, "after_transaction_end")
def restart_savepoint(session, transaction):
if transaction.nested and not transaction._parent.nested:
# ensure that state is expired the way
# session.commit() at the top level normally does
# (optional step)
session.expire_all()
session.begin_nested()
yield session
session.close()
connection_transaction.rollback()
connection.close()
@pytest.fixture
def mock_configuration(mocker):
m = mocker.patch("shared.config._get_config_instance")
mock_config = ConfigHelper()
m.return_value = mock_config
our_config = {
"bitbucket": {"bot": {"username": "codecov-io"}},
"services": {
"minio": {
"access_key_id": "codecov-default-key",
"bucket": "archive",
"hash_key": "88f572f4726e4971827415efa8867978",
"periodic_callback_ms": False,
"secret_access_key": "codecov-default-secret",
"verify_ssl": False,
},
"smtp": {
"host": "mailhog",
"port": 1025,
"username": "username",
"password": "password",
},
},
"setup": {
"codecov_url": "https://codecov.io",
"encryption_secret": "zp^P9*i8aR3",
"telemetry": {
"endpoint_override": "abcde",
},
},
}
mock_config.set_params(our_config)
return mock_config
@pytest.fixture
def empty_configuration(mocker):
m = mocker.patch("shared.config._get_config_instance")
mock_config = ConfigHelper()
m.return_value = mock_config
return mock_config
@pytest.fixture
def codecov_vcr(request):
vcr_log = logging.getLogger("vcr")
vcr_log.setLevel(logging.ERROR)
current_path = Path(request.node.fspath)
current_path_name = current_path.name.replace(".py", "")
cassete_path = current_path.parent / "cassetes" / current_path_name
if request.node.cls:
cls_name = request.node.cls.__name__
cassete_path = cassete_path / cls_name
current_name = request.node.name
casset_file_path = str(cassete_path / f"{current_name}.yaml")
with vcr.use_cassette(
casset_file_path,
record_mode="once",
filter_headers=["authorization"],
match_on=["method", "scheme", "host", "port", "path"],
) as cassete_maker:
yield cassete_maker
@pytest.fixture
def mock_redis(mocker):
m = mocker.patch("services.redis._get_redis_instance_from_url")
redis_server = mocker.MagicMock()
m.return_value = redis_server
yield redis_server
@pytest.fixture
def mock_storage(mocker):
m = mocker.patch("services.storage._cached_get_storage_client")
storage_server = MemoryStorageService({})
m.return_value = storage_server
yield storage_server
@pytest.fixture
def mock_smtp(mocker):
m = mocker.patch("services.smtp.SMTPService")
smtp_server = mocker.MagicMock()
m.return_value = smtp_server
yield smtp_server
@pytest.fixture
def mock_repo_provider(mocker):
m = mocker.patch("services.repository._get_repo_provider_service_instance")
provider_instance = mocker.MagicMock(
GithubHandler,
data={},
get_commit_diff=mock.AsyncMock(return_value={}),
get_distance_in_commits=mock.AsyncMock(
return_value={"behind_by": 0, "behind_by_commit": None}
),
)
m.return_value = provider_instance
yield provider_instance
@pytest.fixture
def mock_owner_provider(mocker):
provider_instance = mocker.MagicMock(GithubHandler)
def side_effect(*args, **kwargs):
provider_instance.data = {**kwargs}
return provider_instance
m = mocker.patch("services.owner._get_owner_provider_service_instance")
m.side_effect = side_effect
yield provider_instance
@pytest.fixture
def with_sql_functions(dbsession):
dbsession.execute(
"""CREATE OR REPLACE FUNCTION array_append_unique(anyarray, anyelement) RETURNS anyarray
LANGUAGE sql IMMUTABLE
AS $_$
select case when $2 is null
then $1
else array_remove($1, $2) || array[$2]
end;
$_$;"""
)
dbsession.execute(
"""create or replace function try_to_auto_activate(int, int) returns boolean as $$
update owners
set plan_activated_users = (
case when coalesce(array_length(plan_activated_users, 1), 0) < plan_user_count -- we have credits
then array_append_unique(plan_activated_users, $2) -- add user
else plan_activated_users
end)
where ownerid=$1
returning (plan_activated_users @> array[$2]);
$$ language sql volatile strict;"""
)
dbsession.execute(
"""create or replace function get_gitlab_root_group(int) returns jsonb as $$
/* get root group by following parent_service_id to highest level */
with recursive tree as (
select o.service_id,
o.parent_service_id,
o.ownerid,
1 as depth
from owners o
where o.ownerid = $1
and o.service = 'gitlab'
and o.parent_service_id is not null
union all
select o.service_id,
o.parent_service_id,
o.ownerid,
depth + 1 as depth
from tree t
join owners o
on o.service_id = t.parent_service_id
/* avoid infinite loop in case of cycling (2 > 5 > 3 > 2 > 5...) up to Gitlab max subgroup depth of 20 */
where depth <= 20
), data as (
select t.ownerid,
t.service_id
from tree t
where t.parent_service_id is null
)
select to_jsonb(data) from data limit 1;
$$ language sql stable strict;"""
)
dbsession.flush()
# We don't want any tests submitting checkpoint logs to Sentry for real
@pytest.fixture(autouse=True)
def mock_checkpoint_submit(mocker, request):
# We mock sentry differently in the tests for CheckpointLogger
if request.node.get_closest_marker("real_checkpoint_logger"):
return
def mock_submit_fn(metric, start, end):
pass
mock_submit = mocker.Mock()
mock_submit.side_effect = mock_submit_fn
from helpers.checkpoint_logger import CheckpointLogger
return mocker.patch.object(
CheckpointLogger,
"submit_subflow",
mock_submit,
)
@pytest.fixture(autouse=True)
def mock_feature(mocker, request):
if request.node.get_closest_marker("real_feature"):
return
from shared.rollouts import Feature
def check_value(self, identifier, default=False):
return default
return mocker.patch.object(Feature, "check_value", check_value)