From 71c5de412c8929aeefec5b4ae2354bc50658f220 Mon Sep 17 00:00:00 2001 From: Robert MacRae Date: Tue, 19 Aug 2025 15:17:01 -0300 Subject: [PATCH] fixes issue #72 You now can re-enter worlds --- .../oc2/common/event/ForgeEventHandlers.java | 12 +- .../oc2/common/serialization/BlobStorage.java | 48 ++++++-- .../li/cil/oc2/common/util/AsyncUtils.java | 109 ++++++++++++++---- 3 files changed, 132 insertions(+), 37 deletions(-) diff --git a/src/main/java/li/cil/oc2/common/event/ForgeEventHandlers.java b/src/main/java/li/cil/oc2/common/event/ForgeEventHandlers.java index 88cc433e..aa65b2b3 100644 --- a/src/main/java/li/cil/oc2/common/event/ForgeEventHandlers.java +++ b/src/main/java/li/cil/oc2/common/event/ForgeEventHandlers.java @@ -37,8 +37,16 @@ public final class ForgeEventHandlers { server = event.getServer(); LOGGER.info("Server starting, initializing async components"); - // Run async tests if enabled - if (AsyncConfig.SERVER.runAsyncTests.get()) { + // Safely check if we should run async tests + boolean shouldRunTests = false; + try { + shouldRunTests = AsyncConfig.SERVER != null && AsyncConfig.SERVER.runAsyncTests.get(); + } catch (IllegalStateException e) { + LOGGER.warn("Config not available, skipping async tests"); + } + + // Run async tests if enabled and config is available + if (shouldRunTests) { LOGGER.info("Running async operation tests..."); AsyncTestUtils.verifyAsyncOperations() .thenAccept(uuid -> { diff --git a/src/main/java/li/cil/oc2/common/serialization/BlobStorage.java b/src/main/java/li/cil/oc2/common/serialization/BlobStorage.java index 109a7bef..31ab6ce7 100644 --- a/src/main/java/li/cil/oc2/common/serialization/BlobStorage.java +++ b/src/main/java/li/cil/oc2/common/serialization/BlobStorage.java @@ -78,14 +78,14 @@ public final class BlobStorage { close(); dataDirectory = newDataDir; - AsyncUtils.runAsync(() -> { - try { - Files.createDirectories(dataDirectory); - LOGGER.info("Blob storage directory initialized at: {}", dataDirectory); - } catch (final IOException e) { - LOGGER.error("Failed to create blob storage directory", e); - } - }, "Initialize blob storage directory"); + try { + // Create directories synchronously since this is called during server startup + // when the config system might not be fully initialized yet + Files.createDirectories(dataDirectory); + LOGGER.info("Blob storage directory initialized at: {}", dataDirectory); + } catch (final IOException e) { + LOGGER.error("Failed to create blob storage directory", e); + } } } @@ -109,7 +109,15 @@ public final class BlobStorage { } BLOBS.clear(); - if (AsyncConfig.SERVER.enableSuperDebug.get()) { + // Safely check debug config if available + boolean debug = false; + try { + debug = AsyncConfig.SERVER != null && AsyncConfig.SERVER.enableSuperDebug.get(); + } catch (IllegalStateException ignored) { + // Config system might be shutting down, continue with debug disabled + } + + if (debug) { LOGGER.info("Closed all blob storage resources"); } } @@ -199,12 +207,21 @@ public final class BlobStorage { * @return a CompletableFuture that completes when the blob is closed. */ public static CompletableFuture closeAsync(final UUID handle) { + // Check debug state before async operation to avoid config access issues + boolean debug = false; + try { + debug = AsyncConfig.SERVER != null && AsyncConfig.SERVER.enableSuperDebug.get(); + } catch (IllegalStateException ignored) { + // Config system might be shutting down, continue with debug disabled + } + + final boolean finalDebug = debug; return AsyncUtils.runAsync(() -> { try { final FileChannel blob = BLOBS.remove(handle); if (blob != null) { blob.close(); - if (AsyncConfig.SERVER.enableSuperDebug.get()) { + if (finalDebug) { LOGGER.debug("Closed blob: {}", handle); } } @@ -237,11 +254,20 @@ public final class BlobStorage { * @return a CompletableFuture that completes when the blob is deleted. */ public static CompletableFuture deleteAsync(final UUID handle) { + // Check debug state before async operation to avoid config access issues + boolean debug = false; + try { + debug = AsyncConfig.SERVER != null && AsyncConfig.SERVER.enableSuperDebug.get(); + } catch (IllegalStateException ignored) { + // Config system might be shutting down, continue with debug disabled + } + + final boolean finalDebug = debug; return AsyncUtils.runAsync(() -> { final Path path = getBlobPath(handle); try { final boolean deleted = Files.deleteIfExists(path); - if (deleted && AsyncConfig.SERVER.enableSuperDebug.get()) { + if (deleted && finalDebug) { LOGGER.debug("Deleted blob file: {}", path); } } catch (final IOException e) { diff --git a/src/main/java/li/cil/oc2/common/util/AsyncUtils.java b/src/main/java/li/cil/oc2/common/util/AsyncUtils.java index 4116c572..7c39c592 100644 --- a/src/main/java/li/cil/oc2/common/util/AsyncUtils.java +++ b/src/main/java/li/cil/oc2/common/util/AsyncUtils.java @@ -11,6 +11,7 @@ import org.apache.logging.log4j.Logger; import javax.annotation.Nullable; import java.util.concurrent.*; +import java.util.List; import java.util.function.Consumer; import java.util.function.Supplier; @@ -19,15 +20,44 @@ import java.util.function.Supplier; */ public final class AsyncUtils { private static final Logger LOGGER = LogManager.getLogger(); - + // Use a dedicated executor for async operations to avoid blocking the main server thread - private static final ExecutorService ASYNC_EXECUTOR = Executors.newWorkStealingPool( - Math.max(1, Runtime.getRuntime().availableProcessors() / 2) - ); + private static final ExecutorService ASYNC_EXECUTOR; + static { + ASYNC_EXECUTOR = new ForkJoinPool( + Math.max(1, Runtime.getRuntime().availableProcessors() / 2), + ForkJoinPool.defaultForkJoinWorkerThreadFactory, + (t, e) -> LOGGER.error("Uncaught exception in async executor thread", e), + true + ) { + @Override + public List shutdownNow() { + // Ensure all threads are interrupted when shutting down + List tasks = super.shutdownNow(); + // Force an interrupt on all threads in the pool + for (Thread worker : getActiveWorkers()) { + worker.interrupt(); + } + return tasks; + } + + // Helper method to get active worker threads + private java.util.Set getActiveWorkers() { + java.util.Set workers = java.util.concurrent.ConcurrentHashMap.newKeySet(); + for (Thread thread : Thread.getAllStackTraces().keySet()) { + if (thread.getName().startsWith("ForkJoinPool") && thread.isAlive()) { + workers.add(thread); + } + } + return workers; + } + }; + } + /** * Gets the async executor service. - * + * * @return The async executor service. */ public static ExecutorService getAsyncExecutor() { @@ -49,7 +79,7 @@ public final class AsyncUtils { LOGGER.info("Starting async task: {}", description); logStackTrace("Async task stack trace"); } - + return CompletableFuture.supplyAsync(() -> { try { return task.get(); @@ -97,7 +127,7 @@ public final class AsyncUtils { /** * Schedules a task to run on the server thread. - * + * * @param task The task to run on the server thread. * @param The return type of the task. * @return A CompletableFuture that completes with the result of the task. @@ -107,14 +137,14 @@ public final class AsyncUtils { if (server == null) { return CompletableFuture.failedFuture(new IllegalStateException("No server available")); } - + final CompletableFuture future = new CompletableFuture<>(); - + server.execute(() -> { if (AsyncConfig.SERVER.enableSuperDebug.get()) { LOGGER.debug("Executing task on server thread"); } - + try { future.complete(task.get()); } catch (final Throwable t) { @@ -122,13 +152,13 @@ public final class AsyncUtils { future.completeExceptionally(t); } }); - + return future; } - + /** * Schedules a task to run on the server thread. - * + * * @param task The task to run on the server thread. * @return A CompletableFuture that completes when the task is done. */ @@ -138,10 +168,10 @@ public final class AsyncUtils { return null; }); } - + /** * Gets the server's thread pool executor if available. - * + * * @return The server's thread pool executor, or null if not available. */ @Nullable @@ -149,25 +179,56 @@ public final class AsyncUtils { final MinecraftServer server = ForgeEventHandlers.getCurrentServer(); return server != null ? server : null; } - + /** * Shuts down the async executor. Should be called when the game is shutting down. + * Uses a shorter timeout and more aggressive cancellation to speed up shutdown. */ public static void shutdown() { - if (AsyncConfig.SERVER.enableSuperDebug.get()) { - LOGGER.info("Shutting down async executor"); + if (ASYNC_EXECUTOR.isShutdown()) { + return; // Already shut down } - + + boolean debug = false; + try { + // Safely check debug config if available + debug = AsyncConfig.SERVER != null && AsyncConfig.SERVER.enableSuperDebug.get(); + } catch (IllegalStateException ignored) { + // Config system might be shutting down, continue with debug disabled + } + + if (debug) { + LOGGER.info("Initiating async executor shutdown..."); + } + + // Disable new tasks from being submitted ASYNC_EXECUTOR.shutdown(); - + try { // Wait a short time for tasks to complete - if (!ASYNC_EXECUTOR.awaitTermination(5, TimeUnit.SECONDS)) { - LOGGER.warn("Async executor did not shut down gracefully, forcing shutdown"); - ASYNC_EXECUTOR.shutdownNow(); + if (!ASYNC_EXECUTOR.awaitTermination(1, TimeUnit.SECONDS)) { + if (debug) { + LOGGER.warn("Async executor did not shut down within timeout, forcing immediate shutdown"); + } else { + LOGGER.warn("Async executor did not shut down within timeout, forcing immediate shutdown"); + } + + // Cancel currently executing tasks + final var runningTasks = ASYNC_EXECUTOR.shutdownNow(); + + if (debug && !runningTasks.isEmpty()) { + LOGGER.warn("Cancelled {} running tasks", runningTasks.size()); + } + + // Wait a bit more for tasks to respond to being cancelled + if (!ASYNC_EXECUTOR.awaitTermination(100, TimeUnit.MILLISECONDS)) { + LOGGER.warn("Some tasks did not respond to cancellation"); + } } } catch (final InterruptedException e) { - LOGGER.warn("Interrupted while waiting for async executor to shut down", e); + LOGGER.warn("Interrupted while waiting for async executor to shut down, forcing immediate shutdown", e); + // Try one last time with extreme prejudice + ASYNC_EXECUTOR.shutdownNow(); Thread.currentThread().interrupt(); } }