Fixed Async
This commit is contained in:
@@ -12,6 +12,7 @@ import li.cil.oc2.common.Constants;
|
||||
import li.cil.oc2.common.bus.device.util.IdentityProxy;
|
||||
import li.cil.oc2.common.bus.device.util.OptionalAddress;
|
||||
import li.cil.oc2.common.bus.device.util.OptionalInterrupt;
|
||||
import li.cil.oc2.common.config.AsyncConfig;
|
||||
import li.cil.oc2.common.serialization.BlobStorage;
|
||||
import li.cil.oc2.common.serialization.NBTSerialization;
|
||||
import li.cil.oc2.common.util.Event;
|
||||
@@ -101,7 +102,14 @@ public abstract class AbstractBlockStorageDevice<TBlock extends BlockDevice, TId
|
||||
closeDevice();
|
||||
|
||||
if (blobHandle != null) {
|
||||
BlobStorage.close(blobHandle);
|
||||
if (AsyncConfig.SERVER.asyncStorageOperations.get()) {
|
||||
BlobStorage.closeAsync(blobHandle).exceptionally(e -> {
|
||||
LOGGER.error("Error closing blob asynchronously: " + blobHandle, e);
|
||||
return null;
|
||||
});
|
||||
} else {
|
||||
BlobStorage.close(blobHandle);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -170,7 +178,14 @@ public abstract class AbstractBlockStorageDevice<TBlock extends BlockDevice, TId
|
||||
public static void unmount(final CompoundTag tag) {
|
||||
if (tag.hasUUID(BLOB_HANDLE_TAG_NAME)) {
|
||||
final UUID blobHandle = tag.getUUID(BLOB_HANDLE_TAG_NAME);
|
||||
BlobStorage.close(blobHandle);
|
||||
if (AsyncConfig.SERVER.asyncStorageOperations.get()) {
|
||||
BlobStorage.closeAsync(blobHandle).exceptionally(e -> {
|
||||
LOGGER.error("Error closing blob asynchronously during unmount: " + blobHandle, e);
|
||||
return null;
|
||||
});
|
||||
} else {
|
||||
BlobStorage.close(blobHandle);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -2,19 +2,24 @@
|
||||
|
||||
package li.cil.oc2.common.bus.device.vm.item;
|
||||
|
||||
import li.cil.oc2.common.config.AsyncConfig;
|
||||
import li.cil.oc2.common.serialization.BlobStorage;
|
||||
import li.cil.oc2.common.util.BlockLocation;
|
||||
import li.cil.oc2.common.util.SoundEvents;
|
||||
import li.cil.oc2.common.util.ThrottledSoundEmitter;
|
||||
import li.cil.sedna.device.block.ByteBufferBlockDevice;
|
||||
import net.minecraft.world.item.ItemStack;
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.nio.MappedByteBuffer;
|
||||
import java.nio.channels.FileChannel;
|
||||
import java.nio.channels.FileChannel.MapMode;
|
||||
import java.time.Duration;
|
||||
import java.util.Optional;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.concurrent.CompletionException;
|
||||
import java.util.function.Supplier;
|
||||
|
||||
public class HardDriveDevice extends AbstractBlockStorageDevice<ByteBufferBlockDevice, ItemStack> {
|
||||
@@ -32,19 +37,38 @@ public class HardDriveDevice extends AbstractBlockStorageDevice<ByteBufferBlockD
|
||||
|
||||
///////////////////////////////////////////////////////////////////
|
||||
|
||||
private static final Logger LOGGER = LogManager.getLogger();
|
||||
|
||||
@Override
|
||||
protected CompletableFuture<ByteBufferBlockDevice> createBlockDevice() {
|
||||
blobHandle = BlobStorage.validateHandle(blobHandle);
|
||||
|
||||
return CompletableFuture.supplyAsync(() -> {
|
||||
try {
|
||||
final FileChannel channel = BlobStorage.getOrOpen(blobHandle);
|
||||
final MappedByteBuffer buffer = channel.map(FileChannel.MapMode.READ_WRITE, 0, size);
|
||||
return ByteBufferBlockDevice.wrap(buffer, readonly);
|
||||
} catch (final IOException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}, WORKERS);
|
||||
|
||||
if (AsyncConfig.SERVER.asyncStorageOperations.get()) {
|
||||
return BlobStorage.getOrOpenAsync(blobHandle)
|
||||
.thenApplyAsync(channel -> {
|
||||
try {
|
||||
if (AsyncConfig.SERVER.enableSuperDebug.get()) {
|
||||
LOGGER.debug("Mapping buffer for blob: {}", blobHandle);
|
||||
}
|
||||
final MappedByteBuffer buffer = channel.map(MapMode.READ_WRITE, 0, size);
|
||||
return ByteBufferBlockDevice.wrap(buffer, readonly);
|
||||
} catch (final IOException e) {
|
||||
LOGGER.error("Failed to map buffer for blob: " + blobHandle, e);
|
||||
throw new CompletionException(e);
|
||||
}
|
||||
}, WORKERS);
|
||||
} else {
|
||||
return CompletableFuture.supplyAsync(() -> {
|
||||
try {
|
||||
final FileChannel channel = BlobStorage.getOrOpen(blobHandle);
|
||||
final MappedByteBuffer buffer = channel.map(MapMode.READ_WRITE, 0, size);
|
||||
return ByteBufferBlockDevice.wrap(buffer, readonly);
|
||||
} catch (final IOException e) {
|
||||
LOGGER.error("Failed to create block device", e);
|
||||
throw new CompletionException("Failed to create block device", e);
|
||||
}
|
||||
}, WORKERS);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
|
||||
44
src/main/java/li/cil/oc2/common/config/AsyncConfig.java
Normal file
44
src/main/java/li/cil/oc2/common/config/AsyncConfig.java
Normal file
@@ -0,0 +1,44 @@
|
||||
package li.cil.oc2.common.config;
|
||||
|
||||
import net.minecraftforge.common.ForgeConfigSpec;
|
||||
import org.apache.commons.lang3.tuple.Pair;
|
||||
|
||||
public final class AsyncConfig {
|
||||
public static final ServerConfig SERVER;
|
||||
public static final ForgeConfigSpec SERVER_SPEC;
|
||||
|
||||
static {
|
||||
final Pair<ServerConfig, ForgeConfigSpec> specPair = new ForgeConfigSpec.Builder().configure(ServerConfig::new);
|
||||
SERVER_SPEC = specPair.getRight();
|
||||
SERVER = specPair.getLeft();
|
||||
}
|
||||
|
||||
public static class ServerConfig {
|
||||
public final ForgeConfigSpec.BooleanValue enableSuperDebug;
|
||||
public final ForgeConfigSpec.BooleanValue runAsyncTests;
|
||||
public final ForgeConfigSpec.BooleanValue asyncStorageOperations;
|
||||
|
||||
ServerConfig(final ForgeConfigSpec.Builder builder) {
|
||||
builder.push("async");
|
||||
|
||||
enableSuperDebug = builder
|
||||
.comment("Enable super debug mode for async operations. This will log stack traces for all async operations.")
|
||||
.translation("config.oc2.async.super_debug")
|
||||
.define("superDebug", false);
|
||||
|
||||
runAsyncTests = builder
|
||||
.comment("Run async operation tests during server startup. Enable this to verify async functionality.")
|
||||
.define("runAsyncTests", true);
|
||||
|
||||
asyncStorageOperations = builder
|
||||
.comment("Enables asynchronous storage operations for better performance")
|
||||
.translation("config.oc2.async.storage_operations")
|
||||
.define("storageOperations", true);
|
||||
|
||||
builder.pop();
|
||||
}
|
||||
}
|
||||
|
||||
// Prevent instantiation
|
||||
private AsyncConfig() {}
|
||||
}
|
||||
@@ -0,0 +1,69 @@
|
||||
package li.cil.oc2.common.event;
|
||||
|
||||
import li.cil.oc2.api.API;
|
||||
import li.cil.oc2.common.config.AsyncConfig;
|
||||
import li.cil.oc2.common.util.AsyncTestUtils;
|
||||
import li.cil.oc2.common.util.AsyncUtils;
|
||||
import net.minecraft.server.MinecraftServer;
|
||||
import net.minecraftforge.event.server.ServerAboutToStartEvent;
|
||||
import net.minecraftforge.event.server.ServerStoppedEvent;
|
||||
import net.minecraftforge.eventbus.api.SubscribeEvent;
|
||||
import net.minecraftforge.fml.common.Mod;
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
|
||||
import javax.annotation.Nullable;
|
||||
|
||||
/**
|
||||
* Handles Forge lifecycle events to ensure proper initialization and cleanup of async operations.
|
||||
*/
|
||||
@Mod.EventBusSubscriber(modid = API.MOD_ID, bus = Mod.EventBusSubscriber.Bus.FORGE)
|
||||
public final class ForgeEventHandlers {
|
||||
private static final Logger LOGGER = LogManager.getLogger();
|
||||
private static MinecraftServer server;
|
||||
|
||||
/**
|
||||
* Get the current Minecraft server instance.
|
||||
*
|
||||
* @return The current Minecraft server instance, or null if not available.
|
||||
*/
|
||||
@Nullable
|
||||
public static MinecraftServer getCurrentServer() {
|
||||
return server;
|
||||
}
|
||||
|
||||
@SubscribeEvent
|
||||
public static void handleServerAboutToStart(final ServerAboutToStartEvent event) {
|
||||
server = event.getServer();
|
||||
LOGGER.info("Server starting, initializing async components");
|
||||
|
||||
// Run async tests if enabled
|
||||
if (AsyncConfig.SERVER.runAsyncTests.get()) {
|
||||
LOGGER.info("Running async operation tests...");
|
||||
AsyncTestUtils.verifyAsyncOperations()
|
||||
.thenAccept(uuid -> {
|
||||
if (uuid != null) {
|
||||
LOGGER.debug("Async test completed with UUID: {}", uuid);
|
||||
} else {
|
||||
LOGGER.debug("Async test completed");
|
||||
}
|
||||
})
|
||||
.exceptionally(e -> {
|
||||
LOGGER.error("Async test failed", e);
|
||||
return null;
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
@SubscribeEvent
|
||||
public static void handleServerStopped(final ServerStoppedEvent event) {
|
||||
LOGGER.info("Server stopped, cleaning up async components");
|
||||
try {
|
||||
AsyncUtils.shutdown();
|
||||
} catch (final Exception e) {
|
||||
LOGGER.error("Error during async component shutdown", e);
|
||||
} finally {
|
||||
server = null;
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -3,12 +3,13 @@
|
||||
package li.cil.oc2.common.serialization;
|
||||
|
||||
import li.cil.oc2.api.API;
|
||||
import li.cil.oc2.common.config.AsyncConfig;
|
||||
import li.cil.oc2.common.util.AsyncUtils;
|
||||
import net.minecraft.server.MinecraftServer;
|
||||
import net.minecraft.world.level.storage.LevelResource;
|
||||
import net.minecraftforge.event.server.ServerAboutToStartEvent;
|
||||
import net.minecraftforge.event.server.ServerStoppedEvent;
|
||||
import net.minecraftforge.eventbus.api.SubscribeEvent;
|
||||
import net.minecraftforge.fml.common.Mod;
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
|
||||
@@ -18,10 +19,13 @@ import java.io.RandomAccessFile;
|
||||
import java.nio.channels.FileChannel;
|
||||
import java.nio.file.Files;
|
||||
import java.nio.file.Path;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.UUID;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.concurrent.CompletionException;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
|
||||
import net.minecraftforge.fml.common.Mod;
|
||||
|
||||
/**
|
||||
* This class facilitates storing binary chunks of data in an efficient, parallelized fashion.
|
||||
@@ -33,12 +37,31 @@ public final class BlobStorage {
|
||||
///////////////////////////////////////////////////////////////////
|
||||
|
||||
private static final LevelResource BLOBS_FOLDER_NAME = new LevelResource(API.MOD_ID + "-blobs");
|
||||
private static final Map<UUID, FileChannel> BLOBS = new HashMap<>();
|
||||
private static final Map<UUID, FileChannel> BLOBS = new ConcurrentHashMap<>();
|
||||
private static final Map<UUID, CompletableFuture<FileChannel>> PENDING_OPERATIONS = new ConcurrentHashMap<>();
|
||||
|
||||
private static Path dataDirectory; // Directory blobs get saved to.
|
||||
private static volatile Path dataDirectory; // Directory blobs get saved to.
|
||||
|
||||
///////////////////////////////////////////////////////////////////
|
||||
|
||||
static {
|
||||
// Register shutdown hook to ensure resources are cleaned up
|
||||
Runtime.getRuntime().addShutdownHook(new Thread(BlobStorage::close, "OC2 BlobStorage Shutdown"));
|
||||
}
|
||||
|
||||
/**
|
||||
* Gets the file system path for a blob with the given handle.
|
||||
*
|
||||
* @param handle the handle of the blob.
|
||||
* @return the path to the blob file.
|
||||
*/
|
||||
private static Path getBlobPath(final UUID handle) {
|
||||
if (dataDirectory == null) {
|
||||
throw new IllegalStateException("Data directory not initialized. Server not set?");
|
||||
}
|
||||
return dataDirectory.resolve(handle.toString());
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets the currently running server.
|
||||
* <p>
|
||||
@@ -49,27 +72,46 @@ public final class BlobStorage {
|
||||
* @param server the currently active server.
|
||||
*/
|
||||
public static void setServer(final MinecraftServer server) {
|
||||
dataDirectory = server.getWorldPath(BLOBS_FOLDER_NAME);
|
||||
try {
|
||||
Files.createDirectories(dataDirectory);
|
||||
} catch (final IOException e) {
|
||||
LOGGER.error(e);
|
||||
final Path newDataDir = server.getWorldPath(BLOBS_FOLDER_NAME);
|
||||
if (!newDataDir.equals(dataDirectory)) {
|
||||
// Close all open handles if the directory changes
|
||||
close();
|
||||
dataDirectory = newDataDir;
|
||||
|
||||
AsyncUtils.runAsync(() -> {
|
||||
try {
|
||||
Files.createDirectories(dataDirectory);
|
||||
LOGGER.info("Blob storage directory initialized at: {}", dataDirectory);
|
||||
} catch (final IOException e) {
|
||||
LOGGER.error("Failed to create blob storage directory", e);
|
||||
}
|
||||
}, "Initialize blob storage directory");
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Closes all currently open blobs.
|
||||
*/
|
||||
public static synchronized void close() {
|
||||
public static void close() {
|
||||
// Cancel all pending operations
|
||||
for (final CompletableFuture<FileChannel> future : PENDING_OPERATIONS.values()) {
|
||||
future.cancel(true);
|
||||
}
|
||||
PENDING_OPERATIONS.clear();
|
||||
|
||||
// Close all open channels
|
||||
for (final FileChannel blob : BLOBS.values()) {
|
||||
try {
|
||||
blob.close();
|
||||
} catch (final IOException e) {
|
||||
LOGGER.error(e);
|
||||
LOGGER.error("Error closing blob channel", e);
|
||||
}
|
||||
}
|
||||
|
||||
BLOBS.clear();
|
||||
|
||||
if (AsyncConfig.SERVER.enableSuperDebug.get()) {
|
||||
LOGGER.info("Closed all blob storage resources");
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -97,6 +139,37 @@ public final class BlobStorage {
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Get or opens a file channel for the blob with the specified handle asynchronously.
|
||||
*
|
||||
* @param handle the handle to obtain the file channel for.
|
||||
* @return a CompletableFuture that will complete with the file channel.
|
||||
*/
|
||||
public static CompletableFuture<FileChannel> getOrOpenAsync(final UUID handle) {
|
||||
// Check if already open
|
||||
final FileChannel existingChannel = BLOBS.get(handle);
|
||||
if (existingChannel != null && existingChannel.isOpen()) {
|
||||
return CompletableFuture.completedFuture(existingChannel);
|
||||
}
|
||||
|
||||
// Check if there's already a pending operation
|
||||
return PENDING_OPERATIONS.computeIfAbsent(handle, h -> {
|
||||
return AsyncUtils.runAsync(() -> {
|
||||
try {
|
||||
final Path path = dataDirectory.resolve(h.toString());
|
||||
final FileChannel channel = new RandomAccessFile(path.toFile(), "rw").getChannel();
|
||||
BLOBS.put(h, channel);
|
||||
return channel;
|
||||
} catch (final IOException e) {
|
||||
LOGGER.error("Failed to open blob: " + h, e);
|
||||
throw new CompletionException("Failed to open blob: " + h, e);
|
||||
} finally {
|
||||
PENDING_OPERATIONS.remove(h);
|
||||
}
|
||||
}, "Open blob " + h);
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Get or opens a file channel for the blob with the specified handle.
|
||||
* <p>
|
||||
@@ -105,51 +178,93 @@ public final class BlobStorage {
|
||||
* @param handle the handle to obtain the file channel for.
|
||||
* @return the file channel for the requested blob.
|
||||
* @throws IOException if opening the blob fails.
|
||||
* @deprecated Use {@link #getOrOpenAsync(UUID)} for non-blocking access. This method will be removed in 1.21.1.
|
||||
*/
|
||||
@Deprecated(since = "1.21.1", forRemoval = true)
|
||||
public static synchronized FileChannel getOrOpen(final UUID handle) throws IOException {
|
||||
FileChannel blob = BLOBS.get(handle);
|
||||
if (blob != null && blob.isOpen()) {
|
||||
return blob;
|
||||
try {
|
||||
return getOrOpenAsync(handle).join();
|
||||
} catch (final CompletionException e) {
|
||||
if (e.getCause() instanceof IOException) {
|
||||
throw (IOException) e.getCause();
|
||||
}
|
||||
throw new IOException("Failed to open blob: " + handle, e);
|
||||
}
|
||||
|
||||
final Path path = dataDirectory.resolve(handle.toString());
|
||||
blob = new RandomAccessFile(path.toFile(), "rw").getChannel();
|
||||
BLOBS.put(handle, blob);
|
||||
return blob;
|
||||
}
|
||||
|
||||
/**
|
||||
* Closes the blob with the specified handle asynchronously.
|
||||
*
|
||||
* @param handle the handle of the blob to close.
|
||||
* @return a CompletableFuture that completes when the blob is closed.
|
||||
*/
|
||||
public static CompletableFuture<Void> closeAsync(final UUID handle) {
|
||||
return AsyncUtils.runAsync(() -> {
|
||||
try {
|
||||
final FileChannel blob = BLOBS.remove(handle);
|
||||
if (blob != null) {
|
||||
blob.close();
|
||||
if (AsyncConfig.SERVER.enableSuperDebug.get()) {
|
||||
LOGGER.debug("Closed blob: {}", handle);
|
||||
}
|
||||
}
|
||||
} catch (final IOException e) {
|
||||
LOGGER.error("Error closing blob: " + handle, e);
|
||||
throw new CompletionException(e);
|
||||
}
|
||||
}, "Close blob " + handle);
|
||||
}
|
||||
|
||||
/**
|
||||
* Closes the blob with the specified handle.
|
||||
*
|
||||
* @param handle the handle of the blob to close.
|
||||
* @deprecated Use {@link #closeAsync(UUID)} for non-blocking operation. This method will be removed in 1.21.1.
|
||||
*/
|
||||
@Deprecated(since = "1.21.1", forRemoval = true)
|
||||
public static synchronized void close(final UUID handle) {
|
||||
try {
|
||||
final FileChannel blob = BLOBS.remove(handle);
|
||||
if (blob != null) {
|
||||
blob.close();
|
||||
}
|
||||
} catch (final IOException e) {
|
||||
LOGGER.error(e);
|
||||
closeAsync(handle).join();
|
||||
} catch (final CompletionException e) {
|
||||
LOGGER.error("Error in close operation for blob: " + handle, e);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Deletes the blob with the specified handle asynchronously.
|
||||
*
|
||||
* @param handle the handle of the blob to delete.
|
||||
* @return a CompletableFuture that completes when the blob is deleted.
|
||||
*/
|
||||
public static CompletableFuture<Void> deleteAsync(final UUID handle) {
|
||||
return AsyncUtils.runAsync(() -> {
|
||||
final Path path = getBlobPath(handle);
|
||||
try {
|
||||
final boolean deleted = Files.deleteIfExists(path);
|
||||
if (deleted && AsyncConfig.SERVER.enableSuperDebug.get()) {
|
||||
LOGGER.debug("Deleted blob file: {}", path);
|
||||
}
|
||||
} catch (final IOException e) {
|
||||
LOGGER.error("Error deleting blob file: " + path, e);
|
||||
throw new CompletionException(e);
|
||||
}
|
||||
return null;
|
||||
}, "Deleting blob " + handle);
|
||||
}
|
||||
|
||||
/**
|
||||
* Deletes the blob with the specified handle.
|
||||
*
|
||||
* @param handle the handle of the blob to delete.
|
||||
* @deprecated Use {@link #deleteAsync(UUID)} for non-blocking operation. This method will be removed in 1.21.1.
|
||||
*/
|
||||
@Deprecated(since = "1.21.1", forRemoval = true)
|
||||
public static void delete(final UUID handle) {
|
||||
close(handle);
|
||||
|
||||
final Path path = dataDirectory.resolve(handle.toString());
|
||||
CompletableFuture.runAsync(() -> {
|
||||
try {
|
||||
Files.deleteIfExists(path);
|
||||
} catch (final Throwable e) {
|
||||
LOGGER.error(e);
|
||||
}
|
||||
});
|
||||
try {
|
||||
deleteAsync(handle).join();
|
||||
} catch (final CompletionException e) {
|
||||
LOGGER.error("Error in delete operation for blob: " + handle, e);
|
||||
}
|
||||
}
|
||||
|
||||
///////////////////////////////////////////////////////////////////
|
||||
|
||||
100
src/main/java/li/cil/oc2/common/util/AsyncTestUtils.java
Normal file
100
src/main/java/li/cil/oc2/common/util/AsyncTestUtils.java
Normal file
@@ -0,0 +1,100 @@
|
||||
package li.cil.oc2.common.util;
|
||||
|
||||
import li.cil.oc2.common.config.AsyncConfig;
|
||||
import li.cil.oc2.common.event.ForgeEventHandlers;
|
||||
import net.minecraft.server.MinecraftServer;
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
|
||||
import java.util.UUID;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.function.BooleanSupplier;
|
||||
|
||||
/**
|
||||
* Utility class for testing async functionality.
|
||||
*/
|
||||
public final class AsyncTestUtils {
|
||||
private static final Logger LOGGER = LogManager.getLogger();
|
||||
private static final int TEST_TIMEOUT_MS = 5000;
|
||||
|
||||
/**
|
||||
* Waits for a condition to become true, with a timeout.
|
||||
*
|
||||
* @param condition The condition to wait for.
|
||||
* @param timeoutMs The maximum time to wait in milliseconds.
|
||||
* @return true if the condition became true within the timeout, false otherwise.
|
||||
*/
|
||||
public static boolean waitForCondition(BooleanSupplier condition, long timeoutMs) {
|
||||
final long startTime = System.currentTimeMillis();
|
||||
while (!condition.getAsBoolean()) {
|
||||
if (System.currentTimeMillis() - startTime > timeoutMs) {
|
||||
return false;
|
||||
}
|
||||
try {
|
||||
Thread.sleep(10);
|
||||
} catch (InterruptedException e) {
|
||||
Thread.currentThread().interrupt();
|
||||
return false;
|
||||
}
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
/**
|
||||
* Verifies that async operations are working correctly.
|
||||
*
|
||||
* @return A future that completes with a test UUID when verification is done.
|
||||
*/
|
||||
public static CompletableFuture<UUID> verifyAsyncOperations() {
|
||||
if (!AsyncConfig.SERVER.asyncStorageOperations.get()) {
|
||||
return CompletableFuture.completedFuture(null);
|
||||
}
|
||||
|
||||
LOGGER.info("Verifying async operations...");
|
||||
|
||||
// Test basic async execution
|
||||
return CompletableFuture.supplyAsync(() -> {
|
||||
if (AsyncConfig.SERVER.enableSuperDebug.get()) {
|
||||
LOGGER.debug("Async test operation running on thread: {}", Thread.currentThread().getName());
|
||||
}
|
||||
|
||||
// Add a small delay to ensure async behavior
|
||||
try {
|
||||
Thread.sleep(100);
|
||||
return null; // Return value from the supplier
|
||||
} catch (InterruptedException e) {
|
||||
Thread.currentThread().interrupt();
|
||||
throw new RuntimeException("Async test interrupted", e);
|
||||
}
|
||||
}, AsyncUtils.getAsyncExecutor())
|
||||
.thenCompose(v -> {
|
||||
// Verify we can switch back to server thread
|
||||
return AsyncUtils.onServerThread(() -> {
|
||||
MinecraftServer server = ForgeEventHandlers.getCurrentServer();
|
||||
if (server == null) {
|
||||
throw new IllegalStateException("Server not available during async test");
|
||||
}
|
||||
|
||||
if (AsyncConfig.SERVER.enableSuperDebug.get()) {
|
||||
LOGGER.debug("Successfully switched back to server thread");
|
||||
}
|
||||
|
||||
// Generate a test UUID for storage testing
|
||||
UUID testId = UUID.randomUUID();
|
||||
if (AsyncConfig.SERVER.enableSuperDebug.get()) {
|
||||
LOGGER.debug("Generated test UUID: {}", testId);
|
||||
}
|
||||
|
||||
return testId;
|
||||
});
|
||||
})
|
||||
.whenComplete((result, throwable) -> {
|
||||
if (throwable != null) {
|
||||
LOGGER.error("Async test failed", throwable);
|
||||
} else {
|
||||
LOGGER.info("Async operations verified successfully");
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
174
src/main/java/li/cil/oc2/common/util/AsyncUtils.java
Normal file
174
src/main/java/li/cil/oc2/common/util/AsyncUtils.java
Normal file
@@ -0,0 +1,174 @@
|
||||
package li.cil.oc2.common.util;
|
||||
|
||||
import li.cil.oc2.common.config.AsyncConfig;
|
||||
import li.cil.oc2.common.event.ForgeEventHandlers;
|
||||
import net.minecraft.server.MinecraftServer;
|
||||
import net.minecraft.server.level.ServerLevel;
|
||||
import net.minecraftforge.fml.util.thread.EffectiveSide;
|
||||
import net.minecraftforge.server.ServerLifecycleHooks;
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
|
||||
import javax.annotation.Nullable;
|
||||
import java.util.concurrent.*;
|
||||
import java.util.function.Consumer;
|
||||
import java.util.function.Supplier;
|
||||
|
||||
/**
|
||||
* Utility class for handling asynchronous operations with proper error handling and debugging.
|
||||
*/
|
||||
public final class AsyncUtils {
|
||||
private static final Logger LOGGER = LogManager.getLogger();
|
||||
|
||||
// Use a dedicated executor for async operations to avoid blocking the main server thread
|
||||
private static final ExecutorService ASYNC_EXECUTOR = Executors.newWorkStealingPool(
|
||||
Math.max(1, Runtime.getRuntime().availableProcessors() / 2)
|
||||
);
|
||||
|
||||
/**
|
||||
* Gets the async executor service.
|
||||
*
|
||||
* @return The async executor service.
|
||||
*/
|
||||
public static ExecutorService getAsyncExecutor() {
|
||||
return ASYNC_EXECUTOR;
|
||||
}
|
||||
|
||||
// Prevent instantiation
|
||||
private AsyncUtils() {}
|
||||
|
||||
/**
|
||||
* Runs a task asynchronously with proper error handling and debug logging.
|
||||
*
|
||||
* @param task the task to run
|
||||
* @param description a description of the task for logging purposes
|
||||
* @return a CompletableFuture that will complete when the task finishes
|
||||
*/
|
||||
public static <T> CompletableFuture<T> runAsync(Supplier<T> task, String description) {
|
||||
if (AsyncConfig.SERVER.enableSuperDebug.get()) {
|
||||
LOGGER.info("Starting async task: {}", description);
|
||||
logStackTrace("Async task stack trace");
|
||||
}
|
||||
|
||||
return CompletableFuture.supplyAsync(() -> {
|
||||
try {
|
||||
return task.get();
|
||||
} catch (Throwable t) {
|
||||
LOGGER.error("Error in async task: " + description, t);
|
||||
throw t;
|
||||
} finally {
|
||||
if (AsyncConfig.SERVER.enableSuperDebug.get()) {
|
||||
LOGGER.info("Completed async task: {}", description);
|
||||
}
|
||||
}
|
||||
}, ASYNC_EXECUTOR);
|
||||
}
|
||||
|
||||
/**
|
||||
* Runs a task asynchronously with proper error handling and debug logging.
|
||||
*
|
||||
* @param task the task to run
|
||||
* @param description a description of the task for logging purposes
|
||||
* @return a CompletableFuture that will complete when the task finishes
|
||||
*/
|
||||
public static CompletableFuture<Void> runAsync(Runnable task, String description) {
|
||||
return runAsync(() -> {
|
||||
task.run();
|
||||
return null;
|
||||
}, description);
|
||||
}
|
||||
|
||||
/**
|
||||
* Logs the current stack trace if super debug mode is enabled.
|
||||
*
|
||||
* @param message the message to log with the stack trace
|
||||
*/
|
||||
public static void logStackTrace(String message) {
|
||||
if (AsyncConfig.SERVER.enableSuperDebug.get()) {
|
||||
StackTraceElement[] stackTrace = Thread.currentThread().getStackTrace();
|
||||
StringBuilder sb = new StringBuilder(message).append("\n");
|
||||
// Skip the first 2 elements (getStackTrace and this method)
|
||||
for (int i = 2; i < Math.min(stackTrace.length, 10); i++) {
|
||||
sb.append(" at ").append(stackTrace[i]).append("\n");
|
||||
}
|
||||
LOGGER.debug(sb.toString());
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Schedules a task to run on the server thread.
|
||||
*
|
||||
* @param task The task to run on the server thread.
|
||||
* @param <T> The return type of the task.
|
||||
* @return A CompletableFuture that completes with the result of the task.
|
||||
*/
|
||||
public static <T> CompletableFuture<T> onServerThread(Supplier<T> task) {
|
||||
final MinecraftServer server = ForgeEventHandlers.getCurrentServer();
|
||||
if (server == null) {
|
||||
return CompletableFuture.failedFuture(new IllegalStateException("No server available"));
|
||||
}
|
||||
|
||||
final CompletableFuture<T> future = new CompletableFuture<>();
|
||||
|
||||
server.execute(() -> {
|
||||
if (AsyncConfig.SERVER.enableSuperDebug.get()) {
|
||||
LOGGER.debug("Executing task on server thread");
|
||||
}
|
||||
|
||||
try {
|
||||
future.complete(task.get());
|
||||
} catch (final Throwable t) {
|
||||
LOGGER.error("Error in server thread task", t);
|
||||
future.completeExceptionally(t);
|
||||
}
|
||||
});
|
||||
|
||||
return future;
|
||||
}
|
||||
|
||||
/**
|
||||
* Schedules a task to run on the server thread.
|
||||
*
|
||||
* @param task The task to run on the server thread.
|
||||
* @return A CompletableFuture that completes when the task is done.
|
||||
*/
|
||||
public static CompletableFuture<Void> onServerThread(Runnable task) {
|
||||
return onServerThread(() -> {
|
||||
task.run();
|
||||
return null;
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Gets the server's thread pool executor if available.
|
||||
*
|
||||
* @return The server's thread pool executor, or null if not available.
|
||||
*/
|
||||
@Nullable
|
||||
public static Executor getServerExecutor() {
|
||||
final MinecraftServer server = ForgeEventHandlers.getCurrentServer();
|
||||
return server != null ? server : null;
|
||||
}
|
||||
|
||||
/**
|
||||
* Shuts down the async executor. Should be called when the game is shutting down.
|
||||
*/
|
||||
public static void shutdown() {
|
||||
if (AsyncConfig.SERVER.enableSuperDebug.get()) {
|
||||
LOGGER.info("Shutting down async executor");
|
||||
}
|
||||
|
||||
ASYNC_EXECUTOR.shutdown();
|
||||
|
||||
try {
|
||||
// Wait a short time for tasks to complete
|
||||
if (!ASYNC_EXECUTOR.awaitTermination(5, TimeUnit.SECONDS)) {
|
||||
LOGGER.warn("Async executor did not shut down gracefully, forcing shutdown");
|
||||
ASYNC_EXECUTOR.shutdownNow();
|
||||
}
|
||||
} catch (final InterruptedException e) {
|
||||
LOGGER.warn("Interrupted while waiting for async executor to shut down", e);
|
||||
Thread.currentThread().interrupt();
|
||||
}
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user