#!/usr/bin/python # # Dump Qcow2 file structure # # qemu-img create -f qcow2 -o preallocation=off /tmp/test.qcow2 1G # qemu-img create -f qcow2 -o preallocation=metadata /tmp/test.qcow2 1G # qemu-img snapshot -c Test1234 /tmp/test.qcow2 # qemu-io -c 'write 0 512' -c flush -c quit /tmp/test.qcow2 import sys import os import math from struct import Struct as _Struct import time class Struct(_Struct): def __init__(self, fmt): f = '' for l in fmt.splitlines(): try: i = l.index('#') if i >= 0: l = l[:i] except ValueError: pass l = l.strip() if l: f = f + l _Struct.__init__(self, f) QCowHeader = Struct('''> #big-endian 4s #uint32_t magic; I #uint32_t version; Q #uint64_t backing_file_offset; I #uint32_t backing_file_size; I #uint32_t cluster_bits; Q #uint64_t size; /* in bytes */ I #uint32_t crypt_method; I #uint32_t l1_size; Q #uint64_t l1_table_offset; Q #uint64_t refcount_table_offset; I #uint32_t refcount_table_clusters; I #uint32_t nb_snapshots; Q #uint64_t snapshots_offset; ''') QCowSnapshotHeader = Struct('''> #big-endian Q #uint64_t l1_table_offset; I #uint32_t l1_size; H #uint16_t id_str_size; H #uint16_t name_size; I #uint32_t date_sec; I #uint32_t date_nsec; Q #uint64_t vm_clock_nsec; I #uint32_t vm_state_size; I #uint32_t extra_data_size; /* for extension */ #/* extra data follows */ #/* id_str follows */ #/* name follows */ ''') LD512 = int(math.log(512) / math.log(2)) LD2M = int(math.log(2<<20) / math.log(2)) def human(size): '''Print size as human readable string. >>> human(0) '0 B' >>> human(1) '1 B' >>> human(1023) '1023 B' >>> human(1024) '1 KiB' ''' if size < 1024: return '%s B' % size size >>= 10 if size < 1024: return '%s KiB' % size size >>= 10 if size < 1024: return '%s MiB' % size size >>= 10 if size < 1024: return '%s GiB' % size size >>= 10 if size < 1024: return '%s TiB' % size size >>= 10 return '%s PiB' % size def round_up(value, size): '''Round value ot to next size. >>> round_up(0, 2) 0 >>> round_up(1, 2) 2 ''' bitmask = size - 1 assert not (size & bitmask), "size is not a power of 2" value += bitmask value &= ~bitmask return value def aligned(value, size): '''Test if value is aligned to size. >>> aligned(1, 2) False >>> aligned(4, 2) True ''' bitmask = size - 1 value &= bitmask return value == 0 def dump_QCowHeader(f): d = f.read(QCowHeader.size) magic, version, backing_file_offset, backing_file_size, cluster_bits, size, crypt_method, l1_size, l1_table_offset, refcount_table_offset, refcount_table_clusters, nb_snapshots, snapshots_offset = QCowHeader.unpack_from(d) print 'magic=%r: %s' % (magic, magic == 'QFI\xfb') print 'version=%d: %s' % (version, version == 2) print 'backing_file_offset=0x%016x: %s' % (backing_file_offset, 0 <= backing_file_offset < fsize) print 'backing_file_size=0x%08x: %s' % (backing_file_size, 0 <= backing_file_offset + backing_file_size < fsize) print 'cluster_bits=%d: %s' % (cluster_bits, LD512 <= cluster_bits <= LD2M) global cluster_size cluster_size = 1 << cluster_bits global l2_bits l2_bits = cluster_bits - 3 assert 0 < l2_bits global l2_size l2_size = 1 << l2_bits print 'size=%s: %s' % (human(size), True) print 'crypt_method=0x%08x: %s' % (crypt_method, 0 <= crypt_method <= 1) global l1_size_min l1_size_min = size / cluster_size / l2_size print 'l1_size=%d: %s' % (l1_size, l1_size_min <= l1_size) print 'l1_table_offset=0x%016x: %s' % (l1_table_offset, 0 <= l1_table_offset < fsize and aligned(l1_table_offset, cluster_size)) print 'refcount_table_offset=0x%016x: %s' % (refcount_table_offset, 0 <= refcount_table_offset < fsize and aligned(refcount_table_offset, cluster_size)) print 'refcount_table_clusters=%d: %s' % (refcount_table_clusters, 0 <= refcount_table_offset + refcount_table_clusters * cluster_size < fsize) print 'nb_snapshots=%d: %s' % (nb_snapshots, 0 <= nb_snapshots) print 'snapshots_offset=0x%016x: %s' % (snapshots_offset, 0 <= snapshots_offset < fsize and aligned(snapshots_offset, cluster_size)) if backing_file_offset and backing_file_size > 0: f.seek(backing_file_offset, os.SEEK_SET) backing_file = f.read(backing_file_size) print 'backing_file=%r: %s' % (backing_file, True) f.seek(refcount_table_offset) d = f.read(cluster_size * refcount_table_clusters) refcount_table_len = cluster_size * refcount_table_clusters / 8 RCT = Struct('''> #big-endian %dQ''' % refcount_table_len) RCTE = Struct('''> #big-endian %dH''' % (cluster_size / 2)) refcount_table = RCT.unpack_from(d) for i in range(refcount_table_len): offset = refcount_table[i] if not offset: continue print 'refcount_table[%d]=0x%016x: %s' % (i, offset, 0 <= offset < fsize and aligned(offset, cluster_size)) f.seek(offset) d = f.read(cluster_size) refcount_table_entry = RCTE.unpack_from(d) for j in range(cluster_size / 2): refcount = refcount_table_entry[j] if not refcount: continue cluster = i * (cluster_size / 2) + j print ' [%d][%d]=[%d]=%d: %s' % (i, j, cluster, refcount, 0 <= refcount <= nb_snapshots + 1 and 0 <= cluster * cluster_size < fsize) f.seek(l1_table_offset) dump_L1(f, l1_size) if nb_snapshots > 0: f.seek(snapshots_offset, os.SEEK_SET) for snapshot in range(nb_snapshots): print ' Snapshot #%d' % (snapshot,) dump_QCowSnapshotHeader(f) def dump_L1(f, l1_size, prefix=''): L1 = Struct('''> #big-endian %dQ #offset''' % l1_size) d = f.read(round_up(L1.size, cluster_size)) l1_table = L1.unpack_from(d) for i in range(l1_size): copied = (l1_table[i] & 1L << 63) != 0 compressed = (l1_table[i] & 1L << 62) != 0 offset = l1_table[i] & ~(3L << 62) print '%sL1[%d]=0x%016x: %s' % (prefix, i, l1_table[i], 0 <= offset < fsize) if offset > 0: f.seek(offset) dump_L2(f, prefix) def dump_L2(f, prefix=''): L2 = Struct('''> #big-endian %dQ #offset''' % l2_size) d = f.read(L2.size) l2_table = L2.unpack_from(d) for i in range(l2_size): copied = (l2_table[i] & 1L << 63) != 0 compressed = (l2_table[i] & 1L << 62) != 0 offset = l2_table[i] & ~(3L << 62) if not offset: continue print '%s L2[%d]=0x%016x: %s' % (prefix, i, l2_table[i], 0 <= offset < fsize) def dump_QCowSnapshotHeader(f): d = f.read(QCowSnapshotHeader.size) l1_table_offset, l1_size, id_str_size, name_size, date_sec, date_nsec, vm_clock_nsec, vm_state_size, extra_data_size = QCowSnapshotHeader.unpack_from(d) print ' l1_table_offset=0x%016x: %s' % (l1_table_offset, 0 <= l1_table_offset < fsize and aligned(l1_table_offset, cluster_size)) print ' l1_size=%d: %s' % (l1_size, l1_size_min <= l1_size) print ' id_str_size=%d: %s' % (id_str_size, 0 <= id_str_size) print ' name_size=%d: %s' % (name_size, 0 <= name_size) print ' date_sec=%s: %s' % (time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(date_sec)), 0 <= date_sec) print ' date_nsec=%d: %s' % (date_nsec, 0 <= date_nsec < 1000000000) print ' vm_clock_nsec=%d: %s' % (vm_clock_nsec, 0 <= vm_clock_nsec) print ' vm_state_size=0x%08x: %s' % (vm_state_size, 0 <= vm_state_size) print ' extra_data_size=0x%08x: %s' % (extra_data_size, 0 <= extra_data_size) if extra_data_size > 0: extra_data = f.read(extra_data_size) print ' extra_data=%d: %s' % (len(extra_data), len(extra_data) == extra_data_size) if id_str_size > 0: id_str = f.read(id_str_size) print ' id_str=%r: %s' % (id_str, len(id_str) == id_str_size) if name_size > 0: name = f.read(name_size) print ' name=%r: %s' % (name, len(name) == name_size) if vm_state_size > 0: vm_state = f.read(vm_state_size) print ' vm_state=%d: %s' % (len(vm_state), len(vm_state) == vm_state_size) ssize = QCowSnapshotHeader.size + extra_data_size + id_str_size + name_size + vm_state_size if ssize & 7: pad = f.read(8 - ssize & 7) pos = f.tell() f.seek(l1_table_offset) dump_L1(f, l1_size, ' ') f.seek(pos) def read(f, pos, count=1): f.seek(0) d = f.read(QCowHeader.size) magic, version, backing_file_offset, backing_file_size, cluster_bits, size, crypt_method, l1_size, l1_table_offset, refcount_table_offset, refcount_table_clusters, nb_snapshots, snapshots_offset = QCowHeader.unpack_from(d) if crypt_method: raise NotImplemented('Qcow2 encryption') cluster_size = 1 << cluster_bits l2_bits = cluster_bits - 3 l2_size = 1 << l2_bits if pos >= size: raise EOFError() cluster_index = pos & (cluster_size - 1L) pos >>= cluster_bits l2_index = pos & (l2_size - 1L) l1_index = pos >> l2_bits f.seek(l1_table_offset) L1 = Struct('''> #big-endian %dQ #offset''' % l1_size) d = f.read(round_up(L1.size, cluster_size)) l1_table = L1.unpack_from(d) l2_table_offset = l1_table[l1_index] if not l2_table_offset: return None # '\0' * min(cluster_size * l2_size, size - pos) if l2_table_offset & (1L << 62): raise NotImplemented('Qcow2 compression') l2_table_offset &= ~(3L << 62) f.seek(l2_table_offset) L2 = Struct('''> #big-endian %dQ #offset''' % l2_size) d = f.read(L2.size) l2_table = L2.unpack_from(d) cluster_offset = l2_table[l2_index] if not cluster_offset: return None # '\0' * min(cluster_size, size - pos) if cluster_offset & (1L << 62): raise NotImplemented('Qcow2 compression') cluster_offset &= ~(3L << 62) f.seek(cluster_offset) d = f.read(cluster_size) return d[cluster_index] if __name__ == '__main__': import doctest doctest.testmod() try: NAME = sys.argv[1] except IndexError: NAME = '/tmp/test.qcow2' f = open(NAME, 'r') fsize = os.fstat(f.fileno()).st_size try: dump_QCowHeader(f) #print read(f, 0) #print read(f, 511) #print read(f, 512) #print read(f, (1 << 30 - 1)) #print read(f, 1 << 30) finally: f.close()