"""This module contains a representation class for EVM instructions and
transitions between them."""
import logging
from copy import copy, deepcopy
from typing import cast, Callable, List, Union, Tuple
from mythril.exceptions import UnsatError
from mythril.laser.smt import (
Extract,
Expression,
UDiv,
simplify,
Concat,
ULT,
UGT,
BitVec,
is_false,
URem,
SRem,
If,
Bool,
Not,
LShR,
UGE,
)
from mythril.laser.smt import symbol_factory
from mythril.disassembler.disassembly import Disassembly
from mythril.laser.ethereum.state.calldata import ConcreteCalldata, SymbolicCalldata
import mythril.laser.ethereum.util as helper
from mythril.laser.ethereum import util
from mythril.laser.ethereum.function_managers import (
keccak_function_manager,
exponent_function_manager,
)
from mythril.laser.ethereum.call import (
get_call_parameters,
native_call,
get_call_data,
SYMBOLIC_CALLDATA_SIZE,
)
from mythril.laser.ethereum.evm_exceptions import (
VmException,
StackUnderflowException,
InvalidJumpDestination,
InvalidInstruction,
OutOfGasException,
WriteProtection,
)
from mythril.laser.ethereum.instruction_data import get_opcode_gas, calculate_sha3_gas
from mythril.laser.ethereum.state.global_state import GlobalState
from mythril.laser.ethereum.state.return_data import ReturnData
from mythril.laser.ethereum.transaction import (
MessageCallTransaction,
TransactionStartSignal,
ContractCreationTransaction,
tx_id_manager,
)
from mythril.support.model import get_model
from mythril.support.support_utils import get_code_hash
from mythril.support.loader import DynLoader
log = logging.getLogger(__name__)
TT256 = symbol_factory.BitVecVal(0, 256)
TT256M1 = symbol_factory.BitVecVal(2**256 - 1, 256)
[docs]def transfer_ether(
global_state: GlobalState,
sender: BitVec,
receiver: BitVec,
value: Union[int, BitVec],
):
"""
Perform an Ether transfer between two accounts
:param global_state: The global state in which the Ether transfer occurs
:param sender: The sender of the Ether
:param receiver: The recipient of the Ether
:param value: The amount of Ether to send
:return:
"""
value = value if isinstance(value, BitVec) else symbol_factory.BitVecVal(value, 256)
global_state.world_state.constraints.append(
UGE(global_state.world_state.balances[sender], value)
)
global_state.world_state.balances[receiver] += value
global_state.world_state.balances[sender] -= value
[docs]class StateTransition(object):
"""Decorator that handles global state copy and original return.
This decorator calls the decorated instruction mutator function on a
copy of the state that is passed to it. After the call, the
resulting new states' program counter is automatically incremented
if `increment_pc=True`.
"""
def __init__(
self, increment_pc=True, enable_gas=True, is_state_mutation_instruction=False
):
"""
:param increment_pc:
:param enable_gas:
:param is_state_mutation_instruction: The function mutates state
"""
self.increment_pc = increment_pc
self.enable_gas = enable_gas
self.is_state_mutation_instruction = is_state_mutation_instruction
[docs] @staticmethod
def call_on_state_copy(func: Callable, func_obj: "Instruction", state: GlobalState):
"""
:param func:
:param func_obj:
:param state:
:return:
"""
global_state_copy = copy(state)
return func(func_obj, global_state_copy)
[docs] def increment_states_pc(self, states: List[GlobalState]) -> List[GlobalState]:
"""
:param states:
:return:
"""
if self.increment_pc:
for state in states:
state.mstate.pc += 1
return states
[docs] @staticmethod
def check_gas_usage_limit(global_state: GlobalState):
"""
:param global_state:
:return:
"""
global_state.mstate.check_gas()
if isinstance(global_state.current_transaction.gas_limit, BitVec):
value = global_state.current_transaction.gas_limit.value
if value is None:
return
global_state.current_transaction.gas_limit = value
if (
global_state.mstate.min_gas_used
>= global_state.current_transaction.gas_limit
):
raise OutOfGasException()
[docs] def accumulate_gas(self, global_state: GlobalState):
"""
:param global_state:
:return:
"""
if not self.enable_gas:
return global_state
opcode = global_state.instruction["opcode"]
min_gas, max_gas = get_opcode_gas(opcode)
global_state.mstate.min_gas_used += min_gas
global_state.mstate.max_gas_used += max_gas
self.check_gas_usage_limit(global_state)
return global_state
def __call__(self, func: Callable) -> Callable:
def wrapper(
func_obj: "Instruction", global_state: GlobalState
) -> List[GlobalState]:
"""
:param func_obj:
:param global_state:
:return:
"""
if self.is_state_mutation_instruction and global_state.environment.static:
raise WriteProtection(
"The function {} cannot be executed in a static call".format(
func.__name__[:-1]
)
)
new_global_states = self.call_on_state_copy(func, func_obj, global_state)
new_global_states = [
self.accumulate_gas(state) for state in new_global_states
]
return self.increment_states_pc(new_global_states)
return wrapper
[docs]class Instruction:
"""Instruction class is used to mutate a state according to the current
instruction."""
def __init__(
self,
op_code: str,
dynamic_loader: DynLoader,
pre_hooks: List[Callable] = None,
post_hooks: List[Callable] = None,
) -> None:
"""
:param op_code:
:param dynamic_loader:
:param iprof:
"""
self.dynamic_loader = dynamic_loader
self.op_code = op_code.upper()
self.pre_hook = pre_hooks if pre_hooks else []
self.post_hook = post_hooks if post_hooks else []
def _execute_pre_hooks(self, global_state: GlobalState):
for hook in self.pre_hook:
hook(global_state)
def _execute_post_hooks(self, global_state: GlobalState):
for hook in self.post_hook:
hook(global_state)
[docs] def evaluate(self, global_state: GlobalState, post=False) -> List[GlobalState]:
"""Performs the mutation for this instruction.
:param global_state:
:param post:
:return:
"""
# Generalize some ops
log.debug("Evaluating %s at %i", self.op_code, global_state.mstate.pc)
op = self.op_code.lower()
if self.op_code.startswith("PUSH"):
op = "push"
elif self.op_code.startswith("DUP"):
op = "dup"
elif self.op_code.startswith("SWAP"):
op = "swap"
elif self.op_code.startswith("LOG"):
op = "log"
instruction_mutator = (
getattr(self, op + "_", None)
if not post
else getattr(self, op + "_" + "post", None)
)
if instruction_mutator is None:
raise NotImplementedError
self._execute_pre_hooks(global_state)
result = instruction_mutator(global_state)
self._execute_post_hooks(global_state)
return result
[docs] @StateTransition()
def jumpdest_(self, global_state: GlobalState) -> List[GlobalState]:
"""
:param global_state:
:return:
"""
return [global_state]
[docs] @StateTransition()
def push_(self, global_state: GlobalState) -> List[GlobalState]:
"""
:param global_state:
:return:
"""
push_instruction = global_state.get_current_instruction()
push_value = push_instruction.get("argument", 0)
try:
length_of_value = 2 * int(push_instruction["opcode"][4:])
except ValueError:
raise VmException("Invalid Push instruction")
if length_of_value == 0:
global_state.mstate.stack.append(symbol_factory.BitVecVal(0, 256))
elif isinstance(push_value, tuple):
if isinstance(push_value[0], int):
new_value = symbol_factory.BitVecVal(push_value[0], 8)
else:
new_value = push_value[0]
if len(push_value) > 1:
for val in push_value[1:]:
if isinstance(val, int):
new_value = Concat(new_value, symbol_factory.BitVecVal(val, 8))
else:
new_value = Concat(new_value, val)
pad_length = length_of_value // 2 - len(push_value)
if pad_length > 0:
new_value = Concat(new_value, symbol_factory.BitVecVal(0, pad_length))
if new_value.size() < 256:
new_value = Concat(
symbol_factory.BitVecVal(0, 256 - new_value.size()), new_value
)
global_state.mstate.stack.append(new_value)
else:
push_value += "0" * max(length_of_value - (len(push_value) - 2), 0)
global_state.mstate.stack.append(
symbol_factory.BitVecVal(int(push_value, 16), 256)
)
return [global_state]
[docs] @StateTransition()
def dup_(self, global_state: GlobalState) -> List[GlobalState]:
"""
:param global_state:
:return:
"""
value = int(global_state.get_current_instruction()["opcode"][3:], 10)
global_state.mstate.stack.append(global_state.mstate.stack[-value])
return [global_state]
[docs] @StateTransition()
def swap_(self, global_state: GlobalState) -> List[GlobalState]:
"""
:param global_state:
:return:
"""
depth = int(self.op_code[4:])
stack = global_state.mstate.stack
stack[-depth - 1], stack[-1] = stack[-1], stack[-depth - 1]
return [global_state]
[docs] @StateTransition()
def pop_(self, global_state: GlobalState) -> List[GlobalState]:
"""
:param global_state:
:return:
"""
global_state.mstate.stack.pop()
return [global_state]
[docs] @StateTransition()
def and_(self, global_state: GlobalState) -> List[GlobalState]:
"""
:param global_state:
:return:
"""
stack = global_state.mstate.stack
op1, op2 = stack.pop(), stack.pop()
if isinstance(op1, Bool):
op1 = If(
op1, symbol_factory.BitVecVal(1, 256), symbol_factory.BitVecVal(0, 256)
)
if isinstance(op2, Bool):
op2 = If(
op2, symbol_factory.BitVecVal(1, 256), symbol_factory.BitVecVal(0, 256)
)
if not isinstance(op1, Expression):
op1 = symbol_factory.BitVecVal(op1, 256)
if not isinstance(op2, Expression):
op2 = symbol_factory.BitVecVal(op2, 256)
stack.append(op1 & op2)
return [global_state]
[docs] @StateTransition()
def or_(self, global_state: GlobalState) -> List[GlobalState]:
"""
:param global_state:
:return:
"""
stack = global_state.mstate.stack
op1, op2 = stack.pop(), stack.pop()
if isinstance(op1, Bool):
op1 = If(
op1, symbol_factory.BitVecVal(1, 256), symbol_factory.BitVecVal(0, 256)
)
if isinstance(op2, Bool):
op2 = If(
op2, symbol_factory.BitVecVal(1, 256), symbol_factory.BitVecVal(0, 256)
)
stack.append(op1 | op2)
return [global_state]
[docs] @StateTransition()
def xor_(self, global_state: GlobalState) -> List[GlobalState]:
"""
:param global_state:
:return:
"""
mstate = global_state.mstate
mstate.stack.append(mstate.stack.pop() ^ mstate.stack.pop())
return [global_state]
[docs] @StateTransition()
def not_(self, global_state: GlobalState):
"""
:param global_state:
:return:
"""
mstate = global_state.mstate
mstate.stack.append(TT256M1 - mstate.stack.pop())
return [global_state]
[docs] @StateTransition()
def byte_(self, global_state: GlobalState) -> List[GlobalState]:
"""
:param global_state:
:return:
"""
mstate = global_state.mstate
op0, op1 = mstate.stack.pop(), mstate.stack.pop()
if not isinstance(op1, Expression):
op1 = symbol_factory.BitVecVal(op1, 256)
try:
index = util.get_concrete_int(op0)
offset = (31 - index) * 8
if offset >= 0:
result: Union[int, Expression] = simplify(
Concat(
symbol_factory.BitVecVal(0, 248),
Extract(offset + 7, offset, op1),
)
)
else:
result = 0
except TypeError:
log.debug("BYTE: Unsupported symbolic byte offset")
result = global_state.new_bitvec(
str(simplify(op1)) + "[" + str(simplify(op0)) + "]", 256
)
mstate.stack.append(result)
return [global_state]
# Arithmetic
[docs] @StateTransition()
def add_(self, global_state: GlobalState) -> List[GlobalState]:
"""
:param global_state:
:return:
"""
global_state.mstate.stack.append(
(
helper.pop_bitvec(global_state.mstate)
+ helper.pop_bitvec(global_state.mstate)
)
)
return [global_state]
[docs] @StateTransition()
def sub_(self, global_state: GlobalState) -> List[GlobalState]:
"""
:param global_state:
:return:
"""
global_state.mstate.stack.append(
(
helper.pop_bitvec(global_state.mstate)
- helper.pop_bitvec(global_state.mstate)
)
)
return [global_state]
[docs] @StateTransition()
def mul_(self, global_state: GlobalState) -> List[GlobalState]:
"""
:param global_state:
:return:
"""
global_state.mstate.stack.append(
(
helper.pop_bitvec(global_state.mstate)
* helper.pop_bitvec(global_state.mstate)
)
)
return [global_state]
[docs] @StateTransition()
def div_(self, global_state: GlobalState) -> List[GlobalState]:
"""
:param global_state:
:return:
"""
op0, op1 = (
util.pop_bitvec(global_state.mstate),
util.pop_bitvec(global_state.mstate),
)
if op1 == 0:
global_state.mstate.stack.append(symbol_factory.BitVecVal(0, 256))
else:
global_state.mstate.stack.append(UDiv(op0, op1))
return [global_state]
[docs] @StateTransition()
def sdiv_(self, global_state: GlobalState) -> List[GlobalState]:
"""
:param global_state:
:return:
"""
s0, s1 = (
util.pop_bitvec(global_state.mstate),
util.pop_bitvec(global_state.mstate),
)
if s1 == 0:
global_state.mstate.stack.append(symbol_factory.BitVecVal(0, 256))
else:
global_state.mstate.stack.append(s0 / s1)
return [global_state]
[docs] @StateTransition()
def mod_(self, global_state: GlobalState) -> List[GlobalState]:
"""
:param global_state:
:return:
"""
s0, s1 = (
util.pop_bitvec(global_state.mstate),
util.pop_bitvec(global_state.mstate),
)
global_state.mstate.stack.append(0 if s1 == 0 else URem(s0, s1))
return [global_state]
[docs] @StateTransition()
def shl_(self, global_state: GlobalState) -> List[GlobalState]:
shift, value = (
util.pop_bitvec(global_state.mstate),
util.pop_bitvec(global_state.mstate),
)
global_state.mstate.stack.append(value << shift)
return [global_state]
[docs] @StateTransition()
def shr_(self, global_state: GlobalState) -> List[GlobalState]:
shift, value = (
util.pop_bitvec(global_state.mstate),
util.pop_bitvec(global_state.mstate),
)
global_state.mstate.stack.append(LShR(value, shift))
return [global_state]
[docs] @StateTransition()
def sar_(self, global_state: GlobalState) -> List[GlobalState]:
shift, value = (
util.pop_bitvec(global_state.mstate),
util.pop_bitvec(global_state.mstate),
)
global_state.mstate.stack.append(value >> shift)
return [global_state]
[docs] @StateTransition()
def smod_(self, global_state: GlobalState) -> List[GlobalState]:
"""
:param global_state:
:return:
"""
s0, s1 = (
util.pop_bitvec(global_state.mstate),
util.pop_bitvec(global_state.mstate),
)
global_state.mstate.stack.append(0 if s1 == 0 else SRem(s0, s1))
return [global_state]
[docs] @StateTransition()
def addmod_(self, global_state: GlobalState) -> List[GlobalState]:
"""
:param global_state:
:return:
"""
s0, s1, s2 = (
util.pop_bitvec(global_state.mstate),
util.pop_bitvec(global_state.mstate),
util.pop_bitvec(global_state.mstate),
)
global_state.mstate.stack.append(URem(URem(s0, s2) + URem(s1, s2), s2))
return [global_state]
[docs] @StateTransition()
def mulmod_(self, global_state: GlobalState) -> List[GlobalState]:
"""
:param global_state:
:return:
"""
s0, s1, s2 = (
util.pop_bitvec(global_state.mstate),
util.pop_bitvec(global_state.mstate),
util.pop_bitvec(global_state.mstate),
)
global_state.mstate.stack.append(URem(URem(s0, s2) * URem(s1, s2), s2))
return [global_state]
[docs] @StateTransition()
def exp_(self, global_state: GlobalState) -> List[GlobalState]:
"""
:param global_state:
:return:
"""
state = global_state.mstate
base, exponent = util.pop_bitvec(state), util.pop_bitvec(state)
exponentiation, constraint = exponent_function_manager.create_condition(
base, exponent
)
state.stack.append(exponentiation)
global_state.world_state.constraints.append(constraint)
return [global_state]
[docs] @StateTransition()
def signextend_(self, global_state: GlobalState) -> List[GlobalState]:
"""
:param global_state:
:return:
"""
mstate = global_state.mstate
s0, s1 = mstate.stack.pop(), mstate.stack.pop()
testbit = s0 * symbol_factory.BitVecVal(8, 256) + symbol_factory.BitVecVal(
7, 256
)
set_testbit = symbol_factory.BitVecVal(1, 256) << testbit
sign_bit_set = simplify(Not(s1 & set_testbit == 0))
mstate.stack.append(
simplify(
If(
s0 <= 31,
If(
sign_bit_set, s1 | (TT256 - set_testbit), s1 & (set_testbit - 1)
),
s1,
)
)
)
return [global_state]
# Comparisons
[docs] @StateTransition()
def lt_(self, global_state: GlobalState) -> List[GlobalState]:
"""
:param global_state:
:return:
"""
state = global_state.mstate
exp = ULT(util.pop_bitvec(state), util.pop_bitvec(state))
state.stack.append(exp)
return [global_state]
[docs] @StateTransition()
def gt_(self, global_state: GlobalState) -> List[GlobalState]:
"""
:param global_state:
:return:
"""
state = global_state.mstate
op1, op2 = util.pop_bitvec(state), util.pop_bitvec(state)
exp = UGT(op1, op2)
state.stack.append(exp)
return [global_state]
[docs] @StateTransition()
def slt_(self, global_state: GlobalState) -> List[GlobalState]:
"""
:param global_state:
:return:
"""
state = global_state.mstate
exp = util.pop_bitvec(state) < util.pop_bitvec(state)
state.stack.append(exp)
return [global_state]
[docs] @StateTransition()
def sgt_(self, global_state: GlobalState) -> List[GlobalState]:
"""
:param global_state:
:return:
"""
state = global_state.mstate
exp = util.pop_bitvec(state) > util.pop_bitvec(state)
state.stack.append(exp)
return [global_state]
[docs] @StateTransition()
def eq_(self, global_state: GlobalState) -> List[GlobalState]:
"""
:param global_state:
:return:
"""
state = global_state.mstate
op1 = state.stack.pop()
op2 = state.stack.pop()
if isinstance(op1, Bool):
op1 = If(
op1, symbol_factory.BitVecVal(1, 256), symbol_factory.BitVecVal(0, 256)
)
if isinstance(op2, Bool):
op2 = If(
op2, symbol_factory.BitVecVal(1, 256), symbol_factory.BitVecVal(0, 256)
)
exp = op1 == op2
state.stack.append(exp)
return [global_state]
[docs] @StateTransition()
def iszero_(self, global_state: GlobalState) -> List[GlobalState]:
"""
:param global_state:
:return:
"""
state = global_state.mstate
val = state.stack.pop()
exp = Not(val) if isinstance(val, Bool) else val == 0
exp = If(
exp, symbol_factory.BitVecVal(1, 256), symbol_factory.BitVecVal(0, 256)
)
state.stack.append(simplify(exp))
return [global_state]
# Call data
[docs] @StateTransition()
def callvalue_(self, global_state: GlobalState) -> List[GlobalState]:
"""
:param global_state:
:return:
"""
state = global_state.mstate
environment = global_state.environment
state.stack.append(environment.callvalue)
return [global_state]
[docs] @StateTransition()
def calldataload_(self, global_state: GlobalState) -> List[GlobalState]:
"""
:param global_state:
:return:
"""
state = global_state.mstate
environment = global_state.environment
op0 = state.stack.pop()
value = environment.calldata.get_word_at(op0)
state.stack.append(value)
return [global_state]
[docs] @StateTransition()
def calldatasize_(self, global_state: GlobalState) -> List[GlobalState]:
"""
:param global_state:
:return:
"""
global_state.mstate.stack.append(global_state.environment.calldata.calldatasize)
return [global_state]
@staticmethod
def _calldata_copy_helper(global_state, mstate, mstart, dstart, size):
environment = global_state.environment
try:
mstart = util.get_concrete_int(mstart)
except TypeError:
log.debug("Unsupported symbolic memory offset in CALLDATACOPY")
return [global_state]
try:
dstart: Union[int, BitVec] = util.get_concrete_int(dstart)
except TypeError:
log.debug("Unsupported symbolic calldata offset in CALLDATACOPY")
dstart = simplify(dstart)
try:
size: Union[int, BitVec] = util.get_concrete_int(size)
except TypeError:
log.debug("Unsupported symbolic size in CALLDATACOPY")
size = SYMBOLIC_CALLDATA_SIZE # The excess size will get overwritten
size = cast(int, size)
if size > 0:
try:
mstate.mem_extend(mstart, size)
except TypeError as e:
log.debug("Memory allocation error: {}".format(e))
mstate.mem_extend(mstart, 1)
mstate.memory[mstart] = global_state.new_bitvec(
"calldata_"
+ str(environment.active_account.contract_name)
+ "["
+ str(dstart)
+ ": + "
+ str(size)
+ "]",
8,
)
return [global_state]
try:
i_data = dstart
new_memory = []
for i in range(size):
value = environment.calldata[i_data]
new_memory.append(value)
i_data = (
i_data + 1
if isinstance(i_data, int)
else simplify(cast(BitVec, i_data) + 1)
)
for i in range(len(new_memory)):
mstate.memory[i + mstart] = new_memory[i]
except IndexError:
log.debug("Exception copying calldata to memory")
mstate.memory[mstart] = global_state.new_bitvec(
"calldata_"
+ str(environment.active_account.contract_name)
+ "["
+ str(dstart)
+ ": + "
+ str(size)
+ "]",
8,
)
return [global_state]
[docs] @StateTransition()
def calldatacopy_(self, global_state: GlobalState) -> List[GlobalState]:
"""
:param global_state:
:return:
"""
state = global_state.mstate
op0, op1, op2 = state.stack.pop(), state.stack.pop(), state.stack.pop()
if isinstance(global_state.current_transaction, ContractCreationTransaction):
log.debug("Attempt to use CALLDATACOPY in creation transaction")
return [global_state]
return self._calldata_copy_helper(global_state, state, op0, op1, op2)
# Environment
[docs] @StateTransition()
def address_(self, global_state: GlobalState) -> List[GlobalState]:
"""
:param global_state:
:return:
"""
state = global_state.mstate
environment = global_state.environment
state.stack.append(environment.address)
return [global_state]
[docs] @StateTransition()
def balance_(self, global_state: GlobalState) -> List[GlobalState]:
"""
:param global_state:
:return:
"""
state = global_state.mstate
address = state.stack.pop()
onchain_access = True
if address.symbolic is False:
try:
balance = global_state.world_state.accounts_exist_or_load(
address.value, self.dynamic_loader
).balance()
except ValueError:
onchain_access = False
else:
onchain_access = False
if onchain_access is False:
balance = symbol_factory.BitVecVal(0, 256)
for account in global_state.world_state.accounts.values():
balance = If(address == account.address, account.balance(), balance)
state.stack.append(balance)
return [global_state]
[docs] @StateTransition()
def origin_(self, global_state: GlobalState) -> List[GlobalState]:
"""
:param global_state:
:return:
"""
state = global_state.mstate
environment = global_state.environment
state.stack.append(environment.origin)
return [global_state]
[docs] @StateTransition()
def caller_(self, global_state: GlobalState) -> List[GlobalState]:
"""
:param global_state:
:return:
"""
state = global_state.mstate
environment = global_state.environment
state.stack.append(environment.sender)
return [global_state]
[docs] @StateTransition()
def chainid_(self, global_state: GlobalState) -> List[GlobalState]:
"""
:param global_state:
:return:
"""
global_state.mstate.stack.append(global_state.environment.chainid)
return [global_state]
[docs] @StateTransition()
def selfbalance_(self, global_state: GlobalState) -> List[GlobalState]:
"""
:param global_state:
:return:
"""
balance = global_state.environment.active_account.balance()
global_state.mstate.stack.append(balance)
return [global_state]
[docs] @StateTransition()
def codesize_(self, global_state: GlobalState) -> List[GlobalState]:
"""
:param global_state:
:return:
"""
state = global_state.mstate
environment = global_state.environment
disassembly = environment.code
calldata = global_state.environment.calldata
if isinstance(global_state.current_transaction, ContractCreationTransaction):
# Hacky way to ensure constructor arguments work - Pick some reasonably large size.
no_of_bytes = len(disassembly.bytecode) // 2
if isinstance(calldata, ConcreteCalldata):
no_of_bytes += calldata.size
else:
no_of_bytes += 0x200 # space for 16 32-byte arguments
global_state.world_state.constraints.append(
global_state.environment.calldata.size == no_of_bytes
)
else:
no_of_bytes = len(disassembly.bytecode) // 2
state.stack.append(no_of_bytes)
return [global_state]
@staticmethod
def _sha3_gas_helper(global_state, length):
min_gas, max_gas = calculate_sha3_gas(length)
global_state.mstate.min_gas_used += min_gas
global_state.mstate.max_gas_used += max_gas
StateTransition.check_gas_usage_limit(global_state)
return global_state
[docs] @StateTransition(enable_gas=False)
def sha3_(self, global_state: GlobalState) -> List[GlobalState]:
"""
:param global_state:
:return:
"""
state = global_state.mstate
index, op1 = state.stack.pop(), state.stack.pop()
try:
length = util.get_concrete_int(op1)
except TypeError:
# Can't access symbolic memory offsets
length = 64
global_state.world_state.constraints.append(op1 == length)
Instruction._sha3_gas_helper(global_state, length)
state.mem_extend(index, length)
data_list = [
b if isinstance(b, BitVec) else symbol_factory.BitVecVal(b, 8)
for b in state.memory[index : index + length]
]
if len(data_list) > 1:
data = simplify(Concat(data_list))
elif len(data_list) == 1:
data = data_list[0]
else:
# TODO: handle finding x where func(x)==func("")
result = keccak_function_manager.get_empty_keccak_hash()
state.stack.append(result)
return [global_state]
result = keccak_function_manager.create_keccak(data)
state.stack.append(result)
return [global_state]
[docs] @StateTransition()
def gasprice_(self, global_state: GlobalState) -> List[GlobalState]:
"""
:param global_state:
:return:
"""
global_state.mstate.stack.append(global_state.environment.gasprice)
return [global_state]
[docs] @StateTransition()
def basefee_(self, global_state: GlobalState) -> List[GlobalState]:
"""
:param global_state:
:return:
"""
global_state.mstate.stack.append(global_state.environment.basefee)
return [global_state]
[docs] @StateTransition()
def codecopy_(self, global_state: GlobalState) -> List[GlobalState]:
"""
:param global_state:
:return:
"""
memory_offset, code_offset, size = (
global_state.mstate.stack.pop(),
global_state.mstate.stack.pop(),
global_state.mstate.stack.pop(),
)
code = global_state.environment.code.bytecode
if code.startswith("0x"):
code = code[2:]
code_size = len(code) // 2
if isinstance(global_state.current_transaction, ContractCreationTransaction):
# Treat creation code after the expected disassembly as calldata.
# This is a slightly hacky way to ensure that symbolic constructor
# arguments work correctly.
mstate = global_state.mstate
offset = code_offset - code_size
log.debug("Copying from code offset: {} with size: {}".format(offset, size))
if isinstance(global_state.environment.calldata, SymbolicCalldata):
if code_offset >= code_size:
return self._calldata_copy_helper(
global_state, mstate, memory_offset, offset, size
)
else:
# Copy from both code and calldata appropriately.
concrete_code_offset = helper.get_concrete_int(code_offset)
concrete_size = helper.get_concrete_int(size)
code_copy_offset = concrete_code_offset
code_copy_size = (
concrete_size
if concrete_code_offset + concrete_size <= code_size
else code_size - concrete_code_offset
)
code_copy_size = code_copy_size if code_copy_size >= 0 else 0
calldata_copy_offset = (
concrete_code_offset - code_size
if concrete_code_offset - code_size > 0
else 0
)
calldata_copy_size = concrete_code_offset + concrete_size - code_size
calldata_copy_size = (
calldata_copy_size if calldata_copy_size >= 0 else 0
)
[global_state] = self._code_copy_helper(
code=global_state.environment.code.bytecode,
memory_offset=memory_offset,
code_offset=code_copy_offset,
size=code_copy_size,
op="CODECOPY",
global_state=global_state,
)
return self._calldata_copy_helper(
global_state=global_state,
mstate=mstate,
mstart=memory_offset + code_copy_size,
dstart=calldata_copy_offset,
size=calldata_copy_size,
)
return self._code_copy_helper(
code=global_state.environment.code.bytecode,
memory_offset=memory_offset,
code_offset=code_offset,
size=size,
op="CODECOPY",
global_state=global_state,
)
[docs] @StateTransition()
def extcodesize_(self, global_state: GlobalState) -> List[GlobalState]:
"""
:param global_state:
:return:
"""
state = global_state.mstate
addr = state.stack.pop()
try:
addr = hex(helper.get_concrete_int(addr))
except TypeError:
log.debug("unsupported symbolic address for EXTCODESIZE")
state.stack.append(global_state.new_bitvec("extcodesize_" + str(addr), 256))
return [global_state]
try:
code = global_state.world_state.accounts_exist_or_load(
addr, self.dynamic_loader
).code.bytecode
except (ValueError, AttributeError) as e:
log.debug("error accessing contract storage due to: " + str(e))
state.stack.append(global_state.new_bitvec("extcodesize_" + str(addr), 256))
return [global_state]
state.stack.append(len(code) // 2)
return [global_state]
@staticmethod
def _code_copy_helper(
code: Union[str, Tuple],
memory_offset: Union[int, BitVec],
code_offset: Union[int, BitVec],
size: Union[int, BitVec],
op: str,
global_state: GlobalState,
) -> List[GlobalState]:
try:
concrete_memory_offset = helper.get_concrete_int(memory_offset)
except TypeError:
log.debug("Unsupported symbolic memory offset in {}".format(op))
return [global_state]
try:
concrete_size = helper.get_concrete_int(size)
global_state.mstate.mem_extend(concrete_memory_offset, concrete_size)
except TypeError:
# except both attribute error and Exception
global_state.mstate.mem_extend(concrete_memory_offset, 1)
global_state.mstate.memory[
concrete_memory_offset
] = global_state.new_bitvec(
"code({})".format(
global_state.environment.active_account.contract_name
),
8,
)
return [global_state]
try:
concrete_code_offset = helper.get_concrete_int(code_offset)
except TypeError:
log.debug("Unsupported symbolic code offset in {}".format(op))
global_state.mstate.mem_extend(concrete_memory_offset, concrete_size)
for i in range(concrete_size):
global_state.mstate.memory[
concrete_memory_offset + i
] = global_state.new_bitvec(
"code({})".format(
global_state.environment.active_account.contract_name
),
8,
)
return [global_state]
if isinstance(code, str) and code.startswith("0x"):
code = code[2:]
for i in range(concrete_size):
if isinstance(code, str):
if 2 * (concrete_code_offset + i + 1) > len(code):
break
global_state.mstate.memory[concrete_memory_offset + i] = int(
code[
2
* (concrete_code_offset + i) : 2
* (concrete_code_offset + i + 1)
],
16,
)
else:
if (concrete_code_offset + i + 1) > len(code):
break
global_state.mstate.memory[concrete_memory_offset + i] = code[
concrete_code_offset + i
]
return [global_state]
[docs] @StateTransition()
def extcodecopy_(self, global_state: GlobalState) -> List[GlobalState]:
"""
:param global_state:
:return:
"""
state = global_state.mstate
addr, memory_offset, code_offset, size = (
state.stack.pop(),
state.stack.pop(),
state.stack.pop(),
state.stack.pop(),
)
try:
addr = hex(helper.get_concrete_int(addr))
except TypeError:
log.debug("unsupported symbolic address for EXTCODECOPY")
return [global_state]
try:
code = global_state.world_state.accounts_exist_or_load(
addr, self.dynamic_loader
).code.bytecode
except (ValueError, AttributeError) as e:
log.debug("error accessing contract storage due to: " + str(e))
return [global_state]
return self._code_copy_helper(
code=code,
memory_offset=memory_offset,
code_offset=code_offset,
size=size,
op="EXTCODECOPY",
global_state=global_state,
)
[docs] @StateTransition()
def extcodehash_(self, global_state: GlobalState) -> List[GlobalState]:
"""
:param global_state:
:return: List of global states possible, list of size 1 in this case
"""
world_state = global_state.world_state
stack = global_state.mstate.stack
address = Extract(159, 0, stack.pop())
if address.symbolic:
code_hash = symbol_factory.BitVecVal(int(get_code_hash(""), 16), 256)
elif address.value not in world_state.accounts:
code_hash = symbol_factory.BitVecVal(0, 256)
else:
addr = "0" * (40 - len(hex(address.value)[2:])) + hex(address.value)[2:]
code = world_state.accounts_exist_or_load(
addr, self.dynamic_loader
).code.bytecode
code_hash = symbol_factory.BitVecVal(int(get_code_hash(code), 16), 256)
stack.append(code_hash)
return [global_state]
[docs] @StateTransition()
def returndatacopy_(self, global_state: GlobalState) -> List[GlobalState]:
"""
:param global_state:
:return:
"""
state = global_state.mstate
memory_offset, return_offset, size = (
state.stack.pop(),
state.stack.pop(),
state.stack.pop(),
)
try:
concrete_memory_offset = helper.get_concrete_int(memory_offset)
except TypeError:
log.debug("Unsupported symbolic memory offset in RETURNDATACOPY")
return [global_state]
try:
concrete_return_offset = helper.get_concrete_int(return_offset)
except TypeError:
log.debug("Unsupported symbolic return offset in RETURNDATACOPY")
return [global_state]
try:
concrete_size = helper.get_concrete_int(size)
except TypeError:
log.debug("Unsupported symbolic max_length offset in RETURNDATACOPY")
return [global_state]
if global_state.last_return_data is None:
return [global_state]
global_state.mstate.mem_extend(concrete_memory_offset, concrete_size)
for i in range(concrete_size):
global_state.mstate.memory[concrete_memory_offset + i] = (
global_state.last_return_data[concrete_return_offset + i]
if concrete_return_offset + i < global_state.last_return_data.size
else 0
)
return [global_state]
[docs] @StateTransition()
def returndatasize_(self, global_state: GlobalState) -> List[GlobalState]:
"""
:param global_state:
:return:
"""
if global_state.last_return_data:
global_state.mstate.stack.append(global_state.last_return_data.size)
else:
global_state.mstate.stack.append(0)
return [global_state]
[docs] @StateTransition()
def blockhash_(self, global_state: GlobalState) -> List[GlobalState]:
"""
:param global_state:
:return:
"""
state = global_state.mstate
blocknumber = state.stack.pop()
state.stack.append(
global_state.new_bitvec("blockhash_block_" + str(blocknumber), 256)
)
return [global_state]
[docs] @StateTransition()
def coinbase_(self, global_state: GlobalState) -> List[GlobalState]:
"""
:param global_state:
:return:
"""
global_state.mstate.stack.append(global_state.new_bitvec("coinbase", 256))
return [global_state]
[docs] @StateTransition()
def timestamp_(self, global_state: GlobalState) -> List[GlobalState]:
"""
:param global_state:
:return:
"""
global_state.mstate.stack.append(global_state.new_bitvec("timestamp", 256))
return [global_state]
[docs] @StateTransition()
def number_(self, global_state: GlobalState) -> List[GlobalState]:
"""
:param global_state:
:return:
"""
global_state.mstate.stack.append(global_state.environment.block_number)
return [global_state]
[docs] @StateTransition()
def difficulty_(self, global_state: GlobalState) -> List[GlobalState]:
"""
:param global_state:
:return:
"""
global_state.mstate.stack.append(
global_state.new_bitvec("block_difficulty", 256)
)
return [global_state]
[docs] @StateTransition()
def gaslimit_(self, global_state: GlobalState) -> List[GlobalState]:
"""
:param global_state:
:return:
"""
global_state.mstate.stack.append(global_state.mstate.gas_limit)
return [global_state]
# Memory operations
[docs] @StateTransition()
def mload_(self, global_state: GlobalState) -> List[GlobalState]:
"""
:param global_state:
:return:
"""
state = global_state.mstate
offset = state.stack.pop()
state.mem_extend(offset, 32)
data = state.memory.get_word_at(offset)
state.stack.append(data)
return [global_state]
[docs] @StateTransition()
def mstore_(self, global_state: GlobalState) -> List[GlobalState]:
"""
:param global_state:
:return:
"""
state = global_state.mstate
mstart, value = state.stack.pop(), state.stack.pop()
try:
state.mem_extend(mstart, 32)
except Exception:
log.debug("Error extending memory")
state.memory.write_word_at(mstart, value)
return [global_state]
[docs] @StateTransition()
def mstore8_(self, global_state: GlobalState) -> List[GlobalState]:
"""
:param global_state:
:return:
"""
state = global_state.mstate
offset, value = state.stack.pop(), state.stack.pop()
state.mem_extend(offset, 1)
try:
value_to_write: Union[int, BitVec] = util.get_concrete_int(value) % 256
except TypeError: # BitVec
value_to_write = Extract(7, 0, value)
state.memory[offset] = value_to_write
return [global_state]
[docs] @StateTransition()
def sload_(self, global_state: GlobalState) -> List[GlobalState]:
"""
:param global_state:
:return:
"""
state = global_state.mstate
index = state.stack.pop()
state.stack.append(global_state.environment.active_account.storage[index])
return [global_state]
[docs] @StateTransition(is_state_mutation_instruction=True)
def sstore_(self, global_state: GlobalState) -> List[GlobalState]:
"""
:param global_state:
:return:
"""
state = global_state.mstate
index, value = state.stack.pop(), state.stack.pop()
global_state.environment.active_account.storage[index] = value
return [global_state]
[docs] @StateTransition(increment_pc=False, enable_gas=False)
def jump_(self, global_state: GlobalState) -> List[GlobalState]:
"""
:param global_state:
:return:
"""
state = global_state.mstate
disassembly = global_state.environment.code
try:
jump_addr = util.get_concrete_int(state.stack.pop())
except TypeError:
raise InvalidJumpDestination("Invalid jump argument (symbolic address)")
except IndexError:
raise StackUnderflowException()
index = util.get_instruction_index(disassembly.instruction_list, jump_addr)
if index is None:
raise InvalidJumpDestination("JUMP to invalid address")
op_code = disassembly.instruction_list[index]["opcode"]
if op_code != "JUMPDEST":
raise InvalidJumpDestination(
"Skipping JUMP to invalid destination (not JUMPDEST): " + str(jump_addr)
)
new_state = copy(global_state)
# add JUMP gas cost
min_gas, max_gas = get_opcode_gas("JUMP")
new_state.mstate.min_gas_used += min_gas
new_state.mstate.max_gas_used += max_gas
# manually set PC to destination
new_state.mstate.pc = index
return [new_state]
[docs] @StateTransition(increment_pc=False, enable_gas=False)
def jumpi_(self, global_state: GlobalState) -> List[GlobalState]:
"""
:param global_state:
:return:
"""
state = global_state.mstate
disassembly = global_state.environment.code
min_gas, max_gas = get_opcode_gas("JUMPI")
states = []
op0, condition = state.stack.pop(), state.stack.pop()
try:
jump_addr = util.get_concrete_int(op0)
except TypeError:
log.debug("Skipping JUMPI to invalid destination.")
global_state.mstate.pc += 1
global_state.mstate.min_gas_used += min_gas
global_state.mstate.max_gas_used += max_gas
return [global_state]
# False case
negated = (
simplify(Not(condition)) if isinstance(condition, Bool) else condition == 0
)
negated.simplify()
# True case
condi = simplify(condition) if isinstance(condition, Bool) else condition != 0
condi.simplify()
negated_cond = (isinstance(negated, bool) and negated) or (
isinstance(negated, Bool) and not is_false(negated)
)
positive_cond = (isinstance(condi, bool) and condi) or (
isinstance(condi, Bool) and not is_false(condi)
)
if negated_cond:
# States have to be deep copied during a fork as summaries assume independence across states.
new_state = deepcopy(global_state)
# add JUMPI gas cost
new_state.mstate.min_gas_used += min_gas
new_state.mstate.max_gas_used += max_gas
# manually increment PC
new_state.mstate.depth += 1
new_state.mstate.pc += 1
new_state.world_state.constraints.append(negated)
states.append(new_state)
else:
log.debug("Pruned unreachable states.")
# Get jump destination
index = util.get_instruction_index(disassembly.instruction_list, jump_addr)
if index is None:
log.debug("Invalid jump destination: " + str(jump_addr))
return states
instr = disassembly.instruction_list[index]
if instr["opcode"] == "JUMPDEST":
if positive_cond:
new_state = deepcopy(global_state)
# add JUMPI gas cost
new_state.mstate.min_gas_used += min_gas
new_state.mstate.max_gas_used += max_gas
# manually set PC to destination
new_state.mstate.pc = index
new_state.mstate.depth += 1
new_state.world_state.constraints.append(condi)
states.append(new_state)
else:
log.debug("Pruned unreachable states.")
return states
[docs] @StateTransition()
def beginsub_(self, global_state: GlobalState) -> List[GlobalState]:
"""
This opcode depicts the start of the subroutine
"""
raise OutOfGasException("Encountered BEGINSUB")
[docs] @StateTransition()
def jumpsub_(self, global_state: GlobalState) -> List[GlobalState]:
"""
Jump to the subroutine
"""
disassembly = global_state.environment.code
try:
location = util.get_concrete_int(global_state.mstate.stack.pop())
except TypeError:
raise VmException("Encountered symbolic JUMPSUB location")
index = util.get_instruction_index(disassembly.instruction_list, location)
instr = disassembly.instruction_list[index]
if instr["opcode"] != "BEGINSUB":
raise VmException(
"Encountered invalid JUMPSUB location :{}".format(instr["address"])
)
global_state.mstate.subroutine_stack.append(global_state.mstate.pc + 1)
global_state.mstate.pc = location
return [global_state]
[docs] @StateTransition(increment_pc=False)
def returnsub_(self, global_state: GlobalState) -> List[GlobalState]:
"""
Returns control to the caller of the subroutine
"""
global_state.mstate.pc = global_state.mstate.subroutine_stack.pop()
return [global_state]
[docs] @StateTransition()
def pc_(self, global_state: GlobalState) -> List[GlobalState]:
"""
:param global_state:
:return:
"""
index = global_state.mstate.pc
program_counter = global_state.environment.code.instruction_list[index][
"address"
]
global_state.mstate.stack.append(symbol_factory.BitVecVal(program_counter, 256))
return [global_state]
[docs] @StateTransition()
def msize_(self, global_state: GlobalState) -> List[GlobalState]:
"""
:param global_state:
:return:
"""
global_state.mstate.stack.append(global_state.mstate.memory_size)
return [global_state]
[docs] @StateTransition()
def gas_(self, global_state: GlobalState) -> List[GlobalState]:
"""
:param global_state:
:return:
"""
# TODO: Push a Constrained variable which lies between min gas and max gas
global_state.mstate.stack.append(global_state.new_bitvec("gas", 256))
return [global_state]
[docs] @StateTransition(is_state_mutation_instruction=True)
def log_(self, global_state: GlobalState) -> List[GlobalState]:
"""
:param global_state:
:return:
"""
# TODO: implement me
state = global_state.mstate
depth = int(self.op_code[3:])
state.stack.pop(), state.stack.pop()
_ = [state.stack.pop() for _ in range(depth)]
# Not supported
return [global_state]
def _create_transaction_helper(
self, global_state, call_value, mem_offset, mem_size, create2_salt=None
) -> List[GlobalState]:
mstate = global_state.mstate
environment = global_state.environment
world_state = global_state.world_state
call_data = get_call_data(global_state, mem_offset, mem_offset + mem_size)
code_raw: List[int] = []
code_end = call_data.size
size = call_data.size
if isinstance(size, BitVec):
# Other size restriction checks handle this
if size.symbolic:
size = 10**4
else:
size = size.value
code_raw = []
constraints = global_state.world_state.constraints
try:
model = get_model(constraints)
except UnsatError:
model = None
if isinstance(call_data, ConcreteCalldata):
for element in call_data.concrete(model):
if isinstance(element, BitVec) and element.symbolic:
break
if isinstance(element, BitVec):
code_raw.append(element.value)
else:
code_raw.append(element)
if len(code_raw) < 1:
global_state.mstate.stack.append(1)
log.debug("No code found for trying to execute a create type instruction.")
return [global_state]
code_str = bytes.hex(bytes(code_raw))
next_transaction_id = tx_id_manager.get_next_tx_id()
constructor_arguments = ConcreteCalldata(
next_transaction_id, call_data[code_end:]
)
code = Disassembly(code_str)
caller = environment.active_account.address
gas_price = environment.gasprice
origin = environment.origin
contract_address: Union[BitVec, int] = None
Instruction._sha3_gas_helper(global_state, len(code_str[2:]) // 2)
if create2_salt:
if create2_salt.symbolic:
if create2_salt.size() != 256:
pad = symbol_factory.BitVecVal(0, 256 - create2_salt.size())
create2_salt = Concat(pad, create2_salt)
address = keccak_function_manager.create_keccak(
Concat(
symbol_factory.BitVecVal(255, 8),
caller,
create2_salt,
symbol_factory.BitVecVal(int(get_code_hash(code_str), 16), 256),
)
)
contract_address = Extract(255, 96, address)
else:
salt = hex(create2_salt.value)[2:]
salt = "0" * (64 - len(salt)) + salt
addr = hex(caller.value)[2:]
addr = "0" * (40 - len(addr)) + addr
contract_address = int(
get_code_hash("0xff" + addr + salt + get_code_hash(code_str)[2:])[
26:
],
16,
)
transaction = ContractCreationTransaction(
world_state=world_state,
caller=caller,
code=code,
call_data=constructor_arguments,
gas_price=gas_price,
gas_limit=mstate.gas_limit,
origin=origin,
call_value=call_value,
contract_address=contract_address,
)
log.info("Raise transaction start signal")
raise TransactionStartSignal(transaction, self.op_code, global_state)
[docs] @StateTransition(is_state_mutation_instruction=True)
def create_(self, global_state: GlobalState) -> List[GlobalState]:
"""
:param global_state:
:return:
"""
call_value, mem_offset, mem_size = global_state.mstate.pop(3)
return self._create_transaction_helper(
global_state, call_value, mem_offset, mem_size
)
[docs] @StateTransition()
def create_post(self, global_state: GlobalState) -> List[GlobalState]:
return self._handle_create_type_post(global_state)
[docs] @StateTransition(is_state_mutation_instruction=True)
def create2_(self, global_state: GlobalState) -> List[GlobalState]:
"""
:param global_state:
:return:
"""
call_value, mem_offset, mem_size, salt = global_state.mstate.pop(4)
return self._create_transaction_helper(
global_state, call_value, mem_offset, mem_size, salt
)
[docs] @StateTransition()
def create2_post(self, global_state: GlobalState) -> List[GlobalState]:
return self._handle_create_type_post(global_state, opcode="create2")
@staticmethod
def _handle_create_type_post(global_state, opcode="create"):
if opcode == "create2":
global_state.mstate.pop(4)
else:
global_state.mstate.pop(3)
if global_state.last_return_data:
return_val = symbol_factory.BitVecVal(
int(global_state.last_return_data.return_data, 16), 256
)
else:
return_val = symbol_factory.BitVecVal(0, 256)
global_state.mstate.stack.append(return_val)
return [global_state]
[docs] @StateTransition()
def return_(self, global_state: GlobalState):
"""
:param global_state:
"""
state = global_state.mstate
offset, length = state.stack.pop(), state.stack.pop()
if length.symbolic:
return_data = [global_state.new_bitvec("return_data", 8)]
log.debug("Return with symbolic length or offset. Not supported")
else:
state.mem_extend(offset, length)
StateTransition.check_gas_usage_limit(global_state)
return_data = state.memory[offset : offset + length]
global_state.current_transaction.end(
global_state, ReturnData(return_data, length)
)
[docs] @StateTransition(is_state_mutation_instruction=True)
def selfdestruct_(self, global_state: GlobalState):
"""
:param global_state:
"""
target = global_state.mstate.stack.pop()
transfer_amount = global_state.environment.active_account.balance()
# Often the target of the selfdestruct instruction will be symbolic
# If it isn't then we'll transfer the balance to the indicated contract
global_state.world_state.balances[target] += transfer_amount
global_state.environment.active_account = deepcopy(
global_state.environment.active_account
)
global_state.accounts[
global_state.environment.active_account.address.value
] = global_state.environment.active_account
global_state.environment.active_account.set_balance(0)
global_state.environment.active_account.deleted = True
global_state.current_transaction.end(global_state)
[docs] @StateTransition()
def revert_(self, global_state: GlobalState) -> None:
"""
:param global_state:
"""
state = global_state.mstate
offset, length = state.stack.pop(), state.stack.pop()
if length.symbolic is False:
return_data = [
global_state.new_bitvec(
f"{global_state.current_transaction.id}_return_data_{i}", 8
)
for i in range(length.value)
]
else:
return_data = [
If(
i < length,
global_state.new_bitvec(
f"{global_state.current_transaction.id}_return_data_{i}", 8
),
0,
)
for i in range(300)
]
try:
return_data = state.memory[
util.get_concrete_int(offset) : util.get_concrete_int(offset + length)
]
except TypeError:
log.debug("Return with symbolic length or offset. Not supported")
global_state.current_transaction.end(
global_state, return_data=ReturnData(return_data, length), revert=True
)
[docs] @StateTransition()
def assert_fail_(self, global_state: GlobalState):
"""
:param global_state:
"""
# 0xfe: designated invalid opcode
raise InvalidInstruction
[docs] @StateTransition()
def invalid_(self, global_state: GlobalState):
"""
:param global_state:
"""
raise InvalidInstruction
[docs] @StateTransition()
def stop_(self, global_state: GlobalState):
"""
:param global_state:
"""
global_state.current_transaction.end(global_state)
@staticmethod
def _write_symbolic_returndata(
global_state: GlobalState, memory_out_offset: BitVec, memory_out_size: BitVec
):
"""
Writes symbolic return-data into memory, The memory offset and size should be concrete
:param global_state:
:param memory_out_offset:
:param memory_out_size:
:return:
"""
if memory_out_offset.symbolic is True or memory_out_size.symbolic is True:
return
return_data = []
return_data_size = global_state.new_bitvec("returndatasize", 256)
for i in range(memory_out_size.value):
data = global_state.new_bitvec(
"call_output_var({})_{}".format(
simplify(memory_out_offset + i), global_state.mstate.pc
),
8,
)
return_data.append(data)
global_state.mstate.mem_extend(memory_out_offset, memory_out_size)
for i in range(memory_out_size.value):
global_state.mstate.memory[memory_out_offset + i] = If(
i <= return_data_size,
return_data[i],
global_state.mstate.memory[memory_out_offset + i],
)
global_state.last_return_data = ReturnData(
return_data=return_data, return_data_size=return_data_size
)
[docs] @StateTransition()
def call_(self, global_state: GlobalState) -> List[GlobalState]:
"""
:param global_state:
:return:
"""
instr = global_state.get_current_instruction()
environment = global_state.environment
memory_out_size, memory_out_offset = global_state.mstate.stack[-7:-5]
try:
(
callee_address,
callee_account,
call_data,
value,
gas,
memory_out_offset,
memory_out_size,
) = get_call_parameters(global_state, self.dynamic_loader, True)
if callee_account is not None and callee_account.code.bytecode == "":
log.debug("The call is related to ether transfer between accounts")
sender = environment.active_account.address
receiver = callee_account.address
transfer_ether(global_state, sender, receiver, value)
self._write_symbolic_returndata(
global_state, memory_out_offset, memory_out_size
)
global_state.mstate.stack.append(
global_state.new_bitvec("retval_" + str(instr["address"]), 256)
)
return [global_state]
except ValueError as e:
log.debug(
"Could not determine required parameters for call, putting fresh symbol on the stack. \n{}".format(
e
)
)
self._write_symbolic_returndata(
global_state, memory_out_offset, memory_out_size
)
# TODO: decide what to do in this case
global_state.mstate.stack.append(
global_state.new_bitvec("retval_" + str(instr["address"]), 256)
)
return [global_state]
if environment.static:
if isinstance(value, int) and value > 0:
raise WriteProtection(
"Cannot call with non zero value in a static call"
)
if isinstance(value, BitVec):
if value.symbolic:
global_state.world_state.constraints.append(
value == symbol_factory.BitVecVal(0, 256)
)
elif value.value > 0:
raise WriteProtection(
"Cannot call with non zero value in a static call"
)
native_result = native_call(
global_state, callee_address, call_data, memory_out_offset, memory_out_size
)
if native_result:
return native_result
transaction = MessageCallTransaction(
world_state=global_state.world_state,
gas_price=environment.gasprice,
gas_limit=gas,
origin=environment.origin,
caller=environment.active_account.address,
callee_account=callee_account,
call_data=call_data,
call_value=value,
static=environment.static,
)
raise TransactionStartSignal(transaction, self.op_code, global_state)
[docs] @StateTransition()
def call_post(self, global_state: GlobalState) -> List[GlobalState]:
"""
:param global_state:
:return:
"""
return self.post_handler(global_state, function_name="call")
[docs] @StateTransition()
def callcode_(self, global_state: GlobalState) -> List[GlobalState]:
"""
:param global_state:
:return:
"""
instr = global_state.get_current_instruction()
environment = global_state.environment
memory_out_size, memory_out_offset = global_state.mstate.stack[-7:-5]
try:
(
callee_address,
callee_account,
call_data,
value,
gas,
_,
_,
) = get_call_parameters(global_state, self.dynamic_loader, True)
if callee_account is not None and callee_account.code.bytecode == "":
log.debug("The call is related to ether transfer between accounts")
sender = global_state.environment.active_account.address
receiver = callee_account.address
transfer_ether(global_state, sender, receiver, value)
self._write_symbolic_returndata(
global_state, memory_out_offset, memory_out_size
)
global_state.mstate.stack.append(
global_state.new_bitvec("retval_" + str(instr["address"]), 256)
)
return [global_state]
except ValueError as e:
log.debug(
"Could not determine required parameters for call, putting fresh symbol on the stack. \n{}".format(
e
)
)
self._write_symbolic_returndata(
global_state, memory_out_offset, memory_out_size
)
global_state.mstate.stack.append(
global_state.new_bitvec("retval_" + str(instr["address"]), 256)
)
return [global_state]
native_result = native_call(
global_state, callee_address, call_data, memory_out_offset, memory_out_size
)
if native_result:
return native_result
transaction = MessageCallTransaction(
world_state=global_state.world_state,
gas_price=environment.gasprice,
gas_limit=gas,
origin=environment.origin,
code=callee_account.code,
caller=environment.address,
callee_account=environment.active_account,
call_data=call_data,
call_value=value,
static=environment.static,
)
raise TransactionStartSignal(transaction, self.op_code, global_state)
[docs] @StateTransition()
def callcode_post(self, global_state: GlobalState) -> List[GlobalState]:
"""
:param global_state:
:return:
"""
instr = global_state.get_current_instruction()
memory_out_size, memory_out_offset = global_state.mstate.stack[-7:-5]
try:
(
_,
_,
_,
_,
_,
memory_out_offset,
memory_out_size,
) = get_call_parameters(global_state, self.dynamic_loader, True)
except ValueError as e:
log.debug(
"Could not determine required parameters for call, putting fresh symbol on the stack. \n{}".format(
e
)
)
self._write_symbolic_returndata(
global_state, memory_out_offset, memory_out_size
)
global_state.mstate.stack.append(
global_state.new_bitvec("retval_" + str(instr["address"]), 256)
)
return [global_state]
if global_state.last_return_data is None:
# Put return value on stack
return_value = global_state.new_bitvec(
"retval_" + str(instr["address"]), 256
)
global_state.mstate.stack.append(return_value)
self._write_symbolic_returndata(
global_state, memory_out_offset, memory_out_size
)
global_state.world_state.constraints.append(return_value == 0)
return [global_state]
try:
memory_out_offset = (
util.get_concrete_int(memory_out_offset)
if isinstance(memory_out_offset, Expression)
else memory_out_offset
)
memory_out_size = (
util.get_concrete_int(memory_out_size)
if isinstance(memory_out_size, Expression)
else memory_out_size
)
except TypeError:
global_state.mstate.stack.append(
global_state.new_bitvec("retval_" + str(instr["address"]), 256)
)
return [global_state]
# Copy memory
global_state.mstate.mem_extend(
memory_out_offset, min(memory_out_size, global_state.last_return_data.size)
)
if global_state.last_return_data.size.symbolic:
ret_size = 500
else:
ret_size = global_state.last_return_data.size.value
for i in range(min(memory_out_size, ret_size)):
global_state.mstate.memory[
i + memory_out_offset
] = global_state.last_return_data[i]
# Put return value on stack
return_value = global_state.new_bitvec("retval_" + str(instr["address"]), 256)
global_state.mstate.stack.append(return_value)
global_state.world_state.constraints.append(return_value == 1)
return [global_state]
[docs] @StateTransition()
def delegatecall_(self, global_state: GlobalState) -> List[GlobalState]:
"""
:param global_state:
:return:
"""
instr = global_state.get_current_instruction()
environment = global_state.environment
memory_out_size, memory_out_offset = global_state.mstate.stack[-6:-4]
try:
(
callee_address,
callee_account,
call_data,
value,
gas,
_,
_,
) = get_call_parameters(global_state, self.dynamic_loader)
if callee_account is not None and callee_account.code.bytecode == "":
log.debug("The call is related to ether transfer between accounts")
sender = global_state.environment.active_account.address
receiver = callee_account.address
transfer_ether(global_state, sender, receiver, value)
self._write_symbolic_returndata(
global_state, memory_out_offset, memory_out_size
)
global_state.mstate.stack.append(
global_state.new_bitvec("retval_" + str(instr["address"]), 256)
)
return [global_state]
except ValueError as e:
log.debug(
"Could not determine required parameters for call, putting fresh symbol on the stack. \n{}".format(
e
)
)
self._write_symbolic_returndata(
global_state, memory_out_offset, memory_out_size
)
global_state.mstate.stack.append(
global_state.new_bitvec("retval_" + str(instr["address"]), 256)
)
return [global_state]
native_result = native_call(
global_state, callee_address, call_data, memory_out_offset, memory_out_size
)
if native_result:
return native_result
transaction = MessageCallTransaction(
world_state=global_state.world_state,
gas_price=environment.gasprice,
gas_limit=gas,
origin=environment.origin,
code=callee_account.code,
caller=environment.sender,
callee_account=environment.active_account,
call_data=call_data,
call_value=environment.callvalue,
static=environment.static,
)
raise TransactionStartSignal(transaction, self.op_code, global_state)
[docs] @StateTransition()
def delegatecall_post(self, global_state: GlobalState) -> List[GlobalState]:
"""
:param global_state:
:return:
"""
instr = global_state.get_current_instruction()
memory_out_size, memory_out_offset = global_state.mstate.stack[-6:-4]
try:
(
_,
_,
_,
_,
_,
memory_out_offset,
memory_out_size,
) = get_call_parameters(global_state, self.dynamic_loader)
except ValueError as e:
log.debug(
"Could not determine required parameters for call, putting fresh symbol on the stack. \n{}".format(
e
)
)
global_state.mstate.stack.append(
global_state.new_bitvec("retval_" + str(instr["address"]), 256)
)
self._write_symbolic_returndata(
global_state, memory_out_offset, memory_out_size
)
return [global_state]
if global_state.last_return_data is None:
# Put return value on stack
return_value = global_state.new_bitvec(
"retval_" + str(instr["address"]), 256
)
global_state.mstate.stack.append(return_value)
global_state.world_state.constraints.append(return_value == 0)
return [global_state]
try:
memory_out_offset = (
util.get_concrete_int(memory_out_offset)
if isinstance(memory_out_offset, Expression)
else memory_out_offset
)
memory_out_size = (
util.get_concrete_int(memory_out_size)
if isinstance(memory_out_size, Expression)
else memory_out_size
)
except TypeError:
global_state.mstate.stack.append(
global_state.new_bitvec("retval_" + str(instr["address"]), 256)
)
return [global_state]
# Copy memory
global_state.mstate.mem_extend(
memory_out_offset, min(memory_out_size, global_state.last_return_data.size)
)
if global_state.last_return_data.size.symbolic:
ret_size = 500
else:
ret_size = global_state.last_return_data.size.value
for i in range(min(memory_out_size, ret_size)):
global_state.mstate.memory[
i + memory_out_offset
] = global_state.last_return_data[i]
# Put return value on stack
return_value = global_state.new_bitvec("retval_" + str(instr["address"]), 256)
global_state.mstate.stack.append(return_value)
global_state.world_state.constraints.append(return_value == 1)
return [global_state]
[docs] @StateTransition()
def staticcall_(self, global_state: GlobalState) -> List[GlobalState]:
"""
:param global_state:
:return:
"""
instr = global_state.get_current_instruction()
environment = global_state.environment
memory_out_size, memory_out_offset = global_state.mstate.stack[-6:-4]
try:
(
callee_address,
callee_account,
call_data,
value,
gas,
memory_out_offset,
memory_out_size,
) = get_call_parameters(global_state, self.dynamic_loader)
if callee_account is not None and callee_account.code.bytecode == "":
log.debug("The call is related to ether transfer between accounts")
sender = environment.active_account.address
receiver = callee_account.address
transfer_ether(global_state, sender, receiver, value)
self._write_symbolic_returndata(
global_state, memory_out_offset, memory_out_size
)
global_state.mstate.stack.append(
global_state.new_bitvec("retval_" + str(instr["address"]), 256)
)
return [global_state]
except ValueError as e:
log.debug(
"Could not determine required parameters for call, putting fresh symbol on the stack. \n{}".format(
e
)
)
self._write_symbolic_returndata(
global_state, memory_out_offset, memory_out_size
)
global_state.mstate.stack.append(
global_state.new_bitvec("retval_" + str(instr["address"]), 256)
)
return [global_state]
native_result = native_call(
global_state, callee_address, call_data, memory_out_offset, memory_out_size
)
if native_result:
return native_result
transaction = MessageCallTransaction(
world_state=global_state.world_state,
gas_price=environment.gasprice,
gas_limit=gas,
origin=environment.origin,
code=callee_account.code,
caller=environment.address,
callee_account=callee_account,
call_data=call_data,
call_value=value,
static=True,
)
raise TransactionStartSignal(transaction, self.op_code, global_state)
[docs] @StateTransition()
def staticcall_post(self, global_state: GlobalState) -> List[GlobalState]:
return self.post_handler(global_state, function_name="staticcall")
[docs] def post_handler(self, global_state, function_name: str):
instr = global_state.get_current_instruction()
if function_name in ("staticcall", "delegatecall"):
memory_out_size, memory_out_offset = global_state.mstate.stack[-6:-4]
else:
memory_out_size, memory_out_offset = global_state.mstate.stack[-7:-5]
try:
with_value = function_name != "staticcall"
(
_,
_,
_,
_,
_,
memory_out_offset,
memory_out_size,
) = get_call_parameters(global_state, self.dynamic_loader, with_value)
except ValueError as e:
log.debug(
"Could not determine required parameters for {}, putting fresh symbol on the stack. \n{}".format(
function_name, e
)
)
self._write_symbolic_returndata(
global_state, memory_out_offset, memory_out_size
)
global_state.mstate.stack.append(
global_state.new_bitvec("retval_" + str(instr["address"]), 256)
)
return [global_state]
if global_state.last_return_data is None:
# Put return value on stack
return_value = global_state.new_bitvec(
"retval_" + str(instr["address"]), 256
)
global_state.mstate.stack.append(return_value)
return [global_state]
try:
memory_out_offset = (
util.get_concrete_int(memory_out_offset)
if isinstance(memory_out_offset, Expression)
else memory_out_offset
)
memory_out_size = (
util.get_concrete_int(memory_out_size)
if isinstance(memory_out_size, Expression)
else memory_out_size
)
except TypeError:
global_state.mstate.stack.append(
global_state.new_bitvec("retval_" + str(instr["address"]), 256)
)
return [global_state]
global_state.mstate.mem_extend(
memory_out_offset, min(memory_out_size, global_state.last_return_data.size)
)
if global_state.last_return_data.size.symbolic:
ret_size = 500
else:
ret_size = global_state.last_return_data.size.value
for i in range(min(memory_out_size, ret_size)):
global_state.mstate.memory[
i + memory_out_offset
] = global_state.last_return_data[i]
# Put return value on stack
return_value = global_state.new_bitvec(
"retval_" + str(global_state.get_current_instruction()["address"]), 256
)
global_state.mstate.stack.append(return_value)
global_state.world_state.constraints.append(return_value == 1)
return [global_state]