]> sigrok.org Git - sigrok-util.git/blob - firmware/kingst-la/sigrok-fwextract-kingst-la2016
sigrok-fwextract-kingst-la2016: handle compressed Qt resources
[sigrok-util.git] / firmware / kingst-la / sigrok-fwextract-kingst-la2016
1 #!/usr/bin/python3
2 ##
3 ## This file is part of the sigrok-util project.
4 ##
5 ## Copyright (C) 2020 Florian Schmidt <schmidt_florian@gmx.de>
6 ##
7 ## This program is free software; you can redistribute it and/or modify
8 ## it under the terms of the GNU General Public License as published by
9 ## the Free Software Foundation; either version 3 of the License, or
10 ## (at your option) any later version.
11 ##
12 ## This program is distributed in the hope that it will be useful,
13 ## but WITHOUT ANY WARRANTY; without even the implied warranty of
14 ## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15 ## GNU General Public License for more details.
16 ##
17 ## You should have received a copy of the GNU General Public License
18 ## along with this program; if not, see <http://www.gnu.org/licenses/>.
19 ##
20
21 # This utility extracts FX2 MCU firmware and FPGA bitstream images from
22 # the "KingstVIS" vendor software. The blobs are kept in Qt resources
23 # sections. The script was tested with several v3.5 software versions.
24
25 import argparse
26 import os
27 import sys
28 import re
29 import struct
30 import codecs
31 import importlib.util
32 import zlib
33
34 # Reuse the parseelf.py module from saleae-logic16.
35 fwdir = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
36 parseelf_py = os.path.join(fwdir, "saleae-logic16", "parseelf.py")
37 spec = importlib.util.spec_from_file_location("parseelf", parseelf_py)
38 parseelf = importlib.util.module_from_spec(spec)
39 spec.loader.exec_module(parseelf)
40
41 class qt_resources(object):
42     def __init__(self, program):
43         self._elf = parseelf.elf(program)
44         self._elf_sections = {} # idx -> data
45         self._read_resources()
46
47     def _get_elf_section(self, idx):
48         s = self._elf_sections.get(idx)
49         if s is None:
50             shdr = self._elf.shdrs[idx]
51             s = self._elf.read_section(shdr), shdr
52             self._elf_sections[idx] = s
53         return s
54
55     def _get_elf_sym_value(self, sname):
56         sym = self._elf.symtab[sname]
57         section, shdr = self._get_elf_section(sym["st_shndx"])
58         addr = sym["st_value"] - shdr["sh_addr"]
59         value = section[addr:addr + sym["st_size"]]
60         if len(value) != sym["st_size"]:
61             print("warning: symbol %s should be %d bytes, but in section is only %d bytes" % (
62                 sname, sym["st_size"], len(value)))
63         return value
64
65     # Qt resource stuff.
66     def _get_resource_name(self, offset):
67         length, i = struct.unpack(">HI", self._res_names[offset:offset + 2 + 4])
68         offset += 2 + 4
69         name = self._res_names[offset:offset + 2 * length].decode("utf-16be")
70         return name
71
72     def _get_resource_data(self, offset):
73         length = struct.unpack(">I", self._res_datas[offset:offset + 4])[0]
74         offset += 4
75         return self._res_datas[offset:offset + length]
76
77     def _read_resources(self):
78         RCCFileInfo_Directory = 0x02
79         def read_table():
80             table = []
81             offset = 0
82             while offset < len(self._res_struct):
83                 name_offset, flags = struct.unpack(">IH", self._res_struct[offset:offset+4+2])
84                 offset += 6
85                 name = self._get_resource_name(name_offset)
86                 if flags & RCCFileInfo_Directory:
87                     child_count, first_child_offset = struct.unpack(">II", self._res_struct[offset:offset + 4 + 4])
88                     offset += 4 + 4
89                     table.append((name, flags, child_count, first_child_offset))
90                 else:
91                     country, language, data_offset = struct.unpack(">HHI", self._res_struct[offset:offset + 2 + 2 + 4])
92                     offset += 2 + 2 + 4
93                     table.append((name, flags, country, language, data_offset))
94             return table
95         def read_dir_entries(table, which, parents=[]):
96             name, flags = which[:2]
97             if not flags & RCCFileInfo_Directory:
98                 raise Exception("not a directory!")
99             child_count, first_child = which[2:]
100             for i in range(child_count):
101                 child = table[first_child + i]
102                 flags = child[1]
103                 if flags & RCCFileInfo_Directory:
104                     read_dir_entries(table, child, parents + [child[0]])
105                 else:
106                     country, language, data_offset = child[2:]
107                     full_name = "/".join(parents + [child[0]])
108                     self._resources[full_name] = data_offset
109                     self._resource_flags[full_name] = flags
110
111         self._res_datas = self._get_elf_sym_value("_ZL16qt_resource_data")
112         self._res_names = self._get_elf_sym_value("_ZL16qt_resource_name")
113         self._res_struct = self._get_elf_sym_value("_ZL18qt_resource_struct")
114
115         self._resources = {} # res_fn -> res_offset
116         self._resource_flags = {} # res_fn -> RCC_flags
117         table = read_table()
118         read_dir_entries(table, table[0])
119
120     def get_resource(self, res_fn):
121         RCCFileInfo_Compressed = 1
122         offset = self._resources[res_fn]
123         flags = self._resource_flags[res_fn]
124         data = self._get_resource_data(offset)
125         if flags & RCCFileInfo_Compressed:
126             data = zlib.decompress(data[4:])
127         return data
128
129     def find_resource_names(self, res_fn_re):
130         for key in self._resources.keys():
131             m = re.match(res_fn_re, key)
132             if m is not None:
133                 yield key
134
135 class res_writer(object):
136     def __init__(self, res):
137         self.res = res
138
139     def _decode_crc(self, data, decoder=None):
140         if decoder is not None:
141             data = decoder(data)
142         data = bytearray(data)
143         crc = zlib.crc32(data) & 0xffffffff
144         return data, crc
145
146     def _write_file(self, fn, data):
147         with open(fn, "wb") as fp:
148             fp.write(data)
149
150     def extract_re(self, resource_pattern, fname_pattern, decoder=None):
151         resources = sorted(res.find_resource_names(resource_pattern))
152         for resource in resources:
153             fname = re.sub(resource_pattern, fname_pattern, resource)
154             fname = fname.lower()
155             data = self.res.get_resource(resource)
156             data, crc = self._decode_crc(data, decoder=decoder)
157             self._write_file(fname, data)
158             print("resource {rsc}, file {fname}, size {size}, checksum {crc:08x}".format(
159                 rsc = resource, fname = fname, size = len(data), crc = crc,
160             ))
161
162 def decode_intel_hex(hexdata):
163     """ return list of (address, data)
164     """
165     datas = []
166     # Assume LF-only or CR-LF style end-of-line.
167     for line in hexdata.split(b"\n"):
168         line = line.strip()
169         if chr(line[0]) != ":": raise Exception("invalid line: %r" % line)
170         offset = 1
171         record = codecs.decode(line[offset:], "hex")
172         byte_count, address, record_type = struct.unpack(">BHB", record[:1 + 2 + 1])
173         offset = 1 + 2 + 1
174         if byte_count > 0:
175             data = record[offset:offset + byte_count]
176             offset += byte_count
177         checksum = record[offset]
178         ex_checksum = (~sum(record[:offset]) + 1) & 0xff
179         if ex_checksum != checksum: raise Exception("invalid checksum %#x in %r" % (checksum, line))
180         if record_type == 0:
181             datas.append((address, data))
182         elif record_type == 1:
183             break
184     return datas
185
186 def intel_hex_as_blob(hexdata):
187     """ return continuous bytes sequence including all data
188     (loosing start address here)
189     """
190     data = decode_intel_hex(hexdata)
191     data.sort()
192     last = data[-1]
193     length = last[0] + len(last[1])
194     img = bytearray(length)
195     for off, part in data:
196         img[off:off + len(part)] = part
197     return img
198
199 def maybe_intel_hex_as_blob(data):
200     if data[0] == ord(":") and max(data) < 127:
201           return intel_hex_as_blob(data)
202     return data # Keep binary data.
203
204 if __name__ == "__main__":
205     parser = argparse.ArgumentParser(description = "KingstVIS firmware extraction")
206     parser.add_argument('executable', help = "KingstVIS executable file")
207     options = parser.parse_args()
208     exe_fn = options.executable
209
210     res = qt_resources(exe_fn)
211     writer = res_writer(res)
212
213     # Extract all MCU firmware and FPGA bitstream images. The sigrok
214     # project may not cover all KingstVIS supported devices. Users can
215     # either just copy those files which are strictly required for their
216     # specific device (diagnostics will identify those). Or just copy a
217     # few more files while some of them remain unused later (their size
218     # is small). Seeing which files would be contained is considered
219     # valuable, to identify device variants or candidate models.
220     writer.extract_re(r"fwusb/fw(.*)", r"kingst-la-\1.fw", decoder=maybe_intel_hex_as_blob)
221     writer.extract_re(r"fwfpga/(.*)", r"kingst-\1-fpga.bitstream")