fixes issue #72 You now can re-enter worlds
This commit is contained in:
@@ -37,8 +37,16 @@ public final class ForgeEventHandlers {
|
||||
server = event.getServer();
|
||||
LOGGER.info("Server starting, initializing async components");
|
||||
|
||||
// Run async tests if enabled
|
||||
if (AsyncConfig.SERVER.runAsyncTests.get()) {
|
||||
// Safely check if we should run async tests
|
||||
boolean shouldRunTests = false;
|
||||
try {
|
||||
shouldRunTests = AsyncConfig.SERVER != null && AsyncConfig.SERVER.runAsyncTests.get();
|
||||
} catch (IllegalStateException e) {
|
||||
LOGGER.warn("Config not available, skipping async tests");
|
||||
}
|
||||
|
||||
// Run async tests if enabled and config is available
|
||||
if (shouldRunTests) {
|
||||
LOGGER.info("Running async operation tests...");
|
||||
AsyncTestUtils.verifyAsyncOperations()
|
||||
.thenAccept(uuid -> {
|
||||
|
||||
@@ -78,14 +78,14 @@ public final class BlobStorage {
|
||||
close();
|
||||
dataDirectory = newDataDir;
|
||||
|
||||
AsyncUtils.runAsync(() -> {
|
||||
try {
|
||||
Files.createDirectories(dataDirectory);
|
||||
LOGGER.info("Blob storage directory initialized at: {}", dataDirectory);
|
||||
} catch (final IOException e) {
|
||||
LOGGER.error("Failed to create blob storage directory", e);
|
||||
}
|
||||
}, "Initialize blob storage directory");
|
||||
try {
|
||||
// Create directories synchronously since this is called during server startup
|
||||
// when the config system might not be fully initialized yet
|
||||
Files.createDirectories(dataDirectory);
|
||||
LOGGER.info("Blob storage directory initialized at: {}", dataDirectory);
|
||||
} catch (final IOException e) {
|
||||
LOGGER.error("Failed to create blob storage directory", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -109,7 +109,15 @@ public final class BlobStorage {
|
||||
}
|
||||
BLOBS.clear();
|
||||
|
||||
if (AsyncConfig.SERVER.enableSuperDebug.get()) {
|
||||
// Safely check debug config if available
|
||||
boolean debug = false;
|
||||
try {
|
||||
debug = AsyncConfig.SERVER != null && AsyncConfig.SERVER.enableSuperDebug.get();
|
||||
} catch (IllegalStateException ignored) {
|
||||
// Config system might be shutting down, continue with debug disabled
|
||||
}
|
||||
|
||||
if (debug) {
|
||||
LOGGER.info("Closed all blob storage resources");
|
||||
}
|
||||
}
|
||||
@@ -199,12 +207,21 @@ public final class BlobStorage {
|
||||
* @return a CompletableFuture that completes when the blob is closed.
|
||||
*/
|
||||
public static CompletableFuture<Void> closeAsync(final UUID handle) {
|
||||
// Check debug state before async operation to avoid config access issues
|
||||
boolean debug = false;
|
||||
try {
|
||||
debug = AsyncConfig.SERVER != null && AsyncConfig.SERVER.enableSuperDebug.get();
|
||||
} catch (IllegalStateException ignored) {
|
||||
// Config system might be shutting down, continue with debug disabled
|
||||
}
|
||||
|
||||
final boolean finalDebug = debug;
|
||||
return AsyncUtils.runAsync(() -> {
|
||||
try {
|
||||
final FileChannel blob = BLOBS.remove(handle);
|
||||
if (blob != null) {
|
||||
blob.close();
|
||||
if (AsyncConfig.SERVER.enableSuperDebug.get()) {
|
||||
if (finalDebug) {
|
||||
LOGGER.debug("Closed blob: {}", handle);
|
||||
}
|
||||
}
|
||||
@@ -237,11 +254,20 @@ public final class BlobStorage {
|
||||
* @return a CompletableFuture that completes when the blob is deleted.
|
||||
*/
|
||||
public static CompletableFuture<Void> deleteAsync(final UUID handle) {
|
||||
// Check debug state before async operation to avoid config access issues
|
||||
boolean debug = false;
|
||||
try {
|
||||
debug = AsyncConfig.SERVER != null && AsyncConfig.SERVER.enableSuperDebug.get();
|
||||
} catch (IllegalStateException ignored) {
|
||||
// Config system might be shutting down, continue with debug disabled
|
||||
}
|
||||
|
||||
final boolean finalDebug = debug;
|
||||
return AsyncUtils.runAsync(() -> {
|
||||
final Path path = getBlobPath(handle);
|
||||
try {
|
||||
final boolean deleted = Files.deleteIfExists(path);
|
||||
if (deleted && AsyncConfig.SERVER.enableSuperDebug.get()) {
|
||||
if (deleted && finalDebug) {
|
||||
LOGGER.debug("Deleted blob file: {}", path);
|
||||
}
|
||||
} catch (final IOException e) {
|
||||
|
||||
@@ -11,6 +11,7 @@ import org.apache.logging.log4j.Logger;
|
||||
|
||||
import javax.annotation.Nullable;
|
||||
import java.util.concurrent.*;
|
||||
import java.util.List;
|
||||
import java.util.function.Consumer;
|
||||
import java.util.function.Supplier;
|
||||
|
||||
@@ -19,15 +20,44 @@ import java.util.function.Supplier;
|
||||
*/
|
||||
public final class AsyncUtils {
|
||||
private static final Logger LOGGER = LogManager.getLogger();
|
||||
|
||||
|
||||
// Use a dedicated executor for async operations to avoid blocking the main server thread
|
||||
private static final ExecutorService ASYNC_EXECUTOR = Executors.newWorkStealingPool(
|
||||
Math.max(1, Runtime.getRuntime().availableProcessors() / 2)
|
||||
);
|
||||
private static final ExecutorService ASYNC_EXECUTOR;
|
||||
|
||||
static {
|
||||
ASYNC_EXECUTOR = new ForkJoinPool(
|
||||
Math.max(1, Runtime.getRuntime().availableProcessors() / 2),
|
||||
ForkJoinPool.defaultForkJoinWorkerThreadFactory,
|
||||
(t, e) -> LOGGER.error("Uncaught exception in async executor thread", e),
|
||||
true
|
||||
) {
|
||||
@Override
|
||||
public List<Runnable> shutdownNow() {
|
||||
// Ensure all threads are interrupted when shutting down
|
||||
List<Runnable> tasks = super.shutdownNow();
|
||||
// Force an interrupt on all threads in the pool
|
||||
for (Thread worker : getActiveWorkers()) {
|
||||
worker.interrupt();
|
||||
}
|
||||
return tasks;
|
||||
}
|
||||
|
||||
// Helper method to get active worker threads
|
||||
private java.util.Set<Thread> getActiveWorkers() {
|
||||
java.util.Set<Thread> workers = java.util.concurrent.ConcurrentHashMap.newKeySet();
|
||||
for (Thread thread : Thread.getAllStackTraces().keySet()) {
|
||||
if (thread.getName().startsWith("ForkJoinPool") && thread.isAlive()) {
|
||||
workers.add(thread);
|
||||
}
|
||||
}
|
||||
return workers;
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
/**
|
||||
* Gets the async executor service.
|
||||
*
|
||||
*
|
||||
* @return The async executor service.
|
||||
*/
|
||||
public static ExecutorService getAsyncExecutor() {
|
||||
@@ -49,7 +79,7 @@ public final class AsyncUtils {
|
||||
LOGGER.info("Starting async task: {}", description);
|
||||
logStackTrace("Async task stack trace");
|
||||
}
|
||||
|
||||
|
||||
return CompletableFuture.supplyAsync(() -> {
|
||||
try {
|
||||
return task.get();
|
||||
@@ -97,7 +127,7 @@ public final class AsyncUtils {
|
||||
|
||||
/**
|
||||
* Schedules a task to run on the server thread.
|
||||
*
|
||||
*
|
||||
* @param task The task to run on the server thread.
|
||||
* @param <T> The return type of the task.
|
||||
* @return A CompletableFuture that completes with the result of the task.
|
||||
@@ -107,14 +137,14 @@ public final class AsyncUtils {
|
||||
if (server == null) {
|
||||
return CompletableFuture.failedFuture(new IllegalStateException("No server available"));
|
||||
}
|
||||
|
||||
|
||||
final CompletableFuture<T> future = new CompletableFuture<>();
|
||||
|
||||
|
||||
server.execute(() -> {
|
||||
if (AsyncConfig.SERVER.enableSuperDebug.get()) {
|
||||
LOGGER.debug("Executing task on server thread");
|
||||
}
|
||||
|
||||
|
||||
try {
|
||||
future.complete(task.get());
|
||||
} catch (final Throwable t) {
|
||||
@@ -122,13 +152,13 @@ public final class AsyncUtils {
|
||||
future.completeExceptionally(t);
|
||||
}
|
||||
});
|
||||
|
||||
|
||||
return future;
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Schedules a task to run on the server thread.
|
||||
*
|
||||
*
|
||||
* @param task The task to run on the server thread.
|
||||
* @return A CompletableFuture that completes when the task is done.
|
||||
*/
|
||||
@@ -138,10 +168,10 @@ public final class AsyncUtils {
|
||||
return null;
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Gets the server's thread pool executor if available.
|
||||
*
|
||||
*
|
||||
* @return The server's thread pool executor, or null if not available.
|
||||
*/
|
||||
@Nullable
|
||||
@@ -149,25 +179,56 @@ public final class AsyncUtils {
|
||||
final MinecraftServer server = ForgeEventHandlers.getCurrentServer();
|
||||
return server != null ? server : null;
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Shuts down the async executor. Should be called when the game is shutting down.
|
||||
* Uses a shorter timeout and more aggressive cancellation to speed up shutdown.
|
||||
*/
|
||||
public static void shutdown() {
|
||||
if (AsyncConfig.SERVER.enableSuperDebug.get()) {
|
||||
LOGGER.info("Shutting down async executor");
|
||||
if (ASYNC_EXECUTOR.isShutdown()) {
|
||||
return; // Already shut down
|
||||
}
|
||||
|
||||
|
||||
boolean debug = false;
|
||||
try {
|
||||
// Safely check debug config if available
|
||||
debug = AsyncConfig.SERVER != null && AsyncConfig.SERVER.enableSuperDebug.get();
|
||||
} catch (IllegalStateException ignored) {
|
||||
// Config system might be shutting down, continue with debug disabled
|
||||
}
|
||||
|
||||
if (debug) {
|
||||
LOGGER.info("Initiating async executor shutdown...");
|
||||
}
|
||||
|
||||
// Disable new tasks from being submitted
|
||||
ASYNC_EXECUTOR.shutdown();
|
||||
|
||||
|
||||
try {
|
||||
// Wait a short time for tasks to complete
|
||||
if (!ASYNC_EXECUTOR.awaitTermination(5, TimeUnit.SECONDS)) {
|
||||
LOGGER.warn("Async executor did not shut down gracefully, forcing shutdown");
|
||||
ASYNC_EXECUTOR.shutdownNow();
|
||||
if (!ASYNC_EXECUTOR.awaitTermination(1, TimeUnit.SECONDS)) {
|
||||
if (debug) {
|
||||
LOGGER.warn("Async executor did not shut down within timeout, forcing immediate shutdown");
|
||||
} else {
|
||||
LOGGER.warn("Async executor did not shut down within timeout, forcing immediate shutdown");
|
||||
}
|
||||
|
||||
// Cancel currently executing tasks
|
||||
final var runningTasks = ASYNC_EXECUTOR.shutdownNow();
|
||||
|
||||
if (debug && !runningTasks.isEmpty()) {
|
||||
LOGGER.warn("Cancelled {} running tasks", runningTasks.size());
|
||||
}
|
||||
|
||||
// Wait a bit more for tasks to respond to being cancelled
|
||||
if (!ASYNC_EXECUTOR.awaitTermination(100, TimeUnit.MILLISECONDS)) {
|
||||
LOGGER.warn("Some tasks did not respond to cancellation");
|
||||
}
|
||||
}
|
||||
} catch (final InterruptedException e) {
|
||||
LOGGER.warn("Interrupted while waiting for async executor to shut down", e);
|
||||
LOGGER.warn("Interrupted while waiting for async executor to shut down, forcing immediate shutdown", e);
|
||||
// Try one last time with extreme prejudice
|
||||
ASYNC_EXECUTOR.shutdownNow();
|
||||
Thread.currentThread().interrupt();
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user