Skip to content

Utility

utility

This module provides several classes and functions to perform a number of pre-, syn-, and post-processing tasks. Users incorporate utility as required in their code, depending on what they would like to achieve.

TimestepAdaptor(dt_const, u, V, target_cfl=1.0, increase_tolerance=1.5, maximum_timestep=None)

Computes timestep based on CFL condition for provided velocity field

Parameters:

Name Type Description Default
dt_const

Constant whose value will be updated by the timestep adaptor

required
u

Velocity to base CFL condition on

required
V

FunctionSpace for reference velocity, usually velocity space

required
target_cfl

CFL number to target with chosen timestep

1.0
increase_tolerance

Maximum tolerance timestep is allowed to change by

1.5
maximum_timestep

Maximum allowable timestep

None
Source code in g-adopt/gadopt/utility.py
63
64
65
66
67
68
69
70
71
72
73
74
def __init__(self, dt_const, u, V, target_cfl=1.0, increase_tolerance=1.5, maximum_timestep=None):
    self.dt_const = dt_const
    self.u = u
    self.target_cfl = target_cfl
    self.increase_tolerance = increase_tolerance
    self.maximum_timestep = maximum_timestep
    self.mesh = V.mesh()

    # J^-1 u is a discontinuous expression, using op2.MAX it takes the maximum value
    # in all adjacent elements when interpolating it to a continuous function space
    # We do need to ensure we reset ref_vel to zero, as it also takes the max with any previous values
    self.ref_vel_interpolator = Interpolator(abs(dot(JacobianInverse(self.mesh), self.u)), V, access=op2.MAX)

CombinedSurfaceMeasure(domain, degree)

Bases: Measure

A surface measure that combines ds_v, the integral over vertical boundary facets, and ds_t and ds_b, the integral over horizontal top and bottom facets. The vertical boundary facets are identified with the same surface ids as ds_v. The top and bottom surfaces are identified via the "top" and "bottom" ids.

Source code in g-adopt/gadopt/utility.py
128
129
130
131
def __init__(self, domain, degree):
    self.ds_v = ds_v(domain=domain, degree=degree)
    self.ds_t = ds_t(domain=domain, degree=degree)
    self.ds_b = ds_b(domain=domain, degree=degree)

__rmul__(other)

This is to handle terms to be integrated over all surfaces in the form of other*ds. Here the CombinedSurfaceMeasure ds is not called, instead we just split it up as below.

Source code in g-adopt/gadopt/utility.py
141
142
143
144
def __rmul__(self, other):
    """This is to handle terms to be integrated over all surfaces in the form of other*ds.
    Here the CombinedSurfaceMeasure ds is not called, instead we just split it up as below."""
    return other*self.ds_v + other*self.ds_t + other*self.ds_b

ExtrudedFunction(*args, mesh_3d=None, **kwargs)

Bases: Function

A 2D Function that provides a 3D view on the extruded domain.

The 3D function can be accessed as ExtrudedFunction.view_3d. The 3D function resides in V x R function space, where V is the function space of the source function. The 3D function shares the data of the 2D function.

Parameters:

Name Type Description Default
mesh_3d

Extruded 3D mesh where the function will be extended to.

None
Source code in g-adopt/gadopt/utility.py
268
269
270
271
272
273
def __init__(self, *args, mesh_3d=None, **kwargs):
    # create the 2d function
    super().__init__(*args, **kwargs)
    print(*args)
    if mesh_3d is not None:
        self.view_3d = extend_function_to_3d(self, mesh_3d)

LayerAveraging(mesh, r1d=None, quad_degree=None)

A manager for computing a vertical profile of horizontal layer averages.

Parameters:

Name Type Description Default
mesh

The mesh over which to compute averages

required
r1d

An array of either depth coordinates or radii, at which to compute layer averages. If not provided, and mesh is extruded, it uses the same layer heights. If mesh is not extruded, r1d is required.

None
Source code in g-adopt/gadopt/utility.py
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
def __init__(self, mesh, r1d=None, quad_degree=None):
    self.mesh = mesh
    XYZ = SpatialCoordinate(mesh)

    if mesh.cartesian:
        self.r = XYZ[len(XYZ)-1]
    else:
        self.r = sqrt(dot(XYZ, XYZ))

    self.dx = dx
    if quad_degree is not None:
        self.dx = dx(degree=quad_degree)

    if r1d is not None:
        self.r1d = r1d
    else:
        try:
            nlayers = mesh.layers
        except AttributeError:
            raise ValueError("For non-extruded mesh need to specify depths array r1d.")
        CG1 = FunctionSpace(mesh, "CG", 1)
        r_func = Function(CG1).interpolate(self.r)
        self.r1d = r_func.dat.data[:nlayers]

    self.mass = np.zeros((2, len(self.r1d)))
    self.rhs = np.zeros(len(self.r1d))
    self._assemble_mass()

get_layer_average(T)

Compute the layer averages of Function T at the predefined depths.

Returns:

Type Description

A numpy array containing the averages.

Source code in g-adopt/gadopt/utility.py
408
409
410
411
412
413
414
415
416
def get_layer_average(self, T):
    """Compute the layer averages of `Function` T at the predefined depths.

    Returns:
      A numpy array containing the averages.
    """

    self._assemble_rhs(T)
    return solveh_banded(self.mass, self.rhs, lower=True)

extrapolate_layer_average(u, avg)

Given an array of layer averages avg, extrapolate to Function u

Source code in g-adopt/gadopt/utility.py
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
def extrapolate_layer_average(self, u, avg):
    """Given an array of layer averages avg, extrapolate to `Function` u
    """

    r = self.r
    rc = Constant(self.r1d[0])
    rn = Constant(self.r1d[1])
    rp = Constant(0.)

    u.assign(0.0)

    phi = max_value(min_value((r - rp) / (rc - rp), (rn - r) / (rn - rc)), 0)
    val = Constant(0.)

    for a, rin in zip(avg[:-1], self.r1d[1:]):
        val.assign(a)
        rn.assign(rin)
        # reconstruct this layer according to the basis function
        u.interpolate(u + val * phi)

        rp.assign(rc)
        rc.assign(rn)

    phi = max_value(min_value(1, (r - rp) / (rn - rp)), 0)
    val.assign(avg[-1])
    u.interpolate(u + val * phi)

log(*args)

Log output to stdout from root processor only

Source code in g-adopt/gadopt/utility.py
29
30
31
def log(*args):
    """Log output to stdout from root processor only"""
    PETSc.Sys.Print(*args)

depends_on(ufl_expr, terminal)

Does ufl_expr depend on terminal (Function/Constant/...)?

Source code in g-adopt/gadopt/utility.py
171
172
173
def depends_on(ufl_expr, terminal):
    """Does ufl_expr depend on terminal (Function/Constant/...)?"""
    return terminal in traverse_unique_terminals(ufl_expr)

cell_edge_integral_ratio(mesh, p)

Ratio C such that \int_f u^2 <= C Area(f)/Volume(e) \int_e u^2 for facets f, elements e and polynomials u of degree p.

See eqn. (3.7) ad table 3.1 from Hillewaert's thesis: https://www.researchgate.net/publication/260085826 and its appendix C for derivation.

Source code in g-adopt/gadopt/utility.py
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
def cell_edge_integral_ratio(mesh, p):
    r"""
    Ratio C such that \int_f u^2 <= C Area(f)/Volume(e) \int_e u^2
    for facets f, elements e and polynomials u of degree p.

    See eqn. (3.7) ad table 3.1 from Hillewaert's thesis: https://www.researchgate.net/publication/260085826
    and its appendix C for derivation."""
    cell_type = mesh.ufl_cell().cellname()
    if cell_type == "triangle":
        return (p+1)*(p+2)/2.
    elif cell_type == "quadrilateral" or cell_type == "interval * interval":
        return (p+1)**2
    elif cell_type == "triangle * interval":
        return (p+1)**2
    elif cell_type == "quadrilateral * interval":
        # if e is a wedge and f is a triangle: (p+1)**2
        # if e is a wedge and f is a quad: (p+1)*(p+2)/2
        # here we just return the largest of the the two (for p>=0)
        return (p+1)**2
    elif cell_type == "tetrahedron":
        return (p+1)*(p+3)/3
    else:
        raise NotImplementedError("Unknown cell type in mesh: {}".format(cell_type))

tensor_jump(v, n)

Jump term for vector functions based on the tensor product

\["jump"(bb u, bb n) = (bb u^+ bb n^+) + (bb u^- bb n^-)\]

This is the discrete equivalent of grad(u) as opposed to the vectorial UFL jump operator ufl.jump which represents div(u). The equivalent of nabla_grad(u) is given by tensor_jump(n, u).

Source code in g-adopt/gadopt/utility.py
219
220
221
222
223
224
225
226
227
228
229
def tensor_jump(v, n):
    r"""
    Jump term for vector functions based on the tensor product

    $$"jump"(bb u, bb n) = (bb u^+ bb n^+) + (bb u^- bb n^-)$$

    This is the discrete equivalent of grad(u) as opposed to the
    vectorial UFL jump operator `ufl.jump` which represents div(u).
    The equivalent of nabla_grad(u) is given by tensor_jump(n, u).
    """
    return outer(v('+'), n('+')) + outer(v('-'), n('-'))

extend_function_to_3d(func, mesh_extruded)

Returns a 3D view of a 2D Function on the extruded domain. The 3D function resides in V x R function space, where V is the function space of the source function. The 3D function shares the data of the 2D function.

Source code in g-adopt/gadopt/utility.py
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
def extend_function_to_3d(func, mesh_extruded):
    """
    Returns a 3D view of a 2D `Function` on the extruded domain.
    The 3D function resides in V x R function space, where V is the function
    space of the source function. The 3D function shares the data of the 2D
    function.
    """
    fs = func.function_space()
#    assert fs.mesh().geometric_dimension() == 2, 'Function must be in 2D space'
    ufl_elem = fs.ufl_element()
    family = ufl_elem.family()
    degree = ufl_elem.degree()
    name = func.name()
    if isinstance(ufl_elem, VectorElement):
        # vector function space
        fs_extended = get_functionspace(mesh_extruded, family, degree, 'R', 0, dim=2, vector=True)
    else:
        fs_extended = get_functionspace(mesh_extruded, family, degree, 'R', 0)
    func_extended = Function(fs_extended, name=name, val=func.dat._data)
    func_extended.source = func
    return func_extended

absv(u)

Component-wise absolute value of vector for SU stabilisation

Source code in g-adopt/gadopt/utility.py
457
458
459
def absv(u):
    """Component-wise absolute value of vector for SU stabilisation"""
    return as_vector([abs(ui) for ui in u])

su_nubar(u, J, Pe)

SU stabilisation viscosity as a function of velocity, Jacobian and grid Peclet number

Source code in g-adopt/gadopt/utility.py
462
463
464
465
466
467
468
469
470
471
472
473
def su_nubar(u, J, Pe):
    """SU stabilisation viscosity as a function of velocity, Jacobian and grid Peclet number"""
    # SU(PG) ala Donea & Huerta:
    # Columns of Jacobian J are the vectors that span the quad/hex
    # which can be seen as unit-vectors scaled with the dx/dy/dz in that direction (assuming physical coordinates x,y,z aligned with local coordinates)
    # thus u^T J is (dx * u , dy * v)
    # and following (2.44c) Pe = u^T J / (2*nu)
    # beta(Pe) is the xibar vector in (2.44a)
    # then we get artifical viscosity nubar from (2.49)
    beta_pe = as_vector([1/tanh(Pei+1e-6) - 1/(Pei+1e-6) for Pei in Pe])

    return dot(absv(dot(u, J)), beta_pe)/2

node_coordinates(function)

Extract mesh coordinates and interpolate them onto the relevant function space

Source code in g-adopt/gadopt/utility.py
476
477
478
479
480
481
482
483
def node_coordinates(function):
    """Extract mesh coordinates and interpolate them onto the relevant function space"""
    func_space = function.function_space()
    mesh_coords = SpatialCoordinate(func_space.mesh())

    return [
        Function(func_space).interpolate(coords).dat.data for coords in mesh_coords
    ]

interpolate_1d_profile(function, one_d_filename)

Assign a one-dimensional profile to a Function function from a file.

The function reads a one-dimensional profile (e.g., viscosity) together with the radius/height input for the profile, from a file, broadcasts the two arrays to all processes, and then interpolates this array onto the function space of function.

Parameters:

Name Type Description Default
function Function

The function onto which the 1D profile will be assigned

required
one_d_filename str

The path to the file containing the 1D radial profile

required
Note
  • This is designed to read a file with one process and distribute in parallel with MPI.
  • The input file should contain an array of radius/height and an array of values, separated by a comma.
Source code in g-adopt/gadopt/utility.py
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
def interpolate_1d_profile(function: Function, one_d_filename: str):
    """
    Assign a one-dimensional profile to a Function `function` from a file.

    The function reads a one-dimensional profile (e.g., viscosity) together with the
    radius/height input for the profile, from a file, broadcasts the two arrays to all
    processes, and then interpolates this array onto the function space of `function`.

    Args:
        function: The function onto which the 1D profile will be assigned
        one_d_filename: The path to the file containing the 1D radial profile

    Note:
        - This is designed to read a file with one process and distribute in parallel with MPI.
        - The input file should contain an array of radius/height and an array of values, separated by a comma.
    """
    mesh = extract_unique_domain(function)

    if mesh.comm.rank == 0:
        rshl, one_d_data = np.loadtxt(one_d_filename, unpack=True, delimiter=",")
        if rshl[1] < rshl[0]:
            rshl = rshl[::-1]
            one_d_data = one_d_data[::-1]
        if not np.all(np.diff(rshl) > 0):
            raise ValueError("Data must be strictly monotonous.")
    else:
        one_d_data = None
        rshl = None

    one_d_data = mesh.comm.bcast(one_d_data, root=0)
    rshl = mesh.comm.bcast(rshl, root=0)

    X = SpatialCoordinate(mesh)

    upward_coord = vertical_component(X)

    rad = Function(function.function_space()).interpolate(upward_coord)

    averager = LayerAveraging(mesh, rshl if mesh.layers is None else None)
    interpolated_visc = np.interp(averager.get_layer_average(rad), rshl, one_d_data)
    averager.extrapolate_layer_average(function, interpolated_visc)