185 lines
7.5 KiB
Python
185 lines
7.5 KiB
Python
from FluidSim.StaggeredArray import StaggeredArray2D
|
|
import numpy as np
|
|
import scipy
|
|
import scipy.sparse
|
|
import scipy.sparse.linalg
|
|
|
|
class FluidSimulator2D:
|
|
def __init__(self, x_n: int, y_n: int):
|
|
self.x_n = x_n
|
|
self.y_n = y_n
|
|
|
|
self.array = StaggeredArray2D(self.x_n, self.y_n)
|
|
|
|
self.coordinate_array = np.zeros((x_n, y_n, 2), dtype=np.int)
|
|
for x in range(x_n):
|
|
for y in range(y_n):
|
|
self.coordinate_array[x, y, :] = x, y
|
|
|
|
def advect(self, field: np.ndarray, delta_t: float):
|
|
u_x, u_y = self.array.get_velocity_arrays()
|
|
u = np.stack([u_x, u_y], axis=2)
|
|
|
|
def runge_kutta_layer(input, time_elapsed, border_handling='clamp'):
|
|
shifted_pos = np.round(self.coordinate_array - u * time_elapsed).astype(np.int) * (u < 0) +\
|
|
(self.coordinate_array - u * time_elapsed).astype(np.int) * (u > 0) + \
|
|
self.coordinate_array * (u == 0)
|
|
# border handling
|
|
if border_handling == 'clamp':
|
|
shifted_pos = np.maximum(0, shifted_pos)
|
|
shifted_pos[:, :, 0] = np.minimum(len(input), shifted_pos[:, :, 0])
|
|
shifted_pos[:, :, 1] = np.minimum(len(input[0]), shifted_pos[:, :, 1])
|
|
pass
|
|
layer = np.zeros(field.shape, dtype=field.dtype)
|
|
for x in range(self.x_n):
|
|
for y in range(self.y_n):
|
|
layer[x, y] = field[shifted_pos[x, y][0], shifted_pos[x, y][1]] - field[x, y]
|
|
return layer
|
|
|
|
k1 = runge_kutta_layer(field, delta_t)
|
|
|
|
k2 = runge_kutta_layer(field + 0.5 * delta_t * k1, 0.5 * delta_t)
|
|
|
|
k3 = runge_kutta_layer(field + 0.75 * delta_t * k2, 0.75 * delta_t) # maybe 0.25 instead?
|
|
|
|
# new_field = field + 2.0 / 9.0 * delta_t * k1 + 3.0 / 9.0 * delta_t * k2 + 4.0 / 9.0 * delta_t * k3
|
|
new_field = field + k1
|
|
|
|
return new_field
|
|
|
|
def get_timestep(self, f, h=1.0, k_cfl=1.0):
|
|
f_length = np.max(np.sqrt(np.sum(np.square(f), axis=-1)))
|
|
|
|
u_x, u_y = self.array.get_velocity_arrays()
|
|
u_length = np.max(np.sqrt(np.square(u_x) + np.square(u_y)))
|
|
|
|
# return k_cfl * h / (u_length + np.sqrt(h + f_length))
|
|
return k_cfl * h / u_length
|
|
|
|
def update_velocity(self, timestep, border_handling='constant'):
|
|
if border_handling == 'constant':
|
|
p_diff_x = np.pad(self.array.p, [(0, 1), (0, 0)], mode='constant', constant_values=0) -\
|
|
np.pad(self.array.p, [(1, 0), (0, 0)], mode='constant', constant_values=0)
|
|
borders_fluid_x = np.pad(self.array.has_fluid, [(0, 1), (0, 0)], mode='constant', constant_values=False) +\
|
|
np.pad(self.array.has_fluid, [(1, 0), (0, 0)], mode='constant', constant_values=False)
|
|
|
|
p_diff_y = np.pad(self.array.p, [(0, 0), (0, 1)], mode='constant', constant_values=0) -\
|
|
np.pad(self.array.p, [(0, 0), (1, 0)], mode='constant', constant_values=0)
|
|
borders_fluid_y = np.pad(self.array.has_fluid, [(0, 0), (0, 1)], mode='constant', constant_values=False) +\
|
|
np.pad(self.array.has_fluid, [(0, 0), (1, 0)], mode='constant', constant_values=False)
|
|
else:
|
|
p_diff_x = 0
|
|
p_diff_y = 0
|
|
borders_fluid_x = False
|
|
borders_fluid_y = False
|
|
|
|
u_x_new = self.array.u_x - (timestep * p_diff_x) * (1.0 * borders_fluid_x)
|
|
u_y_new = self.array.u_y - (timestep * p_diff_y) * (1.0 * borders_fluid_y)
|
|
|
|
# clear all components that do not border the fluid
|
|
u_x_new *= (1.0 * borders_fluid_x)
|
|
u_y_new *= (1.0 * borders_fluid_y)
|
|
|
|
return u_x_new, u_y_new
|
|
|
|
def calculate_divergence(self, h=1.0):
|
|
dx_u = (self.array.u_x[1:, :] - self.array.u_x[:-1, :]) / h
|
|
dy_u = (self.array.u_y[:, 1:] - self.array.u_y[:, -1]) / h
|
|
|
|
return dx_u + dy_u
|
|
|
|
def pressure_solve(self, divergence):
|
|
new_p = np.zeros((self.array.x_n, self.array.y_n))
|
|
connection_matrix = np.zeros((self.array.x_n * self.array.y_n, self.array.x_n * self.array.y_n))
|
|
flat_divergence = np.zeros((self.array.x_n * self.array.y_n))
|
|
|
|
for x in range(self.array.x_n):
|
|
for y in range(self.array.y_n):
|
|
flat_divergence[x * self.array.y_n + y] = divergence[x, y]
|
|
|
|
neighbors = 4
|
|
if x == 0:
|
|
neighbors -= 1
|
|
else:
|
|
connection_matrix[x * self.array.y_n + y, (x - 1) * self.array.y_n + y] = 1
|
|
if y == 0:
|
|
neighbors -= 1
|
|
else:
|
|
connection_matrix[x * self.array.y_n + y, x * self.array.y_n + y - 1] = 1
|
|
if x == self.array.x_n - 1:
|
|
neighbors -= 1
|
|
else:
|
|
connection_matrix[x * self.array.y_n + y, (x + 1) * self.array.y_n + y] = 1
|
|
if y == self.array.y_n - 1:
|
|
neighbors -= 1
|
|
else:
|
|
connection_matrix[x * self.array.y_n + y, x * self.array.y_n + y + 1] = 1
|
|
|
|
connection_matrix[x * self.array.y_n + y, x * self.array.y_n + y] = -neighbors
|
|
|
|
p = scipy.sparse.linalg.spsolve(connection_matrix, -flat_divergence)
|
|
|
|
for x in range(self.array.x_n):
|
|
for y in range(self.array.y_n):
|
|
new_p[x, y] = p[x * self.array.y_n + y]
|
|
return new_p
|
|
|
|
def timestep(self, external_f, h=1.0, k_cfl=1.0):
|
|
"""
|
|
:param external_f: external forces to be applied
|
|
:param h: grid size
|
|
:param k_cfl: speed up multiplier (reduces accuracy if > 1.0. Does not make much sense if smaller than 1.0)
|
|
:return:
|
|
"""
|
|
delta_t = self.get_timestep(external_f, h, k_cfl)
|
|
|
|
has_fluid = self.advect(self.array.has_fluid * 1.0, delta_t)
|
|
self.array.density = self.advect(self.array.density, delta_t)
|
|
|
|
# temp_u_x = self.advect(self.array.u_x, delta_t)
|
|
# temp_u_y = self.advect(self.array.u_y, delta_t)
|
|
# self.array.u_x = temp_u_x
|
|
# self.array.u_y = temp_u_y
|
|
#TODO advect velocity
|
|
test_u = np.stack(self.array.get_velocity_arrays(), axis=2)
|
|
test = self.advect(np.stack(self.array.get_velocity_arrays(), axis=2), delta_t)
|
|
|
|
# TODO maybe use dynamic threshold to conserve amount of cells containing fluid
|
|
self.array.has_fluid = has_fluid >= 0.5
|
|
# add more stuff to advect here. For example temperature, density, and other things. Maybe advect velocity.
|
|
|
|
# self.array.u_x, self.array.u_y = self.update_velocity(delta_t)
|
|
# TODO add forces (a = F / m) -> add a to u
|
|
|
|
dx_u = (self.array.u_x[1:, :] - self.array.u_x[:-1, :]) / h
|
|
dy_u = (self.array.u_y[:, 1:] - self.array.u_y[:, -1]) / h
|
|
|
|
dx_u = self.advect(dx_u, delta_t)
|
|
dy_u = self.advect(dy_u, delta_t)
|
|
|
|
divergence = dx_u + dy_u
|
|
# divergence = self.calculate_divergence(h)
|
|
|
|
self.array.p = self.pressure_solve(divergence)
|
|
|
|
self.array.u_x, self.array.u_y = self.update_velocity(delta_t)
|
|
|
|
|
|
if __name__ == '__main__':
|
|
fs = FluidSimulator2D(50, 50)
|
|
|
|
fs.array.has_fluid[10, 10] = True
|
|
|
|
for i in range(100):
|
|
fs.timestep(0)
|
|
print(i)
|
|
|
|
print(fs.array.has_fluid[10, 10])
|
|
print(np.sum(fs.array.has_fluid * 1.0))
|
|
# test = fs.advect(fs.array.has_fluid * 1.0, 1.0)
|
|
#
|
|
# test2 = fs.update_velocity(1.0)
|
|
#
|
|
# test3 = fs.calculate_divergence()
|
|
#
|
|
# test4 = fs.pressure_solve(test3)
|