Revert "fixes for #83, #81 and the memory leak"

This reverts commit b4b2fe1d0e.
This commit is contained in:
Robert MacRae
2025-09-05 16:02:15 -03:00
parent b4b2fe1d0e
commit 41012242b8
10 changed files with 122 additions and 259 deletions

View File

@@ -92,11 +92,11 @@ public final class DiskDriveDevice<T extends BlockEntity & DiskDriveContainer> e
blobHandle = BlobStorage.validateHandle(blobHandle);
return CompletableFuture.supplyAsync(() -> {
try {
final MappedByteBuffer buffer = BlobStorage.getOrOpen(blobHandle);
buffer.limit(capacity);
final FileChannel channel = BlobStorage.getOrOpen(blobHandle);
final MappedByteBuffer buffer = channel.map(FileChannel.MapMode.READ_WRITE, 0, capacity);
return ByteBufferBlockDevice.wrap(buffer, false);
} catch (final Exception e) {
throw new RuntimeException("Failed to create block device", e);
} catch (final IOException e) {
throw new RuntimeException(e);
}
}, WORKERS);
}

View File

@@ -92,11 +92,11 @@ public final class FlashMemoryFlasherDevice<T extends BlockEntity & FlashMemoryF
blobHandle = BlobStorage.validateHandle(blobHandle);
return CompletableFuture.supplyAsync(() -> {
try {
final MappedByteBuffer buffer = BlobStorage.getOrOpen(blobHandle);
buffer.limit(capacity);
final FileChannel channel = BlobStorage.getOrOpen(blobHandle);
final MappedByteBuffer buffer = channel.map(FileChannel.MapMode.READ_WRITE, 0, capacity);
return ByteBufferBlockDevice.wrap(buffer, false);
} catch (final Exception e) {
throw new RuntimeException("Failed to create block device", e);
} catch (final IOException e) {
throw new RuntimeException(e);
}
}, WORKERS);
}

View File

@@ -142,8 +142,8 @@ public final class MonitorDevice extends IdentityProxy<BlockEntity> implements V
private SimpleFramebufferDevice createFrameBufferDevice() throws IOException {
blobHandle = BlobStorage.validateHandle(blobHandle);
final MappedByteBuffer buffer = BlobStorage.getOrOpen(blobHandle);
buffer.limit(WIDTH * HEIGHT * SimpleFramebufferDevice.STRIDE);
final FileChannel channel = BlobStorage.getOrOpen(blobHandle);
final MappedByteBuffer buffer = channel.map(FileChannel.MapMode.READ_WRITE, 0, WIDTH * HEIGHT * SimpleFramebufferDevice.STRIDE);
return new SimpleFramebufferDevice(WIDTH, HEIGHT, buffer);
}
}

View File

@@ -132,8 +132,8 @@ public final class PciCardCageDevice extends IdentityProxy<BlockEntity> implemen
private PciRootPortDevice createPciRootPortDevice() throws IOException {
blobHandle = BlobStorage.validateHandle(blobHandle);
final MappedByteBuffer buffer = BlobStorage.getOrOpen(blobHandle);
buffer.limit(WINDOW_SIZE * 2);
final FileChannel channel = BlobStorage.getOrOpen(blobHandle);
final MappedByteBuffer buffer = channel.map(FileChannel.MapMode.READ_WRITE, 0, WINDOW_SIZE * 2);
return new PciRootPortDevice(BUS_COUNT, WINDOW_SIZE, buffer);
}
}

View File

@@ -142,8 +142,8 @@ public final class ProjectorDevice extends IdentityProxy<BlockEntity> implements
private SimpleFramebufferDevice createFrameBufferDevice() throws IOException {
blobHandle = BlobStorage.validateHandle(blobHandle);
final MappedByteBuffer buffer = BlobStorage.getOrOpen(blobHandle);
buffer.limit(WIDTH * HEIGHT * SimpleFramebufferDevice.STRIDE);
final FileChannel channel = BlobStorage.getOrOpen(blobHandle);
final MappedByteBuffer buffer = channel.map(FileChannel.MapMode.READ_WRITE, 0, WIDTH * HEIGHT * SimpleFramebufferDevice.STRIDE);
return new SimpleFramebufferDevice(WIDTH, HEIGHT, buffer);
}
}

View File

@@ -74,28 +74,26 @@ public abstract class AbstractBlockStorageDevice<TBlock extends BlockDevice, TId
@Override
public VMDeviceLoadResult mount(final VMContext context) {
this.context = context; // Store context for later cleanup
if (!allocateDevice(context)) {
closeDevice();
return VMDeviceLoadResult.fail();
}
if (!address.claim(context, device)) {
closeDevice();
return VMDeviceLoadResult.fail();
}
if (interrupt.claim(context)) {
device.getInterrupt().set(interrupt.getAsInt(), context.getInterruptController());
} else {
closeDevice();
return VMDeviceLoadResult.fail();
}
loadPersistedState();
context.getEventBus().register(this);
if (deviceTag != null) {
NBTSerialization.deserialize(deviceTag, device);
}
return VMDeviceLoadResult.success();
}
@@ -220,26 +218,6 @@ public abstract class AbstractBlockStorageDevice<TBlock extends BlockDevice, TId
protected void handleDataAccess() {
}
/**
* Loads the device's persisted state from the saved deviceTag.
* This is called after the device has been created but before it's registered with the VM.
*/
protected void loadPersistedState() {
if (device == null || deviceTag == null) {
return;
}
try {
// Deserialize the device state from the saved tag
NBTSerialization.deserialize(deviceTag, device);
// Clear the tag to avoid holding onto the data
deviceTag = null;
} catch (final Exception e) {
LOGGER.error("Failed to load persisted device state", e);
}
}
///////////////////////////////////////////////////////////////
private boolean allocateDevice(final VMContext context) {
@@ -262,30 +240,10 @@ public abstract class AbstractBlockStorageDevice<TBlock extends BlockDevice, TId
return true;
}
private VMContext context; // Store context for memory deallocation
//
// @Override
// public VMDeviceLoadResult mount(final VMContext context) {
// this.context = context; // Store context for later cleanup
//
// if (!allocateDevice(context)) {
// return VMDeviceLoadResult.fail();
// }
//
// if (!address.claim(context, device)) {
// closeDevice();
// return VMDeviceLoadResult.fail();
// }
//
// loadPersistedState();
//
// context.getEventBus().register(this);
//
// return VMDeviceLoadResult.success();
// }
private void closeDevice() {
// Join the init job before releasing the device to avoid writes to a closed device
// Join the init job before releasing the device to avoid writes from thread to closed device.
// Since we use memory mapped memory, closing the device leads to it holding a dead pointer,
// meaning further access to it will hard-crash the JVM.
joinOpenJob();
if (device == null) {
@@ -293,30 +251,12 @@ public abstract class AbstractBlockStorageDevice<TBlock extends BlockDevice, TId
}
try {
// Close the underlying device
device.close();
// Note: Memory is automatically released when the VM is stopped
// as per the MemoryAllocator API design
// Close blob storage if it exists
if (blobHandle != null) {
try {
BlobStorage.close(blobHandle);
} catch (Exception e) {
LOGGER.warn("Failed to close blob storage", e);
}
blobHandle = null;
}
} catch (final IOException e) {
LOGGER.error("Error closing device", e);
} finally {
device = null;
context = null; // Clear the context reference
// Explicitly suggest garbage collection to help with memory cleanup
System.gc();
LOGGER.error(e);
}
device = null;
}
///////////////////////////////////////////////////////////////

View File

@@ -24,7 +24,6 @@ import net.minecraft.network.chat.Component;
import net.minecraft.world.item.ItemStack;
import java.nio.ByteBuffer;
import java.nio.MappedByteBuffer;
public final class ByteBufferFlashStorageDevice extends IdentityProxy<ItemStack> implements VMDevice, ItemDevice, FirmwareLoader {
public static final String DATA_TAG_NAME = "data";
@@ -121,9 +120,7 @@ public final class ByteBufferFlashStorageDevice extends IdentityProxy<ItemStack>
data.clear();
CompoundTag tag = ItemStackUtils.getModDataTag(identity).getCompound(DATA_TAG_NAME);
if (tag.hasUUID("blob")) {
MappedByteBuffer sourceBuffer = BlobStorage.getOrOpen(tag.getUUID("blob"));
sourceBuffer.rewind();
sourceBuffer.get(data.array(), 0, Math.min(sourceBuffer.remaining(), data.remaining()));
BlobStorage.getOrOpen(tag.getUUID("blob")).read(data, 0);
}
} catch(Exception e) {
System.out.println("Error message: " + e.getMessage());

View File

@@ -54,7 +54,7 @@ public class HardDriveDevice extends AbstractBlockStorageDevice<ByteBufferBlockD
if (useAsync) {
return BlobStorage.getOrOpenAsync(blobHandle)
.thenApplyAsync(buffer -> {
.thenApplyAsync(channel -> {
try {
boolean debug = false;
try {
@@ -66,21 +66,21 @@ public class HardDriveDevice extends AbstractBlockStorageDevice<ByteBufferBlockD
}
if (debug) {
LOGGER.debug("Using buffer for blob: {}", blobHandle);
LOGGER.debug("Mapping buffer for blob: {}", blobHandle);
}
buffer.limit(size);
final MappedByteBuffer buffer = channel.map(MapMode.READ_WRITE, 0, size);
return ByteBufferBlockDevice.wrap(buffer, readonly);
} catch (final Exception e) {
LOGGER.error("Failed to create block device for blob: " + blobHandle, e);
throw new CompletionException("Failed to create block device", e);
} catch (final IOException e) {
LOGGER.error("Failed to map buffer for blob: " + blobHandle, e);
throw new CompletionException(e);
}
}, WORKERS);
} else {
return CompletableFuture.supplyAsync(() -> {
try {
final MappedByteBuffer buffer = BlobStorage.getOrOpen(blobHandle);
buffer.limit(size);
final FileChannel channel = BlobStorage.getOrOpen(blobHandle);
final MappedByteBuffer buffer = channel.map(MapMode.READ_WRITE, 0, size);
return ByteBufferBlockDevice.wrap(buffer, readonly);
} catch (final IOException e) {
LOGGER.error("Failed to create block device", e);

View File

@@ -22,7 +22,6 @@ import java.io.IOException;
import java.nio.MappedByteBuffer;
import java.nio.channels.FileChannel;
import java.util.UUID;
import java.io.RandomAccessFile;
public final class MemoryDevice extends IdentityProxy<ItemStack> implements VMDevice, ItemDevice {
private static final Logger LOGGER = LogManager.getLogger();
@@ -114,34 +113,16 @@ public final class MemoryDevice extends IdentityProxy<ItemStack> implements VMDe
}
try {
// Ensure we have a valid blob handle
blobHandle = BlobStorage.validateHandle(blobHandle);
try {
// Try to get or open the blob
final MappedByteBuffer buffer = BlobStorage.getOrOpen(blobHandle);
if (buffer != null) {
buffer.limit(size);
device = new ByteBufferMemory(size, buffer);
return true;
}
} catch (final Exception e) {
// If we get here, the async operation failed, likely due to executor shutdown
LOGGER.warn("Async blob operation failed, falling back to direct file access: {}", e.getMessage());
// Fallback to using a direct byte buffer since we can't access the blob file directly
LOGGER.warn("Using fallback memory allocation for blob: {}", blobHandle);
final java.nio.ByteBuffer buffer = java.nio.ByteBuffer.allocateDirect(size);
device = new ByteBufferMemory(size, buffer);
return true;
}
} catch (final Exception e) {
LOGGER.error("Failed to allocate memory device", e);
final FileChannel channel = BlobStorage.getOrOpen(blobHandle);
final MappedByteBuffer buffer = channel.map(FileChannel.MapMode.READ_WRITE, 0, size);
device = new ByteBufferMemory(size, buffer);
} catch (final IOException e) {
LOGGER.error(e);
return false;
}
// If we get here, something went wrong
closeDevice();
return false;
return true;
}
private void closeDevice() {
@@ -150,25 +131,11 @@ public final class MemoryDevice extends IdentityProxy<ItemStack> implements VMDe
}
try {
// Close the device first
device.close();
// Release the memory mapping
if (blobHandle != null) {
try {
BlobStorage.close(blobHandle);
} catch (Exception e) {
LOGGER.warn("Failed to close blob storage", e);
}
blobHandle = null;
}
} catch (final Exception e) {
LOGGER.error("Error closing memory device", e);
} finally {
device = null;
// Suggest garbage collection to help with memory cleanup
System.gc();
LOGGER.error(e);
}
device = null;
}
}

View File

@@ -16,11 +16,9 @@ import org.apache.logging.log4j.Logger;
import javax.annotation.Nullable;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.MappedByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
@@ -39,9 +37,9 @@ public final class BlobStorage {
///////////////////////////////////////////////////////////////////
private static final LevelResource BLOBS_FOLDER_NAME = new LevelResource(API.MOD_ID + "-blobs");
private static final Map<UUID, MappedByteBuffer> BLOBS = new ConcurrentHashMap<>();
private static final Map<UUID, CompletableFuture<MappedByteBuffer>> PENDING_OPERATIONS = new ConcurrentHashMap<>();
private static final int MAX_MEMORY_MAPPED_SIZE = 64 * 1024 * 1024; // 64MB max size per blob
private static final Map<UUID, FileChannel> BLOBS = new ConcurrentHashMap<>();
private static final Map<UUID, CompletableFuture<FileChannel>> PENDING_OPERATIONS = new ConcurrentHashMap<>();
private static volatile Path dataDirectory; // Directory blobs get saved to.
///////////////////////////////////////////////////////////////////
@@ -79,7 +77,7 @@ public final class BlobStorage {
// Close all open handles if the directory changes
close();
dataDirectory = newDataDir;
try {
// Create directories synchronously since this is called during server startup
// when the config system might not be fully initialized yet
@@ -95,38 +93,22 @@ public final class BlobStorage {
* Closes all currently open blobs.
*/
public static void close() {
// Cancel all pending operations with minimal blocking
for (final Map.Entry<UUID, CompletableFuture<MappedByteBuffer>> entry : PENDING_OPERATIONS.entrySet()) {
try {
entry.getValue().cancel(true);
} catch (final Exception e) {
LOGGER.trace("Error cancelling pending operation", e);
}
// Cancel all pending operations
for (final CompletableFuture<FileChannel> future : PENDING_OPERATIONS.values()) {
future.cancel(true);
}
PENDING_OPERATIONS.clear();
// Clean up all mapped buffers
Map<UUID, MappedByteBuffer> buffersToClean = new HashMap<>(BLOBS);
BLOBS.clear();
for (final MappedByteBuffer buffer : buffersToClean.values()) {
if (buffer != null) {
try {
// Clean up the mapped buffer
((java.nio.Buffer) buffer).clear();
} catch (final Exception e) {
LOGGER.trace("Error cleaning up mapped buffer", e);
}
// Close all open channels
for (final FileChannel blob : BLOBS.values()) {
try {
blob.close();
} catch (final IOException e) {
LOGGER.error("Error closing blob channel", e);
}
}
// Force cleanup of mapped buffers
System.gc();
System.runFinalization();
// Clear any interrupted status that might have been set by cancellation
Thread.interrupted();
BLOBS.clear();
// Safely check debug config if available
boolean debug = false;
try {
@@ -134,13 +116,10 @@ public final class BlobStorage {
} catch (IllegalStateException ignored) {
// Config system might be shutting down, continue with debug disabled
}
if (debug) {
LOGGER.info("Closed all blob storage resources");
}
// Force a GC to help clean up any lingering file handles
System.gc();
}
/**
@@ -174,56 +153,31 @@ public final class BlobStorage {
* @param handle the handle to obtain the file channel for.
* @return a CompletableFuture that will complete with the file channel.
*/
public static CompletableFuture<MappedByteBuffer> getOrOpenAsync(final UUID handle) {
public static CompletableFuture<FileChannel> getOrOpenAsync(final UUID handle) {
// Check if already open
MappedByteBuffer buffer = BLOBS.get(handle);
if (buffer != null && buffer.capacity() > 0) {
return CompletableFuture.completedFuture(buffer);
} else if (buffer != null) {
// Remove stale reference
BLOBS.remove(handle);
final FileChannel existingChannel = BLOBS.get(handle);
if (existingChannel != null && existingChannel.isOpen()) {
return CompletableFuture.completedFuture(existingChannel);
}
// Check if there's already a pending operation
return PENDING_OPERATIONS.computeIfAbsent(handle, h -> {
return AsyncUtils.runAsync(() -> {
if (dataDirectory == null) {
throw new CompletionException("Data directory not initialized. Server not ready?", null);
}
final Path path = dataDirectory.resolve(h.toString());
try {
// Ensure parent directory exists
Files.createDirectories(path.getParent());
// Use memory-mapped files instead of direct file channel access
try (RandomAccessFile file = new RandomAccessFile(path.toFile(), "rw");
FileChannel channel = file.getChannel()) {
// Map the file into memory
MappedByteBuffer mappedBuffer = channel.map(
FileChannel.MapMode.READ_WRITE,
0,
Math.min(channel.size(), MAX_MEMORY_MAPPED_SIZE)
);
// Store the mapped buffer
BLOBS.put(h, mappedBuffer);
return mappedBuffer;
} catch (final IOException e) {
LOGGER.error("Failed to open or map blob: " + h, e);
throw new CompletionException("Failed to open or map blob: " + h, e);
} finally {
PENDING_OPERATIONS.remove(h);
}
final Path path = dataDirectory.resolve(h.toString());
final FileChannel channel = new RandomAccessFile(path.toFile(), "rw").getChannel();
BLOBS.put(h, channel);
return channel;
} catch (final IOException e) {
LOGGER.error("Failed to open or map blob: " + h, e);
throw new CompletionException("Failed to open or map blob: " + h, e);
LOGGER.error("Failed to open blob: " + h, e);
throw new CompletionException("Failed to open blob: " + h, e);
} finally {
PENDING_OPERATIONS.remove(h);
}
}, "Open blob " + h);
});
}
/**
* Get or opens a file channel for the blob with the specified handle.
* <p>
@@ -235,7 +189,7 @@ public final class BlobStorage {
* @deprecated Use {@link #getOrOpenAsync(UUID)} for non-blocking access. This method will be removed in 1.21.1.
*/
@Deprecated(since = "1.21.1", forRemoval = true)
public static synchronized MappedByteBuffer getOrOpen(final UUID handle) throws IOException {
public static synchronized FileChannel getOrOpen(final UUID handle) throws IOException {
try {
return getOrOpenAsync(handle).join();
} catch (final CompletionException e) {
@@ -253,18 +207,31 @@ public final class BlobStorage {
* @return a CompletableFuture that completes when the blob is closed.
*/
public static CompletableFuture<Void> closeAsync(final UUID handle) {
return CompletableFuture.runAsync(() -> {
final MappedByteBuffer buffer = BLOBS.remove(handle);
if (buffer != null) {
try {
((java.nio.Buffer) buffer).clear();
} catch (final Exception e) {
LOGGER.trace("Error cleaning up mapped buffer for blob: " + handle, e);
// Check debug state before async operation to avoid config access issues
boolean debug = false;
try {
debug = AsyncConfig.SERVER != null && AsyncConfig.SERVER.enableSuperDebug.get();
} catch (IllegalStateException ignored) {
// Config system might be shutting down, continue with debug disabled
}
final boolean finalDebug = debug;
return AsyncUtils.runAsync(() -> {
try {
final FileChannel blob = BLOBS.remove(handle);
if (blob != null) {
blob.close();
if (finalDebug) {
LOGGER.debug("Closed blob: {}", handle);
}
}
} catch (final IOException e) {
LOGGER.error("Error closing blob: " + handle, e);
throw new CompletionException(e);
}
}, AsyncUtils.getAsyncExecutor());
}, "Close blob " + handle);
}
/**
* Closes the blob with the specified handle.
*
@@ -272,14 +239,11 @@ public final class BlobStorage {
* @deprecated Use {@link #closeAsync(UUID)} for non-blocking operation. This method will be removed in 1.21.1.
*/
@Deprecated(since = "1.21.1", forRemoval = true)
public static void close(final UUID handle) {
final MappedByteBuffer buffer = BLOBS.remove(handle);
if (buffer != null) {
try {
((java.nio.Buffer) buffer).clear();
} catch (final Exception e) {
LOGGER.trace("Error cleaning up mapped buffer for blob: " + handle, e);
}
public static synchronized void close(final UUID handle) {
try {
closeAsync(handle).join();
} catch (final CompletionException e) {
LOGGER.error("Error in close operation for blob: " + handle, e);
}
}
@@ -290,16 +254,30 @@ public final class BlobStorage {
* @return a CompletableFuture that completes when the blob is deleted.
*/
public static CompletableFuture<Void> deleteAsync(final UUID handle) {
return closeAsync(handle).thenRunAsync(() -> {
// Check debug state before async operation to avoid config access issues
boolean debug = false;
try {
debug = AsyncConfig.SERVER != null && AsyncConfig.SERVER.enableSuperDebug.get();
} catch (IllegalStateException ignored) {
// Config system might be shutting down, continue with debug disabled
}
final boolean finalDebug = debug;
return AsyncUtils.runAsync(() -> {
final Path path = getBlobPath(handle);
try {
Files.deleteIfExists(path);
final boolean deleted = Files.deleteIfExists(path);
if (deleted && finalDebug) {
LOGGER.debug("Deleted blob file: {}", path);
}
} catch (final IOException e) {
LOGGER.error("Failed to delete blob: " + handle, e);
LOGGER.error("Error deleting blob file: " + path, e);
throw new CompletionException(e);
}
}, AsyncUtils.getAsyncExecutor());
return null;
}, "Deleting blob " + handle);
}
/**
* Deletes the blob with the specified handle.
*
@@ -324,25 +302,6 @@ public final class BlobStorage {
@SubscribeEvent
public static void handleServerStopped(final ServerStoppedEvent event) {
try {
// Close all open resources when server stops
close();
} catch (Exception e) {
// Don't let exceptions during shutdown prevent server from stopping
LOGGER.error("Error during blob storage shutdown:", e);
} finally {
// Always clear the data directory reference
dataDirectory = null;
// Force a GC to help clean up any lingering file handles
System.gc();
// Give GC a chance to run
try {
Thread.sleep(100);
} catch (InterruptedException ignored) {
Thread.currentThread().interrupt();
}
}
BlobStorage.close();
}
}