Source code for irksome.galerkin_stepper
from FIAT import (Bernstein, DiscontinuousLagrange,
GaussRadau, IntegratedLegendre, Lagrange,
NodalEnrichedElement, RestrictedElement)
from ufl.classes import Zero
from ufl import as_ufl, as_tensor
from .base_time_stepper import StageCoupledTimeStepper
from .bcs import bc2space, extract_bcs, stage2spaces4bc
from .deriv import TimeDerivative, expand_time_derivatives
from .estimate_degrees import TimeDegreeEstimator, get_degree_mapping
from .labeling import split_quadrature, as_form
from .scheme import GalerkinCollocationScheme, create_time_quadrature, ufc_line
from .tools import AI, IA, dot, fields_to_components, reshape, replace, vecconst
from .discontinuous_galerkin_stepper import getElement as getTestElement
from .integrated_lagrange import IntegratedLagrange
from .ButcherTableaux import CollocationButcherTableau
from .stage_derivative import getForm
from .stage_value import getFormStage
import numpy as np
from firedrake import TestFunction, Constant
from firedrake.bcs import EquationBCSplit
[docs]
def getTrialElement(basis_type, order):
if basis_type is not None:
basis_type = basis_type.lower()
if basis_type == "bernstein":
return Bernstein(ufc_line, order)
elif basis_type == "integral":
return IntegratedLegendre(ufc_line, order)
else:
# Let recursivenodes handle the general case
variant = None if basis_type == "lagrange" else basis_type
return Lagrange(ufc_line, order, variant=variant)
[docs]
def getElements(basis_type, order):
if isinstance(basis_type, (tuple, list)):
trial_type, test_type = basis_type
else:
trial_type = basis_type
test_type = basis_type
L_test = getTestElement(test_type, order-1)
# Deal with GalerkinCollocationScheme
if trial_type == "deriv":
L_trial = IntegratedLagrange(L_test)
elif trial_type == "value":
if len(L_test.entity_dofs()[0][0]) != 0:
# Confluent case
H = IntegratedLagrange(L_test)
indices = [k for k, node in enumerate(H.dual)
if (0.0,) in node.deriv_dict]
R = RestrictedElement(H, indices=indices)
else:
CG = getTrialElement("spectral", order)
R = RestrictedElement(CG, indices=CG.entity_dofs()[0][0])
L_trial = NodalEnrichedElement(R, L_test)
else:
L_trial = getTrialElement(trial_type, order)
return L_trial, L_test
[docs]
def getTermGalerkin(F, L_trial, L_test, Q, t, dt, u0, stages, test, aux_indices):
# preprocess time derivatives
F = expand_time_derivatives(F, t=t, timedep_coeffs=(u0,))
v, = F.arguments()
V = v.function_space()
assert V == u0.function_space()
i0, = L_trial.entity_dofs()[0][0]
qpts = Q.get_points()
qwts = Q.get_weights()
assert qpts.size >= L_test.space_dimension()
tabulate_trials = L_trial.tabulate(1, qpts)
trial_vals = tabulate_trials[(0,)]
trial_dvals = tabulate_trials[(1,)]
test_vals = L_test.tabulate(0, qpts)[(0,)]
test_vals_w = np.multiply(test_vals, qwts)
trial_vals = vecconst(trial_vals)
trial_dvals = vecconst(trial_dvals)
test_vals = vecconst(test_vals)
test_vals_w = vecconst(test_vals_w)
qpts = vecconst(np.reshape(qpts, (-1,)))
# set up the pieces we need to work with to do our substitutions
v_np = reshape(test, (-1, *v.ufl_shape))
w_np = reshape(stages, (-1, *u0.ufl_shape))
u_np = np.insert(w_np, i0, reshape(u0, (1, *u0.ufl_shape)), axis=0)
vsub = dot(test_vals_w.T, v_np)
usub = dot(trial_vals.T, u_np)
dtu0sub = dot(trial_dvals.T, u_np)
dtu0 = TimeDerivative(u0)
# discretize the auxiliary fields in the DG test space
if aux_indices is not None:
aux_components = fields_to_components(V, aux_indices)
usub[:, aux_components] = dot(test_vals.T, w_np[:, aux_components])
# now loop over quadrature points
repl = {}
for q in range(len(qpts)):
repl[q] = {t: t + qpts[q] * dt,
v: vsub[q] * dt,
u0: usub[q],
dtu0: dtu0sub[q] / dt}
Fnew = sum(replace(F, repl[q]) for q in repl)
return Fnew
[docs]
def getFormGalerkin(F, L_trial, L_test, Qdefault, t, dt, u0, stages, bcs=None, bc_type=None, aux_indices=None):
"""Given a time-dependent variational form, trial and test spaces, and
a quadrature rule, produce UFL for the Galerkin-in-Time method.
:arg F: UFL form for the semidiscrete ODE/DAE
:arg L_trial: A :class:`FIAT.FiniteElement` for the trial functions in time
:arg L_test: A :class:`FIAT.FinteElement` for the test functions in time
:arg Qdefault: A :class:`FIAT.QuadratureRule` for the time integration.
This rule will be used for all terms in the semidiscrete
variational form that aren't specifically tagged with another
quadrature rule.
:arg t: a :class:`Function` on the Real space over the same mesh as
`u0`. This serves as a variable referring to the current time.
:arg dt: a :class:`Function` on the Real space over the same mesh as
`u0`. This serves as a variable referring to the current time step.
The user may adjust this value between time steps.
:arg u0: a :class:`Function` referring to the state of
the PDE system at time `t`
:arg stages: a :class:`Function` representing the stages to be solved for.
:kwarg bcs: optionally, a :class:`DirichletBC` object (or iterable thereof)
containing (possibly time-dependent) boundary conditions imposed
on the system.
:kwarg bc_type: How to manipulate the strongly-enforced boundary
conditions to derive the stage boundary conditions. Should
be a string, either "DAE", which implements BCs as
constraints in the style of a differential-algebraic
equation, or "ODE", which evaluates the temporal degrees of
freedom of the boundary data.
Currently there is no support for `firedrake.EquationBC`.
:kwarg aux_indices: a list of field indices to be discretized in the test space
rather than trial space.
:returns: a 2-tuple of
- `Fnew`, the :class:`Form` corresponding to the Galerkin-in-Time discretized problem
- `bcnew`, a list of :class:`firedrake.DirichletBC` objects to be posed
on the Galerkin-in-time solution
"""
if bc_type is None:
bc_type = "DAE"
assert L_test.get_reference_element() == L_trial.get_reference_element()
assert L_trial.space_dimension() == L_test.space_dimension() + 1
Vbig = stages.function_space()
test = TestFunction(Vbig)
degree_mapping = get_degree_mapping(as_form(F), L_test.degree(), L_trial.degree(), t=t, timedep_coeffs=(u0,))
degree_estimator = TimeDegreeEstimator(degree_mapping=degree_mapping)
splitting = split_quadrature(F, degree_estimator=degree_estimator, Qdefault=Qdefault)
Fnew = sum(getTermGalerkin(Fcur, L_trial, L_test, Q, t, dt, u0, stages, test, aux_indices)
for Q, Fcur in splitting.items())
# Oh, honey, is it the boundary conditions?
V = u0.function_space()
if bcs is None:
bcs = []
bcs = extract_bcs(bcs)
bcsnew = []
# list of dictionaries mapping time coordinates to weights to evaluate DOFs
test_dicts = [{Constant(c): Constant(sum(w for (w, *_) in wts))
for (c,), wts in node.pt_dict.items()} for node in L_test.dual_basis()]
def evaluate_dof(dof, g):
if isinstance(g, Zero):
return g
return sum(replace(g, {t: t + c * dt}) * w for c, w in dof.items())
aux_bc_data = lambda bc: [evaluate_dof(dof, as_ufl(bc._original_arg)) for dof in test_dicts]
i0, = L_trial.entity_dofs()[0][0]
if bc_type == "ODE" or len(bcs) == 0:
nodes = list(L_trial.dual_basis())
del nodes[i0]
trial_dicts = [{Constant(c): Constant(sum(w for (w, *_) in wts))
for (c,), wts in node.pt_dict.items()} for node in nodes]
dtrial_dicts = [{Constant(c): Constant(sum(w for (w, *_) in wts))
for (c,), wts in node.deriv_dict.items()} for node in nodes]
def evaluate_trial_dof(bc, value_dict, deriv_dict):
g = as_ufl(bc._original_arg)
gi = evaluate_dof(value_dict, g)
if deriv_dict:
gt = expand_time_derivatives(TimeDerivative(g), t=t, timedep_coeffs=(u0,))
gi += dt * evaluate_dof(deriv_dict, gt)
return gi
bc_data = lambda bc: [evaluate_trial_dof(bc, pt, dpt) for pt, dpt in zip(trial_dicts, dtrial_dicts)]
elif bc_type == "DAE":
if Qdefault is None or isinstance(Qdefault, str):
# create a quadrature for the boundary conditions
bc_degree = max(max(degree_estimator(as_ufl(bc._original_arg)) for bc in bcs), L_trial.degree())
Qdefault = create_time_quadrature(bc_degree + L_test.degree(), scheme=Qdefault)
# mass-ish matrix for BC, based on default quadrature rule
qpts = Qdefault.get_points()
qwts = Qdefault.get_weights()
trial_vals = L_trial.tabulate(0, qpts)[(0,)]
test_vals = L_test.tabulate(0, qpts)[(0,)]
test_vals_w = np.multiply(test_vals, qwts)
mmat = test_vals_w @ np.delete(trial_vals, i0, axis=0).T
try:
trial_proj = vecconst(np.linalg.solve(mmat, test_vals_w))
except np.linalg.LinAlgError:
raise NotImplementedError("Cannot have DAE BCs for this basis type")
trial_vals0 = vecconst(trial_vals[i0])
qpts = vecconst(qpts.reshape((-1,)))
def bc_data(bc):
g = as_ufl(bc._original_arg)
if isinstance(g, Zero):
return [g]*len(test_dicts)
ucur = bc2space(bc, u0)
gq = np.array([replace(g, {t: t + q * dt}) - phi0 * ucur
for phi0, q in zip(trial_vals0, qpts)])
return dot(trial_proj, gq)
else:
raise ValueError(f"Unrecognised bc_type: {bc_type}")
num_stages = L_test.space_dimension()
for bc in bcs:
if isinstance(bc, EquationBCSplit):
raise NotImplementedError("EquationBC not implemented for Galerkin-in-Time")
if aux_indices and bc.function_space().index in aux_indices:
g_np = aux_bc_data(bc)
else:
g_np = bc_data(bc)
for i in range(num_stages):
Vbigi = stage2spaces4bc(bc, V, Vbig, i)
bcsnew.append(bc.reconstruct(V=Vbigi, g=as_tensor(g_np[i])))
return Fnew, bcsnew
[docs]
class ContinuousPetrovGalerkinTimeStepper(StageCoupledTimeStepper):
"""Front-end class for advancing a time-dependent PDE via a Galerkin
in time method
:arg F: A :class:`ufl.Form` instance describing the semi-discrete problem
F(t, u; v) == 0, where `u` is the unknown
:class:`firedrake.Function and `v` is the
:class:firedrake.TestFunction`.
:arg scheme: :class:`ContinuousPetrovGalerkinScheme` encoding the order,
basis type, and default quadrature rule of the method.
:arg t: a :class:`Function` on the Real space over the same mesh as
`u0`. This serves as a variable referring to the current time.
:arg dt: a :class:`Function` on the Real space over the same mesh as
`u0`. This serves as a variable referring to the current time step.
The user may adjust this value between time steps.
:arg u0: A :class:`firedrake.Function` containing the current
state of the problem to be solved.
:kwarg bcs: An iterable of :class:`firedrake.DirichletBC` containing
the strongly-enforced boundary conditions. Irksome will
manipulate these to obtain boundary conditions for each
stage of the method.
:kwarg bc_type: How to manipulate the strongly-enforced boundary
conditions to derive the stage boundary conditions. Should
be a string, either "DAE", which implements BCs as
constraints in the style of a differential-algebraic
equation, or "ODE", which evaluates the temporal degrees of
freedom of the boundary data.
Currently there is no support for `firedrake.EquationBC`.
:kwarg solver_parameters: A :class:`dict` of solver parameters that
will be used in solving the algebraic problem associated
with each time step.
:kwarg appctx: An optional :class:`dict` containing application context.
This gets included with particular things that Irksome will
pass into the nonlinear solver so that, say, user-defined preconditioners
have access to it.
:kwarg nullspace: A list of tuples of the form (index, VSB) where
index is an index into the function space associated with
`u` and VSB is a :class: `firedrake.VectorSpaceBasis`
instance to be passed to a
`firedrake.MixedVectorSpaceBasis` over the larger space
associated with the Runge-Kutta method
:kwarg aux_indices: a list of field indices to be discretized in the test space
rather than trial space.
"""
def __init__(self, F, scheme, t, dt, u0, bcs=None,
bc_type=None, aux_indices=None, **kwargs):
self.order = scheme.order
self.basis_type = scheme.basis_type
V = u0.function_space()
self.num_fields = len(V)
self.trial_el, self.test_el = getElements(self.basis_type, self.order)
num_stages = self.test_el.space_dimension()
quad_degree = scheme.quadrature_degree
if quad_degree is None:
quad_degree = self.trial_el.degree() + self.test_el.degree()
quad_scheme = scheme.quadrature_scheme
if quad_scheme is None and isinstance(self.test_el, GaussRadau):
quad_scheme = "radau"
if quad_degree == "auto":
quadrature = quad_scheme
else:
quadrature = create_time_quadrature(quad_degree, scheme=quad_scheme)
self.quadrature = quadrature
self.aux_indices = aux_indices
if isinstance(scheme, GalerkinCollocationScheme):
self.butcher_tableau = CollocationButcherTableau(self.test_el, None)
else:
self.butcher_tableau = None
super().__init__(F, t, dt, u0, num_stages, bcs=bcs, bc_type=bc_type, **kwargs)
self.set_initial_guess()
self.set_update_expressions()
[docs]
def get_form_and_bcs(self, stages, F=None, bcs=None,
tableau=None, basis_type=None, order=None,
quadrature=None, aux_indices=None):
F = F or self.F
if bcs is None:
bcs = self.orig_bcs
if basis_type is None:
basis_type = self.basis_type
aux_indices = aux_indices or self.aux_indices
if tableau is not None:
# Galerkin collocation is equivalent to an IRK up to row scaling
row_scale = tableau.b
def scaledIA(A):
# For stage-value the splitting exposes row scaling
A1, A2 = IA(A)
np.multiply(A1, row_scale, out=A1)
np.multiply(1/row_scale, A2, out=A2)
return A1, A2
# Construct the equivalent IRK stage residual
trial_type, test_type = basis_type
if trial_type == "value":
splitting = scaledIA
get_irk_form = getFormStage
elif trial_type == "deriv":
splitting = AI
get_irk_form = getForm
else:
raise ValueError("Expecting a GalerkinCollocationScheme")
Fnew, bcnew = get_irk_form(F, tableau, self.t, self.dt, self.u0, stages,
bcs=bcs, splitting=splitting, aux_indices=aux_indices)
if splitting != scaledIA:
v0, = F.arguments()
test, = Fnew.arguments()
test_np = reshape(test, (-1, *v0.ufl_shape))
test_np = np.multiply(vecconst(row_scale).reshape(-1, *(1,)*len(v0.ufl_shape)), test_np)
test_np = test_np.reshape(test.ufl_shape)
Fnew = replace(Fnew, {test: test_np})
return Fnew, bcnew
if order is None:
order = self.order
if basis_type == self.basis_type and order == self.order:
trial_el = self.trial_el
test_el = self.test_el
else:
trial_el, test_el = getElements(basis_type, order)
quadrature = quadrature or self.quadrature
return getFormGalerkin(F, trial_el, test_el, quadrature,
self.t, self.dt, self.u0, stages,
bcs=bcs, bc_type=self.bc_type,
aux_indices=aux_indices)
def _update(self):
for u0bit, expr in zip(self.u0.subfunctions, self.u_update):
u0bit.assign(expr)
[docs]
def set_update_expressions(self):
"""Set up symbolic expressions for the update."""
# Tabulate the trial and test basis functions at the final time
update_trial = vecconst(self.trial_el.tabulate(0, (1.0,))[(0,)])
update_test = vecconst(self.test_el.tabulate(0, (1.0,))[(0,)])
i0, = self.trial_el.entity_dofs()[0][0]
self.u_update = []
for i in range(self.num_fields):
ks = list(self.stages.subfunctions[i::self.num_fields])
if self.aux_indices and i in self.aux_indices:
wts = update_test
else:
wts = update_trial
ks.insert(i0, self.u0.subfunctions[i])
self.u_update.append(sum(w * k for w, k in zip(wts, ks)))
[docs]
def set_initial_guess(self):
"""Set a constant-in-time initial guess."""
ref_el = self.test_el.get_reference_element()
P0 = DiscontinuousLagrange(ref_el, 0)
P0 = P0.get_nodal_basis()
B = P0.get_coeffs()
test_dual = self.test_el.get_dual_set()
test_dofs = np.dot(test_dual.to_riesz(P0), B)
i0, = self.trial_el.entity_dofs()[0][0]
trial_dual = self.trial_el.get_dual_set()
trial_dofs = np.dot(trial_dual.to_riesz(P0), B)
trial_dofs = np.delete(trial_dofs, i0, axis=0)
dof = Constant(0)
for k in range(self.num_stages):
for i, u0bit in enumerate(self.u0.subfunctions):
sbit = self.stages.subfunctions[self.num_fields*k+i]
if self.aux_indices and i in self.aux_indices:
dof.assign(test_dofs[k])
else:
dof.assign(trial_dofs[k])
if abs(float(dof)) < 1E-12:
sbit.zero()
else:
sbit.assign(u0bit * dof)