diff --git a/bcreg-aca/src/app.py b/bcreg-aca/src/app.py index 1fd444a..2c94979 100644 --- a/bcreg-aca/src/app.py +++ b/bcreg-aca/src/app.py @@ -97,6 +97,32 @@ def submit_credential(): return response +@app.route('/issue-credential-v20', methods=['POST']) +def submit_credential_v20(): + """ + Exposed method to proxy credential issuance requests. + """ + if not issuer.tob_connection_synced(): + abort(503, "Connection not yet synced") + + start_time = time.perf_counter() + method = 'submit_credential_v20.batch' + + if not request.json: + end_time = time.perf_counter() + issuer.log_timing_method(method, start_time, end_time, False) + abort(400) + + cred_input = request.json + + response = issuer.handle_send_credential_v20(cred_input) + + end_time = time.perf_counter() + issuer.log_timing_method(method, start_time, end_time, True) + + return response + + @app.route('/api/agentcb/topic//', methods=['POST']) def agent_callback(topic): """ @@ -131,6 +157,13 @@ def agent_callback(topic): else: response = jsonify({}) + elif topic == issuer.TOPIC_CREDENTIALS_V20 or topic == issuer.TOPIC_CREDENTIALS_V20_INDY: + if "state" in message: + method = method + '.' + message["state"] + response = issuer.handle_credentials_v20(message["state"], message) + else: + response = jsonify({}) + elif topic == issuer.TOPIC_PRESENTATIONS: if "state" in message: method = method + '.' + message["state"] @@ -138,6 +171,13 @@ def agent_callback(topic): else: response = jsonify({}) + elif topic == issuer.TOPIC_PRESENTATIONS_V20: + if "state" in message: + method = method + '.' + message["state"] + response = issuer.handle_presentations_v20(message["state"], message) + else: + response = jsonify({}) + elif topic == issuer.TOPIC_GET_ACTIVE_MENU: response = issuer.handle_get_active_menu(message) diff --git a/bcreg-aca/src/issuer.py b/bcreg-aca/src/issuer.py index 36be183..5946bcc 100644 --- a/bcreg-aca/src/issuer.py +++ b/bcreg-aca/src/issuer.py @@ -637,7 +637,10 @@ def get_credential_response(cred_exch_id): TOPIC_CONNECTIONS = "connections" TOPIC_CONNECTIONS_ACTIVITY = "connections_actvity" TOPIC_CREDENTIALS = "issue_credential" -TOPIC_PRESENTATIONS = "presentations" +TOPIC_CREDENTIALS_V20 = "issue_credential_v2_0" +TOPIC_CREDENTIALS_V20_INDY = "issue_credential_v2_0_indy" +TOPIC_PRESENTATIONS = "handle_present_proof" +TOPIC_PRESENTATIONS_V20 = "present_proof_v2_0" TOPIC_GET_ACTIVE_MENU = "get-active-menu" TOPIC_PERFORM_MENU_ACTION = "perform-menu-action" TOPIC_ISSUER_REGISTRATION = "issuer_registration" @@ -664,7 +667,7 @@ def handle_credentials(state, message): ) else: pass - if state == "credential_acked": + if state == "credential_acked" or state == "done": # raise 10% errors #do_error = random.randint(1, 100) #if do_error <= 10: @@ -679,11 +682,42 @@ def handle_credentials(state, message): return jsonify({"message": state}) +def handle_credentials_v20(state, message): + start_time = time.perf_counter() + method = "Handle callback:" + state + log_timing_event(method, message, start_time, None, False) + + if "thread_id" in message: + set_credential_thread_id( + message["cred_ex_id"], message["thread_id"] + ) + else: + pass + if state == "credential_acked" or state == "done": + # raise 10% errors + #do_error = random.randint(1, 100) + #if do_error <= 10: + # raise Exception("Fake exception to test error handling: " + message["thread_id"]) + response = {"success": True, "result": message["cred_ex_id"]} + add_credential_response(message["cred_ex_id"], response) + + end_time = time.perf_counter() + processing_time = end_time - start_time + log_timing_event(method, message, start_time, end_time, True, outcome=str(state)) + + return jsonify({"message": "state"}) + + def handle_presentations(state, message): # TODO auto-respond to proof requests return jsonify({"message": state}) +def handle_presentations_v20(state, message): + # TODO auto-respond to proof requests + return jsonify({"message": state}) + + def handle_get_active_menu(message): # TODO add/update issuer info? return jsonify({}) @@ -775,28 +809,35 @@ def run(self): ) response.raise_for_status() cred_data = response.json() + credential_exchange_id = None if "credential_exchange_id" in cred_data: + credential_exchange_id = cred_data["credential_exchange_id"] result_available = add_credential_request( cred_data["credential_exchange_id"] ) + elif "cred_ex_id" in cred_data: + credential_exchange_id = cred_data["cred_ex_id"] + result_available = add_credential_request( + cred_data["cred_ex_id"] + ) else: raise Exception(json.dumps(cred_data)) # wait for confirmation from the agent, which will include the credential exchange id if result_available and not result_available.wait(MAX_CRED_RESPONSE_TIMEOUT): # no response received so we'll add our own "timeout" response - add_credential_timeout_report(cred_data["credential_exchange_id"], cred_data["thread_id"]) + add_credential_timeout_report(credential_exchange_id, cred_data["thread_id"]) LOGGER.error( "Got credential TIMEOUT: %s %s %s", cred_data["thread_id"], - cred_data["credential_exchange_id"], + credential_exchange_id, cred_data["connection_id"], ) end_time = time.perf_counter() log_timing_method(method, start_time, end_time, False, data={ 'thread_id':cred_data["thread_id"], - 'credential_exchange_id':cred_data["credential_exchange_id"], + 'credential_exchange_id':credential_exchange_id, 'Error': 'Timeout', 'elapsed_time': (end_time-start_time) } @@ -812,7 +853,7 @@ def run(self): # there should be some form of response available self.cred_response = get_credential_response( - cred_data["credential_exchange_id"] + credential_exchange_id ) except Exception as exc: @@ -823,11 +864,11 @@ def run(self): outcome = str(exc) if cred_data: add_credential_exception_report( - cred_data["credential_exchange_id"], exc + credential_exchange_id, exc ) data={ 'thread_id':cred_data["thread_id"], - 'credential_exchange_id':cred_data["credential_exchange_id"], + 'credential_exchange_id':credential_exchange_id, 'Error': str(exc), 'elapsed_time': (end_time-start_time) } @@ -939,3 +980,100 @@ def handle_send_credential(cred_input): print(" ", processing_time/processed_count, "seconds per credential") return jsonify(cred_responses) + + +def handle_send_credential_v20(cred_input): + """ + # other sample data + sample_credentials = [ + { + "schema": "ian-registration.ian-ville", + "version": "1.0.0", + "attributes": { + "corp_num": "ABC12345", + "registration_date": "2018-01-01", + "entity_name": "Ima Permit", + "entity_name_effective": "2018-01-01", + "entity_status": "ACT", + "entity_status_effective": "2019-01-01", + "entity_type": "ABC", + "registered_jurisdiction": "BC", + "effective_date": "2019-01-01", + "expiry_date": "" + } + }, + { + "schema": "ian-permit.ian-ville", + "version": "1.0.0", + "attributes": { + "permit_id": str(uuid.uuid4()), + "entity_name": "Ima Permit", + "corp_num": "ABC12345", + "permit_issued_date": "2018-01-01", + "permit_type": "ABC", + "permit_status": "OK", + "effective_date": "2019-01-01" + } + } + ] + """ + # construct and send the credential + global app_config + + agent_admin_url = app_config["AGENT_ADMIN_URL"] + + start_time = time.perf_counter() + processing_time = 0 + processed_count = 0 + + # let's send a credential! + cred_responses = [] + for credential in cred_input: + cred_def_key = "CRED_DEF_" + credential["schema"] + "_" + credential["version"] + credential_definition_id = app_config["schemas"][cred_def_key] + + credential_attributes = [] + for attribute in credential["attributes"]: + credential_attributes.append({ + "name": attribute, + "value": credential["attributes"][attribute] + }) + cred_offer = { + "credential_preview": { + "@type": "issue-credential/2.0/credential-preview", + "attributes": credential_attributes + }, + "filter": { + "indy": { + "schema_id": app_config["schemas"][ + "SCHEMA_" + credential["schema"] + "_" + credential["version"] + ], + "schema_name": credential["schema"], + "issuer_did": app_config["DID"], + "schema_version": credential["version"], + "schema_issuer_did": app_config["DID"], + "cred_def_id": credential_definition_id, + } + }, + "comment": "", + "connection_id": app_config["TOB_CONNECTION"], + } + do_trace = random.randint(1, 100) + if do_trace <= TRACE_MSG_PCT: + cred_offer["trace"] = True + thread = SendCredentialThread( + credential_definition_id, + cred_offer, + agent_admin_url + "/issue-credential-2.0/send", + ADMIN_REQUEST_HEADERS, + ) + thread.start() + thread.join() + cred_responses.append(thread.cred_response) + processed_count = processed_count + 1 + + processing_time = time.perf_counter() - start_time + print(">>> Processed", processed_count, "credentials in", processing_time) + print(" ", processing_time/processed_count, "seconds per credential") + + return jsonify(cred_responses) diff --git a/data-pipeline/bcreg/bcreg_core.py b/data-pipeline/bcreg/bcreg_core.py index 153967a..6481212 100644 --- a/data-pipeline/bcreg/bcreg_core.py +++ b/data-pipeline/bcreg/bcreg_core.py @@ -61,7 +61,6 @@ def __init__(self, cache=False): self.conn.set_session(readonly=True) #self.conn.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_READ_COMMITTED) self.conn.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_REPEATABLE_READ) - self.conn.autocommit = True # Register the adapter sqlite3.register_adapter(decimal.Decimal, adapt_decimal) @@ -76,7 +75,6 @@ def __init__(self, cache=False): self.sec_conn.set_session(readonly=True) #self.sec_conn.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_READ_COMMITTED) self.sec_conn.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_REPEATABLE_READ) - self.sec_conn.autocommit = True except (Exception) as error: LOGGER.error(error) LOGGER.error(traceback.print_exc()) diff --git a/data-pipeline/bcreg/credssubmitter.py b/data-pipeline/bcreg/credssubmitter.py index 3d009cd..2b5e31f 100644 --- a/data-pipeline/bcreg/credssubmitter.py +++ b/data-pipeline/bcreg/credssubmitter.py @@ -39,6 +39,10 @@ CONTROLLER_URL = os.environ.get('CONTROLLER_URL', 'http://localhost:5002') NOTIFY_OF_CREDENTIAL_POSTING_ERRORS = os.environ.get('NOTIFY_OF_CREDENTIAL_POSTING_ERRORS', 'false') +ISSUE_CRED_VERSION = os.getenv('ISSUE_CRED_VERSION', 'V10') +if not ISSUE_CRED_VERSION in ['V10', 'V20']: + raise Exception(f"Unsupported Issue Credential version: {ISSUE_CRED_VERSION}") + CREDS_BATCH_SIZE = int(os.getenv('CREDS_BATCH_SIZE', '3000')) CREDS_REQUEST_SIZE = int(os.getenv('CREDS_REQUEST_SIZE', '5')) MAX_CREDS_REQUESTS = int(os.getenv('MAX_CREDS_REQUESTS', '32')) @@ -138,24 +142,28 @@ async def submit_cred_batch(creds): print(traceback.format_exc()) raise -async def submit_cred(attrs, schema, version): +@backoff.on_exception(backoff.expo, + ( + aiohttp.ClientError, + ), + max_tries=5) +async def submit_cred_batch_v20(creds): try: async with aiohttp.ClientSession() as local_http_client: response = await local_http_client.post( - '{}/issue-credential'.format(CONTROLLER_URL), - params={'schema': schema, 'version': version}, - json=attrs + '{}/issue-credential-v20'.format(CONTROLLER_URL), + json=creds ) if response.status != 200: raise RuntimeError( - 'Credential could not be processed: {}'.format(await response.text()) + 'Credentials could not be processed: {}'.format(await response.text()) ) result_json = await response.json() return result_json except Exception as exc: print(exc) print(traceback.format_exc()) - raise + raise # add reason code to the submitted credential def inject_reason(attributes, reason): @@ -183,7 +191,10 @@ async def post_credentials(conn, credentials): cur2 = None try: results = None - results = await submit_cred_batch(post_creds) + if ISSUE_CRED_VERSION == "V20": + results = await submit_cred_batch_v20(post_creds) + else: + results = await submit_cred_batch(post_creds) except (Exception) as error: # posting to the controller failed :-( diff --git a/data-pipeline/bcreg/find-test-corps.py b/data-pipeline/bcreg/find-test-corps.py index 933776a..41f2a1c 100644 --- a/data-pipeline/bcreg/find-test-corps.py +++ b/data-pipeline/bcreg/find-test-corps.py @@ -99,6 +99,27 @@ 'C1384725', 'C1392032', 'FI0000018', +'A0104970', +'0593892', +'0721528', +'0778929', +'1384068', +'1384850', +'1389207', +'1389207', +'1389236', +'1389250', +'1390824', +'1390824', +'1392202', +'1392202', +'1392202', +'1392202', +'1392202', +'1392202', +'1449881', +'C1384725', +'C1392032', ] specific_corps_2 = [