diff --git a/tests/unit_tests/test_async_subtensor.py b/tests/unit_tests/test_async_subtensor.py index ff7446aee..bc51c0de5 100644 --- a/tests/unit_tests/test_async_subtensor.py +++ b/tests/unit_tests/test_async_subtensor.py @@ -1083,7 +1083,7 @@ async def test_get_delegated_no_block_hash_no_reuse(subtensor, mocker): fake_coldkey_ss58 = "fake_ss58_address" mocked_ss58_to_vec_u8 = mocker.Mock(return_value=b"encoded_coldkey") - mocker.patch("bittensor.core.async_subtensor.ss58_to_vec_u8", mocked_ss58_to_vec_u8) + mocker.patch.object(async_subtensor, "ss58_to_vec_u8", mocked_ss58_to_vec_u8) mocked_rpc_request = mocker.AsyncMock(return_value={"result": b"mocked_result"}) subtensor.substrate.rpc_request = mocked_rpc_request @@ -1113,7 +1113,7 @@ async def test_get_delegated_with_block_hash(subtensor, mocker): fake_block_hash = "fake_block_hash" mocked_ss58_to_vec_u8 = mocker.Mock(return_value=b"encoded_coldkey") - mocker.patch("bittensor.core.async_subtensor.ss58_to_vec_u8", mocked_ss58_to_vec_u8) + mocker.patch.object(async_subtensor, "ss58_to_vec_u8", mocked_ss58_to_vec_u8) mocked_rpc_request = mocker.AsyncMock(return_value={"result": b"mocked_result"}) subtensor.substrate.rpc_request = mocked_rpc_request @@ -1145,7 +1145,7 @@ async def test_get_delegated_with_reuse_block(subtensor, mocker): subtensor.substrate.last_block_hash = "last_block_hash" mocked_ss58_to_vec_u8 = mocker.Mock(return_value=b"encoded_coldkey") - mocker.patch("bittensor.core.async_subtensor.ss58_to_vec_u8", mocked_ss58_to_vec_u8) + mocker.patch.object(async_subtensor, "ss58_to_vec_u8", mocked_ss58_to_vec_u8) mocked_rpc_request = mocker.AsyncMock(return_value={"result": b"mocked_result"}) subtensor.substrate.rpc_request = mocked_rpc_request @@ -1177,7 +1177,7 @@ async def test_get_delegated_with_empty_result(subtensor, mocker): fake_coldkey_ss58 = "fake_ss58_address" mocked_ss58_to_vec_u8 = mocker.Mock(return_value=b"encoded_coldkey") - mocker.patch("bittensor.core.async_subtensor.ss58_to_vec_u8", mocked_ss58_to_vec_u8) + mocker.patch.object(async_subtensor, "ss58_to_vec_u8", mocked_ss58_to_vec_u8) mocked_rpc_request = mocker.AsyncMock(return_value={}) subtensor.substrate.rpc_request = mocked_rpc_request @@ -1204,8 +1204,9 @@ async def test_query_identity_successful(subtensor, mocker): mocked_query = mocker.AsyncMock(return_value=fake_identity_info) subtensor.substrate.query = mocked_query - mocker.patch( - "bittensor.core.async_subtensor._decode_hex_identity_dict", + mocker.patch.object( + async_subtensor, + "_decode_hex_identity_dict", return_value={"stake": "01 02"}, ) @@ -1256,8 +1257,9 @@ async def test_query_identity_type_error(subtensor, mocker): mocked_query = mocker.AsyncMock(return_value=fake_identity_info) subtensor.substrate.query = mocked_query - mocker.patch( - "bittensor.core.async_subtensor._decode_hex_identity_dict", + mocker.patch.object( + async_subtensor, + "_decode_hex_identity_dict", side_effect=TypeError, ) @@ -1273,3 +1275,226 @@ async def test_query_identity_type_error(subtensor, mocker): reuse_block_hash=False, ) assert result == {} + + +@pytest.mark.asyncio +async def test_weights_successful(subtensor, mocker): + """Tests weights method with successful weight distribution retrieval.""" + # Preps + fake_netuid = 1 + fake_block_hash = "block_hash" + fake_weights = [ + (0, [(1, 10), (2, 20)]), + (1, [(0, 15), (2, 25)]), + ] + + async def mock_query_map(**_): + for uid, w in fake_weights: + yield uid, w + + mocker.patch.object(subtensor.substrate, "query_map", side_effect=mock_query_map) + + # Call + result = await subtensor.weights(netuid=fake_netuid, block_hash=fake_block_hash) + + # Asserts + subtensor.substrate.query_map.assert_called_once_with( + module="SubtensorModule", + storage_function="Weights", + params=[fake_netuid], + block_hash=fake_block_hash, + ) + assert result == fake_weights + + +@pytest.mark.asyncio +async def test_bonds(subtensor, mocker): + """Tests bonds method with successful bond distribution retrieval.""" + # Preps + fake_netuid = 1 + fake_block_hash = "block_hash" + fake_bonds = [ + (0, [(1, 100), (2, 200)]), + (1, [(0, 150), (2, 250)]), + ] + + async def mock_query_map(**_): + for uid, b in fake_bonds: + yield uid, b + + mocker.patch.object(subtensor.substrate, "query_map", side_effect=mock_query_map) + + # Call + result = await subtensor.bonds(netuid=fake_netuid, block_hash=fake_block_hash) + + # Asserts + subtensor.substrate.query_map.assert_called_once_with( + module="SubtensorModule", + storage_function="Bonds", + params=[fake_netuid], + block_hash=fake_block_hash, + ) + assert result == fake_bonds + + +@pytest.mark.asyncio +async def test_does_hotkey_exist_true(subtensor, mocker): + """Tests does_hotkey_exist method when the hotkey exists and is valid.""" + # Preps + fake_hotkey_ss58 = "valid_hotkey" + fake_block_hash = "block_hash" + fake_query_result = ["decoded_account_id"] + + mocked_query = mocker.AsyncMock(return_value=fake_query_result) + subtensor.substrate.query = mocked_query + + mocked_decode_account_id = mocker.Mock(return_value="another_account_id") + mocker.patch.object(async_subtensor, "decode_account_id", mocked_decode_account_id) + + # Call + result = await subtensor.does_hotkey_exist( + hotkey_ss58=fake_hotkey_ss58, block_hash=fake_block_hash + ) + + # Asserts + mocked_query.assert_called_once_with( + module="SubtensorModule", + storage_function="Owner", + params=[fake_hotkey_ss58], + block_hash=fake_block_hash, + reuse_block_hash=False, + ) + mocked_decode_account_id.assert_called_once_with(fake_query_result[0]) + assert result is True + + +@pytest.mark.asyncio +async def test_does_hotkey_exist_false_for_specific_account(subtensor, mocker): + """Tests does_hotkey_exist method when the hotkey exists but matches the specific account ID to ignore.""" + # Preps + fake_hotkey_ss58 = "ignored_hotkey" + fake_query_result = ["ignored_account_id"] + + mocked_query = mocker.AsyncMock(return_value=fake_query_result) + subtensor.substrate.query = mocked_query + + # Mock the decode_account_id function to return the specific account ID that should be ignored + mocked_decode_account_id = mocker.Mock( + return_value="5C4hrfjw9DjXZTzV3MwzrrAr9P1MJhSrvWGWqi1eSuyUpnhM" + ) + mocker.patch.object(async_subtensor, "decode_account_id", mocked_decode_account_id) + + # Call + result = await subtensor.does_hotkey_exist(hotkey_ss58=fake_hotkey_ss58) + + # Asserts + mocked_query.assert_called_once_with( + module="SubtensorModule", + storage_function="Owner", + params=[fake_hotkey_ss58], + block_hash=None, + reuse_block_hash=False, + ) + mocked_decode_account_id.assert_called_once_with(fake_query_result[0]) + assert result is False + + +@pytest.mark.asyncio +async def test_get_hotkey_owner_successful(subtensor, mocker): + """Tests get_hotkey_owner method when the hotkey exists and has an owner.""" + # Preps + fake_hotkey_ss58 = "valid_hotkey" + fake_block_hash = "block_hash" + fake_owner_account_id = "owner_account_id" + + mocked_query = mocker.AsyncMock(return_value=[fake_owner_account_id]) + subtensor.substrate.query = mocked_query + + mocked_decode_account_id = mocker.Mock(return_value="decoded_owner_account_id") + mocker.patch.object(async_subtensor, "decode_account_id", mocked_decode_account_id) + + mocked_does_hotkey_exist = mocker.AsyncMock(return_value=True) + subtensor.does_hotkey_exist = mocked_does_hotkey_exist + + # Call + result = await subtensor.get_hotkey_owner( + hotkey_ss58=fake_hotkey_ss58, block_hash=fake_block_hash + ) + + # Asserts + mocked_query.assert_called_once_with( + module="SubtensorModule", + storage_function="Owner", + params=[fake_hotkey_ss58], + block_hash=fake_block_hash, + ) + mocked_decode_account_id.assert_called_once_with(fake_owner_account_id) + mocked_does_hotkey_exist.assert_awaited_once_with( + fake_hotkey_ss58, block_hash=fake_block_hash + ) + assert result == "decoded_owner_account_id" + + +@pytest.mark.asyncio +async def test_get_hotkey_owner_non_existent_hotkey(subtensor, mocker): + """Tests get_hotkey_owner method when the hotkey does not exist in the query result.""" + # Preps + fake_hotkey_ss58 = "non_existent_hotkey" + fake_block_hash = "block_hash" + + mocked_query = mocker.AsyncMock(return_value=[None]) + subtensor.substrate.query = mocked_query + + mocked_decode_account_id = mocker.Mock(return_value=None) + mocker.patch.object(async_subtensor, "decode_account_id", mocked_decode_account_id) + + # Call + result = await subtensor.get_hotkey_owner( + hotkey_ss58=fake_hotkey_ss58, block_hash=fake_block_hash + ) + + # Asserts + mocked_query.assert_called_once_with( + module="SubtensorModule", + storage_function="Owner", + params=[fake_hotkey_ss58], + block_hash=fake_block_hash, + ) + mocked_decode_account_id.assert_called_once_with(None) + assert result is None + + +@pytest.mark.asyncio +async def test_get_hotkey_owner_exists_but_does_not_exist_flag_false(subtensor, mocker): + """Tests get_hotkey_owner method when decode_account_id returns a value but does_hotkey_exist returns False.""" + # Preps + fake_hotkey_ss58 = "valid_hotkey" + fake_block_hash = "block_hash" + fake_owner_account_id = "owner_account_id" + + mocked_query = mocker.AsyncMock(return_value=[fake_owner_account_id]) + subtensor.substrate.query = mocked_query + + mocked_decode_account_id = mocker.Mock(return_value="decoded_owner_account_id") + mocker.patch.object(async_subtensor, "decode_account_id", mocked_decode_account_id) + + mocked_does_hotkey_exist = mocker.AsyncMock(return_value=False) + subtensor.does_hotkey_exist = mocked_does_hotkey_exist + + # Call + result = await subtensor.get_hotkey_owner( + hotkey_ss58=fake_hotkey_ss58, block_hash=fake_block_hash + ) + + # Asserts + mocked_query.assert_called_once_with( + module="SubtensorModule", + storage_function="Owner", + params=[fake_hotkey_ss58], + block_hash=fake_block_hash, + ) + mocked_decode_account_id.assert_called_once_with(fake_owner_account_id) + mocked_does_hotkey_exist.assert_awaited_once_with( + fake_hotkey_ss58, block_hash=fake_block_hash + ) + assert result is None