diff --git a/Bukkit/src/main/java/com/plotsquared/bukkit/queue/BukkitChunkCoordinator.java b/Bukkit/src/main/java/com/plotsquared/bukkit/queue/BukkitChunkCoordinator.java index ecb0f3500..5cc5de1b1 100644 --- a/Bukkit/src/main/java/com/plotsquared/bukkit/queue/BukkitChunkCoordinator.java +++ b/Bukkit/src/main/java/com/plotsquared/bukkit/queue/BukkitChunkCoordinator.java @@ -44,8 +44,35 @@ import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Consumer; +/** + * Utility that allows for the loading and coordination of chunk actions + *

+ * The coordinator takes in collection of chunk coordinates, loads them + * and allows the caller to specify a sink for the loaded chunks. The + * coordinator will prevent the chunks from being unloaded until the sink + * has fully consumed the chunk + *

+ * Usage: + *

{@code
+ * final ChunkCoordinator chunkCoordinator = ChunkCoordinator.builder()
+ *     .inWorld(Objects.requireNonNull(Bukkit.getWorld("world"))).withChunk(BlockVector2.at(0, 0))
+ *     .withConsumer(chunk -> System.out.printf("Got chunk %d;%d", chunk.getX(), chunk.getZ()))
+ *     .withFinalAction(() -> System.out.println("All chunks have been loaded"))
+ *     .withThrowableConsumer(throwable -> System.err.println("Something went wrong... =("))
+ *     .withMaxIterationTime(25L)
+ *     .build();
+ * chunkCoordinator.subscribeToProgress((coordinator, progress) ->
+ *     System.out.printf("Progress: %.1f", progress * 100.0f));
+ * chunkCoordinator.start();
+ * }
+ * + * @author Alexander Söderberg + * @see #builder() To create a new coordinator instance + */ public final class BukkitChunkCoordinator extends BukkitRunnable { + private final List progressSubscribers = new LinkedList<>(); + private final Queue requestedChunks; private final Queue availableChunks; private final long maxIterationTime; @@ -53,32 +80,48 @@ public final class BukkitChunkCoordinator extends BukkitRunnable { private final Consumer chunkConsumer; private final World world; private final Runnable whenDone; + private final Consumer throwableConsumer; + private final int totalSize; private AtomicInteger expectedSize; private int batchSize; private BukkitChunkCoordinator(final long maxIterationTime, final int initialBatchSize, @NotNull final Consumer chunkConsumer, @NotNull final World world, - @NotNull final Collection requestedChunks, @NotNull final Runnable whenDone) { + @NotNull final Collection requestedChunks, @NotNull final Runnable whenDone, + @NotNull final Consumer throwableConsumer) { this.requestedChunks = new LinkedBlockingQueue<>(requestedChunks); this.availableChunks = new LinkedBlockingQueue<>(); - this.expectedSize = new AtomicInteger(requestedChunks.size()); + this.totalSize = requestedChunks.size(); + this.expectedSize = new AtomicInteger(this.totalSize); this.world = world; this.batchSize = initialBatchSize; this.chunkConsumer = chunkConsumer; this.maxIterationTime = maxIterationTime; this.whenDone = whenDone; + this.throwableConsumer = throwableConsumer; this.plugin = JavaPlugin.getPlugin(BukkitPlatform.class); + } + + /** + * Create a new {@link BukkitChunkCoordinator} instance + * + * @return Coordinator builder instance + */ + @NotNull public static ChunkCoordinatorBuilder builder() { + return new ChunkCoordinatorBuilder(); + } + + /** + * Start the coordinator instance + */ + public void start() { // Request initial batch this.requestBatch(); // Wait until next tick to give the chunks a chance to be loaded this.runTaskTimer(this.plugin, 1L, 1L); } - @NotNull public static ChunkCoordinatorBuilder builder() { - return new ChunkCoordinatorBuilder(); - } - @Override public void run() { Chunk chunk = this.availableChunks.poll(); if (chunk == null) { @@ -90,7 +133,8 @@ public final class BukkitChunkCoordinator extends BukkitRunnable { final long start = System.currentTimeMillis(); try { this.chunkConsumer.accept(chunk); - } catch (final Exception ignored) { + } catch (final Throwable throwable) { + this.throwableConsumer.accept(throwable); } this.freeChunk(chunk); processedChunks++; @@ -103,10 +147,19 @@ public final class BukkitChunkCoordinator extends BukkitRunnable { // Adjust batch size based on the amount of processed chunks per tick this.batchSize = processedChunks; } - if (this.expectedSize.addAndGet(-processedChunks) <= 0) { + + final int expected = this.expectedSize.addAndGet(-processedChunks); + + final float progress = ((float) totalSize - (float) expected) / (float) totalSize; + for (final ProgressSubscriber subscriber : this.progressSubscribers) { + subscriber.notifyProgress(this, progress); + } + + if (expected <= 0) { try { this.whenDone.run(); - } catch (final Exception ignored) { + } catch (final Throwable throwable) { + this.throwableConsumer.accept(throwable); } this.cancel(); } else { @@ -150,10 +203,54 @@ public final class BukkitChunkCoordinator extends BukkitRunnable { chunk.removePluginChunkTicket(this.plugin); } + /** + * Get the amount of remaining chunks (at the time of the method call) + * + * @return Snapshot view of remaining chunk count + */ + public int getRemainingChunks() { + return this.expectedSize.get(); + } + + /** + * Get the amount of requested chunks + * + * @return Requested chunk count + */ + public int getTotalChunks() { + return this.totalSize; + } + + /** + * Subscribe to coordinator progress updates + * + * @param subscriber Subscriber + */ + public void subscribeToProgress( + @NotNull final BukkitChunkCoordinator.ProgressSubscriber subscriber) { + this.progressSubscribers.add(subscriber); + } + + + @FunctionalInterface + public interface ProgressSubscriber { + + /** + * Notify about a progress update in the coordinator + * + * @param coordinator Coordinator instance that triggered the notification + * @param progress Progress in the range [0, 1] + */ + void notifyProgress(@NotNull final BukkitChunkCoordinator coordinator, + final float progress); + + } + public static final class ChunkCoordinatorBuilder { private final List requestedChunks = new LinkedList<>(); + private Consumer throwableConsumer = Throwable::printStackTrace; private World world; private Consumer chunkConsumer; private Runnable whenDone = () -> { @@ -208,12 +305,22 @@ public final class BukkitChunkCoordinator extends BukkitRunnable { return this; } + @NotNull public ChunkCoordinatorBuilder withThrowableConsumer( + @NotNull final Consumer throwableConsumer) { + this.throwableConsumer = + Preconditions.checkNotNull(throwableConsumer, "Throwable consumer may not be null"); + return this; + } + @NotNull public BukkitChunkCoordinator build() { Preconditions.checkNotNull(this.world, "No world was supplied"); Preconditions.checkNotNull(this.chunkConsumer, "No chunk consumer was supplied"); Preconditions.checkNotNull(this.whenDone, "No final action was supplied"); + Preconditions + .checkNotNull(this.throwableConsumer, "No throwable consumer was supplied"); return new BukkitChunkCoordinator(this.maxIterationTime, this.initialBatchSize, - this.chunkConsumer, this.world, this.requestedChunks, this.whenDone); + this.chunkConsumer, this.world, this.requestedChunks, this.whenDone, + this.throwableConsumer); } }