diff --git a/.flake8 b/.flake8 index 4ae521ab8..29cb32392 100644 --- a/.flake8 +++ b/.flake8 @@ -8,7 +8,7 @@ # W504 - Line break occurred after a binary operator ignore = E265,E402,E999,W293,W504 max-line-length = 120 -exclude = .tox,.git,*/migrations/*,*/static/CACHE/*,docs,node_modules,.venv,*/cdk.out/* +exclude = .tox,.git,*/migrations/*,*/static/CACHE/*,docs,node_modules,.venv,*/cdk.out/*,*/build/* # F401 - Unused imports -- this is the only way to have a file-wide rule exception per-file-ignores = diff --git a/TrafficCapture/dockerSolution/src/main/docker/elasticsearchTestConsole/Dockerfile b/TrafficCapture/dockerSolution/src/main/docker/elasticsearchTestConsole/Dockerfile index 18b3088fd..d0406d4b2 100644 --- a/TrafficCapture/dockerSolution/src/main/docker/elasticsearchTestConsole/Dockerfile +++ b/TrafficCapture/dockerSolution/src/main/docker/elasticsearchTestConsole/Dockerfile @@ -3,6 +3,7 @@ FROM amazonlinux:2023 ENV PIP_ROOT_USER_ACTION ignore ENV LANG C.UTF-8 +# procps-ng used for enabling 'watch' command on console RUN dnf install -y --setopt=install_weak_deps=False \ curl-minimal \ diffutils \ @@ -26,6 +27,7 @@ RUN dnf install -y --setopt=install_weak_deps=False \ vim \ wget \ zlib-devel \ + procps-ng \ && \ dnf clean all && \ rm -rf /var/cache/dnf diff --git a/TrafficCapture/dockerSolution/src/main/docker/elasticsearchTestConsole/testDocumentGenerator.py b/TrafficCapture/dockerSolution/src/main/docker/elasticsearchTestConsole/testDocumentGenerator.py index 7186838dd..19cbacd96 100755 --- a/TrafficCapture/dockerSolution/src/main/docker/elasticsearchTestConsole/testDocumentGenerator.py +++ b/TrafficCapture/dockerSolution/src/main/docker/elasticsearchTestConsole/testDocumentGenerator.py @@ -44,15 +44,29 @@ def send_request(session, index_suffix, url_base, auth, headers, no_refresh): return None, str(e), None +def send_multi_type_request(session, index_name, type_name, payload, url_base, auth, headers, no_refresh): + """Send a request to the specified URL with the given payload.""" + timestamp = datetime.now().isoformat() + refresh_param = "false" if no_refresh else "true" + url = f"{url_base}/{index_name}/{type_name}/{timestamp}?refresh={refresh_param}" + try: + response = session.put(url, json=payload, auth=auth, headers=headers, verify=False, timeout=0.5) + return response.status_code, timestamp, response.json() + except requests.RequestException as e: + return None, str(e), None + + def parse_args(): """Parse command line arguments.""" parser = argparse.ArgumentParser() parser.add_argument("--endpoint", help="Cluster endpoint e.g. http://test.elb.us-west-2.amazonaws.com:9200.") parser.add_argument("--username", help="Cluster username.") parser.add_argument("--password", help="Cluster password.") + parser.add_argument("--enable_multi_type", action='store_true', + help="Flag to enable sending documents to a multi-type index.") parser.add_argument("--no-clear-output", action='store_true', help="Flag to not clear the output before each run. " + - "Helpful for piping to a file or other utility.") + "Helpful for piping to a file or other utility.") parser.add_argument("--requests-per-sec", type=float, default=10.0, help="Target requests per second to be sent.") parser.add_argument("--no-refresh", action='store_true', help="Flag to disable refresh after each request.") return parser.parse_args() @@ -97,13 +111,13 @@ def calculate_sleep_time(request_timestamps, target_requests_per_sec): """ if not request_timestamps: return 0 - + target_time_per_iteration = 1.0 / target_requests_per_sec average_time_per_iteration = (datetime.now() - request_timestamps[0]).total_seconds() / (len(request_timestamps) + 1) - + sleep_time = (target_time_per_iteration - average_time_per_iteration) * len(request_timestamps) - + return max(0, sleep_time) @@ -119,14 +133,29 @@ def main(): total_counts = {'2xx': 0, '4xx': 0, '5xx': 0, 'error': 0} start_time = time.time() request_timestamps = deque() + total_requests = 0 while True: + total_requests = total_requests + 1 request_timestamps.append(datetime.now()) current_index = get_current_date_index() - response_code, request_timestamp_or_error, response_json = send_request( - session, current_index, url_base, auth, keep_alive_headers, args.no_refresh - ) + # Alternate between sending multi-type requests of 'type1' and 'type2' + if args.enable_multi_type: + if total_requests % 2 != 0: + type_name = "type1" + payload = {"title": "This is title of type 1"} + else: + type_name = "type2" + payload = {"content": "This is content of type 2", "contents": "This is contents of type 2"} + response_code, request_timestamp_or_error, response_json = send_multi_type_request( + session, "multi_type_index", type_name, payload, url_base, auth, keep_alive_headers, args.no_refresh + ) + # Send simple document request + else: + response_code, request_timestamp_or_error, response_json = send_request( + session, current_index, url_base, auth, keep_alive_headers, args.no_refresh + ) update_counts(response_code, total_counts) if response_code is not None: @@ -137,7 +166,7 @@ def main(): response_pretty = "Response: N/A" throughput = calculate_throughput(request_timestamps) - + summary_message = ( f"Summary: 2xx responses = {total_counts['2xx']}, 4xx responses = {total_counts['4xx']}, " f"5xx responses = {total_counts['5xx']}, Error requests = {total_counts['error']}" @@ -151,7 +180,7 @@ def main(): f"{response_pretty}\n" + f"{summary_message}\n" + f"{throughput_message}") - + sleep_time = calculate_sleep_time(request_timestamps, args.requests_per_sec) # Flush the stdout buffer to ensure the log messages are displayed immediately and in sync @@ -159,12 +188,12 @@ def main(): if (sleep_time > 0): time.sleep(sleep_time) - + if time.time() - start_time >= 5: session.close() session = requests.Session() start_time = time.time() - + # Remove timestamps older than 5 seconds while request_timestamps and (datetime.now() - request_timestamps[0]).total_seconds() > 5: request_timestamps.popleft() diff --git a/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/Dockerfile b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/Dockerfile index 775069ed9..71a7b9934 100644 --- a/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/Dockerfile +++ b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/Dockerfile @@ -60,7 +60,8 @@ RUN dnf install -y bash-completion --setopt=install_weak_deps=False && \ RUN echo '. /etc/profile.d/bash_completion.sh' >> ~/.bashrc && \ echo '. /etc/profile.d/venv.sh' >> ~/.bashrc && \ echo 'echo Welcome to the Migration Assistant Console' >> ~/.bashrc && \ - echo 'eval "$(register-python-argcomplete cluster_tools)"' >> ~/.bashrc + echo 'eval "$(register-python-argcomplete cluster_tools)"' >> ~/.bashrc && \ + echo 'PS1="(\t) \[\e[92m\]migration-console \[\e[0m\](\w) -> "' >> ~/.bashrc # Set ENV to control startup script in /bin/sh mode ENV ENV=/root/.bashrc diff --git a/deployment/cdk/opensearch-service-migration/lib/service-stacks/migration-console-stack.ts b/deployment/cdk/opensearch-service-migration/lib/service-stacks/migration-console-stack.ts index 3ca4837f5..55d8ef629 100644 --- a/deployment/cdk/opensearch-service-migration/lib/service-stacks/migration-console-stack.ts +++ b/deployment/cdk/opensearch-service-migration/lib/service-stacks/migration-console-stack.ts @@ -338,8 +338,8 @@ export class MigrationConsoleStack extends MigrationServiceCore { taskRole: serviceTaskRole, taskRolePolicies: servicePolicies, cpuArchitecture: props.fargateCpuArch, - taskCpuUnits: 1024, - taskMemoryLimitMiB: 2048, + taskCpuUnits: 2048, + taskMemoryLimitMiB: 4096, ...props }); }