From 8f0ae7e51d9acd05cf030ae7ecb1c6af7dcc4dca Mon Sep 17 00:00:00 2001 From: Traks <58818927+traksag@users.noreply.github.com> Date: Thu, 19 Aug 2021 13:31:47 +0200 Subject: [PATCH] Clean up chunk coordinators and queue coordinators (#3208) --- .../bukkit/queue/BukkitChunkCoordinator.java | 57 ++++++++++++++----- .../bukkit/queue/BukkitQueueCoordinator.java | 1 - .../core/queue/ChunkCoordinator.java | 24 ++------ .../core/queue/GlobalBlockQueue.java | 35 ------------ .../core/queue/QueueCoordinator.java | 15 ++++- .../subscriber/DefaultProgressSubscriber.java | 6 +- 6 files changed, 63 insertions(+), 75 deletions(-) diff --git a/Bukkit/src/main/java/com/plotsquared/bukkit/queue/BukkitChunkCoordinator.java b/Bukkit/src/main/java/com/plotsquared/bukkit/queue/BukkitChunkCoordinator.java index 77d40d6ee..2b26f1750 100644 --- a/Bukkit/src/main/java/com/plotsquared/bukkit/queue/BukkitChunkCoordinator.java +++ b/Bukkit/src/main/java/com/plotsquared/bukkit/queue/BukkitChunkCoordinator.java @@ -30,11 +30,13 @@ import com.google.inject.assistedinject.Assisted; import com.plotsquared.bukkit.BukkitPlatform; import com.plotsquared.core.queue.ChunkCoordinator; import com.plotsquared.core.queue.subscriber.ProgressSubscriber; +import com.plotsquared.core.util.task.PlotSquaredTask; import com.plotsquared.core.util.task.TaskManager; import com.plotsquared.core.util.task.TaskTime; import com.sk89q.worldedit.math.BlockVector2; import com.sk89q.worldedit.world.World; import io.papermc.lib.PaperLib; + import org.bukkit.Bukkit; import org.bukkit.Chunk; import org.bukkit.plugin.Plugin; @@ -75,6 +77,9 @@ public final class BukkitChunkCoordinator extends ChunkCoordinator { private final AtomicInteger expectedSize; private int batchSize; + private PlotSquaredTask task; + private boolean shouldCancel; + private boolean finished; @Inject private BukkitChunkCoordinator( @@ -108,11 +113,41 @@ public final class BukkitChunkCoordinator extends ChunkCoordinator { // Request initial batch this.requestBatch(); // Wait until next tick to give the chunks a chance to be loaded - TaskManager.runTaskLater(() -> TaskManager.runTaskRepeat(this, TaskTime.ticks(1)), TaskTime.ticks(1)); + TaskManager.runTaskLater(() -> task = TaskManager.runTaskRepeat(this, TaskTime.ticks(1)), TaskTime.ticks(1)); } @Override - public void runTask() { + public void cancel() { + shouldCancel = true; + } + + private void finish() { + try { + this.whenDone.run(); + } catch (final Throwable throwable) { + this.throwableConsumer.accept(throwable); + } finally { + for (final ProgressSubscriber subscriber : this.progressSubscribers) { + subscriber.notifyEnd(); + } + task.cancel(); + finished = true; + } + } + + @Override + public void run() { + if (shouldCancel) { + if (unloadAfter) { + Chunk chunk; + while ((chunk = availableChunks.poll()) != null) { + freeChunk(chunk); + } + } + finish(); + return; + } + Chunk chunk = this.availableChunks.poll(); if (chunk == null) { return; @@ -143,16 +178,7 @@ public final class BukkitChunkCoordinator extends ChunkCoordinator { final int expected = this.expectedSize.addAndGet(-processedChunks); if (expected <= 0) { - try { - this.whenDone.run(); - } catch (final Throwable throwable) { - this.throwableConsumer.accept(throwable); - } finally { - for (final ProgressSubscriber subscriber : this.progressSubscribers) { - subscriber.notifyEnd(); - } - this.cancel(); - } + finish(); } else { if (this.availableChunks.size() < processedChunks) { final double progress = ((double) totalSize - (double) expected) / (double) totalSize; @@ -186,12 +212,17 @@ public final class BukkitChunkCoordinator extends ChunkCoordinator { } /** - * Once a chunk has been loaded, process it (add a plugin ticket and add to available chunks list) + * Once a chunk has been loaded, process it (add a plugin ticket and add to + * available chunks list). It is important that this gets executed on the + * server's main thread. */ private void processChunk(final @NonNull Chunk chunk) { if (!chunk.isLoaded()) { throw new IllegalArgumentException(String.format("Chunk %d;%d is is not loaded", chunk.getX(), chunk.getZ())); } + if (finished) { + return; + } chunk.addPluginChunkTicket(this.plugin); this.availableChunks.add(chunk); } diff --git a/Bukkit/src/main/java/com/plotsquared/bukkit/queue/BukkitQueueCoordinator.java b/Bukkit/src/main/java/com/plotsquared/bukkit/queue/BukkitQueueCoordinator.java index d1445b343..3256b2fa5 100644 --- a/Bukkit/src/main/java/com/plotsquared/bukkit/queue/BukkitQueueCoordinator.java +++ b/Bukkit/src/main/java/com/plotsquared/bukkit/queue/BukkitQueueCoordinator.java @@ -92,7 +92,6 @@ public class BukkitQueueCoordinator extends BasicQueueCoordinator { chunkCoordinator.start(); } - //TODO: implement cancellation @Override public void cancel() { chunkCoordinator.cancel(); diff --git a/Core/src/main/java/com/plotsquared/core/queue/ChunkCoordinator.java b/Core/src/main/java/com/plotsquared/core/queue/ChunkCoordinator.java index e1ba7ed32..de9ed61d3 100644 --- a/Core/src/main/java/com/plotsquared/core/queue/ChunkCoordinator.java +++ b/Core/src/main/java/com/plotsquared/core/queue/ChunkCoordinator.java @@ -25,24 +25,7 @@ */ package com.plotsquared.core.queue; -import com.plotsquared.core.util.task.PlotSquaredTask; - -public abstract class ChunkCoordinator implements PlotSquaredTask { - - private boolean cancelled = false; - - @Override - public abstract void runTask(); - - @Override - public boolean isCancelled() { - return cancelled; - } - - @Override - public void cancel() { - this.cancelled = true; - } +public abstract class ChunkCoordinator implements Runnable { /** * Starts the chunk coordinator. This will usually (implementation-specific-permitting) mark chunks to be loaded in batches, @@ -51,6 +34,11 @@ public abstract class ChunkCoordinator implements PlotSquaredTask { */ public abstract void start(); + /** + * Cancel the chunk coordinator. + */ + public abstract void cancel(); + /** * Get the amount of remaining chunks (at the time of the method call) * diff --git a/Core/src/main/java/com/plotsquared/core/queue/GlobalBlockQueue.java b/Core/src/main/java/com/plotsquared/core/queue/GlobalBlockQueue.java index b379a163c..b78327d77 100644 --- a/Core/src/main/java/com/plotsquared/core/queue/GlobalBlockQueue.java +++ b/Core/src/main/java/com/plotsquared/core/queue/GlobalBlockQueue.java @@ -29,18 +29,12 @@ import com.plotsquared.core.PlotSquared; import com.sk89q.worldedit.world.World; import org.checkerframework.checker.nullness.qual.NonNull; -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.ConcurrentLinkedDeque; - public class GlobalBlockQueue { - private final ConcurrentLinkedDeque activeQueues; private QueueProvider provider; public GlobalBlockQueue(@NonNull QueueProvider provider) { this.provider = provider; - this.activeQueues = new ConcurrentLinkedDeque<>(); } /** @@ -64,33 +58,4 @@ public class GlobalBlockQueue { this.provider = provider; } - /** - * Place an instance of {@link QueueCoordinator} into a list incase access is needed - * and then start it. - * - * @param queue {@link QueueCoordinator} instance to start. - * @return true if added to queue, false otherwise - */ - public boolean enqueue(@NonNull QueueCoordinator queue) { - boolean success = false; - if (queue.size() > 0 && !activeQueues.contains(queue)) { - success = activeQueues.add(queue); - queue.start(); - } - return success; - } - - public void dequeue(@NonNull QueueCoordinator queue) { - queue.cancel(); - activeQueues.remove(queue); - } - - public @NonNull List getActiveQueues() { - return new ArrayList<>(activeQueues); - } - - public boolean isDone() { - return activeQueues.size() == 0; - } - } diff --git a/Core/src/main/java/com/plotsquared/core/queue/QueueCoordinator.java b/Core/src/main/java/com/plotsquared/core/queue/QueueCoordinator.java index 5c0f88c88..ac30f0b63 100644 --- a/Core/src/main/java/com/plotsquared/core/queue/QueueCoordinator.java +++ b/Core/src/main/java/com/plotsquared/core/queue/QueueCoordinator.java @@ -39,6 +39,9 @@ import com.sk89q.worldedit.world.World; import com.sk89q.worldedit.world.biome.BiomeType; import com.sk89q.worldedit.world.block.BaseBlock; import com.sk89q.worldedit.world.block.BlockState; + +import java.util.concurrent.atomic.AtomicBoolean; + import org.checkerframework.checker.nullness.qual.NonNull; import org.checkerframework.checker.nullness.qual.Nullable; @@ -51,6 +54,7 @@ public abstract class QueueCoordinator { private boolean forceSync = false; @Nullable private Object chunkObject; + private final AtomicBoolean enqueued = new AtomicBoolean(); @Inject private GlobalBlockQueue blockQueue; @@ -319,12 +323,17 @@ public abstract class QueueCoordinator { } /** - * Enqueue the queue with the {@link GlobalBlockQueue} + * Enqueue the queue to start it * * @return success or not */ public boolean enqueue() { - return blockQueue.enqueue(this); + boolean success = false; + if (enqueued.compareAndSet(false, true)) { + success = true; + start(); + } + return success; } /** @@ -333,7 +342,7 @@ public abstract class QueueCoordinator { public abstract void start(); /** - * Cancel the queue. Not yet implemented. + * Cancel the queue */ public abstract void cancel(); diff --git a/Core/src/main/java/com/plotsquared/core/queue/subscriber/DefaultProgressSubscriber.java b/Core/src/main/java/com/plotsquared/core/queue/subscriber/DefaultProgressSubscriber.java index 0d1a272f1..10ee06c33 100644 --- a/Core/src/main/java/com/plotsquared/core/queue/subscriber/DefaultProgressSubscriber.java +++ b/Core/src/main/java/com/plotsquared/core/queue/subscriber/DefaultProgressSubscriber.java @@ -107,11 +107,7 @@ public class DefaultProgressSubscriber implements ProgressSubscriber { @Override public void notifyProgress(@NonNull ChunkCoordinator coordinator, double progress) { this.progress.set(progress); - if (coordinator.isCancelled() || progress >= 1) { - if (task != null) { - task.cancel(); - } - } else if (started.compareAndSet(false, true)) { + if (started.compareAndSet(false, true)) { TaskManager.getPlatformImplementation().taskLater(() -> task = TaskManager .getPlatformImplementation() .taskRepeat(() -> {