Clean up chunk coordinators and queue coordinators (#3208)

This commit is contained in:
Traks 2021-08-19 13:31:47 +02:00 committed by GitHub
parent 2988ad6b11
commit 8f0ae7e51d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 63 additions and 75 deletions

View File

@ -30,11 +30,13 @@ import com.google.inject.assistedinject.Assisted;
import com.plotsquared.bukkit.BukkitPlatform; import com.plotsquared.bukkit.BukkitPlatform;
import com.plotsquared.core.queue.ChunkCoordinator; import com.plotsquared.core.queue.ChunkCoordinator;
import com.plotsquared.core.queue.subscriber.ProgressSubscriber; import com.plotsquared.core.queue.subscriber.ProgressSubscriber;
import com.plotsquared.core.util.task.PlotSquaredTask;
import com.plotsquared.core.util.task.TaskManager; import com.plotsquared.core.util.task.TaskManager;
import com.plotsquared.core.util.task.TaskTime; import com.plotsquared.core.util.task.TaskTime;
import com.sk89q.worldedit.math.BlockVector2; import com.sk89q.worldedit.math.BlockVector2;
import com.sk89q.worldedit.world.World; import com.sk89q.worldedit.world.World;
import io.papermc.lib.PaperLib; import io.papermc.lib.PaperLib;
import org.bukkit.Bukkit; import org.bukkit.Bukkit;
import org.bukkit.Chunk; import org.bukkit.Chunk;
import org.bukkit.plugin.Plugin; import org.bukkit.plugin.Plugin;
@ -75,6 +77,9 @@ public final class BukkitChunkCoordinator extends ChunkCoordinator {
private final AtomicInteger expectedSize; private final AtomicInteger expectedSize;
private int batchSize; private int batchSize;
private PlotSquaredTask task;
private boolean shouldCancel;
private boolean finished;
@Inject @Inject
private BukkitChunkCoordinator( private BukkitChunkCoordinator(
@ -108,11 +113,41 @@ public final class BukkitChunkCoordinator extends ChunkCoordinator {
// Request initial batch // Request initial batch
this.requestBatch(); this.requestBatch();
// Wait until next tick to give the chunks a chance to be loaded // Wait until next tick to give the chunks a chance to be loaded
TaskManager.runTaskLater(() -> TaskManager.runTaskRepeat(this, TaskTime.ticks(1)), TaskTime.ticks(1)); TaskManager.runTaskLater(() -> task = TaskManager.runTaskRepeat(this, TaskTime.ticks(1)), TaskTime.ticks(1));
} }
@Override @Override
public void runTask() { public void cancel() {
shouldCancel = true;
}
private void finish() {
try {
this.whenDone.run();
} catch (final Throwable throwable) {
this.throwableConsumer.accept(throwable);
} finally {
for (final ProgressSubscriber subscriber : this.progressSubscribers) {
subscriber.notifyEnd();
}
task.cancel();
finished = true;
}
}
@Override
public void run() {
if (shouldCancel) {
if (unloadAfter) {
Chunk chunk;
while ((chunk = availableChunks.poll()) != null) {
freeChunk(chunk);
}
}
finish();
return;
}
Chunk chunk = this.availableChunks.poll(); Chunk chunk = this.availableChunks.poll();
if (chunk == null) { if (chunk == null) {
return; return;
@ -143,16 +178,7 @@ public final class BukkitChunkCoordinator extends ChunkCoordinator {
final int expected = this.expectedSize.addAndGet(-processedChunks); final int expected = this.expectedSize.addAndGet(-processedChunks);
if (expected <= 0) { if (expected <= 0) {
try { finish();
this.whenDone.run();
} catch (final Throwable throwable) {
this.throwableConsumer.accept(throwable);
} finally {
for (final ProgressSubscriber subscriber : this.progressSubscribers) {
subscriber.notifyEnd();
}
this.cancel();
}
} else { } else {
if (this.availableChunks.size() < processedChunks) { if (this.availableChunks.size() < processedChunks) {
final double progress = ((double) totalSize - (double) expected) / (double) totalSize; final double progress = ((double) totalSize - (double) expected) / (double) totalSize;
@ -186,12 +212,17 @@ public final class BukkitChunkCoordinator extends ChunkCoordinator {
} }
/** /**
* Once a chunk has been loaded, process it (add a plugin ticket and add to available chunks list) * Once a chunk has been loaded, process it (add a plugin ticket and add to
* available chunks list). It is important that this gets executed on the
* server's main thread.
*/ */
private void processChunk(final @NonNull Chunk chunk) { private void processChunk(final @NonNull Chunk chunk) {
if (!chunk.isLoaded()) { if (!chunk.isLoaded()) {
throw new IllegalArgumentException(String.format("Chunk %d;%d is is not loaded", chunk.getX(), chunk.getZ())); throw new IllegalArgumentException(String.format("Chunk %d;%d is is not loaded", chunk.getX(), chunk.getZ()));
} }
if (finished) {
return;
}
chunk.addPluginChunkTicket(this.plugin); chunk.addPluginChunkTicket(this.plugin);
this.availableChunks.add(chunk); this.availableChunks.add(chunk);
} }

View File

@ -92,7 +92,6 @@ public class BukkitQueueCoordinator extends BasicQueueCoordinator {
chunkCoordinator.start(); chunkCoordinator.start();
} }
//TODO: implement cancellation
@Override @Override
public void cancel() { public void cancel() {
chunkCoordinator.cancel(); chunkCoordinator.cancel();

View File

@ -25,24 +25,7 @@
*/ */
package com.plotsquared.core.queue; package com.plotsquared.core.queue;
import com.plotsquared.core.util.task.PlotSquaredTask; public abstract class ChunkCoordinator implements Runnable {
public abstract class ChunkCoordinator implements PlotSquaredTask {
private boolean cancelled = false;
@Override
public abstract void runTask();
@Override
public boolean isCancelled() {
return cancelled;
}
@Override
public void cancel() {
this.cancelled = true;
}
/** /**
* Starts the chunk coordinator. This will usually (implementation-specific-permitting) mark chunks to be loaded in batches, * Starts the chunk coordinator. This will usually (implementation-specific-permitting) mark chunks to be loaded in batches,
@ -51,6 +34,11 @@ public abstract class ChunkCoordinator implements PlotSquaredTask {
*/ */
public abstract void start(); public abstract void start();
/**
* Cancel the chunk coordinator.
*/
public abstract void cancel();
/** /**
* Get the amount of remaining chunks (at the time of the method call) * Get the amount of remaining chunks (at the time of the method call)
* *

View File

@ -29,18 +29,12 @@ import com.plotsquared.core.PlotSquared;
import com.sk89q.worldedit.world.World; import com.sk89q.worldedit.world.World;
import org.checkerframework.checker.nullness.qual.NonNull; import org.checkerframework.checker.nullness.qual.NonNull;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ConcurrentLinkedDeque;
public class GlobalBlockQueue { public class GlobalBlockQueue {
private final ConcurrentLinkedDeque<QueueCoordinator> activeQueues;
private QueueProvider provider; private QueueProvider provider;
public GlobalBlockQueue(@NonNull QueueProvider provider) { public GlobalBlockQueue(@NonNull QueueProvider provider) {
this.provider = provider; this.provider = provider;
this.activeQueues = new ConcurrentLinkedDeque<>();
} }
/** /**
@ -64,33 +58,4 @@ public class GlobalBlockQueue {
this.provider = provider; this.provider = provider;
} }
/**
* Place an instance of {@link QueueCoordinator} into a list incase access is needed
* and then start it.
*
* @param queue {@link QueueCoordinator} instance to start.
* @return true if added to queue, false otherwise
*/
public boolean enqueue(@NonNull QueueCoordinator queue) {
boolean success = false;
if (queue.size() > 0 && !activeQueues.contains(queue)) {
success = activeQueues.add(queue);
queue.start();
}
return success;
}
public void dequeue(@NonNull QueueCoordinator queue) {
queue.cancel();
activeQueues.remove(queue);
}
public @NonNull List<QueueCoordinator> getActiveQueues() {
return new ArrayList<>(activeQueues);
}
public boolean isDone() {
return activeQueues.size() == 0;
}
} }

View File

@ -39,6 +39,9 @@ import com.sk89q.worldedit.world.World;
import com.sk89q.worldedit.world.biome.BiomeType; import com.sk89q.worldedit.world.biome.BiomeType;
import com.sk89q.worldedit.world.block.BaseBlock; import com.sk89q.worldedit.world.block.BaseBlock;
import com.sk89q.worldedit.world.block.BlockState; import com.sk89q.worldedit.world.block.BlockState;
import java.util.concurrent.atomic.AtomicBoolean;
import org.checkerframework.checker.nullness.qual.NonNull; import org.checkerframework.checker.nullness.qual.NonNull;
import org.checkerframework.checker.nullness.qual.Nullable; import org.checkerframework.checker.nullness.qual.Nullable;
@ -51,6 +54,7 @@ public abstract class QueueCoordinator {
private boolean forceSync = false; private boolean forceSync = false;
@Nullable @Nullable
private Object chunkObject; private Object chunkObject;
private final AtomicBoolean enqueued = new AtomicBoolean();
@Inject @Inject
private GlobalBlockQueue blockQueue; private GlobalBlockQueue blockQueue;
@ -319,12 +323,17 @@ public abstract class QueueCoordinator {
} }
/** /**
* Enqueue the queue with the {@link GlobalBlockQueue} * Enqueue the queue to start it
* *
* @return success or not * @return success or not
*/ */
public boolean enqueue() { public boolean enqueue() {
return blockQueue.enqueue(this); boolean success = false;
if (enqueued.compareAndSet(false, true)) {
success = true;
start();
}
return success;
} }
/** /**
@ -333,7 +342,7 @@ public abstract class QueueCoordinator {
public abstract void start(); public abstract void start();
/** /**
* Cancel the queue. Not yet implemented. * Cancel the queue
*/ */
public abstract void cancel(); public abstract void cancel();

View File

@ -107,11 +107,7 @@ public class DefaultProgressSubscriber implements ProgressSubscriber {
@Override @Override
public void notifyProgress(@NonNull ChunkCoordinator coordinator, double progress) { public void notifyProgress(@NonNull ChunkCoordinator coordinator, double progress) {
this.progress.set(progress); this.progress.set(progress);
if (coordinator.isCancelled() || progress >= 1) { if (started.compareAndSet(false, true)) {
if (task != null) {
task.cancel();
}
} else if (started.compareAndSet(false, true)) {
TaskManager.getPlatformImplementation().taskLater(() -> task = TaskManager TaskManager.getPlatformImplementation().taskLater(() -> task = TaskManager
.getPlatformImplementation() .getPlatformImplementation()
.taskRepeat(() -> { .taskRepeat(() -> {