Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/master'
Browse files Browse the repository at this point in the history
  • Loading branch information
cipig committed Feb 27, 2024
2 parents e593774 + 470c76d commit 8b46f47
Show file tree
Hide file tree
Showing 10 changed files with 230 additions and 33 deletions.
2 changes: 1 addition & 1 deletion contrib/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# example of Dockerfile that installs spesmilo electrumx 1.16.0
# ENV variables can be overriden on the `docker run` command
# ENV variables can be overridden on the `docker run` command

FROM python:3.9.16-bullseye AS builder

Expand Down
2 changes: 0 additions & 2 deletions docs/HOWTO.rst
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ Python3 ElectrumX uses asyncio. Python version >= 3.8 is
**required**.
`aiohttp`_ Python library for asynchronous HTTP. Version >=
2.0 required.
`pylru`_ Python LRU cache package.
DB Engine A database engine package is required; two are
supported (see `Database Engine`_ below).
================ ========================
Expand Down Expand Up @@ -443,7 +442,6 @@ You can then set the port as follows and advertise the service externally on the
.. _`daemontools`: http://cr.yp.to/daemontools.html
.. _`runit`: http://smarden.org/runit/index.html
.. _`aiohttp`: https://pypi.python.org/pypi/aiohttp
.. _`pylru`: https://pypi.python.org/pypi/pylru
.. _`dash_hash`: https://pypi.python.org/pypi/dash_hash
.. _`contrib/raspberrypi3/install_electrumx.sh`: https://github.com/spesmilo/electrumx/blob/master/contrib/raspberrypi3/install_electrumx.sh
.. _`contrib/raspberrypi3/run_electrumx.sh`: https://github.com/spesmilo/electrumx/blob/master/contrib/raspberrypi3/run_electrumx.sh
181 changes: 181 additions & 0 deletions electrumx/lib/lrucache.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,181 @@
# The MIT License (MIT)
#
# Copyright (c) 2014-2022 Thomas Kemmer
#
# Permission is hereby granted, free of charge, to any person obtaining a copy of
# this software and associated documentation files (the "Software"), to deal in
# the Software without restriction, including without limitation the rights to
# use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
# the Software, and to permit persons to whom the Software is furnished to do so,
# subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
# FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
# COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
# IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
# CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
#
# -----
#
# This is a stripped down LRU-cache from the "cachetools" library.
# https://github.com/tkem/cachetools/blob/d991ac71b4eb6394be5ec572b835434081393215/src/cachetools/__init__.py

import collections
import collections.abc


class _DefaultSize:

__slots__ = ()

def __getitem__(self, _):
return 1

def __setitem__(self, _, value):
assert value == 1

def pop(self, _):
return 1


class Cache(collections.abc.MutableMapping):
"""Mutable mapping to serve as a simple cache or cache base class."""

__marker = object()

__size = _DefaultSize()

def __init__(self, maxsize, getsizeof=None):
if getsizeof:
self.getsizeof = getsizeof
if self.getsizeof is not Cache.getsizeof:
self.__size = dict()
self.__data = dict()
self.__currsize = 0
self.__maxsize = maxsize

def __repr__(self):
return "%s(%s, maxsize=%r, currsize=%r)" % (
self.__class__.__name__,
repr(self.__data),
self.__maxsize,
self.__currsize,
)

def __getitem__(self, key):
try:
return self.__data[key]
except KeyError:
return self.__missing__(key)

def __setitem__(self, key, value):
maxsize = self.__maxsize
size = self.getsizeof(value)
if size > maxsize:
raise ValueError("value too large")
if key not in self.__data or self.__size[key] < size:
while self.__currsize + size > maxsize:
self.popitem()
if key in self.__data:
diffsize = size - self.__size[key]
else:
diffsize = size
self.__data[key] = value
self.__size[key] = size
self.__currsize += diffsize

def __delitem__(self, key):
size = self.__size.pop(key)
del self.__data[key]
self.__currsize -= size

def __contains__(self, key):
return key in self.__data

def __missing__(self, key):
raise KeyError(key)

def __iter__(self):
return iter(self.__data)

def __len__(self):
return len(self.__data)

def get(self, key, default=None):
if key in self:
return self[key]
else:
return default

def pop(self, key, default=__marker):
if key in self:
value = self[key]
del self[key]
elif default is self.__marker:
raise KeyError(key)
else:
value = default
return value

def setdefault(self, key, default=None):
if key in self:
value = self[key]
else:
self[key] = value = default
return value

@property
def maxsize(self):
"""The maximum size of the cache."""
return self.__maxsize

@property
def currsize(self):
"""The current size of the cache."""
return self.__currsize

@staticmethod
def getsizeof(value):
"""Return the size of a cache element's value."""
return 1


class LRUCache(Cache):
"""Least Recently Used (LRU) cache implementation."""

def __init__(self, maxsize, getsizeof=None):
Cache.__init__(self, maxsize, getsizeof)
self.__order = collections.OrderedDict()

def __getitem__(self, key, cache_getitem=Cache.__getitem__):
value = cache_getitem(self, key)
if key in self: # __missing__ may not store item
self.__update(key)
return value

def __setitem__(self, key, value, cache_setitem=Cache.__setitem__):
cache_setitem(self, key, value)
self.__update(key)

def __delitem__(self, key, cache_delitem=Cache.__delitem__):
cache_delitem(self, key)
del self.__order[key]

def popitem(self):
"""Remove and return the `(key, value)` pair least recently used."""
try:
key = next(iter(self.__order))
except StopIteration:
raise KeyError("%s is empty" % type(self).__name__) from None
else:
return (key, self.pop(key))

def __update(self, key):
try:
self.__order.move_to_end(key)
except KeyError:
self.__order[key] = None
2 changes: 1 addition & 1 deletion electrumx/lib/tx.py
Original file line number Diff line number Diff line change
Expand Up @@ -1009,7 +1009,7 @@ def _is_anon_input(self):
self.script[1] == self.OP_ANON_MARKER)

def is_generation(self):
# Transactions comming in from stealth addresses are seen by
# Transactions coming in from stealth addresses are seen by
# the blockchain as newly minted coins. The reverse, where coins
# are sent TO a stealth address, are seen by the blockchain as
# a coin burn.
Expand Down
2 changes: 1 addition & 1 deletion electrumx/server/daemon.py
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ async def _send_data(self, data):
async def _send(self, payload, processor):
'''Send a payload to be converted to JSON.
Handles temporary connection issues. Daemon reponse errors
Handles temporary connection issues. Daemon response errors
are raise through DaemonError.
'''
def log_error(error):
Expand Down
36 changes: 21 additions & 15 deletions electrumx/server/mempool.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
from abc import ABC, abstractmethod
from asyncio import Lock
from collections import defaultdict
from typing import Sequence, Tuple, TYPE_CHECKING, Type, Dict
from typing import Sequence, Tuple, TYPE_CHECKING, Type, Dict, Optional, Set
import math

import attr
Expand All @@ -31,10 +31,10 @@
class MemPoolTx:
prevouts = attr.ib() # type: Sequence[Tuple[bytes, int]]
# A pair is a (hashX, value) tuple
in_pairs = attr.ib()
out_pairs = attr.ib()
fee = attr.ib()
size = attr.ib()
in_pairs = attr.ib() # type: Optional[Sequence[Tuple[bytes, int]]]
out_pairs = attr.ib() # type: Sequence[Tuple[bytes, int]]
fee = attr.ib() # type: int
size = attr.ib() # type: int


@attr.s(slots=True)
Expand Down Expand Up @@ -119,7 +119,7 @@ def __init__(
self.coin = coin
self.api = api
self.logger = class_logger(__name__, self.__class__.__name__)
self.txs = {}
self.txs = {} # type: Dict[bytes, MemPoolTx]
self.hashXs = defaultdict(set) # None can be a key
self.cached_compact_histogram = []
self.refresh_secs = refresh_secs
Expand Down Expand Up @@ -205,7 +205,7 @@ def _compress_histogram(
prev_fee_rate = fee_rate
return compact

def _accept_transactions(self, tx_map, utxo_map, touched):
def _accept_transactions(self, tx_map: Dict[bytes, MemPoolTx], utxo_map, touched):
'''Accept transactions in tx_map to the mempool if all their inputs
can be found in the existing mempool or a utxo_map from the
DB.
Expand All @@ -223,7 +223,7 @@ def _accept_transactions(self, tx_map, utxo_map, touched):
try:
for prevout in tx.prevouts:
utxo = utxo_map.get(prevout)
if not utxo:
if not utxo: # i.e. parent also unconfirmed
prev_hash, prev_index = prevout
# Raises KeyError if prev_hash is not in txs
utxo = txs[prev_hash].out_pairs[prev_index]
Expand Down Expand Up @@ -274,7 +274,7 @@ async def _refresh_hashes(self, synchronized_event):
touched = set()
await sleep(self.refresh_secs)

async def _process_mempool(self, all_hashes, touched, mempool_height):
async def _process_mempool(self, all_hashes: Set[bytes], touched, mempool_height):
# Re-sync with the new set of hashes
txs = self.txs
hashXs = self.hashXs
Expand Down Expand Up @@ -321,16 +321,17 @@ async def _process_mempool(self, all_hashes, touched, mempool_height):

return touched

async def _fetch_and_accept(self, hashes, all_hashes, touched):
async def _fetch_and_accept(self, hashes: Sequence[bytes], all_hashes: Set[bytes], touched):
'''Fetch a list of mempool transactions.'''
hex_hashes_iter = (hash_to_hex_str(hash) for hash in hashes)
raw_txs = await self.api.raw_transactions(hex_hashes_iter)

def deserialize_txs(): # This function is pure
def deserialize_txs() -> Dict[bytes, MemPoolTx]:
"""This function is pure"""
to_hashX = self.coin.hashX_from_script
deserializer = self.coin.DESERIALIZER

txs = {}
txs = {} # type: Dict[bytes, MemPoolTx]
for hash, raw_tx in zip(hashes, raw_txs):
# The daemon may have evicted the tx from its
# mempool or it may have gotten in a block
Expand All @@ -348,12 +349,17 @@ def deserialize_txs(): # This function is pure
if not txin.is_generation())
txout_pairs = tuple((to_hashX(txout.pk_script), txout.value)
for txout in tx.outputs)
txs[hash] = MemPoolTx(txin_pairs, None, txout_pairs,
0, tx_size)
txs[hash] = MemPoolTx(
prevouts=txin_pairs,
in_pairs=None,
out_pairs=txout_pairs,
fee=0,
size=tx_size,
)
return txs

# Thread this potentially slow operation so as not to block
tx_map = await run_in_thread(deserialize_txs)
tx_map = await run_in_thread(deserialize_txs) # type: Dict[bytes, MemPoolTx]

# Determine all prevouts not in the mempool, and fetch the
# UTXO information from the database. Failed prevout lookups
Expand Down
23 changes: 18 additions & 5 deletions electrumx/server/peers.py
Original file line number Diff line number Diff line change
Expand Up @@ -431,7 +431,9 @@ async def _send_server_features(self, session, peer):
message = 'server.features'
features = await session.send_request(message)
assert_good(message, features, dict)
hosts = [host.lower() for host in features.get('hosts', {})]
features_hosts = features.get('hosts', {})
assert_good(message, features_hosts, dict)
hosts = [host.lower() for host in features_hosts]
if self.env.coin.GENESIS_HASH != features.get('genesis_hash'):
raise BadPeerError('incorrect genesis hash')
if peer.host.lower() in hosts:
Expand Down Expand Up @@ -474,10 +476,21 @@ async def discover_peers(self):
self.logger.info(f'my clearnet self: {self._my_clearnet_peer()}')
self.logger.info(f'force use of proxy: {self.env.force_proxy}')
self.logger.info(f'beginning peer discovery...')
async with self.group as group:
await group.spawn(self._refresh_blacklist())
await group.spawn(self._detect_proxy())
await group.spawn(self._import_peers())
try:
async with self.group as group:
await group.spawn(self._refresh_blacklist())
await group.spawn(self._detect_proxy())
await group.spawn(self._import_peers())
except Exception:
self.logger.exception(
"PeerManager.group died! Please open a bug report about this, "
"and include at least this traceback and maybe a bit more of "
"the log (if it looks relevant).")
while True:
await sleep(3600)
self.logger.warning(
"Peer discovery is not working as PeerManager.group died. "
"This is not normal. You should inspect the log.")

def info(self):
'''The number of peers.'''
Expand Down
Loading

0 comments on commit 8b46f47

Please sign in to comment.