Source code for mythril.analysis.solver

"""This module contains analysis module helpers to solve path constraints."""

from typing import Dict, List, Tuple, Union, Any

import z3
import logging

from z3 import FuncInterp

from mythril.exceptions import UnsatError
from mythril.laser.ethereum.function_managers import (
    keccak_function_manager,
)

from mythril.laser.ethereum.state.constraints import Constraints
from mythril.laser.ethereum.state.global_state import GlobalState
from mythril.laser.ethereum.transaction import BaseTransaction
from mythril.laser.ethereum.transaction.transaction_models import (
    ContractCreationTransaction,
)
from mythril.laser.smt import UGE, symbol_factory
from mythril.support.model import get_model

log = logging.getLogger(__name__)
z3.set_option(
    max_args=10000000, max_lines=1000000, max_depth=10000000, max_visited=1000000
)


[docs]def pretty_print_model(model): """Pretty prints a z3 model :param model: :return: """ ret = "" for d in model.decls(): if type(model[d]) == FuncInterp: condition = model[d].as_list() ret += "%s: %s\n" % (d.name(), condition) continue try: condition = "0x%x" % model[d].as_long() except: condition = str(z3.simplify(model[d])) ret += "%s: %s\n" % (d.name(), condition) return ret
[docs]def get_transaction_sequence( global_state: GlobalState, constraints: Constraints ) -> Dict[str, Any]: """Generate concrete transaction sequence. Note: This function only considers the constraints in constraint argument, which in some cases is expected to differ from global_state's constraints :param global_state: GlobalState to generate transaction sequence for :param constraints: list of constraints used to generate transaction sequence """ transaction_sequence = global_state.world_state.transaction_sequence concrete_transactions = [] tx_constraints, minimize = _set_minimisation_constraints( transaction_sequence, constraints.copy(), [], 5000, global_state.world_state ) try: model = get_model(tx_constraints, minimize=minimize) except UnsatError: raise UnsatError if isinstance(transaction_sequence[0], ContractCreationTransaction): initial_world_state = transaction_sequence[0].prev_world_state else: initial_world_state = transaction_sequence[0].world_state initial_accounts = initial_world_state.accounts for transaction in transaction_sequence: concrete_transaction = _get_concrete_transaction(model, transaction) concrete_transactions.append(concrete_transaction) min_price_dict: Dict[str, int] = {} for address in initial_accounts.keys(): min_price_dict[address] = model.eval( initial_world_state.starting_balances[ symbol_factory.BitVecVal(address, 256) ].raw, model_completion=True, ).as_long() concrete_initial_state = _get_concrete_state(initial_accounts, min_price_dict) if isinstance(transaction_sequence[0], ContractCreationTransaction): code = transaction_sequence[0].code _replace_with_actual_sha(concrete_transactions, model, code) else: _replace_with_actual_sha(concrete_transactions, model) _add_calldata_placeholder(concrete_transactions, transaction_sequence) steps = {"initialState": concrete_initial_state, "steps": concrete_transactions} return steps
def _add_calldata_placeholder( concrete_transactions: List[Dict[str, str]], transaction_sequence: List[BaseTransaction], ): """ Add's a calldata placeholder into the concrete transactions :param concrete_transactions: :param transaction_sequence: :return: """ for tx in concrete_transactions: tx["calldata"] = tx["input"] if not isinstance(transaction_sequence[0], ContractCreationTransaction): return if type(transaction_sequence[0].code.bytecode) == tuple: code_len = len(transaction_sequence[0].code.bytecode) * 2 else: code_len = len(transaction_sequence[0].code.bytecode) concrete_transactions[0]["calldata"] = concrete_transactions[0]["input"][ code_len + 2 : ] def _replace_with_actual_sha( concrete_transactions: List[Dict[str, str]], model: z3.Model, code=None ): concrete_hashes = keccak_function_manager.get_concrete_hash_data(model) for tx in concrete_transactions: if keccak_function_manager.hash_matcher not in tx["input"]: continue if code is not None and code.bytecode in tx["input"]: s_index = len(code.bytecode) + 2 else: s_index = 10 for i in range(s_index, len(tx["input"])): data_slice = tx["input"][i : i + 64] if ( keccak_function_manager.hash_matcher not in data_slice or len(data_slice) != 64 ): continue find_input = symbol_factory.BitVecVal(int(data_slice, 16), 256) input_ = None for size in concrete_hashes: _, inverse = keccak_function_manager.store_function[size] if find_input.value not in concrete_hashes[size]: continue input_ = symbol_factory.BitVecVal( model.eval(inverse(find_input).raw).as_long(), size ) if input_ is None: continue keccak = keccak_function_manager.find_concrete_keccak(input_) hex_keccak = hex(keccak.value)[2:] if len(hex_keccak) != 64: hex_keccak = "0" * (64 - len(hex_keccak)) + hex_keccak tx["input"] = tx["input"][:s_index] + tx["input"][s_index:].replace( tx["input"][i : 64 + i], hex_keccak ) def _get_concrete_state( initial_accounts: Dict, min_price_dict: Dict[str, int] ) -> Dict[str, Dict]: """Gets a concrete state""" accounts = {} for address, account in initial_accounts.items(): # Skip empty default account data: Dict[str, Union[int, str]] = {} data["nonce"] = account.nonce data["code"] = account.serialised_code() data["storage"] = str(account.storage) data["balance"] = hex(min_price_dict.get(address, 0)) accounts[hex(address)] = data return {"accounts": accounts} def _get_concrete_transaction(model: z3.Model, transaction: BaseTransaction): """Gets a concrete transaction from a transaction and z3 model""" # Get concrete values from transaction address = hex(transaction.callee_account.address.value) value = model.eval(transaction.call_value.raw, model_completion=True).as_long() caller = "0x" + ( "%x" % model.eval(transaction.caller.raw, model_completion=True).as_long() ).zfill(40) input_ = "" if isinstance(transaction, ContractCreationTransaction): address = "" input_ += transaction.code.bytecode input_ += "".join( [ hex(b)[2:] if len(hex(b)) % 2 == 0 else "0" + hex(b)[2:] for b in transaction.call_data.concrete(model) ] ) # Create concrete transaction dict concrete_transaction = dict() # type: Dict[str, str] concrete_transaction["input"] = "0x" + input_ concrete_transaction["value"] = "0x%x" % value # Fixme: base origin assignment on origin symbol concrete_transaction["origin"] = caller concrete_transaction["address"] = "%s" % address return concrete_transaction def _set_minimisation_constraints( transaction_sequence, constraints, minimize, max_size, world_state ) -> Tuple[Constraints, tuple]: """Set constraints that minimise key transaction values Constraints generated: - Upper bound on calldata size - Minimisation of call value's and calldata sizes :param transaction_sequence: Transaction for which the constraints should be applied :param constraints: The constraints array which should contain any added constraints :param minimize: The minimisation array which should contain any variables that should be minimised :param max_size: The max size of the calldata array :return: updated constraints, minimize """ for transaction in transaction_sequence: # Set upper bound on calldata size max_calldata_size = symbol_factory.BitVecVal(max_size, 256) constraints.append(UGE(max_calldata_size, transaction.call_data.calldatasize)) # Minimize minimize.append(transaction.call_data.calldatasize) minimize.append(transaction.call_value) constraints.append( UGE( symbol_factory.BitVecVal(1000000000000000000000, 256), world_state.starting_balances[transaction.caller], ) ) for account in world_state.accounts.values(): # Lazy way to prevent overflows and to ensure "reasonable" balances # Each account starts with less than 100 ETH constraints.append( UGE( symbol_factory.BitVecVal(100000000000000000000, 256), world_state.starting_balances[account.address], ) ) return constraints, tuple(minimize)