Skip to content

Linear requirements

The backend for linear requirements — linear arithmetic inequalities over the variables.

Shield Layer

shield_layer

The Shield Layer for linear requirements.

Defines :class:ShieldLayer, a differentiable PyTorch module that corrects a batch of predictions so they satisfy a set of linear inequality requirements, by clipping each variable into the feasible interval implied by its requirements following a fixed variable ordering.

ShieldLayer

ShieldLayer(num_variables: int, requirements_filepath: str, ordering_choice: str = 'given')

Bases: Module

Differentiable layer that corrects predictions so they satisfy a set of linear inequality requirements.

The correction works variable by variable, following a fixed ordering. For each variable x, the requirements that bound x are rewritten as a lower bound and an upper bound, each expressed as a linear function of the other variables. At inference time the variables are corrected one at a time in this order: x is clipped into the [lower bound, upper bound] interval implied by the requirements, using the already-corrected values of the variables that precede it. Because every variable is only ever clipped using values that are already feasible, correcting one variable can never break a requirement that was already satisfied, so all requirements hold after the pass.

Attributes:

Name Type Description
num_variables

Number of variables (labels or features) in a prediction.

ordering

The ordering of variables used to correct predictions.

constraints

The full list of parsed linear requirements.

sets_of_constr

Mapping from each variable to the requirements bounding it.

pos_matrices

Per-variable coefficient matrices encoding lower bounds.

neg_matrices

Per-variable coefficient matrices encoding upper bounds.

dense_ordering

The ordering restricted to variables that appear in some requirement.

Parameters:

Name Type Description Default
num_variables int

Number of variables in each prediction vector.

required
requirements_filepath str

Path to the requirements file (containing the variable ordering and the linear requirements).

required
ordering_choice str

How to pick the correction ordering: 'given' to use the ordering in the file, or 'random' for a random permutation.

'given'
Example

layer = ShieldLayer(num_variables=4, requirements_filepath='reqs.txt') corrected = layer(predictions) # corrected satisfies all requirements

Build the layer: parse requirements and precompute per-variable bounds.

Source code in pishield/linear_requirements/shield_layer.py
def __init__(self, num_variables: int, requirements_filepath: str, ordering_choice: str = 'given'):
    """Build the layer: parse requirements and precompute per-variable bounds."""
    super().__init__()
    self.num_variables = num_variables
    # Read the variable ordering and the list of linear constraints from the requirements file.
    ordering, constraints = parse_constraints_file(requirements_filepath)
    # clustered_constraints = split_constraints(ordering, constraints)
    # Decide the order in which variables will be corrected ('given', 'random' or custom).
    self.ordering = set_ordering(ordering, ordering_choice)
    self.constraints = constraints
    # Group the constraints by the variable they bound (its "level" in the ordering).
    self.sets_of_constr = compute_sets_of_constraints(ordering, constraints, verbose=True)
    # Precompute, per variable, the coefficient matrices encoding its lower/upper bounds.
    self.pos_matrices, self.neg_matrices = self.create_matrices()
    # Restrict the ordering to the variables that actually appear in some constraint.
    self.dense_ordering = self.get_dense_ordering()  # requires self.sets_of_constraints

create_matrices

create_matrices()

Precompute the lower- and upper-bound matrices for every variable.

For each variable x, builds a matrix C+ encoding the requirements that lower-bound x and a matrix C- encoding those that upper-bound it.

Returns:

Type Description

A tuple (pos_matrices, neg_matrices) of dicts mapping each

class:Variable to its lower-bound and upper-bound representation

(a tensor, or +/-inf when x is unbounded on that side).

Source code in pishield/linear_requirements/shield_layer.py
def create_matrices(self):
    """Precompute the lower- and upper-bound matrices for every variable.

    For each variable ``x``, builds a matrix ``C+`` encoding the requirements
    that lower-bound ``x`` and a matrix ``C-`` encoding those that upper-bound
    it.

    Returns:
        A tuple ``(pos_matrices, neg_matrices)`` of dicts mapping each
        :class:`Variable` to its lower-bound and upper-bound representation
        (a tensor, or ``+/-inf`` when ``x`` is unbounded on that side).
    """
    # This function creates matrices C+ and C- for each variable x_i.
    # C+ (pos) holds the constraints that lower-bound x_i; C- (neg) those that upper-bound it.
    # Note that the column corresponding to x_i in the matrices will be 0s.
    pos_matrices: {Variable: torch.Tensor} = {}
    neg_matrices: {Variable: torch.Tensor} = {}
    for x in self.sets_of_constr:
        x: Variable
        # print(x.id)
        x_constr = get_constr_at_level_x(x, self.sets_of_constr)
        # Split x's constraints by the sign of x: positive occurrences give lower bounds on x,
        # negative occurrences give upper bounds.
        pos_x_constr, neg_x_constr = get_pos_neg_x_constr(x, x_constr)

        pos_matrices[x] = self.create_matrix(x, pos_x_constr, positive_x=True)
        neg_matrices[x] = self.create_matrix(x, neg_x_constr, positive_x=False)
    return pos_matrices, neg_matrices

get_dense_ordering

get_dense_ordering() -> List[Variable]

Restrict the ordering to variables that have requirements.

Returns:

Type Description
List[Variable]

The variables of self.ordering that appear in at least one

List[Variable]

requirement, in order. Requires self.sets_of_constr to be set.

Source code in pishield/linear_requirements/shield_layer.py
def get_dense_ordering(self) -> List[Variable]:
    """Restrict the ordering to variables that have requirements.

    Returns:
        The variables of ``self.ordering`` that appear in at least one
        requirement, in order. Requires ``self.sets_of_constr`` to be set.
    """
    dense_ordering = []
    for x in self.ordering:
        x_constr = get_constr_at_level_x(x, self.sets_of_constr)
        if len(x_constr) == 0:
            continue
        else:
            dense_ordering.append(x)
    return dense_ordering

create_matrix

create_matrix(x: Variable, x_constr: List[Constraint], positive_x: bool) -> Union[torch.Tensor, float]

Build the coefficient matrix encoding one side of x's bounds.

Each requirement contributes a row that expresses the bound it places on x as a linear function of the other variables, plus a bias column for the requirement's constant. Evaluating a row against a prediction vector yields the value x must be >= (when positive_x) or <= (when not positive_x).

Parameters:

Name Type Description Default
x Variable

The variable whose bound matrix is built.

required
x_constr List[Constraint]

The requirements that bound x on the relevant side.

required
positive_x bool

True to build the lower-bound matrix (positive occurrences of x), False for the upper-bound matrix.

required

Returns:

Type Description
Union[Tensor, float]

A tensor of shape (num_constraints, num_variables + 1), or

Union[Tensor, float]

-inf / +inf when x is unbounded on that side.

Source code in pishield/linear_requirements/shield_layer.py
def create_matrix(self, x: Variable, x_constr: List[Constraint], positive_x: bool) -> Union[torch.Tensor, float]:
    """Build the coefficient matrix encoding one side of ``x``'s bounds.

    Each requirement contributes a row that expresses the bound it places on
    ``x`` as a linear function of the other variables, plus a bias column for
    the requirement's constant. Evaluating a row against a prediction vector
    yields the value ``x`` must be ``>=`` (when ``positive_x``) or ``<=``
    (when not ``positive_x``).

    Args:
        x: The variable whose bound matrix is built.
        x_constr: The requirements that bound ``x`` on the relevant side.
        positive_x: ``True`` to build the lower-bound matrix (positive
            occurrences of ``x``), ``False`` for the upper-bound matrix.

    Returns:
        A tensor of shape ``(num_constraints, num_variables + 1)``, or
        ``-inf`` / ``+inf`` when ``x`` is unbounded on that side.
    """
    # Build one row per constraint, expressing the bound it places on x as a linear function of
    # the other variables (plus a bias term for the constraint's constant). Evaluating a row
    # against a prediction vector yields the value x must be >= (positive_x) or <= (negative_x).
    # With no constraints, x is unbounded on that side: return -inf (lower) or +inf (upper).
    if len(x_constr) == 0:
        return -INFINITY if positive_x else INFINITY

    matrix = torch.zeros((len(x_constr), self.num_variables), dtype=torch.float)
    x_unsigned_coefficients = torch.ones((len(x_constr),), dtype=torch.float)  # bias (i.e. the constraint constant)
    bias = torch.zeros((len(x_constr),), dtype=torch.float)
    for constr_index, constr in enumerate(x_constr):
        constr: Constraint

        is_strict_inequality = True if constr.single_inequality.ineq_sign == '>' else False
        constant = constr.single_inequality.constant
        epsilon = EPSILON if is_strict_inequality else 0.
        bias[constr_index] = constant + epsilon
        complementary_atoms: List[Atom] = constr.get_body_atoms()
        for atom in complementary_atoms:
            atom_id = atom.variable.id
            if atom_id == x.id:
                x_unsigned_coefficients[constr_index] = atom.coefficient
                continue
            else:
                signed_coefficient = atom.get_signed_coefficient()
                matrix[constr_index, atom_id] = signed_coefficient

    # next, divide by the unsigned coefficients of x:
    matrix = matrix / x_unsigned_coefficients.unsqueeze(-1)  # num constraints that contain x x num variables

    # if x is positive, multiply by -1 the matrix
    if positive_x:
        matrix *= (-1.)

    # add bias (constraint constant)
    bias = bias / x_unsigned_coefficients
    if not positive_x:
        bias *= (-1.)

    matrix = torch.cat([matrix, bias.unsqueeze(1)], dim=1)
    return matrix

apply_matrix

apply_matrix(preds: Tensor, matrix: Union[Tensor, float], reduction='none') -> torch.Tensor

Evaluate a bound matrix against predictions and reduce over requirements.

Computes, for every requirement row, the bound value as a dot product with the prediction vector, then optionally reduces across rows to obtain the tightest bound.

Parameters:

Name Type Description Default
preds Tensor

Prediction tensor of shape (batch_size, num_variables + 1) (the trailing column is the bias constant).

required
matrix Union[Tensor, float]

The bound matrix for the variable, or a float when the variable is unbounded on that side (returned unchanged).

required
reduction

'amax' for the tightest lower bound, 'amin' for the tightest upper bound, or 'none' for no reduction.

'none'

Returns:

Type Description
Tensor

The per-sample bound value(s); shape (batch_size,) when reduced,

Tensor

or the float matrix when it is not a tensor.

Source code in pishield/linear_requirements/shield_layer.py
def apply_matrix(self, preds: torch.Tensor, matrix: Union[torch.Tensor, float], reduction='none') -> torch.Tensor:
    """Evaluate a bound matrix against predictions and reduce over requirements.

    Computes, for every requirement row, the bound value as a dot product
    with the prediction vector, then optionally reduces across rows to obtain
    the tightest bound.

    Args:
        preds: Prediction tensor of shape ``(batch_size, num_variables + 1)``
            (the trailing column is the bias constant).
        matrix: The bound matrix for the variable, or a float when the
            variable is unbounded on that side (returned unchanged).
        reduction: ``'amax'`` for the tightest lower bound, ``'amin'`` for the
            tightest upper bound, or ``'none'`` for no reduction.

    Returns:
        The per-sample bound value(s); shape ``(batch_size,)`` when reduced,
        or the float ``matrix`` when it is not a tensor.
    """
    if type(matrix) != torch.Tensor:
        return matrix
    else:
        matrix = matrix.to(preds.device)

    B = preds.shape[0]  # batch size
    C = matrix.shape[0]  # num constraints in the current set
    N = matrix.shape[1]  # num variables

    # expand tensors
    preds = preds.unsqueeze(1).expand((B, C, N))
    matrix = matrix.clone().unsqueeze(0).expand((B, C, N))

    result = (preds * matrix).sum(dim=2)
    if reduction == 'amax':
        result = result.amax(dim=1)
    elif reduction == 'amin':
        result = result.amin(dim=1)
    else:
        pass
    return result

Parser

parser

Parser for linear requirements files.

Reads a requirements file into the constraint data model: the variable ordering and a list of :class:Constraint objects built from the textual linear inequalities (with support for or disjunctions and neg negation).

neg_postprocess_ineq

neg_postprocess_ineq(ineq: Inequality) -> Inequality

Negate an inequality (logical negation of the linear constraint).

Negates every atom and flips the inequality sign between '>' and '>=', e.g. neg (x1 + x2 - x3 >= 0) becomes -x1 - x2 + x3 > 0.

Parameters:

Name Type Description Default
ineq Inequality

The inequality to negate.

required

Returns:

Type Description
Inequality

A new :class:Inequality representing the negation.

Source code in pishield/linear_requirements/parser.py
def neg_postprocess_ineq(ineq: Inequality) -> Inequality:
    """Negate an inequality (logical negation of the linear constraint).

    Negates every atom and flips the inequality sign between ``'>'`` and
    ``'>='``, e.g. ``neg (x1 + x2 - x3 >= 0)`` becomes ``-x1 - x2 + x3 > 0``.

    Args:
        ineq: The inequality to negate.

    Returns:
        A new :class:`Inequality` representing the negation.
    """
    # equivalent to negating the sign of the inequality and the multiplying by -1
    # e.g. neg x1+x2-x3 >=0 becomes -x1-x2+x3 > 0
    atoms_list, ineq_sign, constant = ineq.get_ineq_attributes()

    # negate all atoms in the inequality
    neg_atoms_list = []
    for atom in atoms_list:
        variable, coefficient, positive_sign = atom.get_atom_attributes()
        neg_atom = Atom(variable, coefficient, not positive_sign)
        neg_atoms_list.append(neg_atom)

    # change the sign of the inequality
    neg_ineq_sign = '>' if ineq_sign == '>=' else '>='
    neg_ineq = Inequality(neg_atoms_list, neg_ineq_sign, constant)
    return neg_ineq

parse_atom

parse_atom(x)

Parse a single textual atom into an :class:Atom.

Parameters:

Name Type Description Default
x

The atom string, e.g. 'y1', '-2y2', '-y23' or '6y3'. A missing coefficient defaults to 1; a leading '-' sets a negative sign.

required

Returns:

Type Description

The parsed :class:Atom, or None if the string is empty.

Source code in pishield/linear_requirements/parser.py
def parse_atom(x):
    """Parse a single textual atom into an :class:`Atom`.

    Args:
        x: The atom string, e.g. ``'y1'``, ``'-2y2'``, ``'-y23'`` or ``'6y3'``.
            A missing coefficient defaults to 1; a leading ``'-'`` sets a negative
            sign.

    Returns:
        The parsed :class:`Atom`, or ``None`` if the string is empty.
    """
    x = x.strip().replace(" ", "")
    if x == '':
        return None
    positive_sign = False if '-' in x else True

    if '-' in x or '+' in x:
        x = x[1:]

    coefficient, var_id = x.split('y')
    coefficient = float(coefficient) if coefficient != '' else 1
    var = Variable('y' + var_id)

    atom = Atom(var, coefficient, positive_sign)
    return atom

parse_inequality

parse_inequality(inequality)

Parse a textual linear inequality into an :class:Inequality.

Parameters:

Name Type Description Default
inequality

The inequality string, e.g. 'y1-2y2>0' or 'y1+y2>=0'. The body atoms are split on the +, -, * and / operators and the right-hand side is read as a constant.

required

Returns:

Type Description

The parsed :class:Inequality.

Source code in pishield/linear_requirements/parser.py
def parse_inequality(inequality):
    """Parse a textual linear inequality into an :class:`Inequality`.

    Args:
        inequality: The inequality string, e.g. ``'y1-2y2>0'`` or ``'y1+y2>=0'``.
            The body atoms are split on the ``+``, ``-``, ``*`` and ``/``
            operators and the right-hand side is read as a constant.

    Returns:
        The parsed :class:`Inequality`.
    """
    ineq_sign = None
    inequality = inequality.strip()
    for sign in ALLOWED_INEQ_SIGNS:
        if sign in inequality:
            ineq_sign = sign
            break
    body, constant = inequality.split(ineq_sign)
    constant = float(constant)

    atoms_list = []
    constructed_atom = ''
    for elem in body:
        if elem in ALLOWED_OPS:
            if constructed_atom == '':
                constructed_atom += elem
            else:
                atom = parse_atom(constructed_atom)
                if atom is not None:
                    atoms_list.append(atom)
                constructed_atom = elem
        else:
            constructed_atom += elem
    atom = parse_atom(constructed_atom)
    if atom is not None:
        atoms_list.append(atom)

    ineq = Inequality(atoms_list, ineq_sign, constant)
    return ineq

parse_constraint

parse_constraint(constr) -> Constraint

Parse one textual requirement line into a :class:Constraint.

Splits the line on or into disjunct inequalities, applying neg negation where present, and wraps the resulting inequalities in a constraint.

Parameters:

Name Type Description Default
constr

The requirement line, e.g. 'y1>0 or neg y2>=0'.

required

Returns:

Type Description
Constraint

The parsed :class:Constraint.

Source code in pishield/linear_requirements/parser.py
def parse_constraint(constr) -> Constraint:
    """Parse one textual requirement line into a :class:`Constraint`.

    Splits the line on ``or`` into disjunct inequalities, applying ``neg``
    negation where present, and wraps the resulting inequalities in a constraint.

    Args:
        constr: The requirement line, e.g. ``'y1>0 or neg y2>=0'``.

    Returns:
        The parsed :class:`Constraint`.
    """
    ineqs = []
    neg_postprocess = False

    disjunct_inequalities = constr.split('or')
    for ineq in disjunct_inequalities:
        if 'neg' in ineq:
            ineq = ineq.split('neg')[-1]
            neg_postprocess = True
        ineq = parse_inequality(ineq)
        if neg_postprocess:
            ineq = neg_postprocess_ineq(ineq)
            neg_postprocess = False
        ineqs.append(ineq)

    constr = Constraint(ineqs)
    return constr

parse_constraints_file

parse_constraints_file(file: str) -> (List[Variable], List[Constraint])

Parse a requirements file into a variable ordering and requirements.

The file must contain an ordering line listing the variables, followed by one requirement per line.

Parameters:

Name Type Description Default
file str

Path to the requirements file.

required

Returns:

Type Description
(List[Variable], List[Constraint])

A tuple (ordering, constraints) of the parsed variable ordering and

(List[Variable], List[Constraint])

the list of :class:Constraint objects.

Raises:

Type Description
Exception

If the file does not provide a variable ordering.

Source code in pishield/linear_requirements/parser.py
def parse_constraints_file(file: str) -> (List[Variable], List[Constraint]):
    """Parse a requirements file into a variable ordering and requirements.

    The file must contain an ``ordering`` line listing the variables, followed by
    one requirement per line.

    Args:
        file: Path to the requirements file.

    Returns:
        A tuple ``(ordering, constraints)`` of the parsed variable ordering and
        the list of :class:`Constraint` objects.

    Raises:
        Exception: If the file does not provide a variable ordering.
    """
    f = open(file, 'r')
    constraints = []
    ordering = []
    for line in f:
        line = line.strip()
        if 'ordering' in line:
            str_ordering = line.split('ordering ')[-1].split()
            for elem in str_ordering:
                ordering.append(Variable(elem.replace(" ", ""))) # remove all whitespace from variable string
            continue
        constraints.append(parse_constraint(line))
    if len(ordering) == 0:
        raise Exception('An ordering of the variables must be provided!')
    return ordering, constraints

split_constraints

split_constraints(ordering: List[Variable], constraints: List[Constraint])

Cluster requirements into variable-disjoint groups.

Splits a list of requirements into groups Gi of requirements such that Vars(Gi) \intersect Vars(Gj) = null for distinct groups, i.e. requirements are grouped together whenever they share any variable (transitively).

Parameters:

Name Type Description Default
ordering List[Variable]

The variable ordering, used to order each group's variables.

required
constraints List[Constraint]

The requirements to cluster.

required

Returns:

Type Description

A tuple (clustered_orderings, clustered_constraints) of lists, where

each entry is, respectively, the variable ordering and the requirement

list of one variable-disjoint cluster.

Source code in pishield/linear_requirements/parser.py
def split_constraints(ordering: List[Variable], constraints: List[Constraint]):
    r"""Cluster requirements into variable-disjoint groups.

    Splits a list of requirements into groups ``Gi`` of requirements such that
    ``Vars(Gi) \intersect Vars(Gj) = null`` for distinct groups, i.e. requirements
    are grouped together whenever they share any variable (transitively).

    Args:
        ordering: The variable ordering, used to order each group's variables.
        constraints: The requirements to cluster.

    Returns:
        A tuple ``(clustered_orderings, clustered_constraints)`` of lists, where
        each entry is, respectively, the variable ordering and the requirement
        list of one variable-disjoint cluster.
    """
    constr_vars = {}
    for i,constr in enumerate(constraints):
        vars_in_constr = constr.single_inequality.get_body_variables()
        constr_vars[i] = set([var.id for var in vars_in_constr])

    clustered_constraints_ids = list(range(len(constraints)))
    for constr_i in range(len(constraints)):
        vars_in_constr_i = constr_vars[constr_i]
        for constr_j in range(constr_i+1, len(constraints)):

            vars_in_constr_j = constr_vars[constr_j]
            if vars_in_constr_i.intersection(vars_in_constr_j):
                clustered_constraints_ids[constr_j] = clustered_constraints_ids[constr_i]

    clustered_constraints = {id:[] for id in set(clustered_constraints_ids)}
    clustered_orderings = {id:[] for id in set(clustered_constraints_ids)}
    var_ordering = [var.id for var in ordering]
    for cluster_id in clustered_constraints:
        ids_cluster_components = [id for id, group_id in enumerate(clustered_constraints_ids) if group_id == cluster_id]
        clustered_constraints[cluster_id] = [constraints[cluster_component] for cluster_component in ids_cluster_components]

        cluster_ordering = set.union(*[constr_vars[constr_i] for constr_i in ids_cluster_components])
        clustered_orderings[cluster_id] = [var for var in var_ordering if var in cluster_ordering]

    clustered_constraints = list(clustered_constraints.values())
    clustered_orderings = list(clustered_orderings.values())
    return clustered_orderings, clustered_constraints

Classes

classes

Data model for linear requirements.

Defines the building blocks of a linear requirement: a :class:Variable, a signed scaled :class:Atom (a coefficient times a variable), an :class:Inequality (a body of atoms compared against a constant), and a :class:Constraint wrapping one inequality. These classes also provide satisfaction checks against batches of predictions.

Variable

Variable(variable: str)

A variable (a label or feature) identified by a string name.

Attributes:

Name Type Description
variable

The variable's string name (e.g. 'y_3').

id

The integer id parsed from the trailing part of the name.

Parameters:

Name Type Description Default
variable str

The variable's string name, whose trailing token after the final underscore is parsed as the integer id.

required

Initialize the variable and parse its integer id from the name.

Source code in pishield/linear_requirements/classes.py
def __init__(self, variable: str):
    """Initialize the variable and parse its integer id from the name."""
    super().__init__()
    self.variable = variable
    self.id = self.get_variable_id()

readable

readable()

Return the human-readable string name of the variable.

Returns:

Type Description

The variable's string name.

Source code in pishield/linear_requirements/classes.py
def readable(self):
    """Return the human-readable string name of the variable.

    Returns:
        The variable's string name.
    """
    return self.variable

get_variable_id

get_variable_id()

Parse the integer id from the variable name.

Returns:

Type Description

The integer following the final underscore in the variable name.

Source code in pishield/linear_requirements/classes.py
def get_variable_id(self):
    """Parse the integer id from the variable name.

    Returns:
        The integer following the final underscore in the variable name.
    """
    id = int(self.variable.split('_')[-1])
    return id

Atom

Atom(variable: Variable, coefficient: float, positive_sign: bool)

A signed, scaled occurrence of a variable in a linear requirement.

An atom represents (+/-) coefficient * variable.

Attributes:

Name Type Description
variable

The :class:Variable the atom refers to.

coefficient

The (unsigned) magnitude of the coefficient.

positive_sign

True if the term is added, False if subtracted.

Parameters:

Name Type Description Default
variable Variable

The :class:Variable the atom refers to.

required
coefficient float

The unsigned coefficient magnitude.

required
positive_sign bool

Whether the term carries a positive sign.

required

Initialize the atom from a variable, coefficient and sign.

Source code in pishield/linear_requirements/classes.py
def __init__(self, variable: Variable, coefficient: float, positive_sign: bool):
    """Initialize the atom from a variable, coefficient and sign."""
    super().__init__()
    self.variable = variable
    self.coefficient = coefficient
    self.positive_sign = positive_sign

get_variable_id

get_variable_id()

Return the integer id of the atom's variable.

Returns:

Type Description

The variable's integer id.

Source code in pishield/linear_requirements/classes.py
def get_variable_id(self):
    """Return the integer id of the atom's variable.

    Returns:
        The variable's integer id.
    """
    return self.variable.get_variable_id()

eval

eval(x_value)

Evaluate the atom for a given value of its variable.

Parameters:

Name Type Description Default
x_value

The value(s) of the atom's variable.

required

Returns:

Type Description

x_value multiplied by the atom's signed coefficient.

Source code in pishield/linear_requirements/classes.py
def eval(self, x_value):
    """Evaluate the atom for a given value of its variable.

    Args:
        x_value: The value(s) of the atom's variable.

    Returns:
        ``x_value`` multiplied by the atom's signed coefficient.
    """
    return x_value * self.get_signed_coefficient()

get_signed_coefficient

get_signed_coefficient()

Return the coefficient with its sign applied.

Returns:

Type Description

+coefficient if the atom is positive, -coefficient otherwise.

Source code in pishield/linear_requirements/classes.py
def get_signed_coefficient(self):
    """Return the coefficient with its sign applied.

    Returns:
        ``+coefficient`` if the atom is positive, ``-coefficient`` otherwise.
    """
    return self.coefficient if self.positive_sign else -1 * self.coefficient

readable

readable()

Return a human-readable string for the atom, e.g. ' + 2.00y_3'.

Returns:

Type Description

A string with the sign, coefficient (omitted when 1) and variable.

Source code in pishield/linear_requirements/classes.py
def readable(self):
    """Return a human-readable string for the atom, e.g. ``' + 2.00y_3'``.

    Returns:
        A string with the sign, coefficient (omitted when 1) and variable.
    """
    readable = ' + ' if self.positive_sign else ' - '
    readable += (f'{self.coefficient:.2f}' if self.coefficient != int(
        self.coefficient) else f'{self.coefficient:.0f}') if self.coefficient != 1 else ''
    readable += self.variable.readable()
    return readable

get_atom_attributes

get_atom_attributes()

Return the atom's defining attributes.

Returns:

Type Description

A tuple (variable, coefficient, positive_sign).

Source code in pishield/linear_requirements/classes.py
def get_atom_attributes(self):
    """Return the atom's defining attributes.

    Returns:
        A tuple ``(variable, coefficient, positive_sign)``.
    """
    return self.variable, self.coefficient, self.positive_sign

Inequality

Inequality(body: List[Atom], ineq_sign: str, constant: float)

A linear inequality: a body of atoms compared against a constant.

Represents sum(body) ineq_sign constant (e.g. 2y_1 - y_2 >= 0).

Attributes:

Name Type Description
ineq_sign

The inequality operator, '>' or '>='.

constant

The right-hand-side constant.

body

The list of :class:Atom objects on the left-hand side.

Parameters:

Name Type Description Default
body List[Atom]

The atoms forming the left-hand side.

required
ineq_sign str

The inequality operator ('>' or '>=').

required
constant float

The right-hand-side constant.

required

Initialize the inequality from its body, sign and constant.

Source code in pishield/linear_requirements/classes.py
def __init__(self, body: List[Atom], ineq_sign: str, constant: float):
    """Initialize the inequality from its body, sign and constant."""
    super().__init__()
    self.ineq_sign = ineq_sign
    self.constant = constant
    self.body = body

readable

readable()

Return a human-readable string for the inequality.

Returns:

Type Description

A string combining the readable body atoms, the sign and the

constant.

Source code in pishield/linear_requirements/classes.py
def readable(self):
    """Return a human-readable string for the inequality.

    Returns:
        A string combining the readable body atoms, the sign and the
        constant.
    """
    readable_ineq = ''
    for elem in self.body:
        readable_ineq += elem.readable()
    readable_ineq += ' ' + self.ineq_sign + ' ' + str(self.constant)
    return readable_ineq

get_body_variables

get_body_variables()

Return the variables appearing in the inequality body.

Returns:

Type Description

A list of :class:Variable objects, one per body atom.

Source code in pishield/linear_requirements/classes.py
def get_body_variables(self):
    """Return the variables appearing in the inequality body.

    Returns:
        A list of :class:`Variable` objects, one per body atom.
    """
    var_list = []
    for atom in self.body:
        var_list.append(atom.variable)
    return var_list

get_body_atoms

get_body_atoms()

Return the atoms forming the inequality body.

Returns:

Type Description

A list of the body :class:Atom objects.

Source code in pishield/linear_requirements/classes.py
def get_body_atoms(self):
    """Return the atoms forming the inequality body.

    Returns:
        A list of the body :class:`Atom` objects.
    """
    atom_list = []
    for atom in self.body:
        atom_list.append(atom)
    return atom_list

get_x_complement_body_atoms

get_x_complement_body_atoms(x: Variable) -> (List[Atom], Atom, bool)

Split the body into x's atom and the remaining atoms.

Given an inequality in which x appears, separate the single atom over x from the rest of the body.

Parameters:

Name Type Description Default
x Variable

The variable to isolate.

required

Returns:

Type Description
(List[Atom], Atom, bool)

A tuple ``(complementary_atom_list, x_atom_occurrences, constant,

(List[Atom], Atom, bool)

is_strict_inequality)wherecomplementary_atom_list`` are the body

(List[Atom], Atom, bool)

atoms other than x, x_atom_occurrences is the single

(List[Atom], Atom, bool)

class:Atom over x (or an empty list if x is absent),

(List[Atom], Atom, bool)

constant is the inequality constant and is_strict_inequality

(List[Atom], Atom, bool)

is True when the sign is '>'.

Raises:

Type Description
AssertionError

If x appears in more than one atom (atoms over x should be merged first via collapse_atoms).

Source code in pishield/linear_requirements/classes.py
def get_x_complement_body_atoms(self, x: Variable) -> (List[Atom], Atom, bool):
    """Split the body into ``x``'s atom and the remaining atoms.

    Given an inequality in which ``x`` appears, separate the single atom over
    ``x`` from the rest of the body.

    Args:
        x: The variable to isolate.

    Returns:
        A tuple ``(complementary_atom_list, x_atom_occurrences, constant,
        is_strict_inequality)`` where ``complementary_atom_list`` are the body
        atoms other than ``x``, ``x_atom_occurrences`` is the single
        :class:`Atom` over ``x`` (or an empty list if ``x`` is absent),
        ``constant`` is the inequality constant and ``is_strict_inequality``
        is ``True`` when the sign is ``'>'``.

    Raises:
        AssertionError: If ``x`` appears in more than one atom (atoms over
            ``x`` should be merged first via ``collapse_atoms``).
    """
    # given a constraint constr in which variable x appears,
    # return the body of the constraint (i.e. the left-hand side of the inequality)
    # from which x occurrences have been removed
    complementary_atom_list = []
    x_atom_occurrences = []
    for atom in self.body:
        if atom.variable.id != x.id:
            complementary_atom_list.append(atom)
        else:
            x_atom_occurrences.append(atom)
    assert len(x_atom_occurrences) <= 1, "variable {x.id} appears more than one time, function collapse_atoms() from compute_sets_of_constraints should be applied"
    if len(x_atom_occurrences) == 1:
        x_atom_occurrences = x_atom_occurrences[0]
    is_strict_inequality = True if self.ineq_sign == '>' else False
    return complementary_atom_list, x_atom_occurrences, self.constant, is_strict_inequality

get_ineq_attributes

get_ineq_attributes()

Return the inequality's defining attributes.

Returns:

Type Description

A tuple (body, ineq_sign, constant).

Source code in pishield/linear_requirements/classes.py
def get_ineq_attributes(self):
    """Return the inequality's defining attributes.

    Returns:
        A tuple ``(body, ineq_sign, constant)``.
    """
    return self.body, self.ineq_sign, self.constant

contains_variable

contains_variable(x: Variable)

Check whether a variable appears in the inequality body.

Parameters:

Name Type Description Default
x Variable

The variable to look for.

required

Returns:

Type Description

True if x (matched by id) appears in the body.

Source code in pishield/linear_requirements/classes.py
def contains_variable(self, x: Variable):
    """Check whether a variable appears in the inequality body.

    Args:
        x: The variable to look for.

    Returns:
        ``True`` if ``x`` (matched by id) appears in the body.
    """
    body_variables = [elem.id for elem in self.get_body_variables()]
    return x.id in body_variables

check_satisfaction

check_satisfaction(preds: Tensor) -> torch.Tensor

Check, per sample, whether predictions satisfy the inequality.

The body is evaluated against the predictions and compared to the constant up to TOLERANCE.

Parameters:

Name Type Description Default
preds Tensor

Prediction tensor of shape (batch_size, num_variables).

required

Returns:

Type Description
Tensor

A boolean tensor of shape (batch_size,), one entry per sample.

Source code in pishield/linear_requirements/classes.py
def check_satisfaction(self, preds: torch.Tensor) -> torch.Tensor:
    """Check, per sample, whether predictions satisfy the inequality.

    The body is evaluated against the predictions and compared to the
    constant up to ``TOLERANCE``.

    Args:
        preds: Prediction tensor of shape ``(batch_size, num_variables)``.

    Returns:
        A boolean tensor of shape ``(batch_size,)``, one entry per sample.
    """
    eval_body_value = eval_atoms_list(self.body, preds)
    if self.ineq_sign == '>':
        results = eval('(eval_body_value > self.constant - TOLERANCE) | (eval_body_value > self.constant + TOLERANCE)')
    elif self.ineq_sign == '>=':
        results = eval('(eval_body_value >= self.constant - TOLERANCE) | (eval_body_value >= self.constant + TOLERANCE)')
    # if not results.all():
    #     print('Problem here:', eval_body_value[eval_body_value<=self.constant-TOLERANCE])
    return results #.all()

detailed_sat_check

detailed_sat_check(preds: Tensor) -> torch.Tensor

Check satisfaction and also return the intermediate evaluation values.

Parameters:

Name Type Description Default
preds Tensor

Prediction tensor of shape (batch_size, num_variables).

required

Returns:

Type Description
Tensor

A tuple (results, eval_body_value, constant, ineq_sign) where

Tensor

results is the per-sample boolean satisfaction tensor,

Tensor

eval_body_value is the evaluated body, and constant /

Tensor

ineq_sign are the inequality's constant and operator.

Source code in pishield/linear_requirements/classes.py
def detailed_sat_check(self, preds: torch.Tensor) -> torch.Tensor:
    """Check satisfaction and also return the intermediate evaluation values.

    Args:
        preds: Prediction tensor of shape ``(batch_size, num_variables)``.

    Returns:
        A tuple ``(results, eval_body_value, constant, ineq_sign)`` where
        ``results`` is the per-sample boolean satisfaction tensor,
        ``eval_body_value`` is the evaluated body, and ``constant`` /
        ``ineq_sign`` are the inequality's constant and operator.
    """
    eval_body_value = eval_atoms_list(self.body, preds)
    if self.ineq_sign == '>':
        results = eval('(eval_body_value > self.constant - TOLERANCE) | (eval_body_value > self.constant + TOLERANCE)')
    elif self.ineq_sign == '>=':
        results = eval('(eval_body_value >= self.constant - TOLERANCE) | (eval_body_value >= self.constant + TOLERANCE)')
    return results, eval_body_value, self.constant, self.ineq_sign

Constraint

Constraint(inequality_list: List[Inequality])

A linear requirement, wrapping its inequality (or disjunction thereof).

Most methods delegate to the first inequality in the list (single_inequality).

Attributes:

Name Type Description
inequality_list

The list of :class:Inequality objects making up the requirement.

single_inequality

The first inequality, used as the primary inequality.

Parameters:

Name Type Description Default
inequality_list List[Inequality]

The list of inequalities forming the requirement; the first element becomes single_inequality.

required

Initialize the requirement and select its primary inequality.

Source code in pishield/linear_requirements/classes.py
def __init__(self, inequality_list: List[Inequality]):
    """Initialize the requirement and select its primary inequality."""
    super().__init__()
    self.inequality_list = inequality_list
    self.single_inequality = self.inequality_list[0]

readable

readable()

Return a human-readable string for the requirement.

Returns:

Type Description

The readable form of the primary inequality.

Source code in pishield/linear_requirements/classes.py
def readable(self):
    """Return a human-readable string for the requirement.

    Returns:
        The readable form of the primary inequality.
    """
    readable_constr = self.single_inequality.readable()
    return readable_constr

verbose_readable

verbose_readable()

Return a human-readable string for the requirement's first inequality.

Returns:

Type Description

The readable form of the first inequality.

Source code in pishield/linear_requirements/classes.py
def verbose_readable(self):
    """Return a human-readable string for the requirement's first inequality.

    Returns:
        The readable form of the first inequality.
    """
    readable_constr = self.inequality_list[0].readable()
    return readable_constr

contains_variable

contains_variable(x: Variable)

Check whether the requirement involves a given variable.

Parameters:

Name Type Description Default
x Variable

The variable to look for.

required

Returns:

Type Description

True if x appears in the primary inequality.

Source code in pishield/linear_requirements/classes.py
def contains_variable(self, x: Variable):
    """Check whether the requirement involves a given variable.

    Args:
        x: The variable to look for.

    Returns:
        ``True`` if ``x`` appears in the primary inequality.
    """
    return self.single_inequality.contains_variable(x)

get_body_atoms

get_body_atoms()

Return the atoms of the requirement's primary inequality body.

Returns:

Type Description

A list of :class:Atom objects.

Source code in pishield/linear_requirements/classes.py
def get_body_atoms(self):
    """Return the atoms of the requirement's primary inequality body.

    Returns:
        A list of :class:`Atom` objects.
    """
    return self.single_inequality.get_body_atoms()

check_satisfaction

check_satisfaction(preds)

Check whether the whole batch satisfies the requirement.

Parameters:

Name Type Description Default
preds

Prediction tensor of shape (batch_size, num_variables).

required

Returns:

Type Description

True if every sample in the batch satisfies the requirement.

Source code in pishield/linear_requirements/classes.py
def check_satisfaction(self, preds):
    """Check whether the whole batch satisfies the requirement.

    Args:
        preds: Prediction tensor of shape ``(batch_size, num_variables)``.

    Returns:
        ``True`` if every sample in the batch satisfies the requirement.
    """
    return self.single_inequality.check_satisfaction(preds).all()  # for the whole batch

check_satisfaction_per_sample

check_satisfaction_per_sample(preds)

Check requirement satisfaction for each sample individually.

Parameters:

Name Type Description Default
preds

Prediction tensor of shape (batch_size, num_variables).

required

Returns:

Type Description

A boolean tensor of shape (batch_size,).

Source code in pishield/linear_requirements/classes.py
def check_satisfaction_per_sample(self, preds):
    """Check requirement satisfaction for each sample individually.

    Args:
        preds: Prediction tensor of shape ``(batch_size, num_variables)``.

    Returns:
        A boolean tensor of shape ``(batch_size,)``.
    """
    return self.single_inequality.check_satisfaction(preds)

detailed_sample_sat_check

detailed_sample_sat_check(preds)

Return per-sample satisfaction along with intermediate values.

Parameters:

Name Type Description Default
preds

Prediction tensor of shape (batch_size, num_variables).

required

Returns:

Type Description

The tuple produced by :meth:Inequality.detailed_sat_check.

Source code in pishield/linear_requirements/classes.py
def detailed_sample_sat_check(self, preds):
    """Return per-sample satisfaction along with intermediate values.

    Args:
        preds: Prediction tensor of shape ``(batch_size, num_variables)``.

    Returns:
        The tuple produced by :meth:`Inequality.detailed_sat_check`.
    """
    return self.single_inequality.detailed_sat_check(preds)

Computing sets of constraints

compute_sets_of_constraints

Derive, per variable, the set of linear requirements that bound it.

This module performs Fourier-Motzkin-style variable elimination over the linear requirements: following the (reversed) variable ordering, it repeatedly eliminates the highest-ranking remaining variable by combining the requirements that contain it, producing for each variable the set of requirements that bound it in terms of lower-ranked variables only.

collapse_atoms

collapse_atoms(atom_list)

Merge atoms that refer to the same variable into a single atom.

Atoms over the same variable are combined by summing their signed coefficients; any atom whose resulting coefficient is zero is dropped.

Parameters:

Name Type Description Default
atom_list

List of :class:Atom objects, possibly with duplicate variables.

required

Returns:

Type Description

A list of :class:Atom objects with at most one atom per variable and

no zero-coefficient atoms.

Source code in pishield/linear_requirements/compute_sets_of_constraints.py
def collapse_atoms(atom_list):
    """Merge atoms that refer to the same variable into a single atom.

    Atoms over the same variable are combined by summing their signed
    coefficients; any atom whose resulting coefficient is zero is dropped.

    Args:
        atom_list: List of :class:`Atom` objects, possibly with duplicate
            variables.

    Returns:
        A list of :class:`Atom` objects with at most one atom per variable and
        no zero-coefficient atoms.
    """
    # merge any duplicated atoms in an atom list
    merged_atoms = {}
    merged_atoms: {int: Atom}
    for atom in atom_list:
        var = atom.variable.id
        if var not in merged_atoms.keys():
            merged_atoms[var] = atom
        else:
            variable, coefficient, positive_sign = merged_atoms[var].get_atom_attributes()
            existing_coeff = coefficient if positive_sign else -coefficient
            current_coeff = atom.coefficient if atom.positive_sign else -atom.coefficient
            new_coefficient = existing_coeff + current_coeff
            if new_coefficient != 0:
                new_atom = Atom(variable, float(np.abs(new_coefficient)), True if new_coefficient > 0 else False)
                merged_atoms[var] = new_atom
    return list(merged_atoms.values())

split_constr_atoms

split_constr_atoms(y: Variable, constr: Constraint)

Separate the coefficient of y from the rest of a constraint's body.

Parameters:

Name Type Description Default
y Variable

The variable to extract.

required
constr Constraint

The constraint whose body is split.

required

Returns:

Type Description

A tuple (red_coefficient, complementary_atoms) where

red_coefficient is the (positive) coefficient of y and

complementary_atoms is the list of the remaining body atoms.

Source code in pishield/linear_requirements/compute_sets_of_constraints.py
def split_constr_atoms(y: Variable, constr: Constraint):
    """Separate the coefficient of ``y`` from the rest of a constraint's body.

    Args:
        y: The variable to extract.
        constr: The constraint whose body is split.

    Returns:
        A tuple ``(red_coefficient, complementary_atoms)`` where
        ``red_coefficient`` is the (positive) coefficient of ``y`` and
        ``complementary_atoms`` is the list of the remaining body atoms.
    """
    complementary_atoms = []
    for atom in constr.get_body_atoms():
        if atom.variable.id == y.id:
            red_coefficient = atom.coefficient  # note this is a positive real constant
        else:
            complementary_atoms.append(atom)
    return red_coefficient, complementary_atoms

multiply_coefficients_of_atoms

multiply_coefficients_of_atoms(atoms: List[Atom], coeff: float)

Scale every atom's coefficient by a constant factor.

Parameters:

Name Type Description Default
atoms List[Atom]

The atoms to scale.

required
coeff float

The multiplicative factor applied to each coefficient.

required

Returns:

Type Description

A new list of :class:Atom objects with scaled coefficients; the

variables and signs are preserved.

Source code in pishield/linear_requirements/compute_sets_of_constraints.py
def multiply_coefficients_of_atoms(atoms: List[Atom], coeff: float):
    """Scale every atom's coefficient by a constant factor.

    Args:
        atoms: The atoms to scale.
        coeff: The multiplicative factor applied to each coefficient.

    Returns:
        A new list of :class:`Atom` objects with scaled coefficients; the
        variables and signs are preserved.
    """
    new_atoms = []
    for atom in atoms:
        variable, coefficient, positive_sign = atom.get_atom_attributes()
        new_atom = Atom(variable, coefficient*coeff, positive_sign)
        new_atoms.append(new_atom)
    return new_atoms

create_constr_by_reduction

create_constr_by_reduction(y: Variable, constraints_with_y: List[Constraint])

Eliminate a variable by pairwise combination of its requirements.

Splits the requirements containing y by the sign of y and, for every pair of a positive and a negative occurrence, derives a new requirement in which y has been cancelled out (a Fourier-Motzkin elimination step).

Parameters:

Name Type Description Default
y Variable

The variable to eliminate.

required
constraints_with_y List[Constraint]

The requirements that contain y.

required

Returns:

Type Description

A list of new :class:Constraint objects that no longer mention y.

Source code in pishield/linear_requirements/compute_sets_of_constraints.py
def create_constr_by_reduction(y: Variable, constraints_with_y: List[Constraint]):
    """Eliminate a variable by pairwise combination of its requirements.

    Splits the requirements containing ``y`` by the sign of ``y`` and, for every
    pair of a positive and a negative occurrence, derives a new requirement in
    which ``y`` has been cancelled out (a Fourier-Motzkin elimination step).

    Args:
        y: The variable to eliminate.
        constraints_with_y: The requirements that contain ``y``.

    Returns:
        A list of new :class:`Constraint` objects that no longer mention ``y``.
    """
    red_constr = []
    # separate the constraints in two sets by the sign of y (pos or neg)
    pos_constr, neg_constr = get_pos_neg_x_constr(y, constraints_with_y)

    # then obtain new constr by reduction on y from pairs of constr (p,q)
    # where p is from pos_constr and q is from neg_constr
    for p in pos_constr:
        for q in neg_constr:
            p_coeff, p_complementary_body = split_constr_atoms(y, p)
            q_coeff, q_complementary_body = split_constr_atoms(y, q)
            # if p_complementary_body == []:
            #     break
            # if q_complementary_body == []:
            #     continue

            # multiply all coeff in p by q_coeff
            p_complementary_body = multiply_coefficients_of_atoms(p_complementary_body, q_coeff/p_coeff)

            # take the union of the p and q lists of atoms,
            # excluding the atom corresponding to y (whose coefficient is now 0)
            p_complementary_body.extend(q_complementary_body)

            # merge any atom duplicates
            p_complementary_body = collapse_atoms(p_complementary_body)

            _, ineq_sign_p, constant_p = p.single_inequality.get_ineq_attributes()
            _, ineq_sign_q, constant_q = q.single_inequality.get_ineq_attributes()
            new_ineq_sign = ineq_sign_p
            new_constant = constant_p + constant_q
            if p_complementary_body != []:
                new_inequality = Inequality(p_complementary_body, new_ineq_sign, new_constant)
                red_constr.append(Constraint([new_inequality]))

    return red_constr

get_pos_neg_x_constr

get_pos_neg_x_constr(y, constraints_with_y)

Partition requirements by the sign of a variable's occurrence.

Parameters:

Name Type Description Default
y

The variable whose sign is inspected.

required
constraints_with_y

Requirements that contain y.

required

Returns:

Type Description

A tuple (pos_constr, neg_constr) of the requirements in which y

occurs with a positive and a negative coefficient, respectively.

Source code in pishield/linear_requirements/compute_sets_of_constraints.py
def get_pos_neg_x_constr(y, constraints_with_y):
    """Partition requirements by the sign of a variable's occurrence.

    Args:
        y: The variable whose sign is inspected.
        constraints_with_y: Requirements that contain ``y``.

    Returns:
        A tuple ``(pos_constr, neg_constr)`` of the requirements in which ``y``
        occurs with a positive and a negative coefficient, respectively.
    """
    pos_constr, neg_constr = [], []
    for constr in constraints_with_y:
        for atom in constr.get_body_atoms():
            if atom.variable.id == y.id:
                if atom.positive_sign:
                    pos_constr.append(constr)
                else:
                    neg_constr.append(constr)
                break
    return pos_constr, neg_constr

compute_set_of_constraints_for_variable

compute_set_of_constraints_for_variable(x: Variable, prev_x: Variable, constraints_at_previous_level: List[Constraint], verbose)

Derive the requirement set for one variable by eliminating the previous one.

Takes the requirements available at the previous level (those that bound prev_x), eliminates prev_x from the requirements that contain it, and returns the union of the requirements that did not contain prev_x with the newly reduced requirements.

Parameters:

Name Type Description Default
x Variable

The variable whose level is being computed.

required
prev_x Variable

The previously processed (higher-ranked) variable to eliminate.

required
constraints_at_previous_level List[Constraint]

Requirements available at prev_x's level.

required
verbose

Whether to print diagnostic information.

required

Returns:

Type Description

The list of :class:Constraint objects forming the requirement set at

x's level (none of which mention prev_x).

Source code in pishield/linear_requirements/compute_sets_of_constraints.py
def compute_set_of_constraints_for_variable(x: Variable, prev_x: Variable, constraints_at_previous_level: List[Constraint], verbose):
    """Derive the requirement set for one variable by eliminating the previous one.

    Takes the requirements available at the previous level (those that bound
    ``prev_x``), eliminates ``prev_x`` from the requirements that contain it, and
    returns the union of the requirements that did not contain ``prev_x`` with
    the newly reduced requirements.

    Args:
        x: The variable whose level is being computed.
        prev_x: The previously processed (higher-ranked) variable to eliminate.
        constraints_at_previous_level: Requirements available at ``prev_x``'s level.
        verbose: Whether to print diagnostic information.

    Returns:
        The list of :class:`Constraint` objects forming the requirement set at
        ``x``'s level (none of which mention ``prev_x``).
    """
    # create two sets starting from constraints_at_previous_level:
    # one containing only the constraints which variable prev_x appears in
    # and its complement
    constraints_without_prev = []
    constraints_with_prev = []

    for constr in constraints_at_previous_level:
        if constr.contains_variable(prev_x):
            constraints_with_prev.append(constr)
        else:
            constraints_without_prev.append(constr)

    # then compute a new set of constraints derived by algebraic manipulation on constraints containing prev_x
    # note that this new set of constraints will not have occurrences of prev_x by construction
    reduced_constr = create_constr_by_reduction(prev_x, constraints_with_prev)

    # finally, get the union of constraints which do not contain y (constraints_without_prev U reduced_constr)
    constraints_without_prev.extend(reduced_constr)

    # if verbose:
    #     print('\nLEVEL', x.readable())
    #     print('-------------------Constraints at this level:')
    #     for constr in constraints_without_prev:
    #         print(constr.readable())

    return constraints_without_prev

compute_sets_of_constraints

compute_sets_of_constraints(ordering: List[Variable], constraints: List[Constraint], verbose) -> {Variable: List[Constraint]}

Compute the requirement set bounding each variable along the ordering.

Processes the variables from the highest-ranked to the lowest-ranked (the reversed ordering), at each step eliminating the previously processed variable, so that each variable is associated with the requirements that bound it in terms of lower-ranked variables only.

Parameters:

Name Type Description Default
ordering List[Variable]

The ordering of the variables.

required
constraints List[Constraint]

All linear requirements.

required
verbose

Whether to print diagnostic information.

required

Returns:

Type Description
{Variable: List[Constraint]}

A dict mapping each :class:Variable to the list of :class:Constraint

{Variable: List[Constraint]}

objects that bound it at its level.

Source code in pishield/linear_requirements/compute_sets_of_constraints.py
def compute_sets_of_constraints(ordering: List[Variable], constraints: List[Constraint], verbose) -> {Variable: List[Constraint]}:
    """Compute the requirement set bounding each variable along the ordering.

    Processes the variables from the highest-ranked to the lowest-ranked (the
    reversed ordering), at each step eliminating the previously processed
    variable, so that each variable is associated with the requirements that
    bound it in terms of lower-ranked variables only.

    Args:
        ordering: The ordering of the variables.
        constraints: All linear requirements.
        verbose: Whether to print diagnostic information.

    Returns:
        A dict mapping each :class:`Variable` to the list of :class:`Constraint`
        objects that bound it at its level.
    """
    # reverse the ordering:
    ordering = list(reversed(ordering))
    prev_x = ordering[0]

    # add all constraints for the highest ranking variable w.r.t. ordering
    ordered_constraints = {prev_x: constraints}
    print('All constraints')
    for constr in constraints:
        print(constr.readable())

    for x in ordering[1:]:
        constraints_at_previous_level = ordered_constraints[prev_x]
        set_of_constraints = compute_set_of_constraints_for_variable(x, prev_x, constraints_at_previous_level, verbose)
        ordered_constraints[x] = set_of_constraints
        prev_x = x

    # print('-'*80)
    # for x in ordering:
    #     print(f' *** Constraints for {x.readable()} ***')
    #     for i,constr in enumerate(ordered_constraints[x]):
    #         print(f'constr number {i}')
    #         print(constr.readable())
    #     if len(ordered_constraints[x]) == 0:
    #         print('empty set')
    #     print('***\n')
    # print('-'*80)

    return ordered_constraints

Correcting predictions

correct_predictions

Helpers for clipping predictions into the interval allowed by requirements.

Provides utilities to look up the requirement set bounding a given variable, to clip a variable's predicted value into the feasible interval implied by its lower and upper bounds, and to check that corrected predictions satisfy all requirements.

get_constr_at_level_x

get_constr_at_level_x(x, sets_of_constr)

Look up the requirement set associated with a given variable.

Parameters:

Name Type Description Default
x

The :class:Variable whose requirement set is requested.

required
sets_of_constr

Mapping from variables to their requirement sets, as produced by :func:compute_sets_of_constraints.

required

Returns:

Type Description

The list of requirements bounding x (matched by variable id), or

None if x is not present in the mapping.

Source code in pishield/linear_requirements/correct_predictions.py
def get_constr_at_level_x(x, sets_of_constr):
    """Look up the requirement set associated with a given variable.

    Args:
        x: The :class:`Variable` whose requirement set is requested.
        sets_of_constr: Mapping from variables to their requirement sets, as
            produced by :func:`compute_sets_of_constraints`.

    Returns:
        The list of requirements bounding ``x`` (matched by variable id), or
        ``None`` if ``x`` is not present in the mapping.
    """
    for var in sets_of_constr:
        if var.id == x.id:
            return sets_of_constr[var]

get_final_x_correction

get_final_x_correction(initial_x_val: Tensor, pos_x_corrected: Tensor, neg_x_corrected: Tensor) -> torch.Tensor

Clip a variable's value into the interval allowed by its constraints.

initial_x_val is the model's (per-sample) prediction for x; pos_x_corrected is the tightest lower bound implied by x's positive constraints, and neg_x_corrected the tightest upper bound from its negative ones. The corrected value is therefore min(max(x, lower bound), upper bound), i.e. x is only moved if it falls outside the feasible interval. A non-tensor bound (or an inf entry) means "unbounded on that side", in which case the initial value is kept.

Source code in pishield/linear_requirements/correct_predictions.py
def get_final_x_correction(initial_x_val: torch.Tensor, pos_x_corrected: torch.Tensor,
                           neg_x_corrected: torch.Tensor) -> torch.Tensor:
    """
    Clip a variable's value into the interval allowed by its constraints.

    `initial_x_val` is the model's (per-sample) prediction for x; `pos_x_corrected` is the tightest
    lower bound implied by x's positive constraints, and `neg_x_corrected` the tightest upper bound
    from its negative ones. The corrected value is therefore min(max(x, lower bound), upper bound),
    i.e. x is only moved if it falls outside the feasible interval. A non-tensor bound (or an inf
    entry) means "unbounded on that side", in which case the initial value is kept.
    """

    # Enforce the lower bound: result_1 = max(initial value, lower bound).
    if type(pos_x_corrected) is not torch.Tensor:
        result_1 = initial_x_val
    else:
        pos_x_corrected = pos_x_corrected.where(~(pos_x_corrected.isinf()), initial_x_val)
        result_1 = torch.cat([initial_x_val.unsqueeze(1), pos_x_corrected.unsqueeze(1)], dim=1).amax(dim=1)

    # Enforce the upper bound: result_2 = min(result_1, upper bound).
    if type(neg_x_corrected) is not torch.Tensor:
        result_2 = result_1
    else:
        # keep_initial_neg_mask = neg_x_corrected.isinf()
        # neg_x_corrected[keep_initial_neg_mask] = initial_x_val[keep_initial_neg_mask]
        neg_x_corrected = neg_x_corrected.where(~(neg_x_corrected.isinf()), initial_x_val)
        result_2 = torch.cat([result_1.unsqueeze(1), neg_x_corrected.unsqueeze(1)], dim=1).amin(dim=1)

    return result_2

example_predictions_heloc

example_predictions_heloc()

Load a pickled tensor of example HELOC predictions for debugging.

Returns:

Type Description

The unpickled predictions object loaded from a hard-coded local path.

Source code in pishield/linear_requirements/correct_predictions.py
def example_predictions_heloc():
    """Load a pickled tensor of example HELOC predictions for debugging.

    Returns:
        The unpickled predictions object loaded from a hard-coded local path.
    """
    # data = pd.read_csv(f"../data/heloc/test_data.csv")
    # data = data.to_numpy().astype(float)
    # return torch.tensor(data)

    data = pkl.load(open('/home/mihian/DEL_unsat/TEMP_uncons.pkl', 'rb'))
    return data

check_all_constraints_are_sat

check_all_constraints_are_sat(constraints, preds, corrected_preds, verbose=False)

Check whether corrected predictions satisfy all requirements (batch-wise).

Parameters:

Name Type Description Default
constraints

The requirements to check.

required
preds

The original (uncorrected) predictions.

required
corrected_preds

The predictions after correction.

required
verbose

Whether to print which requirements are violated/satisfied.

False

Returns:

Type Description

True if every requirement is satisfied across the whole batch after

correction, False otherwise.

Source code in pishield/linear_requirements/correct_predictions.py
def check_all_constraints_are_sat(constraints, preds, corrected_preds, verbose=False):
    """Check whether corrected predictions satisfy all requirements (batch-wise).

    Args:
        constraints: The requirements to check.
        preds: The original (uncorrected) predictions.
        corrected_preds: The predictions after correction.
        verbose: Whether to print which requirements are violated/satisfied.

    Returns:
        ``True`` if every requirement is satisfied across the whole batch after
        correction, ``False`` otherwise.
    """
    # print('sat req?:')
    for constr in constraints:
        sat = constr.check_satisfaction(preds)
        if not sat and verbose:
            print('Not satisfied!', constr.readable())

    # print('*' * 80)
    all_sat_after_correction = True
    for constr in constraints:
        sat = constr.check_satisfaction(corrected_preds)
        if not sat:
            all_sat_after_correction = False
            # print('Not satisfied!', constr.readable())
    if all_sat_after_correction and verbose:
        print('All constraints are satisfied after correction!')
    if not all_sat_after_correction:
        print('There are still constraint violations!!!')
        # with open('./TEMP_uncons.pkl', 'wb') as f:
        #     pkl.dump(preds, f, -1)
        # with open('./TEMP_cons.pkl', 'wb') as f:
        #     pkl.dump(corrected_preds, f, -1)
    return all_sat_after_correction

check_all_constraints_sat

check_all_constraints_sat(corrected_preds, constraints, error_raise=True)

Assert that corrected predictions satisfy all requirements per sample.

Parameters:

Name Type Description Default
corrected_preds

The predictions after correction.

required
constraints

The requirements to check.

required
error_raise

Retained for backward compatibility; the function raises on violation regardless of this flag.

True

Returns:

Type Description

True if no violation is found.

Raises:

Type Description
Exception

If any requirement is violated by any sample.

Source code in pishield/linear_requirements/correct_predictions.py
def check_all_constraints_sat(corrected_preds, constraints, error_raise=True):
    """Assert that corrected predictions satisfy all requirements per sample.

    Args:
        corrected_preds: The predictions after correction.
        constraints: The requirements to check.
        error_raise: Retained for backward compatibility; the function raises on
            violation regardless of this flag.

    Returns:
        ``True`` if no violation is found.

    Raises:
        Exception: If any requirement is violated by any sample.
    """
    all_sat_after_correction = True
    for constr in constraints:
        sat = constr.check_satisfaction_per_sample(corrected_preds)
        if not sat.all():
            # all_sat_after_correction = False
            print(corrected_preds[~sat], 'aaa')
            sample_sat, eval_body_value, constant, ineq_sign = constr.detailed_sample_sat_check(corrected_preds)
            print(sample_sat.all(), eval_body_value[~sample_sat], constant, ineq_sign)
            raise Exception('Not satisfied!', constr.readable())
    return all_sat_after_correction

Manual correction

manual_correct

Reference (loop-based) implementation of the prediction correction.

Provides an explicit, per-variable implementation of the correction that the :class:~pishield.linear_requirements.shield_layer.ShieldLayer performs with precomputed matrices. It clips each variable into the interval implied by its linear requirements, following the variable ordering.

get_partial_x_correction

get_partial_x_correction(x: Variable, x_positive: bool, x_constraints: List[Constraint], preds: Tensor, epsilon=1e-12) -> torch.Tensor

Compute one side of the corrected value for a variable from its requirements.

For each requirement bounding x, eliminates x from the body, weights the evaluated remainder by -1/coeff (positive x) or 1/coeff (negative x), adds the weighted constant and an epsilon margin for strict inequalities, then reduces across requirements: the tightest lower bound (max) when x_positive, the tightest upper bound (min) otherwise.

Parameters:

Name Type Description Default
x Variable

The variable being corrected.

required
x_positive bool

True to compute the lower bound from x's positive occurrences, False to compute the upper bound from its negative occurrences.

required
x_constraints List[Constraint]

The requirements bounding x on the relevant side.

required
preds Tensor

Prediction tensor of shape (batch_size, num_variables).

required
epsilon

Margin added for strict ('>') inequalities.

1e-12

Returns:

Type Description
Tensor

A per-sample tensor with the partial bound on x, or -inf /

Tensor

+inf when there is no applicable requirement.

Source code in pishield/linear_requirements/manual_correct.py
def get_partial_x_correction(x: Variable, x_positive: bool, x_constraints: List[Constraint],
                             preds: torch.Tensor, epsilon=1e-12) -> torch.Tensor:
    """Compute one side of the corrected value for a variable from its requirements.

    For each requirement bounding ``x``, eliminates ``x`` from the body, weights
    the evaluated remainder by ``-1/coeff`` (positive ``x``) or ``1/coeff``
    (negative ``x``), adds the weighted constant and an ``epsilon`` margin for
    strict inequalities, then reduces across requirements: the tightest lower
    bound (max) when ``x_positive``, the tightest upper bound (min) otherwise.

    Args:
        x: The variable being corrected.
        x_positive: ``True`` to compute the lower bound from ``x``'s positive
            occurrences, ``False`` to compute the upper bound from its negative
            occurrences.
        x_constraints: The requirements bounding ``x`` on the relevant side.
        preds: Prediction tensor of shape ``(batch_size, num_variables)``.
        epsilon: Margin added for strict (``'>'``) inequalities.

    Returns:
        A per-sample tensor with the partial bound on ``x``, or ``-inf`` /
        ``+inf`` when there is no applicable requirement.
    """
    # if x.id == 25:
    #     print('DEBUG!!!')
    if len(x_constraints) == 0:
        if x_positive:
            return -INFINITY
        else:
            return INFINITY

    # dependency_complements = [preds[:, x.id]] # Note: the original prediction cannot be here, as this function gets called twice: for pos and neg occurences of x!
    # so using the original value of x here can undo the partial correction for pos occurrences of x!
    dependency_complements = [] # size: num_atom_dependencies x B (i.e. B=batch size)
    mask_sat_batch_elem = None
    for constr in x_constraints:
        # print(constr.readable(), 'LLLLLLLLLLLLLLLL')
        mask_sat_batch_elem = None  # NOTE: the mask should be reset when a new constraint is considered, regardless of its type (ineq or disj of ineq!)
        # print(constr.single_inequality.readable(), 'AAAAAA')
        if len(constr.inequality_list) == 1:
            complement_body_atoms, x_atom, constr_constant, is_strict_inequality = constr.single_inequality.get_x_complement_body_atoms(x)
            # print('first branch', [e.readable() for e in complement_body_atoms], x_atom, constr_constant, is_strict_inequality)
        else:
            (complement_body_atoms, x_atom, constr_constant, is_strict_inequality), mask_sat_batch_elem = constr.single_inequality.get_x_complement_body_atoms(x, preds)
            # print('second branch', preds[:,2])
            if x_atom is None:
                # print('atom is none')
                # print(constr.readable(), 'AAAAAAAAAAAAAAAAAAAAAAAAAAAAAaaa!!!!!!!!!!!!!!1')
                continue
            # else:
                # print('first branch', [e.readable() for e in complement_body_atoms], x_atom, constr_constant, is_strict_inequality)

                # print(constr.readable(), 'MMMMMMMMMMMMMMMMMMMM!!!!!!!!!!!!!!1')

        # get the coefficient of variable x in constr
        x_coeff = x_atom.coefficient  # do not use self.get_signed_coefficient() here!
        # print(x_coeff,'x coeff')
        # print(constr_constant, '@')
        # if x is y1 and inequality is -y1>0, then add 0+bias to dependency_complements
        if len(complement_body_atoms) == 0:  # TODO: assumes that right hand side of ineq is 0, need to consider cases where the constant/bias is non-zero
            evaluated_complement_body = torch.zeros(preds.shape[0])  # shape (B,)
            # print('first br', evaluated_complement_body)
            # continue
        else:
            # evaluate the body of constr, after eliminating x occurrences from it
            evaluated_complement_body = eval_atoms_list(complement_body_atoms, preds)  # shape (B,)
            # print('second br', evaluated_complement_body)

        # print('x_coeff', x_coeff)
        # print('evaluated_complement_body', evaluated_complement_body)
        # weigh the evaluated complement body by the -1/(coefficient of x) if x occurs positively in constr
        # and by 1/(coefficient of x) if x occurs negatively in constr
        x_weight = -1. / x_coeff if x_positive else 1. / x_coeff
        evaluated_complement_body *= x_weight
        # print('evaluated_complement_body after weighing', evaluated_complement_body)

        # now add the weighted bias:
        evaluated_complement_body += constr_constant * (-x_weight)

        # then add/subtract epsilon if the ineq is strict: +epsilon if positive x, -epsilon otherwise
        if is_strict_inequality:
            # print(is_strict_inequality)
            evaluated_complement_body += epsilon if x_positive else -epsilon

        # if the constr is a disj of inqs, mask out the batch elements for which mask_sat_batch_elem is True
        if mask_sat_batch_elem is not None:
            evaluated_complement_body[mask_sat_batch_elem] += INFINITY * x_weight

        dependency_complements.append(evaluated_complement_body)

        # print('evaluated_complement_body after adding bias', evaluated_complement_body, dependency_complements)

    # if x.id == 25:
    #     print('END DEBUG!!!')

    if len(dependency_complements) > 1:
        dependency_complements = torch.stack(dependency_complements, dim=1)
    elif len(dependency_complements) == 1:
        dependency_complements = dependency_complements[0].unsqueeze(1)
    else:
        return -INFINITY if x_positive else INFINITY

    # print('@@@@@@@@@@@', complement_body_atoms, dependency_complements, len(dependency_complements), constr_constant)

    if x_positive:
        partially_corrected_val = dependency_complements.amax(dim=1)
        # print(partially_corrected_val, x.id, 'AAA')
    else:
        partially_corrected_val = dependency_complements.amin(dim=1)  # TODO: be careful here! the original value of h' to be corrected should not be added multiple times!! it shouldn't be added to the dependecy_complements
        # print(partially_corrected_val, 'BBB')

    # print('partially corrected val', partially_corrected_val)
    return partially_corrected_val

correct_preds

correct_preds(preds: Tensor, ordering: List[Variable], sets_of_constr: {Variable: List[Constraint]})

Correct predictions variable by variable so they satisfy all requirements.

Iterates over the variables in ordering and, for each, clips its predicted value into the [lower bound, upper bound] interval implied by its requirements, using the already-corrected values of earlier variables.

Parameters:

Name Type Description Default
preds Tensor

Prediction tensor of shape (batch_size, num_variables).

required
ordering List[Variable]

The order in which variables are corrected.

required
sets_of_constr {Variable: List[Constraint]}

Mapping from each variable to the requirements bounding it, as produced by :func:compute_sets_of_constraints.

required

Returns:

Type Description

A tensor of the same shape as preds whose entries satisfy all linear

requirements.

Source code in pishield/linear_requirements/manual_correct.py
def correct_preds(preds: torch.Tensor, ordering: List[Variable], sets_of_constr: {Variable: List[Constraint]}):
    """Correct predictions variable by variable so they satisfy all requirements.

    Iterates over the variables in ``ordering`` and, for each, clips its
    predicted value into the ``[lower bound, upper bound]`` interval implied by
    its requirements, using the already-corrected values of earlier variables.

    Args:
        preds: Prediction tensor of shape ``(batch_size, num_variables)``.
        ordering: The order in which variables are corrected.
        sets_of_constr: Mapping from each variable to the requirements bounding
            it, as produced by :func:`compute_sets_of_constraints`.

    Returns:
        A tensor of the same shape as ``preds`` whose entries satisfy all linear
        requirements.
    """
    t1=time.time()
    # num_variables = preds.shape[-1]
    # from constraints_code.constraint_layer import ConstraintLayer
    # CL = ConstraintLayer(num_variables, ordering, sets_of_constr)
    # CL_corrected_preds = CL(preds.clone())
    # t2 = time.time()
    # print(t2-t1, 'timed')
    # return CL_corrected_preds

    # given a NN's preds [h1, h2, .., hn],
    # an ordering of the n variables and
    # a set of constraints computed at each variable w.r.t. descending order of the provided ordering
    # correct the preds according to the constraints in ascending order w.r.t. provided ordering
    corrected_preds = preds.clone()

    for x in ordering:
        pos = x.id
        x_constr = get_constr_at_level_x(x, sets_of_constr)
        if len(x_constr) == 0:
            continue
        # print(x.id, [e.readable() for e in x_constr], 'KKKK')  # .readable()],'@@@')
        pos_x_constr, neg_x_constr = get_pos_neg_x_constr(x, x_constr)

        pos_x_corrected = get_partial_x_correction(x, True, pos_x_constr, preds)
        neg_x_corrected = get_partial_x_correction(x, False, neg_x_constr, preds)

        # print('pos', [e.readable() for e in pos_x_constr], preds[pos], pos_x_corrected)
        # print('neg', [e.readable() for e in neg_x_constr], preds[pos], neg_x_corrected)

        corrected_preds[:,pos] = get_final_x_correction(preds[:,pos], pos_x_corrected, neg_x_corrected)
        preds = corrected_preds.clone()
        corrected_preds = preds.clone()

    t2 = time.time()
    print(t2-t1, 'timed')
    return corrected_preds

Feature orderings

feature_orderings

Selection of the variable ordering used to correct predictions.

Provides helpers to choose the ordering in which variables are corrected by the Shield Layer: either the ordering given in the requirements file or a random permutation of it.

set_random_ordering

set_random_ordering(ordering: List[Variable])

Shuffle a variable ordering in place into a random permutation.

Parameters:

Name Type Description Default
ordering List[Variable]

The list of variables to shuffle (modified in place).

required

Returns:

Type Description

The same list, randomly permuted.

Source code in pishield/linear_requirements/feature_orderings.py
def set_random_ordering(ordering: List[Variable]):
    """Shuffle a variable ordering in place into a random permutation.

    Args:
        ordering: The list of variables to shuffle (modified in place).

    Returns:
        The same list, randomly permuted.
    """
    random.shuffle(ordering) # in-place shuffling
    return ordering

set_ordering

set_ordering(ordering: List[Variable], label_ordering_choice: str)

Select the variable ordering according to the chosen strategy.

Parameters:

Name Type Description Default
ordering List[Variable]

The variable ordering parsed from the requirements file.

required
label_ordering_choice str

Either 'random' (return a random permutation) or 'given' (return the ordering unchanged).

required

Returns:

Type Description

The chosen ordering of variables, or None if label_ordering_choice

is neither 'random' nor 'given'.

Source code in pishield/linear_requirements/feature_orderings.py
def set_ordering(ordering: List[Variable], label_ordering_choice: str):
    """Select the variable ordering according to the chosen strategy.

    Args:
        ordering: The variable ordering parsed from the requirements file.
        label_ordering_choice: Either ``'random'`` (return a random permutation)
            or ``'given'`` (return the ordering unchanged).

    Returns:
        The chosen ordering of variables, or ``None`` if ``label_ordering_choice``
        is neither ``'random'`` nor ``'given'``.
    """
    if label_ordering_choice == 'random':
        ordering = set_random_ordering(ordering)
        return ordering
    elif label_ordering_choice == 'given':
        return ordering

Utilities

utils

Utility functions for evaluating atoms and checking requirement satisfaction.

eval_atoms_list

eval_atoms_list(atoms_list: List, preds: Tensor, reduction='sum')

Evaluate a list of atoms against a batch of predictions.

Each atom is evaluated by multiplying its variable's predicted value by the atom's signed coefficient; the per-atom values are then reduced across atoms.

Parameters:

Name Type Description Default
atoms_list List

The :class:Atom objects to evaluate (the body of an inequality). An empty list evaluates to zeros.

required
preds Tensor

Prediction tensor of shape (batch_size, num_variables).

required
reduction

How to combine the per-atom values. Only 'sum' is supported.

'sum'

Returns:

Type Description

A tensor of shape (batch_size,) with the reduced atom values.

Raises:

Type Description
Exception

If reduction is not 'sum'.

Source code in pishield/linear_requirements/utils.py
def eval_atoms_list(atoms_list: List, preds: torch.Tensor, reduction='sum'):
    """Evaluate a list of atoms against a batch of predictions.

    Each atom is evaluated by multiplying its variable's predicted value by the
    atom's signed coefficient; the per-atom values are then reduced across atoms.

    Args:
        atoms_list: The :class:`Atom` objects to evaluate (the body of an
            inequality). An empty list evaluates to zeros.
        preds: Prediction tensor of shape ``(batch_size, num_variables)``.
        reduction: How to combine the per-atom values. Only ``'sum'`` is
            supported.

    Returns:
        A tensor of shape ``(batch_size,)`` with the reduced atom values.

    Raises:
        Exception: If ``reduction`` is not ``'sum'``.
    """
    evaluated_atoms = []
    for atom in atoms_list:
        atom_value = preds[:, atom.variable.id]
        evaluated_atoms.append(atom.eval(atom_value))

    if evaluated_atoms == []:
        return torch.zeros(preds.shape[0])

    evaluated_atoms = torch.stack(evaluated_atoms, dim=1)
    if reduction == 'sum':
        result = evaluated_atoms.sum(dim=1)
    else:
        raise Exception(f'{reduction} reduction not implemented!')
    return result

check_constraint_satisfaction

check_constraint_satisfaction(preds: Tensor, constraints: List) -> bool

Check whether predictions satisfy all requirements, printing the outcome.

Parameters:

Name Type Description Default
preds Tensor

Prediction tensor of shape (batch_size, num_variables).

required
constraints List

The requirements to check.

required

Returns:

Type Description
bool

True if every requirement is satisfied by every sample, False

bool

otherwise.

Source code in pishield/linear_requirements/utils.py
def check_constraint_satisfaction(preds: torch.Tensor, constraints: List) -> bool:
    """Check whether predictions satisfy all requirements, printing the outcome.

    Args:
        preds: Prediction tensor of shape ``(batch_size, num_variables)``.
        constraints: The requirements to check.

    Returns:
        ``True`` if every requirement is satisfied by every sample, ``False``
        otherwise.
    """
    all_constr_sat = True
    for constr in constraints:
        sat = constr.check_satisfaction_per_sample(preds)
        if not sat.all():
            all_constr_sat = False
            # raise Exception('Not satisfied!', constr.readable())
            print('Not satisfied!', constr.readable())
    if all_constr_sat:
        print('All constraints are satisfied!')
    return all_constr_sat