Skip to content

Commit

Permalink
Rewrite ABI support.
Browse files Browse the repository at this point in the history
  • Loading branch information
palkeo committed Mar 11, 2023
1 parent ee589f8 commit e0709d1
Show file tree
Hide file tree
Showing 7 changed files with 91 additions and 379 deletions.
10 changes: 10 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -42,3 +42,13 @@ Here is an example for cryptokitties: https://oko.palkeo.com/0x06012c8cf97BEaD5d
## Caveats

Windows is not supported currently.

## Changelog

### 0.5.0

* Added support for the BASEFEE opcode.
* Updated bytecode database / ABI definitions.
* Ability to decompile the Solidity-generated Panic reverts.
* Lots of simplification / code cleanup.
* Moved to Poetry as the dependency management tool.
Binary file added panoramix/data/abi_dump.xz
Binary file not shown.
Binary file removed panoramix/data/supplement.db.xz
Binary file not shown.
2 changes: 1 addition & 1 deletion panoramix/decompiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ def _decompile_with_loader(loader, only_func_name=None) -> Decompilation:
# skip all the functions that are not it
continue

logger.info("Parsing %s...", fname)
logger.info("Decompiling %s...", fname)
logger.debug("stack %s", stack)

try:
Expand Down
55 changes: 19 additions & 36 deletions panoramix/loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
find_f_list,
padded_hex,
pretty_bignum,
cache_dir,
)
from panoramix.utils.opcode_dict import opcode_dict
from panoramix.utils.signatures import get_func_name, make_abi
Expand Down Expand Up @@ -56,18 +55,16 @@ def find_sig(sig, add_color=False):
return None

# duplicate of get_func_name from signatures
if "params" in a:
res = "{}({})".format(
a["name"],
", ".join(
[
colorize(x["type"], COLOR_GRAY, add_color) + " " + x["name"][1:]
for x in a["params"]
]
),
)
else:
res = a["folded_name"]
assert "inputs" in a
res = "{}({})".format(
a["name"],
", ".join(
[
colorize(x["type"], COLOR_GRAY, add_color) + " " + x["name"][1:]
for x in a["inputs"]
]
),
)

cache_sigs[add_color][sig] = res
return res
Expand All @@ -85,25 +82,11 @@ def load_addr(self, address):
assert address.isalnum()
address = address.lower()

dir_ = cache_dir() / "code" / address[:5]
if not dir_.is_dir():
dir_.mkdir(parents=True)

cache_fname = dir_ / f"{address}.bin"

if cache_fname.is_file():
logger.info("Code for %s found in cache...", address)
with cache_fname.open() as source_file:
code = source_file.read().strip()
else:
logger.info("Fetching code for %s...", address)
from web3 import Web3
from web3.auto import w3
logger.info("Fetching code for %s...", address)
from web3 import Web3
from web3.auto import w3

code = w3.eth.get_code(Web3.to_checksum_address(address)).hex()[2:]
if code:
with cache_fname.open("w+") as f:
f.write(code)
code = w3.eth.get_code(Web3.to_checksum_address(address)).hex()[2:]

self.load_binary(code)

Expand Down Expand Up @@ -145,11 +128,11 @@ def find_default(exp):
return int(m2.jd)

default = find_f(trace, find_default) if func_list else None
self.add_func(default or 0, name="_fallback()")
self.add_func(default or 0, name="_fallback")

except Exception:
logger.exception("Loader issue.")
self.add_func(0, name="_fallback()")
self.add_func(0, name="_fallback")

make_abi(self.hash_targets)
for hash, (target, stack) in self.hash_targets.items():
Expand All @@ -175,7 +158,7 @@ def add_func(self, target, hash=None, name=None, stack=()):
if padded in self.signatures:
name = self.signatures[padded]
else:
name = "unknown_{}(?????)".format(padded)
name = "unknown_{}".format(padded)
self.signatures[padded] = name

if hash is None:
Expand Down Expand Up @@ -222,7 +205,7 @@ def load_binary(self, source):
if op == "jumpdest":
self.jump_dests.append(line)

if op[:4] == "push":
if op.startswith("push"):
num_words = int(op[4:])

param = 0
Expand All @@ -241,7 +224,7 @@ def load_binary(self, source):
self.lines = {}

for line_no, op, param in parsed_lines:
if op[:4] == "push" and param > 1000000000000000:
if op.startswith("push") and param > 1000000000000000:
param = pretty_bignum(
param
) # convert big numbers into strings if possibble
Expand Down
145 changes: 43 additions & 102 deletions panoramix/utils/signatures.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
opcode,
cache_dir,
)
from panoramix.utils.supplement import fetch_sigs
from panoramix.utils.supplement import fetch_sig

logger = logging.getLogger(__name__)

Expand All @@ -32,12 +32,12 @@


def set_func_params_if_none(params):
if "params" not in _func:
if "inputs" not in _func:
res = []
for t, n in params.values():
res.append({"type": t, "name": n})

_func["params"] = res
_func["inputs"] = res


def set_func(hash):
Expand All @@ -50,7 +50,6 @@ def set_func(hash):

def get_param_name(cd, add_color=False, func=None):
global _func
global _abi
loc = match(cd, ("cd", ":loc")).loc

if _abi is None:
Expand All @@ -59,7 +58,7 @@ def get_param_name(cd, add_color=False, func=None):
if _func is None:
return cd

if "params" not in _func:
if "inputs" not in _func:
return cd

if type(loc) != int:
Expand All @@ -86,136 +85,78 @@ def get_param_name(cd, add_color=False, func=None):
return cd

if (loc - 4) % 32 != 0: # unusual parameter
return cd #
return cd

num = (cd[1] - 4) // 32
if num >= len(_func["params"]):
if num >= len(_func["inputs"]):
return cd

assert num < len(_func["params"]), str(cd) + " // " + str(func["params"])
assert num < len(_func["inputs"]), str(cd) + " // " + str(func["inputs"])

return colorize(_func["params"][num]["name"], COLOR_GREEN, add_color)
return colorize(_func["inputs"][num]["name"], COLOR_GREEN, add_color)


def get_abi_name(hash):
a = _abi[hash]
if "params" in a:
return "{}({})".format(a["name"], ",".join([x["type"] for x in a["params"]]))
assert "inputs" in a, a
return "{}({})".format(a["name"], ",".join([x["type"] for x in a["inputs"]]))


def get_func_params(hash):
a = _abi[hash]
if "params" in a:
return a["params"]
else:
return []
assert "inputs" in a, a
return a["inputs"]


def get_func_name(hash, add_color=False):
a = _abi[hash]
if "params" in a:
return "{}({})".format(
a["name"],
", ".join(
[
x["type"]
+ " "
+ colorize(
x["name"][:-1] if x["name"][-1] == "_" else x["name"],
COLOR_GREEN,
add_color,
)
for x in a["params"]
]
),
)
else:
return a["folded_name"]


def match_score(func, hashes):
# returns % score of this function's

score_a = 0

for h in hashes:
if h in func["cooccurs"]:
score_a += 1

score_a = 10 * score_a / len(hashes)

score_b = 0

for h in func["cooccurs"]:
if h in hashes:
score_b += 1

score_b = score_b / len(func["cooccurs"])

score_c = 0 if "param" in str(func["params"]) else 100

return score_a + score_b + score_c
logger.debug("get_func_name for abi %s", a)
assert "inputs" in a, a
return "{}({})".format(
a["name"],
", ".join(
[
x["type"]
+ " "
+ colorize(
x["name"].removesuffix("_"),
COLOR_GREEN,
add_color,
)
for x in a["inputs"]
]
),
)


def make_abi(hash_targets):
global _abi

hash_name = str(sorted(list(hash_targets.keys()))).encode()
hash_name = hashlib.sha256(hash_name).hexdigest()

dir_name = (
cache_dir() / "pabi" / hash_name[:3]
) #:3, because there's not '0x' at the beginning
if not dir_name.is_dir():
dir_name.mkdir(parents=True)

cache_fname = dir_name / (hash_name + ".pabi")

if cache_fname.is_file():
with cache_fname.open() as f:
_abi = json.loads(f.read())
logger.info("Cache for PABI found.")
return _abi

logger.info("Cache for PABI not found, generating...")

hashes = list(hash_targets.keys())

result = {}

for h, target in hash_targets.items():
res = {
"fname": "unknown" + h[2:] + "()",
"folded_name": "unknown" + h[2:] + "(?)",
"name": "unknown" + h[2:],
"inputs": [],
}

if "0x" not in h: # assuming index is a name - e.g. for _fallback()
res["fname"] = h
res["folded_name"] = h

else:
sigs = fetch_sigs(h)
if len(sigs) > 0:
best_score = 0
for f in sigs:
score = match_score(f, hashes)
if score > best_score:
best_score = score
res = {
"name": f["name"],
"folded_name": f["folded_name"],
"params": f["params"],
}
if h.startswith("0x"):
sig = fetch_sig(h)
if sig:
res = {
"name": sig["name"],
"inputs": sig["inputs"],
}
else: # assuming index is a name - e.g. for _fallback()
res = {
"name": h,
"inputs": [],
}

res["target"] = target

result[h] = res

_abi = result

with cache_fname.open("w+") as f:
f.write(json.dumps(result, indent=2))

logger.info("Cache for PABI generated.")

return result
Loading

0 comments on commit e0709d1

Please sign in to comment.