#!/usr/bin/env python3 # SPDX-FileCopyrightText: Copyright (c) 2023-2025, NVIDIA CORPORATION & AFFILIATES. All rights reserved. # SPDX-License-Identifier: BSD-2-Clause import argparse import csv import os,binascii import errno from cryptography.hazmat.primitives import hmac, hashes from cryptography.hazmat.primitives.kdf import kbkdf from cryptography.hazmat.backends import default_backend from ecdsa import NIST256p, SigningKey from lib.aes_drbg import AES_DRBG from lib.silicon_id_csr import Device_Gen_Silicon_ID_CSR def encode(data: str, is_hex: bool, desc: str) -> bytes: encoded_str = b"\x00" if is_hex: # Convert hex string to bytes try: encoded_str = bytes.fromhex(data) except ValueError as e: raise ValueError("Assumed hex=True, but {}:{} is invalid hex!".format(desc, e)) from e else: # Encode as ASCII bytes try: encoded_str = data.encode("ascii") except UnicodeError as e: raise ValueError("Assumed hex=False, but {}:{} is invalid ASCII!".format(desc, e)) from e return encoded_str def kdf(kdk: bytes, label: bytes, context: bytes, L: int = 256, rlen: int = 32, order = None) -> bytes: """Derive KDF from root key and NIST SP800-108 input (label and context).""" if not context: context = b"\x00" if order is None: order = kbkdf.CounterLocation.BeforeFixed hkdf = kbkdf.KBKDFHMAC( algorithm=hashes.SHA256(), mode=kbkdf.Mode.CounterMode, length=L // 8, rlen=rlen // 8, llen=4, location=order, label=label, context=context, fixed=None, backend=default_backend(), ) return hkdf.derive(kdk) # This function uses the algorithm defined in NIST SP800-56Ar3, # 5.6.1.2.2 Key Pair Generation by Testing Candidates, # to generate an ECDSA private key from a given seed def gen_ecdsa_private_key(seed: bytes): if seed is None: raise ValueError("No seed is provided when generating ECDSA private key!") drbg = AES_DRBG(256) # For AES-256 CTR-DRBG, the seed size is 48 bytes # Refer to NIST SP800-90Ar1, 10.2.1 CTR_DRBG if len(seed) < 48: seed = seed + b"\x00" * (48 - len(seed)) drbg.instantiate(seed) retries = 100 i = 0 while True: key = drbg.generate(64) key = key[0:32] int_key = int.from_bytes(key, "big") if int_key > 0 and int_key < NIST256p.order: # print("An ECDSA private key is generated: " + key.hex() + ". Rounds: " + str(i + 1)) break i += 1 if i >= retries: return b'' return key def convert_sn_endian(sn: bytes) -> bytes: # The device SN is 10 bytes, and has the format: # # oem_id is stored in fuse "odm_info" # All 4-byte or less Tegra fuses are stored in little-endian so we need # to convert the endian of oem_id in device SN before doing the key calculation # The left 8 bytes don't have an endian issue - it is big-endian everywhere result = bytearray(sn) result[0] = sn[1] result[1] = sn[0] return bytes(result) def gen_silicon_id_pubkey_and_csr(kdk: str, sn: bytes, gen_csr: bool, csr_common_name: str, csr_org_name: str, csr_country_name: str): # Prepare the SILICON ID key pair kdk_nrk = kdf(encode(kdk, True, "KDK"), encode("NRK", False, "Label"), encode("00", True, "Context")) silicon_id = kdf(kdk_nrk, encode("ECA_SEED", False, "Label"), convert_sn_endian(sn)) silicon_id_asym_origin = kdf(silicon_id, encode("Asym", False, "Label"), encode("00", True, "Context")) hm = hmac.HMAC(silicon_id_asym_origin, hashes.SHA256(), backend=default_backend()) hm.update("Asym".encode(encoding="utf8")) silicon_id_asym = hm.finalize() ecdsa_key = gen_ecdsa_private_key(silicon_id_asym) if len(ecdsa_key) == 0: return "" silicon_id_private_key = SigningKey.from_string(ecdsa_key, curve=NIST256p) silicon_id_public_key = silicon_id_private_key.get_verifying_key() # print("Silicon ID private key: \n" + silicon_id_private_key.to_pem().decode("ascii")) # print("Silicon ID public key: \n" + silicon_id_public_key.to_pem().decode("ascii")) if not gen_csr: return silicon_id_public_key.to_string().hex() else: silicon_id_csr = Device_Gen_Silicon_ID_CSR(csr_common_name, csr_org_name, csr_country_name, silicon_id_private_key.to_der(), silicon_id_public_key.to_der()) return silicon_id_public_key.to_string().hex(), silicon_id_csr.build_csr() def clean(input): tmpFile = "tmp.csv" with open(input, "r") as file, open(tmpFile, "w") as outFile: reader = csv.reader(file, delimiter=' ') writer = csv.writer(outFile, delimiter=' ') for row in reader: colValues = [] for col in row: colValues.append(col.lower()) del colValues[-1] writer.writerow(colValues) os.rename(tmpFile, input) def get_device_sn(oem_id: int, sn: int) -> bytes: device_sn = oem_id.to_bytes(2, "big") sn_high = (sn >> 32).to_bytes(4, "big") sn_low = (sn & 0xffffffff).to_bytes(4, "big") device_sn += sn_high device_sn += sn_low return device_sn # Get the kdk string by stripping the hex prefix, check its length and validity as a hex string # Returning empty string if the provided kdk string does not meet the standard def get_kdk_string(kdk: str) -> str: kdk_str = kdk.strip() if kdk_str.startswith("0x") or kdk_str.startswith("0X"): kdk_str = kdk_str[2:] if len(kdk_str) != 64: return "" try: int(kdk_str, 16) except ValueError: return "" return kdk_str.lower() def main(): parser = argparse.ArgumentParser(description='The KDK db generation tool.') parser.add_argument('--oem_id', help='The ID of the OEM vendor. A positive 16-bit integer in hex format') parser.add_argument('--sn', help='The beginning of device serial number. A positive 64-bit unsigned integer in hex format') parser.add_argument('--num_devices', type=int, help='the number of devices') parser.add_argument("--destroy", action='store', help='destroy the kdk database') parser.add_argument("--preset", help='use the preset KDK database') parser.add_argument("--fixed", action='store_true', help='Use the fixed KDK value. Requires the --preset option to be set, with the first value in the file used as the fixed KDK.') parser.add_argument("--prov_mode", type=str, default="offline", help="The fTPM provisioning mode (offline or online). The default is offline.") parser.add_argument("--silicon_id_csr_cn", type=str, default="Silicon-ID-ECA", help="The common name of the Silicon ID CSR (Certificate Signing Request).") parser.add_argument("--silicon_id_csr_og", type=str, default="fTPM Corp", help="The organization name of the Silicon ID CSR (Certificate Signing Request).") parser.add_argument("--silicon_id_csr_Cn", type=str, default="US", help="The country name of the Silicon ID CSR (Certificate Signing Request).") args = parser.parse_args() if args.destroy != None and (args.oem_id or args.sn): parser.print_help() return if not (args.destroy or args.oem_id or args.sn or args.num_devices): parser.print_help() return if not args.destroy: if not args.oem_id.lower().startswith("0x") or not args.sn.lower().startswith("0x"): raise ValueError("parameter in hex format needs to start with \"0x\"") args.oem_id = int(args.oem_id, 16) args.sn = int(args.sn, 16) if args.oem_id < 0 or args.oem_id.bit_length() > 16: raise ValueError("oem_id must be a 16-bit integer") if args.sn < 0 or args.sn.bit_length() > 64: raise ValueError("sn must be a 64-bit integer") if args.preset != None and not os.path.isfile(args.preset): raise ValueError("Invalid preset KDK database") if args.destroy: clean(args.destroy) return # OEM_ID: # Starting device SN: # Num_devices: output_dir="ftpm_kdk" if not os.path.exists(output_dir): try: os.makedirs(output_dir) except OSError as e: if e.errno != errno.EEXIST: raise e elif not os.path.isdir(output_dir): print(str(output_dir) + " is not a directory or does not exist") raise NotADirectoryError elif not os.access(output_dir, os.W_OK): print("Write access denied on ", output_dir) raise PermissionError if args.fixed == True and args.preset == None: raise ValueError("--fixed option must be used with the --preset option") preset_kdks = [] fixed_kdk="" if args.preset != None: with open(args.preset, 'r') as file: preset_kdks = file.readlines() if args.fixed == True: if len(preset_kdks) < 1: raise ValueError("Error: incorrect number of values specified by --preset: ", len(preset_kdks)) elif len(preset_kdks) != args.num_devices: raise ValueError("The number of preset KDKs is not equal to the number of devices") for i in range(0, len(preset_kdks)): device_kdk = get_kdk_string(preset_kdks[i]) if not device_kdk: raise ValueError("Invalid preset KDK@" + str(i + 1) + " found: " + preset_kdks[i]) if args.fixed == True: fixed_kdk = device_kdk break preset_kdks[i] = device_kdk device_sn = get_device_sn(args.oem_id, args.sn) kdkfilename='ftpm_kdk/kdk_db_{}-{}-{}.csv'.format(device_sn.hex()[:4], device_sn.hex()[4:], args.num_devices) pubkeyfilename='ftpm_kdk/pubkey_db_{}-{}-{}.csv'.format(device_sn.hex()[:4], device_sn.hex()[4:], args.num_devices) if (args.prov_mode == "online"): # Add Silicon ID CSR DB for online provisioning mode silicon_id_csr_filename = 'ftpm_kdk/silicon_id_csr_db_{}-{}-{}.csv'.format(device_sn.hex()[:4], device_sn.hex()[4:], args.num_devices) silicon_id_csr_file = open(silicon_id_csr_filename, "w", newline='') silicon_id_csr_writer = csv.writer(silicon_id_csr_file, delimiter=' ', quotechar='|', quoting=csv.QUOTE_MINIMAL) with open(kdkfilename, 'w', newline='') as kdkfile, open(pubkeyfilename, 'w', newline='') as pubkeyfile: writer = csv.writer(kdkfile, delimiter=' ', quotechar='|', quoting=csv.QUOTE_MINIMAL) pubkeywriter=csv.writer(pubkeyfile, delimiter=' ', quotechar='|', quoting=csv.QUOTE_MINIMAL) for i in range(0, args.num_devices): if fixed_kdk: device_kdk = fixed_kdk elif len(preset_kdks) > 0: device_kdk = preset_kdks[i] else: device_kdk = binascii.b2a_hex(os.urandom(32)).decode() # The device SN is 10 bytes: # OEM_ID is burnt into the fuse ODM_INFO # High 32 bits of SN is burnt into the fuse ODM_ID0 # Low 32 bits of SN is burnt into the fuse ODM_ID1 device_sn = get_device_sn(args.oem_id, args.sn + i) while True: if (args.prov_mode == "online"): silicon_id_csr_cn = '{}_{}'.format(device_sn.hex(), args.silicon_id_csr_cn) device_pubkey, silicon_id_csr = gen_silicon_id_pubkey_and_csr(device_kdk, device_sn, True, silicon_id_csr_cn, args.silicon_id_csr_og, args.silicon_id_csr_Cn) else: device_pubkey = gen_silicon_id_pubkey_and_csr(device_kdk, device_sn, False, "", "", "") if len(device_pubkey) == 0: print("Generate ECDSA key pair using KDK: " + device_kdk + " failed. Will start over.") device_kdk = binascii.b2a_hex(os.urandom(32)).decode() else: break writer.writerow(["{}".format(device_sn.hex()[:4]), "{}".format(device_sn.hex()[4:]), device_kdk]) pubkeywriter.writerow(["{}".format(device_sn.hex()[:4]), "{}".format(device_sn.hex()[4:]), device_pubkey]) if (args.prov_mode == "online"): silicon_id_csr_writer.writerow(["{}".format(device_sn.hex()[:4]), "{}".format(device_sn.hex()[4:]), silicon_id_csr]) if __name__ == '__main__': main()