Source code for firedrake.mg.ufl_utils

import ufl
from ufl.corealg.map_dag import map_expr_dag
from ufl.corealg.multifunction import MultiFunction
from ufl.domain import as_domain, extract_unique_domain
from ufl.duals import is_dual

from functools import singledispatch, partial
import firedrake
from firedrake.petsc import PETSc
from firedrake.solving_utils import _SNESContext
from firedrake.dmhooks import (get_transfer_manager, get_appctx, push_appctx, pop_appctx,
                               get_parent, add_hook)

from . import utils


__all__ = ["coarsen"]


class CoarseningError(Exception):
    """Exception raised when coarsening symbolic information fails."""
    pass


class CoarsenIntegrand(MultiFunction):

    """'Coarsen' a :class:`ufl.Expr` by replacing coefficients,
    arguments and domain data with coarse mesh equivalents."""

    def __init__(self, coarsen, coefficient_mapping=None):
        if coefficient_mapping is None:
            coefficient_mapping = {}
        self.coefficient_mapping = coefficient_mapping
        self.coarsen = coarsen
        super(CoarsenIntegrand, self).__init__()

    ufl_type = MultiFunction.reuse_if_untouched

    def argument(self, o):
        V = self.coarsen(o.function_space(), self.coarsen)
        return o.reconstruct(V)

    def coefficient(self, o):
        return self.coarsen(o, self.coarsen, coefficient_mapping=self.coefficient_mapping)

    def cofunction(self, o):
        return self.coarsen(o, self.coarsen, coefficient_mapping=self.coefficient_mapping)

    def geometric_quantity(self, o):
        return type(o)(self.coarsen(extract_unique_domain(o), self.coarsen))

    def circumradius(self, o):
        mesh = self.coarsen(extract_unique_domain(o), self.coarsen)
        return firedrake.Circumradius(mesh)

    def facet_normal(self, o):
        mesh = self.coarsen(extract_unique_domain(o), self.coarsen)
        return firedrake.FacetNormal(mesh)


[docs] @singledispatch def coarsen(expr, self, coefficient_mapping=None): # Default, just send it back return expr
@coarsen.register(ufl.Mesh) def coarsen_mesh(mesh, self, coefficient_mapping=None): hierarchy, level = utils.get_level(mesh) if hierarchy is None: raise CoarseningError("No mesh hierarchy available") return hierarchy[level - 1] @coarsen.register(ufl.BaseForm) @coarsen.register(ufl.classes.Expr) def coarse_expr(expr, self, coefficient_mapping=None): if expr is None: return None mapper = CoarsenIntegrand(self, coefficient_mapping) return map_expr_dag(mapper, expr) @coarsen.register(ufl.Form) def coarsen_form(form, self, coefficient_mapping=None): """Return a coarse mesh version of a form :arg form: The :class:`~ufl.classes.Form` to coarsen. :kwarg mapping: an optional map from coefficients to their coarsened equivalents. This maps over the form and replaces coefficients and arguments with their coarse mesh equivalents.""" if form is None: return None mapper = CoarsenIntegrand(self, coefficient_mapping) integrals = [] for it in form.integrals(): integrand = map_expr_dag(mapper, it.integrand()) mesh = as_domain(it) new_mesh = self(mesh, self) if isinstance(integrand, ufl.classes.Zero): continue if it.subdomain_data() is not None: raise CoarseningError("Don't know how to coarsen subdomain data") new_itg = it.reconstruct(integrand=integrand, domain=new_mesh) integrals.append(new_itg) form = ufl.Form(integrals) return form @coarsen.register(ufl.FormSum) def coarsen_formsum(form, self, coefficient_mapping=None): return type(form)(*[(self(ci, self, coefficient_mapping=coefficient_mapping), self(wi, self, coefficient_mapping=coefficient_mapping)) for ci, wi in zip(form.components(), form.weights())]) @coarsen.register(firedrake.DirichletBC) def coarsen_bc(bc, self, coefficient_mapping=None): V = self(bc.function_space(), self, coefficient_mapping=coefficient_mapping) val = self(bc.function_arg, self, coefficient_mapping=coefficient_mapping) subdomain = bc.sub_domain return type(bc)(V, val, subdomain) @coarsen.register(firedrake.EquationBC) def coarsen_equation_bc(ebc, self, coefficient_mapping=None): J = self(ebc._J.f, self, coefficient_mapping=coefficient_mapping) Jp = self(ebc._Jp.f, self, coefficient_mapping=coefficient_mapping) u = self(ebc._F.u, self, coefficient_mapping=coefficient_mapping) sub_domain = ebc._F.sub_domain bcs = [self(bc, self, coefficient_mapping=coefficient_mapping) for bc in ebc.dirichlet_bcs()] V = self(ebc._F.function_space(), self, coefficient_mapping=coefficient_mapping) lhs = self(ebc.eq.lhs, self, coefficient_mapping=coefficient_mapping) rhs = self(ebc.eq.rhs, self, coefficient_mapping=coefficient_mapping) return type(ebc)(lhs == rhs, u, sub_domain, V=V, bcs=bcs, J=J, Jp=Jp) @coarsen.register(firedrake.functionspaceimpl.WithGeometryBase) def coarsen_function_space(V, self, coefficient_mapping=None): if hasattr(V, "_coarse"): return V._coarse V_fine = V mesh_coarse = self(V_fine.mesh(), self) name = f"coarse_{V.name}" if V.name else None V_coarse = V_fine.reconstruct(mesh=mesh_coarse, name=name) V_coarse._fine = V_fine V_fine._coarse = V_coarse return V_coarse @coarsen.register(firedrake.Cofunction) @coarsen.register(firedrake.Function) def coarsen_function(expr, self, coefficient_mapping=None): if coefficient_mapping is None: coefficient_mapping = {} new = coefficient_mapping.get(expr) if new is None: Vf = expr.function_space() Vc = self(Vf, self) new = firedrake.Function(Vc, name=f"coarse_{expr.name()}") manager = get_transfer_manager(Vf.dm) if is_dual(expr): manager.restrict(expr, new) else: manager.inject(expr, new) coefficient_mapping[expr] = new return new @coarsen.register(firedrake.NonlinearVariationalProblem) def coarsen_nlvp(problem, self, coefficient_mapping=None): if hasattr(problem, "_coarse"): return problem._coarse def inject_on_restrict(fine, restriction, rscale, injection, coarse): manager = get_transfer_manager(fine) cctx = get_appctx(coarse) cmapping = cctx._coefficient_mapping if cmapping is None: return for c in cmapping: if is_dual(c): manager.restrict(c, cmapping[c]) else: manager.inject(c, cmapping[c]) # Apply bcs if cctx.pre_apply_bcs: for bc in cctx._problem.dirichlet_bcs(): bc.apply(cctx._x) dm = problem.u_restrict.function_space().dm if not dm.getAttr("_coarsen_hook"): # The hook is persistent and cumulative, but also problem-independent. # Therefore, we are only adding it once. dm.addCoarsenHook(None, inject_on_restrict) dm.setAttr("_coarsen_hook", True) if coefficient_mapping is None: coefficient_mapping = {} bcs = [self(bc, self, coefficient_mapping=coefficient_mapping) for bc in problem.bcs] F = self(problem.F, self, coefficient_mapping=coefficient_mapping) J = self(problem.J, self, coefficient_mapping=coefficient_mapping) Jp = self(problem.Jp, self, coefficient_mapping=coefficient_mapping) u = coefficient_mapping[problem.u_restrict] fine = problem problem = firedrake.NonlinearVariationalProblem(F, u, bcs=bcs, J=J, Jp=Jp, is_linear=problem.is_linear, form_compiler_parameters=problem.form_compiler_parameters) fine._coarse = problem return problem @coarsen.register(firedrake.VectorSpaceBasis) def coarsen_vectorspacebasis(basis, self, coefficient_mapping=None): # Do not add basis._vecs to the coefficient_mapping, # as they need to be normalized, and are not meant to be reinjected coarse_vecs = [self(vec, self) for vec in basis._vecs] vsb = firedrake.VectorSpaceBasis(coarse_vecs, constant=basis._constant, comm=basis.comm) vsb.orthonormalize() return vsb @coarsen.register(firedrake.MixedVectorSpaceBasis) def coarsen_mixedvectorspacebasis(mspbasis, self, coefficient_mapping=None): coarse_V = self(mspbasis._function_space, self, coefficient_mapping=coefficient_mapping) coarse_bases = [] for basis in mspbasis._bases: if isinstance(basis, firedrake.VectorSpaceBasis): coarse_bases.append(self(basis, self, coefficient_mapping=coefficient_mapping)) elif basis.index is not None: coarse_bases.append(coarse_V.sub(basis.index)) else: raise RuntimeError("MixedVectorSpaceBasis can only contain vector space bases or indexed function spaces") return firedrake.MixedVectorSpaceBasis(coarse_V, coarse_bases) @coarsen.register(_SNESContext) def coarsen_snescontext(context, self, coefficient_mapping=None): if coefficient_mapping is None: coefficient_mapping = {} # Have we already done this? coarse = context._coarse if coarse is not None: return coarse problem = self(context._problem, self, coefficient_mapping=coefficient_mapping) appctx = context.appctx new_appctx = {} for k in sorted(appctx.keys()): v = appctx[k] if k != "state": # Constructor makes this one. try: new_appctx[k] = self(v, self, coefficient_mapping=coefficient_mapping) except CoarseningError: # Assume not something that needs coarsening (e.g. float) new_appctx[k] = v _, level = utils.get_level(problem.u_restrict.function_space().mesh()) if level == 0: # Use different mat_type on coarsest level opts = PETSc.Options(context.options_prefix) if opts.getString("snes_type", "") == "fas": solver_prefix = "fas_" else: solver_prefix = "mg_" coarse_mat_type = opts.getString(f"{solver_prefix}coarse_mat_type", "") if coarse_mat_type == "": coarse_mat_type = context.mat_type default_pmat_type = context.pmat_type else: default_pmat_type = coarse_mat_type coarse_pmat_type = opts.getString(f"{solver_prefix}coarse_pmat_type", default_pmat_type) coarse_sub_mat_type = opts.getString(f"{solver_prefix}coarse_sub_mat_type", "") if coarse_sub_mat_type == "": coarse_sub_mat_type = context.sub_mat_type default_sub_pmat_type = context.sub_pmat_type else: default_sub_pmat_type = coarse_sub_mat_type coarse_sub_pmat_type = opts.getString(f"{solver_prefix}coarse_sub_pmat_type", default_sub_pmat_type or "") or None else: coarse_mat_type = context.mat_type coarse_pmat_type = context.pmat_type coarse_sub_mat_type = context.sub_mat_type coarse_sub_pmat_type = context.sub_pmat_type coarse = _SNESContext(problem, mat_type=coarse_mat_type, pmat_type=coarse_pmat_type, sub_mat_type=coarse_sub_mat_type, sub_pmat_type=coarse_sub_pmat_type, appctx=new_appctx, options_prefix=context.options_prefix, transfer_manager=context.transfer_manager, pre_apply_bcs=context.pre_apply_bcs) coarse._coefficient_mapping = coefficient_mapping coarse._fine = context context._coarse = coarse solutiondm = context._problem.u_restrict.function_space().dm parentdm = get_parent(solutiondm) # Now that we have the coarse snescontext, push it to the coarsened DMs # Otherwise they won't have the right transfer manager when they are # coarsened in turn for val in coefficient_mapping.values(): if isinstance(val, (firedrake.Function, firedrake.Cofunction)): V = val.function_space() coarseneddm = V.dm # Now attach the hook to the parent DM if get_appctx(coarseneddm) is None: push_appctx(coarseneddm, coarse) if parentdm.getAttr("__setup_hooks__"): add_hook(parentdm, teardown=partial(pop_appctx, coarseneddm, coarse)) ises = problem.J.arguments()[0].function_space()._ises coarse._nullspace = self(context._nullspace, self, coefficient_mapping=coefficient_mapping) coarse.set_nullspace(coarse._nullspace, ises, transpose=False, near=False) coarse._nullspace_T = self(context._nullspace_T, self, coefficient_mapping=coefficient_mapping) coarse.set_nullspace(coarse._nullspace_T, ises, transpose=True, near=False) coarse._near_nullspace = self(context._near_nullspace, self, coefficient_mapping=coefficient_mapping) coarse.set_nullspace(coarse._near_nullspace, ises, transpose=False, near=True) return coarse @coarsen.register(firedrake.slate.AssembledVector) def coarsen_slate_assembled_vector(tensor, self, coefficient_mapping=None): form = self(tensor.form, self, coefficient_mapping=coefficient_mapping) return type(tensor)(form) @coarsen.register(firedrake.slate.BlockAssembledVector) def coarsen_slate_block_assembled_vector(tensor, self, coefficient_mapping=None): form = self(tensor.form, self, coefficient_mapping=coefficient_mapping) block = self(tensor.block, self, coefficient_mapping=coefficient_mapping) return type(tensor)(form, *block.children, block.indices) @coarsen.register(firedrake.slate.Block) def coarsen_slate_block(tensor, self, coefficient_mapping=None): children = (self(c, self, coefficient_mapping=coefficient_mapping) for c in tensor.children) return type(tensor)(*children, indices=tensor._indices) @coarsen.register(firedrake.slate.Factorization) def coarsen_slate_factorization(tensor, self, coefficient_mapping=None): children = (self(c, self, coefficient_mapping=coefficient_mapping) for c in tensor.children) return type(tensor)(*children, decomposition=tensor.decomposition) @coarsen.register(firedrake.slate.Tensor) def coarsen_slate_tensor(tensor, self, coefficient_mapping=None): form = self(tensor.form, self, coefficient_mapping=coefficient_mapping) return type(tensor)(form, diagonal=tensor.diagonal) @coarsen.register(firedrake.slate.TensorOp) def coarsen_slate_tensor_op(tensor, self, coefficient_mapping=None): children = (self(c, self, coefficient_mapping=coefficient_mapping) for c in tensor.children) return type(tensor)(*children) class Interpolation(object): def __init__(self, Vcoarse, Vfine, manager, cbcs=None, fbcs=None): self.cprimal = firedrake.Function(Vcoarse) self.fprimal = firedrake.Function(Vfine) self.cdual = firedrake.Cofunction(Vcoarse.dual()) self.fdual = firedrake.Cofunction(Vfine.dual()) self.cbcs = cbcs or [] self.fbcs = fbcs or [] self.manager = manager def mult(self, mat, x, y, inc=False): with self.cprimal.dat.vec_wo as v: x.copy(v) self.manager.prolong(self.cprimal, self.fprimal) for bc in self.fbcs: bc.zero(self.fprimal) with self.fprimal.dat.vec_ro as v: if inc: y.axpy(1.0, v) else: v.copy(y) def multAdd(self, mat, x, y, w): if y.handle == w.handle: self.mult(mat, x, w, inc=True) else: self.mult(mat, x, w) w.axpy(1.0, y) def multTranspose(self, mat, x, y, inc=False): with self.fdual.dat.vec_wo as v: x.copy(v) self.manager.restrict(self.fdual, self.cdual) for bc in self.cbcs: bc.zero(self.cdual) with self.cdual.dat.vec_ro as v: if inc: y.axpy(1.0, v) else: v.copy(y) def multTransposeAdd(self, mat, x, y, w): if y.handle == w.handle: self.multTranspose(mat, x, w, inc=True) else: self.multTranspose(mat, x, w) w.axpy(1.0, y) class Injection(object): def __init__(self, Vcoarse, Vfine, manager, cbcs=None): self.cfn = firedrake.Function(Vcoarse) self.ffn = firedrake.Function(Vfine) self.cbcs = cbcs or [] self.manager = manager def mult(self, mat, x, y): with self.ffn.dat.vec_wo as v: x.copy(v) self.manager.inject(self.ffn, self.cfn) for bc in self.cbcs: bc.apply(self.cfn) with self.cfn.dat.vec_ro as v: v.copy(y) def create_interpolation(dmc, dmf): cctx = get_appctx(dmc) fctx = get_appctx(dmf) V_c = cctx._problem.u_restrict.function_space() V_f = fctx._problem.u_restrict.function_space() row_size = V_f.dof_dset.layout_vec.getSizes() col_size = V_c.dof_dset.layout_vec.getSizes() cbcs = tuple(cctx._problem.dirichlet_bcs()) fbcs = tuple(fctx._problem.dirichlet_bcs()) manager = get_transfer_manager(dmf) ctx = Interpolation(V_c, V_f, manager, cbcs, fbcs) mat = PETSc.Mat().create(comm=dmc.comm) mat.setSizes((row_size, col_size)) mat.setType(mat.Type.PYTHON) mat.setPythonContext(ctx) mat.setUp() if row_size == col_size: # PETSc cannot determine the coarse space if the dimensions are equal. # The coarse space is identified by the dimension of rscale, so we provide one. rscale = mat.createVecRight() rscale.set(1.0) else: rscale = None return mat, rscale def create_injection(dmc, dmf): cctx = get_appctx(dmc) fctx = get_appctx(dmf) V_c = cctx._problem.u_restrict.function_space() V_f = fctx._problem.u_restrict.function_space() row_size = V_c.dof_dset.layout_vec.getSizes() col_size = V_f.dof_dset.layout_vec.getSizes() if (V_c.ufl_element().family() == "Real" and V_f.ufl_element().family() == "Real"): assert row_size == col_size # If the coarse and fine spaces have equal size # PETSc will apply the transpose of the injection. # It does not make sense to implement Injection.multTranspose, # instead we return a concrete identity matrix. dvec = V_c.dof_dset.layout_vec.duplicate() dvec.set(1.0) return PETSc.Mat().createDiagonal(dvec) manager = get_transfer_manager(dmf) ctx = Injection(V_c, V_f, manager) mat = PETSc.Mat().create(comm=dmc.comm) mat.setSizes((row_size, col_size)) mat.setType(mat.Type.PYTHON) mat.setPythonContext(ctx) mat.setUp() return mat