Continue implementation of new queue system

- Move ChunkCoordinatorBuild to Core
- Add core ChunkCoordinator
- Add Factories for ChunkCoordinator and its Builder
- Reimplement refreshChunk but in WorldUtil
- Allow custom Consumers to be used by the Queue when sent to the ChunkCoordinator
- Start switching ChunkTasks to use the new ChunkCoordinator system
- Replace GlobalBlockQueue's "empty task" system with normal sync TaskManager
- Remove lombok from the queue system
- Add back forceSync and chunkObject from LocalBlockQueue
This commit is contained in:
dordsor21
2020-07-18 12:07:56 +01:00
parent 66b94ab9f1
commit 57af50ed49
23 changed files with 625 additions and 441 deletions

View File

@ -25,17 +25,19 @@
*/
package com.plotsquared.bukkit.queue;
import com.google.common.base.Preconditions;
import com.plotsquared.bukkit.BukkitPlatform;
import com.plotsquared.core.queue.ChunkCoordinator;
import com.plotsquared.core.util.task.TaskManager;
import com.plotsquared.core.util.task.TaskTime;
import com.sk89q.worldedit.math.BlockVector2;
import com.sk89q.worldedit.world.World;
import io.papermc.lib.PaperLib;
import org.bukkit.Bukkit;
import org.bukkit.Chunk;
import org.bukkit.World;
import org.bukkit.plugin.Plugin;
import org.bukkit.plugin.java.JavaPlugin;
import org.bukkit.scheduler.BukkitRunnable;
import org.jetbrains.annotations.NotNull;
import javax.annotation.Nonnull;
import java.util.Collection;
import java.util.LinkedList;
import java.util.List;
@ -44,32 +46,7 @@ import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
/**
* Utility that allows for the loading and coordination of chunk actions
* <p>
* The coordinator takes in collection of chunk coordinates, loads them
* and allows the caller to specify a sink for the loaded chunks. The
* coordinator will prevent the chunks from being unloaded until the sink
* has fully consumed the chunk
* <p>
* Usage:
* <pre>{@code
* final ChunkCoordinator chunkCoordinator = ChunkCoordinator.builder()
* .inWorld(Objects.requireNonNull(Bukkit.getWorld("world"))).withChunk(BlockVector2.at(0, 0))
* .withConsumer(chunk -> System.out.printf("Got chunk %d;%d", chunk.getX(), chunk.getZ()))
* .withFinalAction(() -> System.out.println("All chunks have been loaded"))
* .withThrowableConsumer(throwable -> System.err.println("Something went wrong... =("))
* .withMaxIterationTime(25L)
* .build();
* chunkCoordinator.subscribeToProgress((coordinator, progress) ->
* System.out.printf("Progress: %.1f", progress * 100.0f));
* chunkCoordinator.start();
* }</pre>
*
* @author Alexander Söderberg
* @see #builder() To create a new coordinator instance
*/
public final class BukkitChunkCoordinator extends BukkitRunnable {
public final class BukkitChunkCoordinator extends ChunkCoordinator {
private final List<ProgressSubscriber> progressSubscribers = new LinkedList<>();
@ -77,52 +54,44 @@ public final class BukkitChunkCoordinator extends BukkitRunnable {
private final Queue<Chunk> availableChunks;
private final long maxIterationTime;
private final Plugin plugin;
private final Consumer<Chunk> chunkConsumer;
private final World world;
private final Consumer<BlockVector2> chunkConsumer;
private final org.bukkit.World bukkitWorld;
private final Runnable whenDone;
private final Consumer<Throwable> throwableConsumer;
private final int totalSize;
private AtomicInteger expectedSize;
private final AtomicInteger expectedSize;
private int batchSize;
private BukkitChunkCoordinator(final long maxIterationTime, final int initialBatchSize,
@NotNull final Consumer<Chunk> chunkConsumer, @NotNull final World world,
@NotNull final Collection<BlockVector2> requestedChunks, @NotNull final Runnable whenDone,
@NotNull final Consumer<Throwable> throwableConsumer) {
@Nonnull final Consumer<BlockVector2> chunkConsumer, @Nonnull final World world,
@Nonnull final Collection<BlockVector2> requestedChunks, @Nonnull final Runnable whenDone,
@Nonnull final Consumer<Throwable> throwableConsumer) {
this.requestedChunks = new LinkedBlockingQueue<>(requestedChunks);
this.availableChunks = new LinkedBlockingQueue<>();
this.totalSize = requestedChunks.size();
this.expectedSize = new AtomicInteger(this.totalSize);
this.world = world;
this.batchSize = initialBatchSize;
this.chunkConsumer = chunkConsumer;
this.maxIterationTime = maxIterationTime;
this.whenDone = whenDone;
this.throwableConsumer = throwableConsumer;
this.plugin = JavaPlugin.getPlugin(BukkitPlatform.class);
}
/**
* Create a new {@link BukkitChunkCoordinator} instance
*
* @return Coordinator builder instance
*/
@NotNull public static ChunkCoordinatorBuilder builder() {
return new ChunkCoordinatorBuilder();
this.bukkitWorld = Bukkit.getWorld(world.getName());
}
/**
* Start the coordinator instance
*/
public void start() {
@Override public void start() {
// Request initial batch
this.requestBatch();
// Wait until next tick to give the chunks a chance to be loaded
this.runTaskTimer(this.plugin, 1L, 1L);
TaskManager.runTaskLater(() -> TaskManager.runTaskRepeat(this, TaskTime.ticks(1)),
TaskTime.ticks(1));
}
@Override public void run() {
@Override public void runTask() {
Chunk chunk = this.availableChunks.poll();
if (chunk == null) {
return;
@ -132,7 +101,7 @@ public final class BukkitChunkCoordinator extends BukkitRunnable {
do {
final long start = System.currentTimeMillis();
try {
this.chunkConsumer.accept(chunk);
this.chunkConsumer.accept(BlockVector2.at(chunk.getX(), chunk.getZ()));
} catch (final Throwable throwable) {
this.throwableConsumer.accept(throwable);
}
@ -173,7 +142,7 @@ public final class BukkitChunkCoordinator extends BukkitRunnable {
BlockVector2 chunk;
for (int i = 0; i < this.batchSize && (chunk = this.requestedChunks.poll()) != null; i++) {
// This required PaperLib to be bumped to version 1.0.4 to mark the request as urgent
PaperLib.getChunkAtAsync(this.world, chunk.getX(), chunk.getZ(), true, true)
PaperLib.getChunkAtAsync(this.bukkitWorld, chunk.getX(), chunk.getZ(), true, true)
.whenComplete((chunkObject, throwable) -> {
if (throwable != null) {
throwable.printStackTrace();
@ -186,7 +155,7 @@ public final class BukkitChunkCoordinator extends BukkitRunnable {
}
}
private void processChunk(@NotNull final Chunk chunk) {
private void processChunk(@Nonnull final Chunk chunk) {
if (!chunk.isLoaded()) {
throw new IllegalArgumentException(
String.format("Chunk %d;%d is is not loaded", chunk.getX(), chunk.getZ()));
@ -195,7 +164,7 @@ public final class BukkitChunkCoordinator extends BukkitRunnable {
this.availableChunks.add(chunk);
}
private void freeChunk(@NotNull final Chunk chunk) {
private void freeChunk(@Nonnull final Chunk chunk) {
if (!chunk.isLoaded()) {
throw new IllegalArgumentException(
String.format("Chunk %d;%d is is not loaded", chunk.getX(), chunk.getZ()));
@ -208,7 +177,7 @@ public final class BukkitChunkCoordinator extends BukkitRunnable {
*
* @return Snapshot view of remaining chunk count
*/
public int getRemainingChunks() {
@Override public int getRemainingChunks() {
return this.expectedSize.get();
}
@ -217,7 +186,7 @@ public final class BukkitChunkCoordinator extends BukkitRunnable {
*
* @return Requested chunk count
*/
public int getTotalChunks() {
@Override public int getTotalChunks() {
return this.totalSize;
}
@ -227,11 +196,10 @@ public final class BukkitChunkCoordinator extends BukkitRunnable {
* @param subscriber Subscriber
*/
public void subscribeToProgress(
@NotNull final BukkitChunkCoordinator.ProgressSubscriber subscriber) {
@Nonnull final BukkitChunkCoordinator.ProgressSubscriber subscriber) {
this.progressSubscribers.add(subscriber);
}
@FunctionalInterface
public interface ProgressSubscriber {
@ -241,88 +209,9 @@ public final class BukkitChunkCoordinator extends BukkitRunnable {
* @param coordinator Coordinator instance that triggered the notification
* @param progress Progress in the range [0, 1]
*/
void notifyProgress(@NotNull final BukkitChunkCoordinator coordinator,
void notifyProgress(@Nonnull final BukkitChunkCoordinator coordinator,
final float progress);
}
public static final class ChunkCoordinatorBuilder {
private final List<BlockVector2> requestedChunks = new LinkedList<>();
private Consumer<Throwable> throwableConsumer = Throwable::printStackTrace;
private World world;
private Consumer<Chunk> chunkConsumer;
private Runnable whenDone = () -> {
};
private long maxIterationTime = 60; // A little over 1 tick;
private int initialBatchSize = 4;
private ChunkCoordinatorBuilder() {
}
@NotNull public ChunkCoordinatorBuilder inWorld(@NotNull final World world) {
this.world = Preconditions.checkNotNull(world, "World may not be null");
return this;
}
@NotNull
public ChunkCoordinatorBuilder withChunk(@NotNull final BlockVector2 chunkLocation) {
this.requestedChunks
.add(Preconditions.checkNotNull(chunkLocation, "Chunk location may not be null"));
return this;
}
@NotNull public ChunkCoordinatorBuilder withChunks(
@NotNull final Collection<BlockVector2> chunkLocations) {
chunkLocations.forEach(this::withChunk);
return this;
}
@NotNull
public ChunkCoordinatorBuilder withConsumer(@NotNull final Consumer<Chunk> chunkConsumer) {
this.chunkConsumer =
Preconditions.checkNotNull(chunkConsumer, "Chunk consumer may not be null");
return this;
}
@NotNull public ChunkCoordinatorBuilder withFinalAction(@NotNull final Runnable whenDone) {
this.whenDone = Preconditions.checkNotNull(whenDone, "Final action may not be null");
return this;
}
@NotNull public ChunkCoordinatorBuilder withMaxIterationTime(final long maxIterationTime) {
Preconditions
.checkArgument(maxIterationTime > 0, "Max iteration time must be positive");
this.maxIterationTime = maxIterationTime;
return this;
}
@NotNull public ChunkCoordinatorBuilder withInitialBatchSize(final int initialBatchSize) {
Preconditions
.checkArgument(initialBatchSize > 0, "Initial batch size must be positive");
this.initialBatchSize = initialBatchSize;
return this;
}
@NotNull public ChunkCoordinatorBuilder withThrowableConsumer(
@NotNull final Consumer<Throwable> throwableConsumer) {
this.throwableConsumer =
Preconditions.checkNotNull(throwableConsumer, "Throwable consumer may not be null");
return this;
}
@NotNull public BukkitChunkCoordinator build() {
Preconditions.checkNotNull(this.world, "No world was supplied");
Preconditions.checkNotNull(this.chunkConsumer, "No chunk consumer was supplied");
Preconditions.checkNotNull(this.whenDone, "No final action was supplied");
Preconditions
.checkNotNull(this.throwableConsumer, "No throwable consumer was supplied");
return new BukkitChunkCoordinator(this.maxIterationTime, this.initialBatchSize,
this.chunkConsumer, this.world, this.requestedChunks, this.whenDone,
this.throwableConsumer);
}
}
}

View File

@ -25,8 +25,11 @@
*/
package com.plotsquared.bukkit.queue;
import com.google.inject.Inject;
import com.plotsquared.bukkit.schematic.StateWrapper;
import com.plotsquared.bukkit.util.BukkitBlockUtil;
import com.plotsquared.core.inject.factory.ChunkCoordinatorBuilderFactory;
import com.plotsquared.core.inject.factory.ChunkCoordinatorFactory;
import com.plotsquared.core.queue.BasicQueueCoordinator;
import com.plotsquared.core.queue.LocalChunk;
import com.plotsquared.core.util.BlockUtil;
@ -42,20 +45,25 @@ import com.sk89q.worldedit.world.World;
import com.sk89q.worldedit.world.biome.BiomeType;
import com.sk89q.worldedit.world.block.BaseBlock;
import com.sk89q.worldedit.world.block.BlockState;
import org.bukkit.Bukkit;
import org.bukkit.Chunk;
import org.bukkit.Material;
import org.bukkit.block.Block;
import org.bukkit.block.Container;
import org.bukkit.block.data.BlockData;
import javax.annotation.Nonnull;
import java.util.function.Consumer;
public class BukkitQueueCoordinator extends BasicQueueCoordinator {
private final World world;
private final SideEffectSet sideEffectSet;
@Inject private ChunkCoordinatorBuilderFactory chunkCoordinatorBuilderFactory;
@Inject private ChunkCoordinatorFactory chunkCoordinatorFactory;
private Runnable whenDone;
public BukkitQueueCoordinator(World world) {
@Inject public BukkitQueueCoordinator(World world) {
super(world);
this.world = world;
sideEffectSet = SideEffectSet.none().with(SideEffect.LIGHTING, SideEffect.State.OFF)
@ -73,19 +81,18 @@ public class BukkitQueueCoordinator extends BasicQueueCoordinator {
}
@Override public boolean enqueue() {
BukkitChunkCoordinator.builder().inWorld(BukkitAdapter.adapt(world))
.withChunks(getBlockChunks().keySet()).withInitialBatchSize(3).withMaxIterationTime(40)
.withThrowableConsumer(Throwable::printStackTrace).withFinalAction(whenDone)
.withConsumer(chunk -> {
LocalChunk localChunk =
getBlockChunks().get(BlockVector2.at(chunk.getX(), chunk.getZ()));
Consumer<BlockVector2> consumer = getChunkConsumer();
if (consumer == null) {
consumer = blockVector2 -> {
LocalChunk localChunk = getBlockChunks().get(blockVector2);
if (localChunk == null) {
throw new NullPointerException(
"LocalChunk cannot be null when accessed from ChunkCoordinator");
}
World worldObj = getWorld();
int sx = chunk.getX() << 4;
int sz = chunk.getX() << 4;
org.bukkit.World bukkitWorld = null;
Chunk chunk = null;
int sx = blockVector2.getX() << 4;
int sz = blockVector2.getZ() << 4;
for (int layer = 0; layer < localChunk.getBaseblocks().length; layer++) {
BaseBlock[] blocksLayer = localChunk.getBaseblocks()[layer];
if (blocksLayer == null) {
@ -100,11 +107,19 @@ public class BukkitQueueCoordinator extends BasicQueueCoordinator {
int y = MainUtil.y_loc[layer][j];
int z = sz + MainUtil.z_loc[layer][j];
try {
worldObj.setBlock(BlockVector3.at(x, y, z), block, sideEffectSet);
world.setBlock(BlockVector3.at(x, y, z), block, sideEffectSet);
} catch (WorldEditException ignored) {
// Fallback to not so nice method
BlockData blockData = BukkitAdapter.adapt(block);
if (bukkitWorld == null) {
bukkitWorld = Bukkit.getWorld(world.getName());
}
if (chunk == null) {
chunk = bukkitWorld
.getChunkAt(blockVector2.getX(), blockVector2.getZ());
}
Block existing = chunk.getBlock(x, y, z);
final BlockState existingBaseBlock =
BukkitAdapter.adapt(existing.getBlockData());
@ -123,7 +138,7 @@ public class BukkitQueueCoordinator extends BasicQueueCoordinator {
CompoundTag tag = block.getNbtData();
StateWrapper sw = new StateWrapper(tag);
sw.restoreTag(worldObj.getName(), existing.getX(), existing.getY(),
sw.restoreTag(world.getName(), existing.getX(), existing.getY(),
existing.getZ());
}
}
@ -142,22 +157,27 @@ public class BukkitQueueCoordinator extends BasicQueueCoordinator {
int x = sx + MainUtil.x_loc[layer][j];
int y = MainUtil.y_loc[layer][j];
int z = sz + MainUtil.z_loc[layer][j];
worldObj.setBiome(BlockVector3.at(x, y, z), biome);
world.setBiome(BlockVector3.at(x, y, z), biome);
}
}
if (localChunk.getTiles().size() > 0) {
localChunk.getTiles().forEach(((blockVector3, tag) -> {
try {
BaseBlock block = worldObj.getBlock(blockVector3).toBaseBlock(tag);
worldObj.setBlock(blockVector3, block, sideEffectSet);
BaseBlock block = world.getBlock(blockVector3).toBaseBlock(tag);
world.setBlock(blockVector3, block, sideEffectSet);
} catch (WorldEditException ignored) {
StateWrapper sw = new StateWrapper(tag);
sw.restoreTag(worldObj.getName(), blockVector3.getX(),
blockVector3.getY(), blockVector3.getZ());
sw.restoreTag(world.getName(), blockVector3.getX(), blockVector3.getY(),
blockVector3.getZ());
}
}));
}
});
};
}
chunkCoordinatorBuilderFactory.create(chunkCoordinatorFactory).inWorld(world)
.withChunks(getBlockChunks().keySet()).withInitialBatchSize(3).withMaxIterationTime(40)
.withThrowableConsumer(Throwable::printStackTrace).withFinalAction(whenDone)
.withConsumer(consumer);
return super.enqueue();
}

View File

@ -39,8 +39,6 @@ import com.sk89q.worldedit.world.biome.BiomeType;
import com.sk89q.worldedit.world.block.BaseBlock;
import com.sk89q.worldedit.world.block.BlockState;
import com.sk89q.worldedit.world.block.BlockTypes;
import lombok.Getter;
import lombok.Setter;
import org.bukkit.Bukkit;
import org.bukkit.Chunk;
import org.bukkit.World;
@ -60,13 +58,21 @@ public class GenChunk extends ScopedQueueCoordinator {
public String world;
public int chunkX;
public int chunkZ;
@Getter @Setter private ChunkData chunkData = null;
private ChunkData chunkData = null;
public GenChunk() {
super(null, Location.at("", 0, 0, 0), Location.at("", 15, 255, 15));
this.biomes = Biome.values();
}
public ChunkData getChunkData() {
return this.chunkData;
}
public void setChunkData(ChunkData chunkData) {
this.chunkData = chunkData;
}
public Chunk getChunk() {
if (chunk == null) {
World worldObj = BukkitUtil.getWorld(world);