Source code for mythril.mythril.mythril_disassembler

import logging
import re
import solc
import sys
import os

from semantic_version import Version, NpmSpec
from typing import List, Tuple, Optional

from mythril.support.support_utils import sha3, zpad
from mythril.ethereum import util
from mythril.ethereum.interface.rpc.client import EthJsonRpc
from mythril.exceptions import CriticalError, CompilerError, NoContractFoundError
from mythril.support import signatures
from mythril.support.support_utils import rzpad
from mythril.support.support_args import args
from mythril.ethereum.evmcontract import EVMContract
from mythril.ethereum.interface.rpc.exceptions import ConnectionError
from mythril.solidity.soliditycontract import SolidityContract, get_contracts_from_file
from eth_utils import int_to_big_endian


log = logging.getLogger(__name__)


[docs]class MythrilDisassembler: """ The Mythril Disassembler class Responsible for generating disassembly of smart contracts: - Compiles solc code from file/onchain - Can also be used to access onchain storage data """ def __init__( self, eth: Optional[EthJsonRpc] = None, solc_version: str = None, solc_settings_json: str = None, enable_online_lookup: bool = False, ) -> None: self.solc_binary = self._init_solc_binary(solc_version) self.solc_settings_json = solc_settings_json self.eth = eth self.enable_online_lookup = enable_online_lookup self.sigs = signatures.SignatureDB(enable_online_lookup=enable_online_lookup) self.contracts = [] # type: List[EVMContract] @staticmethod def _init_solc_binary(version: str) -> Optional[str]: """ Only proper versions are supported. No nightlies, commits etc (such as available in remix). :param version: Version of the solc binary required :return: The solc binary of the corresponding version """ if not version: return None # tried converting input to semver, seemed not necessary so just slicing for now try: main_version = solc.get_solc_version_string() except: main_version = "" # allow missing solc will download instead main_version_number = re.match(r"\d+.\d+.\d+", main_version) if version.startswith("v"): version = version[1:] if version and NpmSpec("^0.8.0").match(Version(version)): args.use_integer_module = False if version == main_version_number: log.info("Given version matches installed version") solc_binary = os.environ.get("SOLC") or "solc" else: solc_binary = util.solc_exists(version) if solc_binary is None: raise CriticalError( "The version of solc that is needed cannot be installed automatically" ) else: log.info("Setting the compiler to %s", solc_binary) return solc_binary
[docs] def load_from_bytecode( self, code: str, bin_runtime: bool = False, address: Optional[str] = None ) -> Tuple[str, EVMContract]: """ Returns the address and the contract class for the given bytecode :param code: Bytecode :param bin_runtime: Whether the code is runtime code or creation code :param address: address of contract :return: tuple(address, Contract class) """ if address is None: address = util.get_indexed_address(0) if bin_runtime: self.contracts.append( EVMContract( code=code, name="MAIN", enable_online_lookup=self.enable_online_lookup, ) ) else: self.contracts.append( EVMContract( creation_code=code, name="MAIN", enable_online_lookup=self.enable_online_lookup, ) ) return address, self.contracts[-1] # return address and contract object
[docs] def load_from_address(self, address: str) -> Tuple[str, EVMContract]: """ Returns the contract given it's on chain address :param address: The on chain address of a contract :return: tuple(address, contract) """ if not re.match(r"0x[a-fA-F0-9]{40}", address): raise CriticalError("Invalid contract address. Expected format is '0x...'.") if self.eth is None: raise CriticalError( "Please check whether the Infura key is set or use a different RPC method." ) try: code = self.eth.eth_getCode(address) except FileNotFoundError as e: raise CriticalError("IPC error: " + str(e)) except ConnectionError: raise CriticalError( "Could not connect to RPC server. Make sure that your node is running and that RPC parameters are set correctly." ) except Exception as e: raise CriticalError("IPC / RPC error: " + str(e)) if code == "0x" or code == "0x0": raise CriticalError( "Received an empty response from eth_getCode. Check the contract address and verify that you are on the correct chain." ) else: self.contracts.append( EVMContract( code, name=address, enable_online_lookup=self.enable_online_lookup ) ) return address, self.contracts[-1] # return address and contract object
[docs] def load_from_solidity( self, solidity_files: List[str] ) -> Tuple[str, List[SolidityContract]]: """ :param solidity_files: List of solidity_files :return: tuple of address, contract class list """ address = util.get_indexed_address(0) contracts = [] for file in solidity_files: if ":" in file: file, contract_name = file.split(":") else: contract_name = None file = os.path.expanduser(file) solc_binary = self.solc_binary or util.extract_binary(file) try: # import signatures from solidity source self.sigs.import_solidity_file( file, solc_binary=solc_binary, solc_settings_json=self.solc_settings_json, ) if contract_name is not None: contract = SolidityContract( input_file=file, name=contract_name, solc_settings_json=self.solc_settings_json, solc_binary=solc_binary, ) self.contracts.append(contract) contracts.append(contract) else: for contract in get_contracts_from_file( input_file=file, solc_settings_json=self.solc_settings_json, solc_binary=solc_binary, ): self.contracts.append(contract) contracts.append(contract) except FileNotFoundError as e: raise CriticalError(f"Input file not found {e}") except CompilerError as e: error_msg = str(e) # Check if error is related to solidity version mismatch if ( "Error: Source file requires different compiler version" in error_msg ): # Grab relevant line "pragma solidity <solv>...", excluding any comments solv_pragma_line = error_msg.split("\n")[-3].split("//")[0] # Grab solidity version from relevant line solv_match = re.findall(r"[0-9]+\.[0-9]+\.[0-9]+", solv_pragma_line) error_suggestion = ( "<version_number>" if len(solv_match) != 1 else solv_match[0] ) error_msg = ( error_msg + '\nSolidityVersionMismatch: Try adding the option "--solv ' + error_suggestion + '"\n' ) raise CriticalError(error_msg) except NoContractFoundError: log.error( "The file " + file + " does not contain a compilable contract." ) return address, contracts
[docs] @staticmethod def hash_for_function_signature(func: str) -> str: """ Return function nadmes corresponding signature hash :param func: function name :return: Its hash signature """ print(sha3(func)) return "0x%s" % sha3(func)[:4].hex()
[docs] def get_state_variable_from_storage( self, address: str, params: Optional[List[str]] = None ) -> str: """ Get variables from the storage :param address: The contract address :param params: The list of parameters param types: [position, length] or ["mapping", position, key1, key2, ... ] or [position, length, array] :return: The corresponding storage slot and its value """ params = params or [] (position, length, mappings) = (0, 1, []) try: if params[0] == "mapping": if len(params) < 3: raise CriticalError("Invalid number of parameters.") position = int(params[1]) position_formatted = zpad(int_to_big_endian(position), 32) for i in range(2, len(params)): key = bytes(params[i], "utf8") key_formatted = rzpad(key, 32) mappings.append( int.from_bytes( sha3(key_formatted + position_formatted), byteorder="big" ) ) length = len(mappings) if length == 1: position = mappings[0] else: if len(params) >= 4: raise CriticalError("Invalid number of parameters.") if len(params) >= 1: position = int(params[0]) if len(params) >= 2: length = int(params[1]) if len(params) == 3 and params[2] == "array": position_formatted = zpad(int_to_big_endian(position), 32) position = int.from_bytes(sha3(position_formatted), byteorder="big") except ValueError: raise CriticalError( "Invalid storage index. Please provide a numeric value." ) outtxt = [] try: if length == 1: outtxt.append( "{}: {}".format( position, self.eth.eth_getStorageAt(address, position) ) ) else: if len(mappings) > 0: for i in range(0, len(mappings)): position = mappings[i] outtxt.append( "{}: {}".format( hex(position), self.eth.eth_getStorageAt(address, position), ) ) else: for i in range(position, position + length): outtxt.append( "{}: {}".format( hex(i), self.eth.eth_getStorageAt(address, i) ) ) except FileNotFoundError as e: raise CriticalError("IPC error: " + str(e)) except ConnectionError: raise CriticalError( "Could not connect to RPC server. " "Make sure that your node is running and that RPC parameters are set correctly." ) return "\n".join(outtxt)