Source code for cocotb_wrapper.axi

# ============================================================
#   _____       ______  _____
#  |_   _|     |  ____|/ ____|
#    | |  _ __ | |__  | (___    Institute of Embedded Systems
#    | | | '_ \|  __|  \___ \   Zurich University of
#   _| |_| | | | |____ ____) |  Applied Sciences
#  |_____|_| |_|______|_____/   8401 Winterthur, Switzerland
# ============================================================

"""Provide a wrapper classes around `cocotbext-axi <https://github.com/alexforencich/cocotbext-axi>`_."""

from __future__ import annotations

__author__ = "Thierry Delafontaine"
__mail__ = "deaa@zhaw.ch"
__copyright__ = "2022 ZHAW Institute of Embedded Systems"
__date__ = "2022-12-20"

from collections.abc import Iterator
from enum import IntEnum, IntFlag
from itertools import cycle
from random import getrandbits

import cocotbext.axi as axi
from cocotb.handle import HierarchyObject
from cocotb.log import SimLog
from cocotb.triggers import Event


def bits_to_bytes(b: int) -> int:
    """Convert a number of bits to bytes.

    Args:
        b: The number of bits

    Returns:
        The number of bytes
    """
    return (b + 7) // 8


[docs]class AxiBurstType(IntEnum): """The burst type used during the AXI write transaction.""" FIXED = 0b00 """A fixed size burst. In a fixed size burst. * The address is the same for every transfer in the burst. * The byte lanes that are valid are constant for all beats in the burst. However, within those byte lanes, the actual bytes that thave **WSTRB** asserted can differ for each beat in the burst. """ INCR = 0b01 """An incrementing burst. In an incrementing burst, the address for each tranfer in the burst is an increment of the address for the previous transfer. The inceremnnt value depends on the size of the transfer. For example, for an aligned start address, the address for each trasnfer in a burst with a size of 4 bytes is the previous address plus four. """ WRAP = 0b10 """A wrapping burst. A wrapping burst is similar to an incrementing burst, except that the address wraps around to a lower address if an upper address limit is reached. The following restrictions apply to wrapping bursts: * The start address must be aligned to the size of each transfer. * the length of the burst must be 2, 4, 8, or 16 transfers. The behavior of a wrapping burst is: * The lowest address that is used by the burst is aligned to the total size of the adat to be transferred, that is, to ((size of each transfer in the burst) x (number of transfers in the burst)). This address is defined as the *wrap boundary*. * After each transfer, the address increments in the same way as for an INCR burst. However, if this inceremented address is ((wrap boundary) + (total size of data to be transferred)), then the address wraps round to the wrap boundary. * The first transfer in the burst can use an address that is higher than the wrap boundary, subject to the restrictions that apply to wrapping bursts. The address wraps for any WRAP burst when the first address is higher than the wrap boundary. This burst type is used for cache line accesses. """
[docs]class AxiBurstSize(IntEnum): """The number of bytes in each data transfer in a write transaction. The burst size must be a power of 2, that is 1, 2, 4, 8, 16, 32, 64, or 128 bytes. """ SIZE_1 = 0b000 """A burst size of 1.""" SIZE_2 = 0b001 """A burst size of 2.""" SIZE_4 = 0b010 """A burst size of 4.""" SIZE_8 = 0b011 """A burst size of 8.""" SIZE_16 = 0b100 """A burst size of 16.""" SIZE_32 = 0b101 """A burst size of 32.""" SIZE_64 = 0b110 """A burst size of 64.""" SIZE_128 = 0b111 """A burst size of 128."""
[docs]class AxiLockType(IntEnum): """The AXI Lock type supported by AXI3. AXI4 does not support locked transactions. In AXI3 the **AxLOCK** signals specify normal, exclusive, and locked accesses. """ NORMAL = 0b0 """Normal access.""" EXCLUSIVE = 0b1 """Exclusize access."""
[docs]class AxiCacheBit(IntFlag): """The different bits of the **AxCACHE** attribute. The **AxCACHE** signals specify the Bufferabel, Modifiable, and Allocate attributes of the transaction. """ B = 0b0001 """Bufferable bit. When this bit is asserted, the interconnect, or any component, can delay the transaction reaching its final destination for any number of cycles. """ M = 0b0010 """Modifiable bit. When this bit is asserted the characteristics of the transaction can be modified. """ RA = 0b0100 """Read-Allocate bit. When this bit is asserted, read allocation of the transaction is recomended but is not mandatory. The `RA` bit must be asserted if the `M` bit deasserted. """ WA = 0b1000 """Write-Allocate bit. When this bit is asserted, write allocation of the transaction is recomended but is not mandatory. The `WA` bit must be asserted if the `M` bit deasserted. """
[docs]class AxiProt(IntFlag): """The permission signals that can be used to protect against illegal transactions.""" PRIVILEGED = 0b001 """Privileged access.""" NONSECURE = 0b010 """Non-secure access.""" INSTRUCTION = 0b100 """Instruction access. If this bit is not asserted then the access is a data access. """
[docs]class AxiResp(IntEnum): """The response signals.""" OKAY = 0b00 """Normal access success. Indicates that a normal access has been successful. Can laso indicate theat an aexclusie access has failed. """ EXOKAY = 0b01 """Exclusive access okay. Indicates that a either the read or write access of an exclusive access has been successful. """ SLVERR = 0b10 """Subordinate error. Used when the access has reached the Subordinate successfully, but the Subordinate wishes to return an error condition to the originating Manager. """ DECERR = 0b11 """Decode error. Generated, typically by an interconnect component, to indicate that there is no Subordinate at the transaction address. """
[docs]class AxiMaster: """A Wrapper around `cocotbext-axi AXI <https://github.com/alexforencich/cocotbext-axi#axi-and-axi-lite-master>`_. Todo: Add a usage example. """
[docs] def __init__( self, bus_prefix: str, clk: str, rst: str, reset_active_level: int, max_burst_length: int = 256, ): """Initialize an instance. Args: bus_prefix: The prefix of signals belonging to the source bus clk: The name of the clock rst: The name of the reset reset_active_level: 1 if active high 0 if active low max_burst_length: The maximum burst length in cycles (1 - 256). """ self._bus_prefix = bus_prefix self._clk = clk self._rst = rst self._reset_active_level = reset_active_level self._max_burst_length = max_burst_length self._log = SimLog(self._bus_prefix)
[docs] def setup(self, dut: HierarchyObject) -> None: """Setup the AXI source. Args: dut: The device under test Raises: AttributeError: If `dut` does not contain the handles of given with `clk` and `rst`. """ self._bus = axi.AxiMaster( bus=axi.AxiBus.from_prefix(dut, self._bus_prefix), clock=getattr(dut, self._clk), reset=getattr(dut, self._rst), reset_active_level=bool(self._reset_active_level), max_burst_length=self._max_burst_length, )
[docs] async def write( self, address: int, data: bytes, id: int | None = None, burst: AxiBurstType = AxiBurstType.INCR, burst_size: AxiBurstSize | None = None, lock: AxiLockType = AxiLockType.NORMAL, cache: AxiCacheBit = AxiCacheBit.B & AxiCacheBit.M, prot: AxiProt = AxiProt.NONSECURE, qos: int = 0, region: int = 0, user: int = 0, wuser: int = 0, ) -> axi.axi_master.AxiWriteResp: """Write `data` to the `address`. Args: address: The write address data: The write data id: The AXI burst ID burst: The AXI burst type burst_size: The AXI burst size lock: The AXI lock type cache: The AXI cache bits prot: The AXI protection flag qos: The AXI quality of service field region: The AXI region field user: The AXI user signal wuser: The AXI write user signal Returns: The response to the write operation """ return await self._bus.write( address, data, id=id, burst=burst, burst_size=burst_size, lock=lock, cache=cache, prot=prot, qos=qos, region=region, user=user, wuser=wuser, )
[docs] async def read( self, address: int, length: int, id: int | None = None, burst: AxiBurstType = AxiBurstType.INCR, burst_size: AxiBurstSize | None = None, lock: AxiLockType = AxiLockType.NORMAL, cache: AxiCacheBit = AxiCacheBit.B & AxiCacheBit.M, prot: AxiProt = AxiProt.NONSECURE, qos: int = 0, region: int = 0, user: int = 0, wuser: int = 0, ) -> axi.axi_master.AxiReadResp: """Read `length` bytes from `address`. Args: address: The read start address length: The read size in bytes id: The AXI burst ID. Defaults to `None` which assigns an ID automatically burst: The AXI burst type burst_size: The AXI burst size lock: The AXI lock type cache: The AXI cache bits enables buffering prot: The AXI protection flag qos: The AXI quality of service field region: The AXI region field user: The AXI user signal wuser: The AXI write user signal Returns: The response to the read operation, which also contains the data in the `data` attribute """ return await self._bus.read( address, length, burst=burst, burst_size=burst_size, lock=lock, cache=cache, prot=prot, qos=qos, region=region, user=user, wuser=wuser, )
[docs] def set_idle_generator(self, generator: Iterator[int]) -> None: """Toggle pauses on the write bus lanes given a generator function. Args: generator: A signal generator for the tready flag. The tready signal will be low if the Iterator yields a ``'1'`` """ for channel in [ self._bus.write_if.aw_channel, self._bus.write_if.w_channel, self._bus.read_if.ar_channel, ]: channel.set_pause_generator(generator)
[docs] def set_backpressure_generator(self, generator: Iterator[int]) -> None: """Toggle pauses on the read bus lanes given a generator function. Args: generator: A signal generator for the tready flag. The tready signal will be low if the Iterator yields a ``'1'`` """ for channel in [ self._bus.write_if.b_channel, self._bus.read_if.r_channel, ]: channel.set_pause_generator(generator)
[docs] def disable(self) -> None: """Disable the AXI master interface.""" self.set_idle_generator(cycle([1])) self.set_backpressure_generator(cycle([1]))
[docs] def enable(self) -> None: """Enable the AXI master interface.""" self.set_idle_generator(cycle([0])) self.set_backpressure_generator(cycle([0]))
[docs]class AxiRam: """A Wrapper around `cocotbext-axi AXI RAM <https://github.com/alexforencich/cocotbext-axi#axi-and-axi-lite-ram>`_. Todo: Add a usage example. """
[docs] def __init__( self, bus_prefix: str, clk: str, rst: str, reset_active_level: int, size: int, ): """Initialize an instance. Args: bus_prefix: The prefix of signals belonging to the source bus tdata_width_bits: The width of `tdata` in bits clk: The name of the clock rst: The name of the reset reset_active_level: 1 if active high 0 if active low size: The memory size in bytes """ self._bus_prefix = bus_prefix self._clk = clk self._rst = rst self._reset_active_level = bool(reset_active_level) self._size = size self._log = SimLog(self._bus_prefix)
[docs] def setup(self, dut: HierarchyObject) -> None: """Setup the AXI RAM. Args: dut: The device under test Raises: AttributeError: If `dut` does not contain the handles of given with `clk` and `rst`. """ self._ram = axi.AxiRam( bus=axi.AxiBus.from_prefix(dut, self._bus_prefix), clock=getattr(dut, self._clk), reset=getattr(dut, self._rst), reset_active_level=self._reset_active_level, size=self._size, )
[docs] def write(self, address: int, data: bytes) -> None: """Write `data` to the `address`. Args: address: The write address data: The write data """ self._ram.write(address, data)
[docs] def read(self, address: int, length: int) -> bytes: """Read `length` bytes from `address`. Args: address: The read start address length: The read size in bytes Returns: The content of the RAM at `address` """ return bytes(self._ram.read(address, length))
[docs] def hexdump(self, address: int, length: int, prefix: str = "RAM") -> None: """Dump the content of the RAM to the stdout. Args: address: The read start address length: The read size in bytes prefix: A prefix to each output line """ self._ram.hexdump(address, length, prefix)
[docs] def set_idle_generator(self, generator: Iterator[int]) -> None: """Toggle pauses on the write bus lanes given a generator function. Args: generator: A signal generator for the tready flag. The tready signal will be low if the Iterator yields a ``'1'`` """ for channel in [ self._ram.write_if.b_channel, self._ram.read_if.r_channel, ]: channel.set_pause_generator(generator)
[docs] def set_backpressure_generator(self, generator: Iterator[int]) -> None: """Toggle pauses on the read bus lanes given a generator function. Args: generator: A signal generator for the tready flag. The tready signal will be low if the Iterator yields a ``'1'`` """ for channel in [ self._ram.write_if.aw_channel, self._ram.write_if.w_channel, self._ram.read_if.ar_channel, ]: channel.set_pause_generator(generator)
[docs] def disable(self) -> None: """Disable the AXI RAM.""" self.set_idle_generator(cycle([1])) self.set_backpressure_generator(cycle([1]))
[docs] def enable(self) -> None: """Enable the AXI RAM.""" self.set_idle_generator(cycle([0])) self.set_backpressure_generator(cycle([0]))
[docs]class RandomAxiPayloadGenerator: """A generator class that gives back a random payload. Todo: Add a usage example. """
[docs] def __init__(self, data_width_bits: int): """Initialize an instance. Args: data_width_bits: The width of each value in bits """ self._data_width_bits = data_width_bits
[docs] def get_payload(self) -> bytes: """Get a random payload. Returns: The random payload """ return getrandbits(self._data_width_bits).to_bytes( bits_to_bytes(self._data_width_bits), byteorder="big" )
[docs]class AxiLiteMaster: """A wrapper class around `cocotbext-axi AXI-Lite Master <https://github.com/alexforencich/cocotbext-axi#axi-and-axi-lite-master>`_. Todo: Add a usage example. """
[docs] def __init__( self, bus_prefix: str, clk: str, rst: str, reset_active_level: int, ): """Initialize an instance. Args: bus_prefix: The prefix of signals belonging to the source bus clk: The name of the clock rst: The name of the reset reset_active_level: 1 if active high 0 if active low """ self._bus_prefix = bus_prefix self._clk = clk self._rst = rst self._reset_active_level = reset_active_level self._log = SimLog(self._bus_prefix)
[docs] def setup(self, dut: HierarchyObject) -> None: """Setup the AXI-Lite source. Args: dut: The device under test Raises: AttributeError: If `dut` does not contain the handles of given with `clk` and `rst`. """ self._bus = axi.AxiLiteMaster( bus=axi.AxiLiteBus.from_prefix(dut, self._bus_prefix), clock=getattr(dut, self._clk), reset=getattr(dut, self._rst), reset_active_level=bool(self._reset_active_level), )
[docs] async def write( self, address: int, data: bytes, prot: AxiProt = AxiProt.NONSECURE, ) -> axi.axil_master.AxiLiteWriteResp: """Write `data` to the `address`. Args: address: The write address data: The write data prot: The AXI protection flag Returns: The response to the write operation """ return await self._bus.write(address, data, prot=prot)
[docs] async def read( self, address: int, length: int, prot: AxiProt = AxiProt.NONSECURE, ) -> axi.axil_master.AxiLiteReadResp: """Read `length` bytes from `address`. Args: address: The read start address length: The read size in bytes prot: The AXI protection flag Returns: The response to the read operation, which also contains the data in the `data` attribute """ return await self._bus.read(address, length, prot=prot)
[docs] def set_idle_generator(self, generator: Iterator[int]) -> None: """Toggle pauses on the write bus lanes given a generator function. Args: generator: A signal generator for the tready flag. The tready signal will be low if the Iterator yields a ``'1'`` """ for channel in [ self._bus.write_if.aw_channel, self._bus.write_if.w_channel, self._bus.read_if.ar_channel, ]: channel.set_pause_generator(generator)
[docs] def set_backpressure_generator(self, generator: Iterator[int]) -> None: """Toggle pauses on the read bus lanes given a generator function. Args: generator: A signal generator for the tready flag. The tready signal will be low if the Iterator yields a ``'1'`` """ for channel in [ self._bus.write_if.b_channel, self._bus.read_if.r_channel, ]: channel.set_pause_generator(generator)
[docs] def disable(self) -> None: """Disable the AXI-Lite master interface.""" self.set_idle_generator(cycle([1])) self.set_backpressure_generator(cycle([1]))
[docs] def enable(self) -> None: """Enable the AXI-Lite master interface.""" self.set_idle_generator(cycle([0])) self.set_backpressure_generator(cycle([0]))
[docs]class AxiLiteRam: """A Wrapper around `cocotbext-axi AXI-Lite RAM <https://github.com/alexforencich/cocotbext-axi#axi-and-axi-lite-ram>`_. Todo: Add a usage example. """
[docs] def __init__( self, bus_prefix: str, clk: str, rst: str, reset_active_level: int, size: int, ): """Initialize an instance. Args: bus_prefix: The prefix of signals belonging to the source bus tdata_width_bits: The width of `tdata` in bits clk: The name of the clock rst: The name of the reset reset_active_level: 1 if active high 0 if active low size: The memory size in bytes """ self._bus_prefix = bus_prefix self._clk = clk self._rst = rst self._reset_active_level = bool(reset_active_level) self._size = size self._log = SimLog(self._bus_prefix)
[docs] def setup(self, dut: HierarchyObject) -> None: """Setup the AXI-Lite RAM. Args: dut: The device under test Raises: AttributeError: If `dut` does not contain the handles of given with `clk` and `rst`. """ self._ram = axi.AxiLiteRam( bus=axi.AxiLiteBus.from_prefix(dut, self._bus_prefix), clock=getattr(dut, self._clk), reset=getattr(dut, self._rst), reset_active_level=self._reset_active_level, size=self._size, )
[docs] def write(self, address: int, data: bytes) -> None: """Write `data` to the `address`. Args: address: The write address data: The write data """ self._ram.write(address, data)
[docs] def read(self, address: int, length: int) -> bytes: """Read `length` bytes from `address`. Args: address: The read start address length: The read size in bytes Returns: The content of the RAM at `address` """ return bytes(self._ram.read(address, length))
[docs] def hexdump(self, address: int, length: int, prefix: str = "RAM") -> None: """Dump the content of the RAM to the stdout. Args: address: The read start address length: The read size in bytes prefix: A prefix to each output line """ self._ram.hexdump(address, length, prefix)
[docs] def set_idle_generator(self, generator: Iterator[int]) -> None: """Toggle pauses on the write bus lanes given a generator function. Args: generator: A signal generator for the tready flag. The tready signal will be low if the Iterator yields a ``'1'`` """ for channel in [ self._ram.write_if.b_channel, self._ram.read_if.r_channel, ]: channel.set_pause_generator(generator)
[docs] def set_backpressure_generator(self, generator: Iterator[int]) -> None: """Toggle pauses on the read bus lanes given a generator function. Args: generator: A signal generator for the tready flag. The tready signal will be low if the Iterator yields a ``'1'`` """ for channel in [ self._ram.write_if.aw_channel, self._ram.write_if.w_channel, self._ram.read_if.ar_channel, ]: channel.set_pause_generator(generator)
[docs] def disable(self) -> None: """Disable the AXI-Lite RAM.""" self.set_idle_generator(cycle([1])) self.set_backpressure_generator(cycle([1]))
[docs] def enable(self) -> None: """Enable the AXI-Lite RAM.""" self.set_idle_generator(cycle([0])) self.set_backpressure_generator(cycle([0]))
[docs]class RandomAxiLitePayloadGenerator: """A generator class that gives back a random payload. Todo: Add a usage example. """
[docs] def __init__(self, data_width_bits: int): """Initialize an instance. Args: data_width_bits: The width of each value in bits """ self._data_width_bits = data_width_bits
[docs] def get_payload(self) -> bytes: """Get a random payload. Returns: The random payload """ return getrandbits(self._data_width_bits).to_bytes( bits_to_bytes(self._data_width_bits), byteorder="big" )
[docs]class AxiStreamSource: """A wrapper class around `cocotbext-axi AXI-Stream source <https://github.com/alexforencich/cocotbext-axi#axi-stream>`_. Todo: Add a usage example. """
[docs] def __init__( self, bus_prefix: str, tdata_width_bits: int, clk: str, rst: str, reset_active_level: int, ): """Initialize an instance. Args: bus_prefix: The prefix of signals belonging to the source bus tdata_width_bits: The width of `tdata` in bits clk: The name of the clock rst: The name of the reset reset_active_level: 1 if active high 0 if active low """ self._bus_prefix = bus_prefix self._tdata_width_bits = tdata_width_bits self._clk = clk self._rst = rst self._reset_active_level = reset_active_level self._log = SimLog(self._bus_prefix)
[docs] def setup(self, dut: HierarchyObject) -> None: """Setup the AXI-Stream source. Args: dut: The device under test Raises: AttributeError: If `dut` does not contain the handles of given with `clk` and `rst`. """ self._bus = axi.AxiStreamSource( bus=axi.AxiStreamBus.from_prefix(dut, self._bus_prefix), clock=getattr(dut, self._clk), reset=getattr(dut, self._rst), reset_active_level=bool(self._reset_active_level), byte_lanes=bits_to_bytes(self._tdata_width_bits), )
[docs] async def write( self, frame_data: bytes, event: Event | None = None ) -> None: """Write an AXI-Stream frame. Args: frame_data: The frame data event: An :class:`~cocotb.triggers.Event` object to await the completion of the frame transmission. The event gets triggered when the frame has been transmitted """ frame = axi.AxiStreamFrame(frame_data, tx_complete=event) await self._bus.write(frame)
[docs] def set_pause_generator(self, generator: Iterator[int]) -> None: """Toggle pauses on the bus given a generator function. Args: generator: A signal generator for the tready flag. The tready signal will be low if the Iterator yields a ``'1'`` """ self._bus.set_pause_generator(generator)
[docs] def disable(self) -> None: """Disable the AXI-Stream source.""" self._bus.set_pause_generator(cycle([1]))
[docs] def enable(self) -> None: """Enable the AXI-Stream source.""" self._bus.set_pause_generator(cycle([0]))
[docs]class AxiStreamSink: """A Wrapper around `cocotbext-axi AXI-Stream sink <https://github.com/alexforencich/cocotbext-axi#axi-stream>`_. Todo: Add a usage example. """
[docs] def __init__( self, bus_prefix: str, tdata_width_bits: int, clk: str, rst: str, reset_active_level: int, ): """Initialize an instance. Args: bus_prefix: The prefix of signals belonging to the source bus tdata_width_bits: The width of `tdata` in bits clk: The name of the clock rst: The name of the reset reset_active_level: 1 if active high 0 if active low """ self._bus_prefix = bus_prefix self._tdata_width_bits = tdata_width_bits self._clk = clk self._rst = rst self._reset_active_level = bool(reset_active_level) self._log = SimLog(self._bus_prefix)
[docs] def setup(self, dut: HierarchyObject) -> None: """Setup the AXI-Stream sink. Args: dut: The device under test Raises: AttributeError: If `dut` does not contain the handles of given with `clk` and `rst`. """ self._bus = axi.AxiStreamSink( bus=axi.AxiStreamBus.from_prefix(dut, self._bus_prefix), clock=getattr(dut, self._clk), reset=getattr(dut, self._rst), reset_active_level=self._reset_active_level, byte_lanes=bits_to_bytes(self._tdata_width_bits), )
[docs] async def read(self) -> bytes: """Read an AXI-Stream frame. Returns: The frame data """ frame = await self._bus.recv() return frame.tdata # type: ignore[no-any-return]
[docs] def set_pause_generator(self, generator: Iterator[int]) -> None: """Toggle pauses on the bus given a generator function. Args: generator: A signal generator for the tready flag. The tready signal will be low if the Iterator yields a ``'1'`` """ self._bus.set_pause_generator(generator)
[docs] def disable(self) -> None: """Disable the AXI-Stream sink.""" self.set_pause_generator(cycle([1]))
[docs] def enable(self) -> None: """Enable the AXI-Stream sink.""" self.set_pause_generator(cycle([0]))
[docs]class RandomAxiStreamPayloadGenerator: """A generator class that gives back a random payload. Todo: Add a usage example. """
[docs] def __init__(self, frame_length: int, data_width_bits: int): """Initialize an instance. Args: frame_length: The length of the frame data_width_bits: The width of each value in bits """ self._frame_length = frame_length self._data_width_bits = data_width_bits
[docs] def get_payload(self) -> bytes: """Get a random payload. Returns: The random payload """ return b"".join( [ getrandbits(self._data_width_bits).to_bytes( bits_to_bytes(self._data_width_bits), byteorder="big" ) for _ in range(self._frame_length) ] )