Skip to content

Utility

utility

This module provides several classes and functions to perform a number of pre-, syn-, and post-processing tasks. Users incorporate utility as required in their code, depending on what they would like to achieve.

TimestepAdaptor(dt_const, u, V, target_cfl=1.0, increase_tolerance=1.5, maximum_timestep=None)

Computes timestep based on CFL condition for provided velocity field

Parameters:

Name Type Description Default
dt_const

Constant whose value will be updated by the timestep adaptor

required
u

Velocity to base CFL condition on

required
V

FunctionSpace for reference velocity, usually velocity space

required
target_cfl

CFL number to target with chosen timestep

1.0
increase_tolerance

Maximum tolerance timestep is allowed to change by

1.5
maximum_timestep

Maximum allowable timestep

None
Source code in g-adopt/gadopt/utility.py
63
64
65
66
67
68
69
70
71
72
73
74
def __init__(self, dt_const, u, V, target_cfl=1.0, increase_tolerance=1.5, maximum_timestep=None):
    self.dt_const = dt_const
    self.u = u
    self.target_cfl = target_cfl
    self.increase_tolerance = increase_tolerance
    self.maximum_timestep = maximum_timestep
    self.mesh = V.mesh()

    # J^-1 u is a discontinuous expression, using op2.MAX it takes the maximum value
    # in all adjacent elements when interpolating it to a continuous function space
    # We do need to ensure we reset ref_vel to zero, as it also takes the max with any previous values
    self.ref_vel_interpolator = Interpolator(abs(dot(JacobianInverse(self.mesh), self.u)), V, access=op2.MAX)

CombinedSurfaceMeasure(domain, degree)

Bases: Measure

A surface measure that combines ds_v, the integral over vertical boundary facets, and ds_t and ds_b, the integral over horizontal top and bottom facets. The vertical boundary facets are identified with the same surface ids as ds_v. The top and bottom surfaces are identified via the "top" and "bottom" ids.

Source code in g-adopt/gadopt/utility.py
128
129
130
131
def __init__(self, domain, degree):
    self.ds_v = ds_v(domain=domain, degree=degree)
    self.ds_t = ds_t(domain=domain, degree=degree)
    self.ds_b = ds_b(domain=domain, degree=degree)

__rmul__(other)

This is to handle terms to be integrated over all surfaces in the form of other*ds. Here the CombinedSurfaceMeasure ds is not called, instead we just split it up as below.

Source code in g-adopt/gadopt/utility.py
141
142
143
144
def __rmul__(self, other):
    """This is to handle terms to be integrated over all surfaces in the form of other*ds.
    Here the CombinedSurfaceMeasure ds is not called, instead we just split it up as below."""
    return other*self.ds_v + other*self.ds_t + other*self.ds_b

ExtrudedFunction(*args, mesh_3d=None, **kwargs)

Bases: Function

A 2D Function that provides a 3D view on the extruded domain.

The 3D function can be accessed as ExtrudedFunction.view_3d. The 3D function resides in V x R function space, where V is the function space of the source function. The 3D function shares the data of the 2D function.

Parameters:

Name Type Description Default
mesh_3d

Extruded 3D mesh where the function will be extended to.

None
Source code in g-adopt/gadopt/utility.py
243
244
245
246
247
248
def __init__(self, *args, mesh_3d=None, **kwargs):
    # create the 2d function
    super().__init__(*args, **kwargs)
    print(*args)
    if mesh_3d is not None:
        self.view_3d = extend_function_to_3d(self, mesh_3d)

LayerAveraging(mesh, r1d=None, quad_degree=None)

A manager for computing a vertical profile of horizontal layer averages.

Parameters:

Name Type Description Default
mesh

The mesh over which to compute averages

required
r1d

An array of either depth coordinates or radii, at which to compute layer averages. If not provided, and mesh is extruded, it uses the same layer heights. If mesh is not extruded, r1d is required.

None
Source code in g-adopt/gadopt/utility.py
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
def __init__(self, mesh, r1d=None, quad_degree=None):
    self.mesh = mesh
    XYZ = SpatialCoordinate(mesh)

    if mesh.cartesian:
        self.r = XYZ[len(XYZ)-1]
    else:
        self.r = sqrt(dot(XYZ, XYZ))

    self.dx = dx
    if quad_degree is not None:
        self.dx = dx(degree=quad_degree)

    if r1d is not None:
        self.r1d = r1d
    else:
        try:
            nlayers = mesh.layers
        except AttributeError:
            raise ValueError("For non-extruded mesh need to specify depths array r1d.")
        CG1 = FunctionSpace(mesh, "CG", 1)
        r_func = Function(CG1).interpolate(self.r)
        self.r1d = r_func.dat.data[:nlayers]

    self.mass = np.zeros((2, len(self.r1d)))
    self.rhs = np.zeros(len(self.r1d))
    self._assemble_mass()

get_layer_average(T)

Compute the layer averages of Function T at the predefined depths.

Returns:

Type Description

A numpy array containing the averages.

Source code in g-adopt/gadopt/utility.py
383
384
385
386
387
388
389
390
391
def get_layer_average(self, T):
    """Compute the layer averages of `Function` T at the predefined depths.

    Returns:
      A numpy array containing the averages.
    """

    self._assemble_rhs(T)
    return solveh_banded(self.mass, self.rhs, lower=True)

extrapolate_layer_average(u, avg)

Given an array of layer averages avg, extrapolate to Function u

Source code in g-adopt/gadopt/utility.py
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
def extrapolate_layer_average(self, u, avg):
    """Given an array of layer averages avg, extrapolate to `Function` u
    """

    r = self.r
    rc = Constant(self.r1d[0])
    rn = Constant(self.r1d[1])
    rp = Constant(0.)

    u.assign(0.0)

    phi = max_value(min_value((r - rp) / (rc - rp), (rn - r) / (rn - rc)), 0)
    val = Constant(0.)

    for a, rin in zip(avg[:-1], self.r1d[1:]):
        val.assign(a)
        rn.assign(rin)
        # reconstruct this layer according to the basis function
        u.interpolate(u + val * phi)

        rp.assign(rc)
        rc.assign(rn)

    phi = max_value(min_value(1, (r - rp) / (rn - rp)), 0)
    val.assign(avg[-1])
    u.interpolate(u + val * phi)

InteriorBC

Bases: DirichletBC

DirichletBC applied to anywhere that is not on the specified boundary

log(*args)

Log output to stdout from root processor only

Source code in g-adopt/gadopt/utility.py
29
30
31
def log(*args):
    """Log output to stdout from root processor only"""
    PETSc.Sys.Print(*args)

depends_on(ufl_expr, terminal)

Does ufl_expr depend on terminal (Function/Constant/...)?

Source code in g-adopt/gadopt/utility.py
171
172
173
def depends_on(ufl_expr, terminal):
    """Does ufl_expr depend on terminal (Function/Constant/...)?"""
    return terminal in traverse_unique_terminals(ufl_expr)

tensor_jump(v, n)

Jump term for vector functions based on the tensor product

\["jump"(bb u, bb n) = (bb u^+ bb n^+) + (bb u^- bb n^-)\]

This is the discrete equivalent of grad(u) as opposed to the vectorial UFL jump operator ufl.jump which represents div(u). The equivalent of nabla_grad(u) is given by tensor_jump(n, u).

Source code in g-adopt/gadopt/utility.py
194
195
196
197
198
199
200
201
202
203
204
def tensor_jump(v, n):
    r"""
    Jump term for vector functions based on the tensor product

    $$"jump"(bb u, bb n) = (bb u^+ bb n^+) + (bb u^- bb n^-)$$

    This is the discrete equivalent of grad(u) as opposed to the
    vectorial UFL jump operator `ufl.jump` which represents div(u).
    The equivalent of nabla_grad(u) is given by tensor_jump(n, u).
    """
    return outer(v('+'), n('+')) + outer(v('-'), n('-'))

extend_function_to_3d(func, mesh_extruded)

Returns a 3D view of a 2D Function on the extruded domain. The 3D function resides in V x R function space, where V is the function space of the source function. The 3D function shares the data of the 2D function.

Source code in g-adopt/gadopt/utility.py
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
def extend_function_to_3d(func, mesh_extruded):
    """
    Returns a 3D view of a 2D `Function` on the extruded domain.
    The 3D function resides in V x R function space, where V is the function
    space of the source function. The 3D function shares the data of the 2D
    function.
    """
    fs = func.function_space()
#    assert fs.mesh().geometric_dimension() == 2, 'Function must be in 2D space'
    ufl_elem = fs.ufl_element()
    family = ufl_elem.family()
    degree = ufl_elem.degree()
    name = func.name()
    if isinstance(ufl_elem, VectorElement):
        # vector function space
        fs_extended = get_functionspace(mesh_extruded, family, degree, 'R', 0, dim=2, vector=True)
    else:
        fs_extended = get_functionspace(mesh_extruded, family, degree, 'R', 0)
    func_extended = Function(fs_extended, name=name, val=func.dat._data)
    func_extended.source = func
    return func_extended

absv(u)

Component-wise absolute value of vector for SU stabilisation

Source code in g-adopt/gadopt/utility.py
439
440
441
def absv(u):
    """Component-wise absolute value of vector for SU stabilisation"""
    return as_vector([abs(ui) for ui in u])

su_nubar(u, J, Pe)

SU stabilisation viscosity as a function of velocity, Jacobian and grid Peclet number

Source code in g-adopt/gadopt/utility.py
444
445
446
447
448
449
450
451
452
453
454
455
def su_nubar(u, J, Pe):
    """SU stabilisation viscosity as a function of velocity, Jacobian and grid Peclet number"""
    # SU(PG) ala Donea & Huerta:
    # Columns of Jacobian J are the vectors that span the quad/hex
    # which can be seen as unit-vectors scaled with the dx/dy/dz in that direction (assuming physical coordinates x,y,z aligned with local coordinates)
    # thus u^T J is (dx * u , dy * v)
    # and following (2.44c) Pe = u^T J / (2*nu)
    # beta(Pe) is the xibar vector in (2.44a)
    # then we get artifical viscosity nubar from (2.49)
    beta_pe = as_vector([1/tanh(Pei+1e-6) - 1/(Pei+1e-6) for Pei in Pe])

    return dot(absv(dot(u, J)), beta_pe)/2

node_coordinates(function)

Extract mesh coordinates and interpolate them onto the relevant function space

Source code in g-adopt/gadopt/utility.py
467
468
469
470
471
472
473
474
def node_coordinates(function):
    """Extract mesh coordinates and interpolate them onto the relevant function space"""
    func_space = function.function_space()
    mesh_coords = SpatialCoordinate(func_space.mesh())

    return [
        Function(func_space).interpolate(coords).dat.data for coords in mesh_coords
    ]

interpolate_1d_profile(function, one_d_filename)

Assign a one-dimensional profile to a Function function from a file.

The function reads a one-dimensional profile (e.g., viscosity) together with the radius/height input for the profile, from a file, broadcasts the two arrays to all processes, and then interpolates this array onto the function space of function.

Parameters:

Name Type Description Default
function Function

The function onto which the 1D profile will be assigned

required
one_d_filename str

The path to the file containing the 1D radial profile

required
Note
  • This is designed to read a file with one process and distribute in parallel with MPI.
  • The input file should contain an array of radius/height and an array of values, separated by a comma.
Source code in g-adopt/gadopt/utility.py
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
def interpolate_1d_profile(function: Function, one_d_filename: str):
    """
    Assign a one-dimensional profile to a Function `function` from a file.

    The function reads a one-dimensional profile (e.g., viscosity) together with the
    radius/height input for the profile, from a file, broadcasts the two arrays to all
    processes, and then interpolates this array onto the function space of `function`.

    Args:
        function: The function onto which the 1D profile will be assigned
        one_d_filename: The path to the file containing the 1D radial profile

    Note:
        - This is designed to read a file with one process and distribute in parallel with MPI.
        - The input file should contain an array of radius/height and an array of values, separated by a comma.
    """
    mesh = extract_unique_domain(function)

    if mesh.comm.rank == 0:
        rshl, one_d_data = np.loadtxt(one_d_filename, unpack=True, delimiter=",")
        if rshl[1] < rshl[0]:
            rshl = rshl[::-1]
            one_d_data = one_d_data[::-1]
        if not np.all(np.diff(rshl) > 0):
            raise ValueError("Data must be strictly monotonous.")
    else:
        one_d_data = None
        rshl = None

    one_d_data = mesh.comm.bcast(one_d_data, root=0)
    rshl = mesh.comm.bcast(rshl, root=0)

    X = SpatialCoordinate(mesh)

    upward_coord = vertical_component(X)

    rad = Function(function.function_space()).interpolate(upward_coord)

    averager = LayerAveraging(mesh, rshl if mesh.layers is None else None)
    interpolated_visc = np.interp(averager.get_layer_average(rad), rshl, one_d_data)
    averager.extrapolate_layer_average(function, interpolated_visc)