From b4b2fe1d0e0cae63781e1707e27dfcb559c102cb Mon Sep 17 00:00:00 2001 From: Robert MacRae Date: Wed, 3 Sep 2025 13:49:40 -0300 Subject: [PATCH] fixes for #83, #81 and the memory leak --- .../bus/device/vm/block/DiskDriveDevice.java | 8 +- .../vm/block/FlashMemoryFlasherDevice.java | 8 +- .../bus/device/vm/block/MonitorDevice.java | 4 +- .../device/vm/block/PciCardCageDevice.java | 4 +- .../bus/device/vm/block/ProjectorDevice.java | 4 +- .../vm/item/AbstractBlockStorageDevice.java | 82 +++++++- .../vm/item/ByteBufferFlashStorageDevice.java | 5 +- .../bus/device/vm/item/HardDriveDevice.java | 16 +- .../bus/device/vm/item/MemoryDevice.java | 55 ++++- .../oc2/common/serialization/BlobStorage.java | 199 +++++++++++------- 10 files changed, 261 insertions(+), 124 deletions(-) diff --git a/src/main/java/li/cil/oc2/common/bus/device/vm/block/DiskDriveDevice.java b/src/main/java/li/cil/oc2/common/bus/device/vm/block/DiskDriveDevice.java index a8bfbf27..d686a9e4 100644 --- a/src/main/java/li/cil/oc2/common/bus/device/vm/block/DiskDriveDevice.java +++ b/src/main/java/li/cil/oc2/common/bus/device/vm/block/DiskDriveDevice.java @@ -92,11 +92,11 @@ public final class DiskDriveDevice e blobHandle = BlobStorage.validateHandle(blobHandle); return CompletableFuture.supplyAsync(() -> { try { - final FileChannel channel = BlobStorage.getOrOpen(blobHandle); - final MappedByteBuffer buffer = channel.map(FileChannel.MapMode.READ_WRITE, 0, capacity); + final MappedByteBuffer buffer = BlobStorage.getOrOpen(blobHandle); + buffer.limit(capacity); return ByteBufferBlockDevice.wrap(buffer, false); - } catch (final IOException e) { - throw new RuntimeException(e); + } catch (final Exception e) { + throw new RuntimeException("Failed to create block device", e); } }, WORKERS); } diff --git a/src/main/java/li/cil/oc2/common/bus/device/vm/block/FlashMemoryFlasherDevice.java b/src/main/java/li/cil/oc2/common/bus/device/vm/block/FlashMemoryFlasherDevice.java index 07e30b8f..f3a99cec 100644 --- a/src/main/java/li/cil/oc2/common/bus/device/vm/block/FlashMemoryFlasherDevice.java +++ b/src/main/java/li/cil/oc2/common/bus/device/vm/block/FlashMemoryFlasherDevice.java @@ -92,11 +92,11 @@ public final class FlashMemoryFlasherDevice { try { - final FileChannel channel = BlobStorage.getOrOpen(blobHandle); - final MappedByteBuffer buffer = channel.map(FileChannel.MapMode.READ_WRITE, 0, capacity); + final MappedByteBuffer buffer = BlobStorage.getOrOpen(blobHandle); + buffer.limit(capacity); return ByteBufferBlockDevice.wrap(buffer, false); - } catch (final IOException e) { - throw new RuntimeException(e); + } catch (final Exception e) { + throw new RuntimeException("Failed to create block device", e); } }, WORKERS); } diff --git a/src/main/java/li/cil/oc2/common/bus/device/vm/block/MonitorDevice.java b/src/main/java/li/cil/oc2/common/bus/device/vm/block/MonitorDevice.java index da75c96e..357cdd35 100644 --- a/src/main/java/li/cil/oc2/common/bus/device/vm/block/MonitorDevice.java +++ b/src/main/java/li/cil/oc2/common/bus/device/vm/block/MonitorDevice.java @@ -142,8 +142,8 @@ public final class MonitorDevice extends IdentityProxy implements V private SimpleFramebufferDevice createFrameBufferDevice() throws IOException { blobHandle = BlobStorage.validateHandle(blobHandle); - final FileChannel channel = BlobStorage.getOrOpen(blobHandle); - final MappedByteBuffer buffer = channel.map(FileChannel.MapMode.READ_WRITE, 0, WIDTH * HEIGHT * SimpleFramebufferDevice.STRIDE); + final MappedByteBuffer buffer = BlobStorage.getOrOpen(blobHandle); + buffer.limit(WIDTH * HEIGHT * SimpleFramebufferDevice.STRIDE); return new SimpleFramebufferDevice(WIDTH, HEIGHT, buffer); } } diff --git a/src/main/java/li/cil/oc2/common/bus/device/vm/block/PciCardCageDevice.java b/src/main/java/li/cil/oc2/common/bus/device/vm/block/PciCardCageDevice.java index 2bfb8050..8cd5b54d 100644 --- a/src/main/java/li/cil/oc2/common/bus/device/vm/block/PciCardCageDevice.java +++ b/src/main/java/li/cil/oc2/common/bus/device/vm/block/PciCardCageDevice.java @@ -132,8 +132,8 @@ public final class PciCardCageDevice extends IdentityProxy implemen private PciRootPortDevice createPciRootPortDevice() throws IOException { blobHandle = BlobStorage.validateHandle(blobHandle); - final FileChannel channel = BlobStorage.getOrOpen(blobHandle); - final MappedByteBuffer buffer = channel.map(FileChannel.MapMode.READ_WRITE, 0, WINDOW_SIZE * 2); + final MappedByteBuffer buffer = BlobStorage.getOrOpen(blobHandle); + buffer.limit(WINDOW_SIZE * 2); return new PciRootPortDevice(BUS_COUNT, WINDOW_SIZE, buffer); } } diff --git a/src/main/java/li/cil/oc2/common/bus/device/vm/block/ProjectorDevice.java b/src/main/java/li/cil/oc2/common/bus/device/vm/block/ProjectorDevice.java index 09379400..c1911984 100644 --- a/src/main/java/li/cil/oc2/common/bus/device/vm/block/ProjectorDevice.java +++ b/src/main/java/li/cil/oc2/common/bus/device/vm/block/ProjectorDevice.java @@ -142,8 +142,8 @@ public final class ProjectorDevice extends IdentityProxy implements private SimpleFramebufferDevice createFrameBufferDevice() throws IOException { blobHandle = BlobStorage.validateHandle(blobHandle); - final FileChannel channel = BlobStorage.getOrOpen(blobHandle); - final MappedByteBuffer buffer = channel.map(FileChannel.MapMode.READ_WRITE, 0, WIDTH * HEIGHT * SimpleFramebufferDevice.STRIDE); + final MappedByteBuffer buffer = BlobStorage.getOrOpen(blobHandle); + buffer.limit(WIDTH * HEIGHT * SimpleFramebufferDevice.STRIDE); return new SimpleFramebufferDevice(WIDTH, HEIGHT, buffer); } } diff --git a/src/main/java/li/cil/oc2/common/bus/device/vm/item/AbstractBlockStorageDevice.java b/src/main/java/li/cil/oc2/common/bus/device/vm/item/AbstractBlockStorageDevice.java index 2f10a525..caa50cb8 100644 --- a/src/main/java/li/cil/oc2/common/bus/device/vm/item/AbstractBlockStorageDevice.java +++ b/src/main/java/li/cil/oc2/common/bus/device/vm/item/AbstractBlockStorageDevice.java @@ -74,26 +74,28 @@ public abstract class AbstractBlockStorageDevice implements VMDevice, ItemDevice, FirmwareLoader { public static final String DATA_TAG_NAME = "data"; @@ -120,7 +121,9 @@ public final class ByteBufferFlashStorageDevice extends IdentityProxy data.clear(); CompoundTag tag = ItemStackUtils.getModDataTag(identity).getCompound(DATA_TAG_NAME); if (tag.hasUUID("blob")) { - BlobStorage.getOrOpen(tag.getUUID("blob")).read(data, 0); + MappedByteBuffer sourceBuffer = BlobStorage.getOrOpen(tag.getUUID("blob")); + sourceBuffer.rewind(); + sourceBuffer.get(data.array(), 0, Math.min(sourceBuffer.remaining(), data.remaining())); } } catch(Exception e) { System.out.println("Error message: " + e.getMessage()); diff --git a/src/main/java/li/cil/oc2/common/bus/device/vm/item/HardDriveDevice.java b/src/main/java/li/cil/oc2/common/bus/device/vm/item/HardDriveDevice.java index b97cba9b..23dce2e8 100644 --- a/src/main/java/li/cil/oc2/common/bus/device/vm/item/HardDriveDevice.java +++ b/src/main/java/li/cil/oc2/common/bus/device/vm/item/HardDriveDevice.java @@ -54,7 +54,7 @@ public class HardDriveDevice extends AbstractBlockStorageDevice { + .thenApplyAsync(buffer -> { try { boolean debug = false; try { @@ -66,21 +66,21 @@ public class HardDriveDevice extends AbstractBlockStorageDevice { try { - final FileChannel channel = BlobStorage.getOrOpen(blobHandle); - final MappedByteBuffer buffer = channel.map(MapMode.READ_WRITE, 0, size); + final MappedByteBuffer buffer = BlobStorage.getOrOpen(blobHandle); + buffer.limit(size); return ByteBufferBlockDevice.wrap(buffer, readonly); } catch (final IOException e) { LOGGER.error("Failed to create block device", e); diff --git a/src/main/java/li/cil/oc2/common/bus/device/vm/item/MemoryDevice.java b/src/main/java/li/cil/oc2/common/bus/device/vm/item/MemoryDevice.java index 5ed766ef..47b6197a 100644 --- a/src/main/java/li/cil/oc2/common/bus/device/vm/item/MemoryDevice.java +++ b/src/main/java/li/cil/oc2/common/bus/device/vm/item/MemoryDevice.java @@ -22,6 +22,7 @@ import java.io.IOException; import java.nio.MappedByteBuffer; import java.nio.channels.FileChannel; import java.util.UUID; +import java.io.RandomAccessFile; public final class MemoryDevice extends IdentityProxy implements VMDevice, ItemDevice { private static final Logger LOGGER = LogManager.getLogger(); @@ -113,16 +114,34 @@ public final class MemoryDevice extends IdentityProxy implements VMDe } try { + // Ensure we have a valid blob handle blobHandle = BlobStorage.validateHandle(blobHandle); - final FileChannel channel = BlobStorage.getOrOpen(blobHandle); - final MappedByteBuffer buffer = channel.map(FileChannel.MapMode.READ_WRITE, 0, size); - device = new ByteBufferMemory(size, buffer); - } catch (final IOException e) { - LOGGER.error(e); - return false; + + try { + // Try to get or open the blob + final MappedByteBuffer buffer = BlobStorage.getOrOpen(blobHandle); + if (buffer != null) { + buffer.limit(size); + device = new ByteBufferMemory(size, buffer); + return true; + } + } catch (final Exception e) { + // If we get here, the async operation failed, likely due to executor shutdown + LOGGER.warn("Async blob operation failed, falling back to direct file access: {}", e.getMessage()); + + // Fallback to using a direct byte buffer since we can't access the blob file directly + LOGGER.warn("Using fallback memory allocation for blob: {}", blobHandle); + final java.nio.ByteBuffer buffer = java.nio.ByteBuffer.allocateDirect(size); + device = new ByteBufferMemory(size, buffer); + return true; + } + } catch (final Exception e) { + LOGGER.error("Failed to allocate memory device", e); } - return true; + // If we get here, something went wrong + closeDevice(); + return false; } private void closeDevice() { @@ -131,11 +150,25 @@ public final class MemoryDevice extends IdentityProxy implements VMDe } try { + // Close the device first device.close(); - } catch (final Exception e) { - LOGGER.error(e); - } - device = null; + // Release the memory mapping + if (blobHandle != null) { + try { + BlobStorage.close(blobHandle); + } catch (Exception e) { + LOGGER.warn("Failed to close blob storage", e); + } + blobHandle = null; + } + } catch (final Exception e) { + LOGGER.error("Error closing memory device", e); + } finally { + device = null; + + // Suggest garbage collection to help with memory cleanup + System.gc(); + } } } diff --git a/src/main/java/li/cil/oc2/common/serialization/BlobStorage.java b/src/main/java/li/cil/oc2/common/serialization/BlobStorage.java index 31ab6ce7..26458964 100644 --- a/src/main/java/li/cil/oc2/common/serialization/BlobStorage.java +++ b/src/main/java/li/cil/oc2/common/serialization/BlobStorage.java @@ -16,9 +16,11 @@ import org.apache.logging.log4j.Logger; import javax.annotation.Nullable; import java.io.IOException; import java.io.RandomAccessFile; +import java.nio.MappedByteBuffer; import java.nio.channels.FileChannel; import java.nio.file.Files; import java.nio.file.Path; +import java.util.HashMap; import java.util.Map; import java.util.UUID; import java.util.concurrent.CompletableFuture; @@ -37,9 +39,9 @@ public final class BlobStorage { /////////////////////////////////////////////////////////////////// private static final LevelResource BLOBS_FOLDER_NAME = new LevelResource(API.MOD_ID + "-blobs"); - private static final Map BLOBS = new ConcurrentHashMap<>(); - private static final Map> PENDING_OPERATIONS = new ConcurrentHashMap<>(); - + private static final Map BLOBS = new ConcurrentHashMap<>(); + private static final Map> PENDING_OPERATIONS = new ConcurrentHashMap<>(); + private static final int MAX_MEMORY_MAPPED_SIZE = 64 * 1024 * 1024; // 64MB max size per blob private static volatile Path dataDirectory; // Directory blobs get saved to. /////////////////////////////////////////////////////////////////// @@ -77,7 +79,7 @@ public final class BlobStorage { // Close all open handles if the directory changes close(); dataDirectory = newDataDir; - + try { // Create directories synchronously since this is called during server startup // when the config system might not be fully initialized yet @@ -93,22 +95,38 @@ public final class BlobStorage { * Closes all currently open blobs. */ public static void close() { - // Cancel all pending operations - for (final CompletableFuture future : PENDING_OPERATIONS.values()) { - future.cancel(true); - } - PENDING_OPERATIONS.clear(); - - // Close all open channels - for (final FileChannel blob : BLOBS.values()) { + // Cancel all pending operations with minimal blocking + for (final Map.Entry> entry : PENDING_OPERATIONS.entrySet()) { try { - blob.close(); - } catch (final IOException e) { - LOGGER.error("Error closing blob channel", e); + entry.getValue().cancel(true); + } catch (final Exception e) { + LOGGER.trace("Error cancelling pending operation", e); } } + PENDING_OPERATIONS.clear(); + + // Clean up all mapped buffers + Map buffersToClean = new HashMap<>(BLOBS); BLOBS.clear(); - + + for (final MappedByteBuffer buffer : buffersToClean.values()) { + if (buffer != null) { + try { + // Clean up the mapped buffer + ((java.nio.Buffer) buffer).clear(); + } catch (final Exception e) { + LOGGER.trace("Error cleaning up mapped buffer", e); + } + } + } + + // Force cleanup of mapped buffers + System.gc(); + System.runFinalization(); + + // Clear any interrupted status that might have been set by cancellation + Thread.interrupted(); + // Safely check debug config if available boolean debug = false; try { @@ -116,10 +134,13 @@ public final class BlobStorage { } catch (IllegalStateException ignored) { // Config system might be shutting down, continue with debug disabled } - + if (debug) { LOGGER.info("Closed all blob storage resources"); } + + // Force a GC to help clean up any lingering file handles + System.gc(); } /** @@ -153,31 +174,56 @@ public final class BlobStorage { * @param handle the handle to obtain the file channel for. * @return a CompletableFuture that will complete with the file channel. */ - public static CompletableFuture getOrOpenAsync(final UUID handle) { + public static CompletableFuture getOrOpenAsync(final UUID handle) { // Check if already open - final FileChannel existingChannel = BLOBS.get(handle); - if (existingChannel != null && existingChannel.isOpen()) { - return CompletableFuture.completedFuture(existingChannel); + MappedByteBuffer buffer = BLOBS.get(handle); + if (buffer != null && buffer.capacity() > 0) { + return CompletableFuture.completedFuture(buffer); + } else if (buffer != null) { + // Remove stale reference + BLOBS.remove(handle); } - + // Check if there's already a pending operation return PENDING_OPERATIONS.computeIfAbsent(handle, h -> { return AsyncUtils.runAsync(() -> { + if (dataDirectory == null) { + throw new CompletionException("Data directory not initialized. Server not ready?", null); + } + + final Path path = dataDirectory.resolve(h.toString()); try { - final Path path = dataDirectory.resolve(h.toString()); - final FileChannel channel = new RandomAccessFile(path.toFile(), "rw").getChannel(); - BLOBS.put(h, channel); - return channel; + // Ensure parent directory exists + Files.createDirectories(path.getParent()); + + // Use memory-mapped files instead of direct file channel access + try (RandomAccessFile file = new RandomAccessFile(path.toFile(), "rw"); + FileChannel channel = file.getChannel()) { + + // Map the file into memory + MappedByteBuffer mappedBuffer = channel.map( + FileChannel.MapMode.READ_WRITE, + 0, + Math.min(channel.size(), MAX_MEMORY_MAPPED_SIZE) + ); + + // Store the mapped buffer + BLOBS.put(h, mappedBuffer); + return mappedBuffer; + } catch (final IOException e) { + LOGGER.error("Failed to open or map blob: " + h, e); + throw new CompletionException("Failed to open or map blob: " + h, e); + } finally { + PENDING_OPERATIONS.remove(h); + } } catch (final IOException e) { - LOGGER.error("Failed to open blob: " + h, e); - throw new CompletionException("Failed to open blob: " + h, e); - } finally { - PENDING_OPERATIONS.remove(h); + LOGGER.error("Failed to open or map blob: " + h, e); + throw new CompletionException("Failed to open or map blob: " + h, e); } }, "Open blob " + h); }); } - + /** * Get or opens a file channel for the blob with the specified handle. *

@@ -189,7 +235,7 @@ public final class BlobStorage { * @deprecated Use {@link #getOrOpenAsync(UUID)} for non-blocking access. This method will be removed in 1.21.1. */ @Deprecated(since = "1.21.1", forRemoval = true) - public static synchronized FileChannel getOrOpen(final UUID handle) throws IOException { + public static synchronized MappedByteBuffer getOrOpen(final UUID handle) throws IOException { try { return getOrOpenAsync(handle).join(); } catch (final CompletionException e) { @@ -207,31 +253,18 @@ public final class BlobStorage { * @return a CompletableFuture that completes when the blob is closed. */ public static CompletableFuture closeAsync(final UUID handle) { - // Check debug state before async operation to avoid config access issues - boolean debug = false; - try { - debug = AsyncConfig.SERVER != null && AsyncConfig.SERVER.enableSuperDebug.get(); - } catch (IllegalStateException ignored) { - // Config system might be shutting down, continue with debug disabled - } - - final boolean finalDebug = debug; - return AsyncUtils.runAsync(() -> { - try { - final FileChannel blob = BLOBS.remove(handle); - if (blob != null) { - blob.close(); - if (finalDebug) { - LOGGER.debug("Closed blob: {}", handle); - } + return CompletableFuture.runAsync(() -> { + final MappedByteBuffer buffer = BLOBS.remove(handle); + if (buffer != null) { + try { + ((java.nio.Buffer) buffer).clear(); + } catch (final Exception e) { + LOGGER.trace("Error cleaning up mapped buffer for blob: " + handle, e); } - } catch (final IOException e) { - LOGGER.error("Error closing blob: " + handle, e); - throw new CompletionException(e); } - }, "Close blob " + handle); + }, AsyncUtils.getAsyncExecutor()); } - + /** * Closes the blob with the specified handle. * @@ -239,11 +272,14 @@ public final class BlobStorage { * @deprecated Use {@link #closeAsync(UUID)} for non-blocking operation. This method will be removed in 1.21.1. */ @Deprecated(since = "1.21.1", forRemoval = true) - public static synchronized void close(final UUID handle) { - try { - closeAsync(handle).join(); - } catch (final CompletionException e) { - LOGGER.error("Error in close operation for blob: " + handle, e); + public static void close(final UUID handle) { + final MappedByteBuffer buffer = BLOBS.remove(handle); + if (buffer != null) { + try { + ((java.nio.Buffer) buffer).clear(); + } catch (final Exception e) { + LOGGER.trace("Error cleaning up mapped buffer for blob: " + handle, e); + } } } @@ -254,30 +290,16 @@ public final class BlobStorage { * @return a CompletableFuture that completes when the blob is deleted. */ public static CompletableFuture deleteAsync(final UUID handle) { - // Check debug state before async operation to avoid config access issues - boolean debug = false; - try { - debug = AsyncConfig.SERVER != null && AsyncConfig.SERVER.enableSuperDebug.get(); - } catch (IllegalStateException ignored) { - // Config system might be shutting down, continue with debug disabled - } - - final boolean finalDebug = debug; - return AsyncUtils.runAsync(() -> { + return closeAsync(handle).thenRunAsync(() -> { final Path path = getBlobPath(handle); try { - final boolean deleted = Files.deleteIfExists(path); - if (deleted && finalDebug) { - LOGGER.debug("Deleted blob file: {}", path); - } + Files.deleteIfExists(path); } catch (final IOException e) { - LOGGER.error("Error deleting blob file: " + path, e); - throw new CompletionException(e); + LOGGER.error("Failed to delete blob: " + handle, e); } - return null; - }, "Deleting blob " + handle); + }, AsyncUtils.getAsyncExecutor()); } - + /** * Deletes the blob with the specified handle. * @@ -302,6 +324,25 @@ public final class BlobStorage { @SubscribeEvent public static void handleServerStopped(final ServerStoppedEvent event) { - BlobStorage.close(); + try { + // Close all open resources when server stops + close(); + } catch (Exception e) { + // Don't let exceptions during shutdown prevent server from stopping + LOGGER.error("Error during blob storage shutdown:", e); + } finally { + // Always clear the data directory reference + dataDirectory = null; + + // Force a GC to help clean up any lingering file handles + System.gc(); + + // Give GC a chance to run + try { + Thread.sleep(100); + } catch (InterruptedException ignored) { + Thread.currentThread().interrupt(); + } + } } }