From fddcee9949b1bc4735e9d06f85c31bcd7ce2f4e4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Florian=20N=C3=BCcke?= Date: Mon, 19 Jul 2021 23:43:31 +0200 Subject: [PATCH] Completely reworked blob storage. No longer care for potential desync between blob storage and world data on crashes. Use blobs as memory mapped byte buffers for drastically reduced memory usage. --- .../java/li/cil/oc2/common/CommonSetup.java | 2 +- .../java/li/cil/oc2/common/Constants.java | 2 + .../item/AbstractBlockDeviceVMDevice.java | 109 ++----- .../bus/device/item/HardDriveVMDevice.java | 30 +- .../HardDriveVMDeviceWithInitialData.java | 75 +++++ .../common/bus/device/item/MemoryDevice.java | 71 +++-- .../device/item/SparseHardDriveVMDevice.java | 132 -------- ...iveWithExternalDataItemDeviceProvider.java | 4 +- .../oc2/common/serialization/BlobStorage.java | 287 +++--------------- .../tileentity/DiskDriveTileEntity.java | 94 +++--- 10 files changed, 240 insertions(+), 566 deletions(-) create mode 100644 src/main/java/li/cil/oc2/common/bus/device/item/HardDriveVMDeviceWithInitialData.java delete mode 100644 src/main/java/li/cil/oc2/common/bus/device/item/SparseHardDriveVMDevice.java diff --git a/src/main/java/li/cil/oc2/common/CommonSetup.java b/src/main/java/li/cil/oc2/common/CommonSetup.java index c934c7fa..5ef0e195 100644 --- a/src/main/java/li/cil/oc2/common/CommonSetup.java +++ b/src/main/java/li/cil/oc2/common/CommonSetup.java @@ -37,7 +37,7 @@ public final class CommonSetup { } private static void handleServerStopped(final FMLServerStoppedEvent event) { - BlobStorage.synchronize(); + BlobStorage.close(); Allocator.resetAndCheckLeaks(); FileSystems.reset(); } diff --git a/src/main/java/li/cil/oc2/common/Constants.java b/src/main/java/li/cil/oc2/common/Constants.java index f3e881dc..540294c1 100644 --- a/src/main/java/li/cil/oc2/common/Constants.java +++ b/src/main/java/li/cil/oc2/common/Constants.java @@ -10,6 +10,8 @@ public final class Constants { public static final int MEGABYTE = 1024 * KILOBYTE; public static final int GIGABYTE = 1024 * MEGABYTE; + public static int PAGE_SIZE = 4 * 1024; + public static final int CPU_FREQUENCY = 25_000_000; public static final int SECONDS_TO_TICKS = 20; diff --git a/src/main/java/li/cil/oc2/common/bus/device/item/AbstractBlockDeviceVMDevice.java b/src/main/java/li/cil/oc2/common/bus/device/item/AbstractBlockDeviceVMDevice.java index 29f36944..3832b2b4 100644 --- a/src/main/java/li/cil/oc2/common/bus/device/item/AbstractBlockDeviceVMDevice.java +++ b/src/main/java/li/cil/oc2/common/bus/device/item/AbstractBlockDeviceVMDevice.java @@ -1,11 +1,10 @@ package li.cil.oc2.common.bus.device.item; -import com.google.common.eventbus.Subscribe; import li.cil.oc2.api.bus.device.ItemDevice; import li.cil.oc2.api.bus.device.vm.VMDevice; import li.cil.oc2.api.bus.device.vm.VMDeviceLoadResult; import li.cil.oc2.api.bus.device.vm.context.VMContext; -import li.cil.oc2.api.bus.device.vm.event.VMResumingRunningEvent; +import li.cil.oc2.common.Constants; import li.cil.oc2.common.bus.device.util.IdentityProxy; import li.cil.oc2.common.bus.device.util.OptionalAddress; import li.cil.oc2.common.bus.device.util.OptionalInterrupt; @@ -16,15 +15,18 @@ import li.cil.oc2.common.util.NBTTagIds; import li.cil.sedna.api.device.BlockDevice; import li.cil.sedna.device.virtio.VirtIOBlockDevice; import net.minecraft.nbt.CompoundNBT; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import javax.annotation.Nullable; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; -import java.util.Optional; import java.util.UUID; -@SuppressWarnings("UnstableApiUsage") public abstract class AbstractBlockDeviceVMDevice extends IdentityProxy implements VMDevice, ItemDevice { + private static final Logger LOGGER = LogManager.getLogger(); + private static final String DEVICE_TAG_NAME = "device"; private static final String ADDRESS_TAG_NAME = "address"; private static final String INTERRUPT_TAG_NAME = "interrupt"; @@ -32,8 +34,6 @@ public abstract class AbstractBlockDeviceVMDevice blobHandle = BlobStorage.validateHandle(blobHandle)); - } if (blobHandle != null) { nbt.putUUID(BLOB_HANDLE_TAG_NAME, blobHandle); } @@ -131,7 +122,10 @@ public abstract class AbstractBlockDeviceVMDevice optional = getSerializationStream(data); - optional.ifPresent(stream -> { - blobHandle = BlobStorage.validateHandle(blobHandle); - jobHandle = BlobStorage.submitSave(blobHandle, stream); - }); - if (!optional.isPresent()) { - BlobStorage.freeHandle(blobHandle); - blobHandle = null; - } - } - } - - public void deserializeData() { - if (data != null && blobHandle != null) { - try { - jobHandle = BlobStorage.submitLoad(blobHandle, getDeserializationStream(data)); - } catch (final UnsupportedOperationException ignored) { - // If logic producing the block data implementation changed between saves we - // can potentially end up in a state where we now have read only block data - // with some previously persisted data. A call to getDeserializationStream - // will, indirectly, lead to this exception. So we just ignore it. - } - } - } - /////////////////////////////////////////////////////////////// - protected abstract int getSize(); - - protected abstract TBlock createBlockDevice(); - - protected abstract Optional getSerializationStream(TBlock device); - - protected abstract OutputStream getDeserializationStream(TBlock device); + protected abstract TBlock createBlockDevice() throws IOException; protected void handleDataAccess() { } @@ -212,24 +169,22 @@ public abstract class AbstractBlockDeviceVMDevice { +public class HardDriveVMDevice extends AbstractBlockDeviceVMDevice { private final int size; private final boolean readonly; private final ThrottledSoundEmitter soundEmitter; @@ -30,23 +32,11 @@ public final class HardDriveVMDevice extends AbstractBlockDeviceVMDevice getSerializationStream(final ByteBufferBlockDevice device) { - return Optional.of(device.getInputStream()); - } - - @Override - protected OutputStream getDeserializationStream(final ByteBufferBlockDevice device) { - return device.getOutputStream(); + protected ByteBufferBlockDevice createBlockDevice() throws IOException { + blobHandle = BlobStorage.validateHandle(blobHandle); + final FileChannel channel = BlobStorage.getOrOpen(blobHandle); + final MappedByteBuffer buffer = channel.map(FileChannel.MapMode.READ_WRITE, 0, size); + return ByteBufferBlockDevice.wrap(buffer, readonly); } @Override diff --git a/src/main/java/li/cil/oc2/common/bus/device/item/HardDriveVMDeviceWithInitialData.java b/src/main/java/li/cil/oc2/common/bus/device/item/HardDriveVMDeviceWithInitialData.java new file mode 100644 index 00000000..481887d5 --- /dev/null +++ b/src/main/java/li/cil/oc2/common/bus/device/item/HardDriveVMDeviceWithInitialData.java @@ -0,0 +1,75 @@ +package li.cil.oc2.common.bus.device.item; + +import com.google.common.eventbus.Subscribe; +import com.google.common.io.ByteStreams; +import li.cil.oc2.api.bus.device.vm.event.VMResumingRunningEvent; +import li.cil.oc2.common.util.Location; +import li.cil.sedna.api.device.BlockDevice; +import li.cil.sedna.device.block.ByteBufferBlockDevice; +import net.minecraft.item.ItemStack; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.function.Supplier; + +@SuppressWarnings("UnstableApiUsage") +public final class HardDriveVMDeviceWithInitialData extends HardDriveVMDevice { + private static final Logger LOGGER = LogManager.getLogger(); + private static final ExecutorService WORKERS = Executors.newCachedThreadPool(r -> { + final Thread thread = new Thread(r, "Hard Drive Initializer"); + thread.setDaemon(false); + return thread; + }); + + private final BlockDevice base; + private Future copyJob; + + /////////////////////////////////////////////////////////////////// + + public HardDriveVMDeviceWithInitialData(final ItemStack identity, final BlockDevice base, final boolean readonly, final Supplier> location) { + super(identity, (int) base.getCapacity(), readonly, location); + this.base = base; + } + + @Subscribe + public void handleResumingRunningEvent(final VMResumingRunningEvent event) { + if (copyJob != null) { + try { + copyJob.get(); + } catch (final Throwable e) { + LOGGER.error(e); + } finally { + copyJob = null; + } + } + } + + /////////////////////////////////////////////////////////////////// + + @Override + protected ByteBufferBlockDevice createBlockDevice() throws IOException { + final boolean isInitializing = blobHandle == null; + final ByteBufferBlockDevice device = super.createBlockDevice(); + if (isInitializing) { + copyJob = CompletableFuture.runAsync(() -> { + try { + try (final InputStream input = base.getInputStream(0); + final OutputStream output = device.getOutputStream(0)) { + ByteStreams.copy(input, output); + } + } catch (final IOException e) { + LOGGER.error(e); + } + }, WORKERS); + } + return device; + } +} diff --git a/src/main/java/li/cil/oc2/common/bus/device/item/MemoryDevice.java b/src/main/java/li/cil/oc2/common/bus/device/item/MemoryDevice.java index 52fb7bca..dd146479 100644 --- a/src/main/java/li/cil/oc2/common/bus/device/item/MemoryDevice.java +++ b/src/main/java/li/cil/oc2/common/bus/device/item/MemoryDevice.java @@ -1,33 +1,35 @@ package li.cil.oc2.common.bus.device.item; -import com.google.common.eventbus.Subscribe; import li.cil.oc2.api.bus.device.ItemDevice; import li.cil.oc2.api.bus.device.vm.VMDevice; import li.cil.oc2.api.bus.device.vm.VMDeviceLoadResult; import li.cil.oc2.api.bus.device.vm.context.VMContext; -import li.cil.oc2.api.bus.device.vm.event.VMResumingRunningEvent; +import li.cil.oc2.common.Constants; import li.cil.oc2.common.bus.device.util.IdentityProxy; import li.cil.oc2.common.bus.device.util.OptionalAddress; import li.cil.oc2.common.serialization.BlobStorage; import li.cil.oc2.common.util.NBTTagIds; import li.cil.sedna.api.device.PhysicalMemory; -import li.cil.sedna.device.memory.Memory; -import li.cil.sedna.memory.PhysicalMemoryInputStream; -import li.cil.sedna.memory.PhysicalMemoryOutputStream; +import li.cil.sedna.device.memory.ByteBufferMemory; import net.minecraft.item.ItemStack; import net.minecraft.nbt.CompoundNBT; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import java.io.IOException; +import java.nio.MappedByteBuffer; +import java.nio.channels.FileChannel; import java.util.UUID; -@SuppressWarnings("UnstableApiUsage") public final class MemoryDevice extends IdentityProxy implements VMDevice, ItemDevice { + private static final Logger LOGGER = LogManager.getLogger(); + private static final String BLOB_HANDLE_TAG_NAME = "blob"; private static final String ADDRESS_TAG_NAME = "address"; /////////////////////////////////////////////////////////////// private final int size; - private BlobStorage.JobHandle jobHandle; private PhysicalMemory device; /////////////////////////////////////////////////////////////// @@ -54,8 +56,6 @@ public final class MemoryDevice extends IdentityProxy implements VMDe return VMDeviceLoadResult.fail(); } - loadPersistedState(); - context.getEventBus().register(this); return VMDeviceLoadResult.success(); @@ -66,32 +66,37 @@ public final class MemoryDevice extends IdentityProxy implements VMDe suspend(); // Memory is volatile, so free up our persisted blob when device is unloaded. - BlobStorage.freeHandle(blobHandle); - blobHandle = null; - jobHandle = null; + if (blobHandle != null) { + BlobStorage.delete(blobHandle); + blobHandle = null; + } address.clear(); } @Override public void suspend() { - device = null; - } + if (device != null) { + try { + device.close(); + } catch (final Exception e) { + LOGGER.error(e); + } + } - @Subscribe - public void handleResumingRunningEvent(final VMResumingRunningEvent event) { - awaitStorageOperation(); + if (blobHandle != null) { + BlobStorage.close(blobHandle); + } + + device = null; } @Override public CompoundNBT serializeNBT() { final CompoundNBT tag = new CompoundNBT(); - if (device != null) { - blobHandle = BlobStorage.validateHandle(blobHandle); + if (blobHandle != null) { tag.putUUID(BLOB_HANDLE_TAG_NAME, blobHandle); - - jobHandle = BlobStorage.submitSave(blobHandle, new PhysicalMemoryInputStream(device)); } if (address.isPresent()) { tag.putLong(ADDRESS_TAG_NAME, address.getAsLong()); @@ -113,25 +118,19 @@ public final class MemoryDevice extends IdentityProxy implements VMDe /////////////////////////////////////////////////////////////// private boolean allocateDevice(final VMContext context) { - if (!context.getMemoryAllocator().claimMemory(size)) { + if (!context.getMemoryAllocator().claimMemory(Constants.PAGE_SIZE)) { return false; } - device = Memory.create(size); + try { + blobHandle = BlobStorage.validateHandle(blobHandle); + final FileChannel channel = BlobStorage.getOrOpen(blobHandle); + final MappedByteBuffer buffer = channel.map(FileChannel.MapMode.READ_WRITE, 0, size); + device = new ByteBufferMemory(buffer); + } catch (final IOException e) { + return false; + } return true; } - - private void loadPersistedState() { - if (blobHandle != null) { - jobHandle = BlobStorage.submitLoad(blobHandle, new PhysicalMemoryOutputStream(device)); - } - } - - private void awaitStorageOperation() { - if (jobHandle != null) { - jobHandle.await(); - jobHandle = null; - } - } } diff --git a/src/main/java/li/cil/oc2/common/bus/device/item/SparseHardDriveVMDevice.java b/src/main/java/li/cil/oc2/common/bus/device/item/SparseHardDriveVMDevice.java deleted file mode 100644 index 1ec03c0b..00000000 --- a/src/main/java/li/cil/oc2/common/bus/device/item/SparseHardDriveVMDevice.java +++ /dev/null @@ -1,132 +0,0 @@ -package li.cil.oc2.common.bus.device.item; - -import li.cil.ceres.BinarySerialization; -import li.cil.oc2.common.util.Location; -import li.cil.oc2.common.util.SoundEvents; -import li.cil.oc2.common.util.ThrottledSoundEmitter; -import li.cil.sedna.api.device.BlockDevice; -import li.cil.sedna.device.block.SparseBlockDevice; -import li.cil.sedna.utils.ByteBufferInputStream; -import net.minecraft.item.ItemStack; - -import java.io.ByteArrayOutputStream; -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.nio.ByteBuffer; -import java.time.Duration; -import java.util.Optional; -import java.util.function.Supplier; - -public final class SparseHardDriveVMDevice extends AbstractBlockDeviceVMDevice { - private final BlockDevice base; - private final boolean readonly; - private final ThrottledSoundEmitter soundEmitter; - - /////////////////////////////////////////////////////////////////// - - public SparseHardDriveVMDevice(final ItemStack identity, final BlockDevice base, final boolean readonly, final Supplier> location) { - super(identity); - this.base = base; - this.readonly = readonly; - this.soundEmitter = new ThrottledSoundEmitter(location, SoundEvents.HDD_ACCESS.get()) - .withMinInterval(Duration.ofSeconds(1)); - } - - /////////////////////////////////////////////////////////////////// - - @Override - protected int getSize() { - return (int) base.getCapacity(); - } - - @Override - protected SparseBlockDevice createBlockDevice() { - return new SparseBlockDevice(base, readonly); - } - - @Override - protected Optional getSerializationStream(final SparseBlockDevice device) { - if (device.getBlockCount() == 0) { - return Optional.empty(); - } - - return Optional.of(new SerializationStream(device)); - } - - @Override - protected OutputStream getDeserializationStream(final SparseBlockDevice device) { - return new DeserializationStream(device); - } - - @Override - protected void handleDataAccess() { - soundEmitter.play(); - } - - /////////////////////////////////////////////////////////////////// - - private static final class SerializationStream extends InputStream { - private final SparseBlockDevice device; - private ByteBufferInputStream stream; - - private SerializationStream(final SparseBlockDevice device) { - this.device = device; - } - - @Override - public int read() { - ensureSerialized(); - return stream.read(); - } - - @Override - public int read(final byte[] b, final int off, final int len) { - ensureSerialized(); - return stream.read(b, off, len); - } - - @Override - public long skip(final long n) throws IOException { - return stream.skip(n); - } - - @Override - public int available() throws IOException { - return stream.available(); - } - - private void ensureSerialized() { - if (stream != null) { - return; - } - - stream = new ByteBufferInputStream(BinarySerialization.serialize(device)); - } - } - - private static final class DeserializationStream extends OutputStream { - private final SparseBlockDevice device; - private final ByteArrayOutputStream stream; - - public DeserializationStream(final SparseBlockDevice device) { - this.device = device; - stream = new ByteArrayOutputStream(); - } - - @Override - public void write(final int b) { - stream.write(b); - } - - @Override - public void write(final byte[] b, final int off, final int len) { - stream.write(b, off, len); - } - - @Override - public void close() { - BinarySerialization.deserialize(ByteBuffer.wrap(stream.toByteArray()), device); - } - } -} diff --git a/src/main/java/li/cil/oc2/common/bus/device/provider/item/HardDriveWithExternalDataItemDeviceProvider.java b/src/main/java/li/cil/oc2/common/bus/device/provider/item/HardDriveWithExternalDataItemDeviceProvider.java index 6ccf1db8..e65bdb10 100644 --- a/src/main/java/li/cil/oc2/common/bus/device/provider/item/HardDriveWithExternalDataItemDeviceProvider.java +++ b/src/main/java/li/cil/oc2/common/bus/device/provider/item/HardDriveWithExternalDataItemDeviceProvider.java @@ -7,7 +7,7 @@ import li.cil.oc2.api.bus.device.data.BlockDeviceData; import li.cil.oc2.api.bus.device.provider.ItemDeviceQuery; import li.cil.oc2.common.Config; import li.cil.oc2.common.Constants; -import li.cil.oc2.common.bus.device.item.SparseHardDriveVMDevice; +import li.cil.oc2.common.bus.device.item.HardDriveVMDeviceWithInitialData; import li.cil.oc2.common.bus.device.provider.util.AbstractItemDeviceProvider; import li.cil.oc2.common.item.HardDriveWithExternalDataItem; import li.cil.oc2.common.util.LocationSupplierUtils; @@ -31,7 +31,7 @@ public final class HardDriveWithExternalDataItemDeviceProvider extends AbstractI return Optional.empty(); } - return Optional.of(new SparseHardDriveVMDevice(stack, data.getBlockDevice(), false, LocationSupplierUtils.of(query))); + return Optional.of(new HardDriveVMDeviceWithInitialData(stack, data.getBlockDevice(), false, LocationSupplierUtils.of(query))); } @Override diff --git a/src/main/java/li/cil/oc2/common/serialization/BlobStorage.java b/src/main/java/li/cil/oc2/common/serialization/BlobStorage.java index 5aff7f11..2a4b0bff 100644 --- a/src/main/java/li/cil/oc2/common/serialization/BlobStorage.java +++ b/src/main/java/li/cil/oc2/common/serialization/BlobStorage.java @@ -1,8 +1,6 @@ package li.cil.oc2.common.serialization; -import com.google.common.collect.HashMultimap; import li.cil.oc2.api.API; -import li.cil.oc2.common.Constants; import net.minecraft.server.MinecraftServer; import net.minecraft.world.storage.FolderName; import org.apache.logging.log4j.LogManager; @@ -10,20 +8,13 @@ import org.apache.logging.log4j.Logger; import javax.annotation.Nullable; import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; +import java.io.RandomAccessFile; +import java.nio.channels.FileChannel; import java.nio.file.Files; import java.nio.file.Path; -import java.nio.file.StandardOpenOption; -import java.util.HashSet; -import java.util.Set; +import java.util.HashMap; +import java.util.Map; import java.util.UUID; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.Future; -import java.util.zip.GZIPInputStream; -import java.util.zip.GZIPOutputStream; /** * This class facilitates storing binary chunks of data in an efficient, parallelized fashion. @@ -34,80 +25,12 @@ public final class BlobStorage { /////////////////////////////////////////////////////////////////// private static final FolderName BLOBS_FOLDER_NAME = new FolderName(API.MOD_ID + "-blobs"); - - private static final HashMultimap> WRITE_HANDLES = HashMultimap.create(); - private static final HashMultimap> READ_HANDLES = HashMultimap.create(); - private static final ExecutorService WORKERS = Executors.newCachedThreadPool(r -> { - final Thread thread = new Thread(r, "Blob Storage I/O"); - thread.setDaemon(false); - return thread; - }); + private static final Map BLOBS = new HashMap<>(); private static Path dataDirectory; // Directory blobs get saved to. - static { - Runtime.getRuntime().addShutdownHook(new Thread(BlobStorage::synchronize)); - } - /////////////////////////////////////////////////////////////////// - /** - * Represents a handle to a serialization or deserialization job submitted using - * {@link #submitSave(UUID, InputStream)} or {@link #submitLoad(UUID, OutputStream)}. - *

- * This can be used to guarantee the serialization job has been completed before making changes to - * the to-be-serialized data/the deserialization job has been completed before accessing the to-be- - * deserialized data. - */ - public static final class JobHandle { - private final Set> futures; - - private JobHandle(final Future future) { - this.futures = new HashSet<>(); - futures.add(future); - } - - private JobHandle(final Set> futures) { - this.futures = futures; - } - - /** - * Blocks until the jobs described by this job handle are complete. - */ - public void await() { - for (final Future future : futures) { - try { - future.get(); - } catch (final Throwable e) { - LOGGER.error("Scheduled blob storage job threw an exception.", e); - } - } - futures.clear(); - } - - /** - * Combines a list of job handles into a single job handle. - *

- * This can make awaiting multiple job handles more convenient by simply rolling them - * into one job handle. - *

- * For convenience the list of job handles may contain {@code null} entries. - * - * @param jobHandles the job handles to merge into a single job handle. - * @return a job handle that can be used to await all passed job handles. - */ - public static JobHandle combine(final JobHandle... jobHandles) { - final HashSet> futures = new HashSet<>(); - for (final JobHandle jobHandle : jobHandles) { - if (jobHandle == null) { - continue; - } - futures.addAll(jobHandle.futures); - } - return new JobHandle(futures); - } - } - /** * Sets the currently running server. *

@@ -118,7 +41,6 @@ public final class BlobStorage { * @param server the currently active server. */ public static void setServer(final MinecraftServer server) { - synchronize(); dataDirectory = server.getWorldPath(BLOBS_FOLDER_NAME); try { Files.createDirectories(dataDirectory); @@ -128,19 +50,24 @@ public final class BlobStorage { } /** - * Blocks until all currently in-flight serialization or deserialization jobs are completed and - * deletes all blobs marked for deletion. + * Closes all currently open blobs. */ - public static void synchronize() { - awaitCompletion(READ_HANDLES); - awaitCompletion(WRITE_HANDLES); + public static void close() { + for (final FileChannel blob : BLOBS.values()) { + try { + blob.close(); + } catch (final IOException e) { + LOGGER.error(e); + } + } + + BLOBS.clear(); } /** * Allocates a new handle for a blob to store. *

- * Use this in a call to {@link #submitSave(UUID, InputStream)} to write data into blob storage. - * Then in a call to {@link #submitLoad(UUID, OutputStream)} to read from blob storage. + * Use this in a call to {@link #getOrOpen(UUID)} to open the blob storage. * * @return a new handle. */ @@ -148,27 +75,6 @@ public final class BlobStorage { return UUID.randomUUID(); } - /** - * Frees a handle, marking the data associated with it for deletion. - *

- * Deletions are enacted upon the next call to {@link #synchronize()}, which is called when - * the server is shut down. - *

- * Validating this handle or submitting a serialization or deserialization for this handle - * will "revive" the handle and the data associated with it will not be deleted with - * the next call to {@link #synchronize()}. - * - * @param handle the handle to free. - */ - public static void freeHandle(@Nullable final UUID handle) { - if (handle != null) { - awaitCompletion(READ_HANDLES, handle); - awaitCompletion(WRITE_HANDLES, handle); - - submitJob(WRITE_HANDLES, handle, () -> delete(handle)); - } - } - /** * Validates a blob handle, returning a new one if it is {@code null} or invalid. * @@ -184,93 +90,50 @@ public final class BlobStorage { } /** - * Submits data to be saved to blob storage, identified by the specified handle. + * Get or opens a file channel for the blob with the specified handle. *

- * The given input stream should support batch reading operations for best performance. - * A buffer will be used for copying, so it is not necessary to pass a buffered stream. - *

- * The returned {@link JobHandle} should be waited on before any changes are made to - * the to-be-written data after this call returns. - *

- * The given input stream will be accessed from a worker thread. + * The returned file channel supports random access. * - * @param handle the handle identifying the stored blob data. - * @param dataAccess a stream used to read the data to be written to blob storage. - * @return a job handle that can be used to wait for the data to be completely written. + * @param handle the handle to obtain the file channel for. + * @return the file channel for the requested blob. + * @throws IOException if opening the blob fails. */ - public static JobHandle submitSave(final UUID handle, final InputStream dataAccess) { - // No other job must access a handle when it is being written to. - awaitCompletion(READ_HANDLES, handle); - awaitCompletion(WRITE_HANDLES, handle); + public static FileChannel getOrOpen(final UUID handle) throws IOException { + FileChannel blob = BLOBS.get(handle); + if (blob != null && blob.isOpen()) { + return blob; + } - return submitJob(WRITE_HANDLES, handle, () -> save(handle, dataAccess)); + final Path path = dataDirectory.resolve(handle.toString()); + blob = new RandomAccessFile(path.toFile(), "rw").getChannel(); + BLOBS.put(handle, blob); + return blob; } /** - * Submits data to be saved to blob storage, identified by the specified handle. - *

- * The given input stream should support batch reading operations for best performance. - * A buffer will be used for copying, so it is not necessary to pass a buffered stream. - *

- * The returned {@link JobHandle} should be waited on before any changes are made to - * the to-be-written data after this call returns. - *

- * The given input stream will be accessed from a worker thread. + * Closes the blob with the specified handle. * - * @param handle the handle identifying the stored blob data. - * @param dataAccess a stream used to read the data to be written to blob storage. - * @return a job handle that can be used to wait for the data to be completely written. + * @param handle the handle of the blob to close. */ - public static JobHandle submitLoad(final UUID handle, final OutputStream dataAccess) { - // Multiple jobs may read from a handle at a time but none may write to it when reading from it. - awaitCompletion(WRITE_HANDLES, handle); - - return submitJob(READ_HANDLES, handle, () -> load(handle, dataAccess)); - } - - /////////////////////////////////////////////////////////////////// - - private static void save(final UUID handle, final InputStream input) { + public static void close(final UUID handle) { try { - final Path path = dataDirectory.resolve(handle.toString()); - try (final OutputStream output = Files.newOutputStream(path, StandardOpenOption.CREATE, StandardOpenOption.TRUNCATE_EXISTING)) { - final GZIPOutputStream zipped = new GZIPOutputStream(output); - copyData(input, zipped); - zipped.finish(); + final FileChannel blob = BLOBS.remove(handle); + if (blob != null) { + blob.close(); } - } catch (final Throwable e) { + } catch (final IOException e) { LOGGER.error(e); - } finally { - try { - input.close(); - } catch (final Throwable e) { - LOGGER.error(e); - } } } - private static void load(final UUID handle, final OutputStream output) { - try { - final Path path = dataDirectory.resolve(handle.toString()); - if (!Files.exists(path)) { - return; - } - try (final InputStream input = Files.newInputStream(path, StandardOpenOption.READ)) { - final GZIPInputStream zipped = new GZIPInputStream(input); - copyData(zipped, output); - } - } catch (final Throwable e) { - LOGGER.error(e); - } finally { - try { - output.close(); - } catch (final Throwable e) { - LOGGER.error(e); - } - } - } + /** + * Deletes the blob with the specified handle. + * + * @param handle the handle of the blob to delete. + */ + public static void delete(final UUID handle) { + close(handle); - private static void delete(final UUID handle) { try { final Path path = dataDirectory.resolve(handle.toString()); Files.deleteIfExists(path); @@ -278,64 +141,4 @@ public final class BlobStorage { LOGGER.error(e); } } - - private static void copyData(final InputStream in, final OutputStream out) throws IOException { - final byte[] buffer = new byte[8 * Constants.KILOBYTE]; - int count; - while ((count = in.read(buffer)) > 0) { - out.write(buffer, 0, count); - } - } - - private static JobHandle submitJob(final HashMultimap> map, final UUID handle, final Runnable runnable) { - final CompletableFuture future = CompletableFuture.runAsync(runnable, WORKERS); - map.put(handle, future); - return new JobHandle(future.thenAccept(unused -> completeJob(map, handle, future))); - } - - @SuppressWarnings("SynchronizationOnLocalVariableOrMethodParameter") // I know what I'm doing - famous last words. - private static void completeJob(final HashMultimap> handles, final UUID handle, final Future future) { - synchronized (handles) { - handles.remove(handle, future); - } - } - - @SuppressWarnings("SynchronizationOnLocalVariableOrMethodParameter") // I know what I'm doing - famous last words. - private static void awaitCompletion(final HashMultimap> handles, final UUID handle) { - // Jobs remove themselves from the list on completion, so we must synchronize access to - // the map of pending jobs, but must release the lock when waiting for the job so as not - // to create a deadlock. - final Set> futures; - synchronized (handles) { - futures = handles.removeAll(handle); - } - - for (final Future future : futures) { - await(future); - } - } - - @SuppressWarnings("SynchronizationOnLocalVariableOrMethodParameter") // I know what I'm doing - famous last words. - private static void awaitCompletion(final HashMultimap> handles) { - // Jobs remove themselves from the list on completion, so we must synchronize access to - // the map of pending jobs, but must release the lock when waiting for the job so as not - // to create a deadlock. - final Set> futures; - synchronized (handles) { - futures = new HashSet<>(handles.values()); - handles.clear(); - } - - for (final Future future : futures) { - await(future); - } - } - - private static void await(final Future future) { - try { - future.get(); - } catch (final Throwable e) { - LOGGER.error("Scheduled blob storage job threw an exception.", e); - } - } } diff --git a/src/main/java/li/cil/oc2/common/tileentity/DiskDriveTileEntity.java b/src/main/java/li/cil/oc2/common/tileentity/DiskDriveTileEntity.java index cbc37271..a26c76fd 100644 --- a/src/main/java/li/cil/oc2/common/tileentity/DiskDriveTileEntity.java +++ b/src/main/java/li/cil/oc2/common/tileentity/DiskDriveTileEntity.java @@ -1,8 +1,6 @@ package li.cil.oc2.common.tileentity; import li.cil.oc2.api.bus.device.vm.VMDevice; -import li.cil.oc2.api.bus.device.vm.VMDeviceLoadResult; -import li.cil.oc2.api.bus.device.vm.context.VMContext; import li.cil.oc2.common.Config; import li.cil.oc2.common.Constants; import li.cil.oc2.common.block.DiskDriveBlock; @@ -12,6 +10,7 @@ import li.cil.oc2.common.container.TypedItemStackHandler; import li.cil.oc2.common.item.FloppyItem; import li.cil.oc2.common.network.Network; import li.cil.oc2.common.network.message.DiskDriveFloppyMessage; +import li.cil.oc2.common.serialization.BlobStorage; import li.cil.oc2.common.tags.ItemTags; import li.cil.oc2.common.util.ItemStackUtils; import li.cil.oc2.common.util.LocationSupplierUtils; @@ -30,12 +29,12 @@ import net.minecraftforge.api.distmarker.OnlyIn; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import javax.annotation.Nonnull; import javax.annotation.Nullable; import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; +import java.nio.MappedByteBuffer; +import java.nio.channels.FileChannel; import java.time.Duration; -import java.util.Optional; public final class DiskDriveTileEntity extends AbstractTileEntity { private static final Logger LOGGER = LogManager.getLogger(); @@ -149,6 +148,7 @@ public final class DiskDriveTileEntity extends AbstractTileEntity { } @Override + @Nonnull public ItemStack getStackInSlot(final int slot) { final ItemStack stack = getStackInSlotRaw(slot); exportDeviceDataToItemStack(stack); @@ -156,6 +156,7 @@ public final class DiskDriveTileEntity extends AbstractTileEntity { } @Override + @Nonnull public ItemStack extractItem(final int slot, final int amount, final boolean simulate) { if (slot == 0 && !simulate && amount > 0) { exportDeviceDataToItemStack(getStackInSlotRaw(0)); @@ -197,8 +198,6 @@ public final class DiskDriveTileEntity extends AbstractTileEntity { return; } - device.serializeData(); - final CompoundNBT tag = new CompoundNBT(); device.exportToItemStack(tag); ItemStackUtils.getOrCreateModDataTag(stack).put(DATA_TAG_NAME, tag); @@ -206,46 +205,48 @@ public final class DiskDriveTileEntity extends AbstractTileEntity { } private final class DiskDriveVMDevice extends AbstractBlockDeviceVMDevice { - private VMContext context; - public DiskDriveVMDevice() { super(DiskDriveTileEntity.this); } public void updateBlockDevice(final CompoundNBT tag) { - blobHandle = null; + if (device == null) { + return; + } + + if (blobHandle != null) { + BlobStorage.close(blobHandle); + blobHandle = null; + } + importFromItemStack(tag); - if (data != null) { - data = createBlockDevice(); - deserializeData(); - - assert device != null; - try { - device.setBlock(data); - } catch (final IOException e) { - LOGGER.error(e); - } + try { + device.setBlock(createBlockDevice()); + } catch (final IOException e) { + LOGGER.error(e); } } public void removeBlockDevice() { - updateBlockDevice(new CompoundNBT()); + if (device == null) { + return; + } + + if (blobHandle != null) { + BlobStorage.close(blobHandle); + blobHandle = null; + } + + try { + device.setBlock(EMPTY_BLOCK_DEVICE); + } catch (final IOException e) { + LOGGER.error(e); + } } @Override - public VMDeviceLoadResult load(final VMContext context) { - this.context = context; - return super.load(context); - } - - @Override - protected int getSize() { - return Config.maxFloppySize; - } - - @Override - protected BlockDevice createBlockDevice() { + protected BlockDevice createBlockDevice() throws IOException { final ItemStack stack = itemHandler.getStackInSlotRaw(0); if (stack.isEmpty() || !(stack.getItem() instanceof FloppyItem)) { return EMPTY_BLOCK_DEVICE; @@ -257,29 +258,10 @@ public final class DiskDriveTileEntity extends AbstractTileEntity { return EMPTY_BLOCK_DEVICE; } - return ByteBufferBlockDevice.create(capacity, false); - } - - @Override - protected Optional getSerializationStream(final BlockDevice device) { - if (context != null) { - context.joinWorkerThread(); - } - - if (device.isReadonly()) { - return Optional.empty(); - } else { - return Optional.of(device.getInputStream()); - } - } - - @Override - protected OutputStream getDeserializationStream(final BlockDevice device) { - if (context != null) { - context.joinWorkerThread(); - } - - return device.getOutputStream(); + blobHandle = BlobStorage.validateHandle(blobHandle); + final FileChannel channel = BlobStorage.getOrOpen(blobHandle); + final MappedByteBuffer buffer = channel.map(FileChannel.MapMode.READ_WRITE, 0, capacity); + return ByteBufferBlockDevice.wrap(buffer, false); } @Override