travking fluid particle

This commit is contained in:
zomseffen 2021-12-20 17:21:46 +01:00
parent 8a5de47da3
commit 54bda855e5
10 changed files with 702 additions and 37 deletions

View file

@ -1,7 +1,7 @@
class FluidSimParameter:
viscosity = 0.1 / 3.0
# Pr = 1.0
Pr = 100.0
Pr = 1.0
# vc = 1.0
vc = 0.5

185
FluidSim/FluidSimulator.py Normal file
View file

@ -0,0 +1,185 @@
from FluidSim.StaggeredArray import StaggeredArray2D
import numpy as np
import scipy
import scipy.sparse
import scipy.sparse.linalg
class FluidSimulator2D:
def __init__(self, x_n: int, y_n: int):
self.x_n = x_n
self.y_n = y_n
self.array = StaggeredArray2D(self.x_n, self.y_n)
self.coordinate_array = np.zeros((x_n, y_n, 2), dtype=np.int)
for x in range(x_n):
for y in range(y_n):
self.coordinate_array[x, y, :] = x, y
def advect(self, field: np.ndarray, delta_t: float):
u_x, u_y = self.array.get_velocity_arrays()
u = np.stack([u_x, u_y], axis=2)
def runge_kutta_layer(input, time_elapsed, border_handling='clamp'):
shifted_pos = np.round(self.coordinate_array - u * time_elapsed).astype(np.int) * (u < 0) +\
(self.coordinate_array - u * time_elapsed).astype(np.int) * (u > 0) + \
self.coordinate_array * (u == 0)
# border handling
if border_handling == 'clamp':
shifted_pos = np.maximum(0, shifted_pos)
shifted_pos[:, :, 0] = np.minimum(len(input), shifted_pos[:, :, 0])
shifted_pos[:, :, 1] = np.minimum(len(input[0]), shifted_pos[:, :, 1])
pass
layer = np.zeros(field.shape, dtype=field.dtype)
for x in range(self.x_n):
for y in range(self.y_n):
layer[x, y] = field[shifted_pos[x, y][0], shifted_pos[x, y][1]] - field[x, y]
return layer
k1 = runge_kutta_layer(field, delta_t)
k2 = runge_kutta_layer(field + 0.5 * delta_t * k1, 0.5 * delta_t)
k3 = runge_kutta_layer(field + 0.75 * delta_t * k2, 0.75 * delta_t) # maybe 0.25 instead?
# new_field = field + 2.0 / 9.0 * delta_t * k1 + 3.0 / 9.0 * delta_t * k2 + 4.0 / 9.0 * delta_t * k3
new_field = field + k1
return new_field
def get_timestep(self, f, h=1.0, k_cfl=1.0):
f_length = np.max(np.sqrt(np.sum(np.square(f), axis=-1)))
u_x, u_y = self.array.get_velocity_arrays()
u_length = np.max(np.sqrt(np.square(u_x) + np.square(u_y)))
# return k_cfl * h / (u_length + np.sqrt(h + f_length))
return k_cfl * h / u_length
def update_velocity(self, timestep, border_handling='constant'):
if border_handling == 'constant':
p_diff_x = np.pad(self.array.p, [(0, 1), (0, 0)], mode='constant', constant_values=0) -\
np.pad(self.array.p, [(1, 0), (0, 0)], mode='constant', constant_values=0)
borders_fluid_x = np.pad(self.array.has_fluid, [(0, 1), (0, 0)], mode='constant', constant_values=False) +\
np.pad(self.array.has_fluid, [(1, 0), (0, 0)], mode='constant', constant_values=False)
p_diff_y = np.pad(self.array.p, [(0, 0), (0, 1)], mode='constant', constant_values=0) -\
np.pad(self.array.p, [(0, 0), (1, 0)], mode='constant', constant_values=0)
borders_fluid_y = np.pad(self.array.has_fluid, [(0, 0), (0, 1)], mode='constant', constant_values=False) +\
np.pad(self.array.has_fluid, [(0, 0), (1, 0)], mode='constant', constant_values=False)
else:
p_diff_x = 0
p_diff_y = 0
borders_fluid_x = False
borders_fluid_y = False
u_x_new = self.array.u_x - (timestep * p_diff_x) * (1.0 * borders_fluid_x)
u_y_new = self.array.u_y - (timestep * p_diff_y) * (1.0 * borders_fluid_y)
# clear all components that do not border the fluid
u_x_new *= (1.0 * borders_fluid_x)
u_y_new *= (1.0 * borders_fluid_y)
return u_x_new, u_y_new
def calculate_divergence(self, h=1.0):
dx_u = (self.array.u_x[1:, :] - self.array.u_x[:-1, :]) / h
dy_u = (self.array.u_y[:, 1:] - self.array.u_y[:, -1]) / h
return dx_u + dy_u
def pressure_solve(self, divergence):
new_p = np.zeros((self.array.x_n, self.array.y_n))
connection_matrix = np.zeros((self.array.x_n * self.array.y_n, self.array.x_n * self.array.y_n))
flat_divergence = np.zeros((self.array.x_n * self.array.y_n))
for x in range(self.array.x_n):
for y in range(self.array.y_n):
flat_divergence[x * self.array.y_n + y] = divergence[x, y]
neighbors = 4
if x == 0:
neighbors -= 1
else:
connection_matrix[x * self.array.y_n + y, (x - 1) * self.array.y_n + y] = 1
if y == 0:
neighbors -= 1
else:
connection_matrix[x * self.array.y_n + y, x * self.array.y_n + y - 1] = 1
if x == self.array.x_n - 1:
neighbors -= 1
else:
connection_matrix[x * self.array.y_n + y, (x + 1) * self.array.y_n + y] = 1
if y == self.array.y_n - 1:
neighbors -= 1
else:
connection_matrix[x * self.array.y_n + y, x * self.array.y_n + y + 1] = 1
connection_matrix[x * self.array.y_n + y, x * self.array.y_n + y] = -neighbors
p = scipy.sparse.linalg.spsolve(connection_matrix, -flat_divergence)
for x in range(self.array.x_n):
for y in range(self.array.y_n):
new_p[x, y] = p[x * self.array.y_n + y]
return new_p
def timestep(self, external_f, h=1.0, k_cfl=1.0):
"""
:param external_f: external forces to be applied
:param h: grid size
:param k_cfl: speed up multiplier (reduces accuracy if > 1.0. Does not make much sense if smaller than 1.0)
:return:
"""
delta_t = self.get_timestep(external_f, h, k_cfl)
has_fluid = self.advect(self.array.has_fluid * 1.0, delta_t)
self.array.density = self.advect(self.array.density, delta_t)
# temp_u_x = self.advect(self.array.u_x, delta_t)
# temp_u_y = self.advect(self.array.u_y, delta_t)
# self.array.u_x = temp_u_x
# self.array.u_y = temp_u_y
#TODO advect velocity
test_u = np.stack(self.array.get_velocity_arrays(), axis=2)
test = self.advect(np.stack(self.array.get_velocity_arrays(), axis=2), delta_t)
# TODO maybe use dynamic threshold to conserve amount of cells containing fluid
self.array.has_fluid = has_fluid >= 0.5
# add more stuff to advect here. For example temperature, density, and other things. Maybe advect velocity.
# self.array.u_x, self.array.u_y = self.update_velocity(delta_t)
# TODO add forces (a = F / m) -> add a to u
dx_u = (self.array.u_x[1:, :] - self.array.u_x[:-1, :]) / h
dy_u = (self.array.u_y[:, 1:] - self.array.u_y[:, -1]) / h
dx_u = self.advect(dx_u, delta_t)
dy_u = self.advect(dy_u, delta_t)
divergence = dx_u + dy_u
# divergence = self.calculate_divergence(h)
self.array.p = self.pressure_solve(divergence)
self.array.u_x, self.array.u_y = self.update_velocity(delta_t)
if __name__ == '__main__':
fs = FluidSimulator2D(50, 50)
fs.array.has_fluid[10, 10] = True
for i in range(100):
fs.timestep(0)
print(i)
print(fs.array.has_fluid[10, 10])
print(np.sum(fs.array.has_fluid * 1.0))
# test = fs.advect(fs.array.has_fluid * 1.0, 1.0)
#
# test2 = fs.update_velocity(1.0)
#
# test3 = fs.calculate_divergence()
#
# test4 = fs.pressure_solve(test3)

View file

@ -37,6 +37,10 @@ def main():
# Initial Conditions
N = np.ones((Ny, Nx, NL)) # * rho0 / NL
temperature = np.ones((Ny, Nx, NL), np.float) # * rho0 / NL
tracked_fluid = np.zeros((Ny, Nx, NL), np.float) # * rho0 / NL
tracked_fluid[50:, :, 0] = 1
has_fluid = np.ones((Ny, Nx), dtype=np.bool)
has_fluid[int(Ny/2):, :] = False
np.random.seed(42)
@ -56,13 +60,17 @@ def main():
# Cylinder boundary
X, Y = np.meshgrid(range(Nx), range(Ny))
cylinder = (X - Nx / 4) ** 2 + (Y - Ny / 2) ** 2 < (Ny / 4) ** 2
inner_cylinder = (X - Nx / 4) ** 2 + (Y - Ny / 2) ** 2 < (Ny / 4 - 2) ** 2
cylinder[:, :] = False
N[cylinder] = 0
N[0, :] = 0
N[Ny - 1, :] = 0
temperature[cylinder] = 0
tracked_fluid[cylinder] = 0
# N[int(Ny/2):, :] = 0
has_fluid[cylinder] = False
@ -98,14 +106,17 @@ def main():
temperature[:, :, i] = np.roll(temperature[:, :, i], cx, axis=1)
temperature[:, :, i] = np.roll(temperature[:, :, i], cy, axis=0)
tracked_fluid[:, :, i] = np.roll(tracked_fluid[:, :, i], cx, axis=1)
tracked_fluid[:, :, i] = np.roll(tracked_fluid[:, :, i], cy, axis=0)
# has_fluid = new_has_fluid > 0.5
# new_has_fluid[F_sum == 0] += has_fluid[F_sum == 0] * 1.0
# new_has_fluid[(np.abs(F_sum) < 0.000000001)] = 0
fluid_sum = np.sum(has_fluid * 1.0)
has_fluid = (new_has_fluid / np.sum(new_has_fluid * 1.0)) * fluid_sum
print('fluid_cells: %d' % np.sum(has_fluid * 1))
#
# fluid_sum = np.sum(has_fluid * 1.0)
# has_fluid = (new_has_fluid / np.sum(new_has_fluid * 1.0)) * fluid_sum
#
# print('fluid_cells: %d' % np.sum(has_fluid * 1))
# for i in idxs:
# N[:, :, i] *= has_fluid
@ -126,9 +137,13 @@ def main():
bndryTemp = temperature[bndry, :]
bndryTemp = bndryTemp[:, reflection_mapping]
bndryTracked = tracked_fluid[bndry, :]
bndryTracked = bndryTracked[:, reflection_mapping]
sum_f = np.sum(N)
print('Sum of Particles: %f' % sum_f)
print('Sum of Temperature: %f' % np.sum(temperature))
print('Sum of tacked particles: %f' % np.sum(tracked_fluid))
# sum_f_cyl = np.sum(N[cylinder])
# print('Sum of Forces in cylinder: %f' % sum_f_cyl)
@ -147,6 +162,9 @@ def main():
# Calculate fluid variables
rho = np.sum(N, 2)
temp_rho = np.sum(temperature, 2)
tracked_rho = np.sum(tracked_fluid, 2)
ux = np.sum(N * cxs, 2) / rho
uy = np.sum(N * cys, 2) / rho
@ -154,16 +172,28 @@ def main():
uy[(np.abs(rho) < epsilon)] = 0
g = -params.g * (temp_rho - yy / Ny)
uy1 = (uy + g * params.t1 * (tracked_rho * 0.9 + 0.1))
uy2 = (uy + g * params.t2 * (tracked_rho * 0.9 + 0.1))
# uy[np.abs(rho) >= epsilon] += g[np.abs(rho) >= epsilon] / 2.0
uy += g / 2.0
# uy += g / 2.0
# u_length = np.maximum(np.abs(ux), np.abs(uy))
u_length = np.sqrt(np.square(ux) + np.square(uy))
u_length1 = np.sqrt(np.square(ux) + np.square(uy1))
u_length2 = np.sqrt(np.square(ux) + np.square(uy2))
u_max_length1 = np.max(u_length1)
u_max_length2 = np.max(u_length2)
u_max_length = max(u_max_length1, u_max_length2)
if u_max_length > 2:
test = 1
u_max_length = np.max(u_length)
if u_max_length > np.sqrt(2):
ux = (ux / u_max_length) * np.sqrt(2)
uy = (uy / u_max_length) * np.sqrt(2)
uy1 = (uy1 / u_max_length) * np.sqrt(2)
uy2 = (uy2 / u_max_length) * np.sqrt(2)
print('max vector part: %f' % u_max_length)
# ux /= u_max_length
@ -193,13 +223,17 @@ def main():
# Apply Collision
temperature_eq = np.zeros(temperature.shape)
tracked_eq = np.zeros(temperature.shape)
Neq = np.zeros(N.shape)
for i, cx, cy, w in zip(idxs, cxs, cys, weights):
Neq[:, :, i] = rho * w * (
1 + 3 * (cx * ux + cy * uy) + 9 * (cx * ux + cy * uy) ** 2 / 2 - 3 * (ux ** 2 + uy ** 2) / 2)
1 + 3 * (cx * ux + cy * uy1) + 9 * (cx * ux + cy * uy1) ** 2 / 2 - 3 * (ux ** 2 + uy1 ** 2) / 2)
temperature_eq[:, :, i] = temp_rho * w * (
1 + 3 * (cx * ux + cy * uy) + 9 * (cx * ux + cy * uy) ** 2 / 2 - 3 * (ux ** 2 + uy ** 2) / 2)
1 + 3 * (cx * ux + cy * uy2) + 9 * (cx * ux + cy * uy2) ** 2 / 2 - 3 * (ux ** 2 + uy2 ** 2) / 2)
tracked_eq[:, :, i] = tracked_rho * w * (
1 + 3 * (cx * ux + cy * uy1) + 9 * (cx * ux + cy * uy1) ** 2 / 2 - 3 * (ux ** 2 + uy1 ** 2) / 2)
# test1 = np.sum(Neq)
test2 = np.sum(N-Neq)
if abs(test2) > 0.0001:
@ -212,10 +246,12 @@ def main():
N += -(1.0 / params.t1) * (N - Neq)
temperature += -(1.0 / params.t2) * (temperature - temperature_eq)
tracked_fluid += -(1.0 / params.t1) * (tracked_fluid - tracked_eq)
# Apply boundary
N[bndry, :] = bndryN
temperature[bndry, :] = bndryTemp
tracked_fluid[bndry, :] = bndryTracked
# temperature[0, :, 0] = 0
# temperature[1, :, 0] = 0
@ -223,8 +259,9 @@ def main():
temperature[0, :, 0] /= 2
temperature[1, :, 0] /= 2
temperature[Ny - 1, :, 0] = 1
temperature[Ny - 2, :, 0] = 1
if it <= 3000:
temperature[Ny - 1, :, 0] = 1
temperature[Ny - 2, :, 0] = 1
# n_sum = np.sum(N, 2)
# n_sum_min = np.min(n_sum)
@ -280,8 +317,12 @@ def main():
plt.subplot(2, 2, 3)
max_N = np.max(np.sum(N, 2))
plt.imshow(np.sum(N, 2) / max_N * 2.0 - 1.0, cmap='bwr')
plt.clim(-.1, .1)
plt.subplot(2, 2, 4)
plt.imshow(np.sum(tracked_fluid, 2) * 2.0 - 1.0, cmap='bwr')
plt.clim(-.1, .1)
# ax = plt.gca()
# ax.invert_yaxis()
# ax.get_xaxis().set_visible(False)

108
FluidSim/StaggeredArray.py Normal file
View file

@ -0,0 +1,108 @@
import numpy as np
from scipy.signal import convolve
class StaggeredArray2D:
def __init__(self, x_n, y_n):
"""
creates a staggered array
:param x_n: x size of the array
:param y_n: y size of the array
"""
self.x_n = x_n
self.y_n = y_n
self.p = np.zeros((x_n, y_n), dtype=np.float)
self.u_x = np.zeros((x_n + 1, y_n), dtype=np.float)
self.u_y = np.zeros((x_n, y_n + 1), dtype=np.float)
self.has_fluid = np.zeros((x_n, y_n), dtype=np.bool)
self.density = np.zeros((x_n, y_n), dtype=np.float)
def get_velocity(self, x, y):
"""
get mid point value for the coordinates
:param x: x coordinate
:param y: y coordinate
:return:
"""
assert 0 <= x < self.x_n, 'x value out of bounds!'
assert 0 <= y < self.y_n, 'y value out of bounds!'
lower_x = self.u_x[x, y]
upper_x = self.u_x[x + 1, y]
lower_y = self.u_y[x, y]
upper_y = self.u_y[x, y + 1]
return (lower_x + upper_x) / 2.0, (lower_y + upper_y) / 2.0
def get_velocity_arrays(self):
c_x = np.array([[0.5], [0.5]])
u_x = convolve(self.u_x, c_x, mode='valid')
c_y = np.array([[0.5, 0.5]])
u_y = convolve(self.u_y, c_y, mode='valid')
return u_x, u_y
class StaggeredArray3D:
def __init__(self, x_n, y_n, z_n):
"""
creates a staggered array
:param x_n: x size of the array
:param y_n: y size of the array
:param z_n: z size of the array
"""
self.x_n = x_n
self.y_n = y_n
self.z_n = z_n
self.p = np.zeros((x_n, y_n, z_n), dtype=np.float)
self.u_x = np.zeros((x_n + 1, y_n, z_n), dtype=np.float)
self.u_y = np.zeros((x_n, y_n + 1, z_n), dtype=np.float)
self.u_z = np.zeros((x_n, y_n, z_n + 1), dtype=np.float)
self.has_fluid = np.zeros((x_n, y_n, z_n), dtype=np.bool)
def get_velocity(self, x, y, z):
"""
get mid point value for the coordinates
:param x: x coordinate
:param y: y coordinate
:param z: z coordinate
:return:
"""
assert 0 <= x < self.x_n, 'x value out of bounds!'
assert 0 <= y < self.y_n, 'y value out of bounds!'
assert 0 <= z < self.z_n, 'y value out of bounds!'
lower_x = self.u_x[x, y, z]
upper_x = self.u_x[x + 1, y, z]
lower_y = self.u_y[x, y, z]
upper_y = self.u_y[x, y + 1, z]
lower_z = self.u_z[x, y, z]
upper_z = self.u_z[x, y, z + 1]
return (lower_x + upper_x) / 2.0, (lower_y + upper_y) / 2.0, (lower_z + upper_z) / 2.0
if __name__ == '__main__':
sa = StaggeredArray2D(10, 10)
for x in range(11):
for y in range(10):
sa.u_x[x, y] = y
for x in range(10):
for y in range(11):
sa.u_y[x, y] = x
ux, uy = sa.get_velocity_arrays()
ux2, uy2 = sa.get_velocity(0, 0)

0
FluidSim/__init__.py Normal file
View file