Alternatives to ChunkStore Implementation


#1

The current ChunkStore writes/reads to/from fs directly in a blocking way. This makes it a bottleneck to the vault’s performance when handling large amount of incoming chunks in a short period.
The performance test and benchmark tests shows that the throughput is around 10 MB/s writing speed and 25 MB/s reading speed (0.03s to read an 1MB chunk and 0.083s write an 1MB chunk).

The purpose of this document is to investigate possible alternatives to ChunkStore so that the performance can be improved.

It is assumed that there will be only one vault on one machine.

Database

The main benefits of using a database are:

  • DB file can safely be moved from one machine to another (regardless of OS/endianness)
  • handles disk IO properly

However, due to the fact that the total_size of chunks to be stored could be huge, such database is not suitable te be executed in in-memory mode (though a benchmark from rusqlite + sqlite3 shows the throughput can achieve 0.022s writes an 1MB chunk).
When executed with db file mode, a benchmark of rusqlite + sqlite3 shows 0.2s writes an 1MB chunk, which is even slower than the current chunk_store.

Key-Value-Store (cask)

A key-value-store can be seen as a simplified version of data_base. Our chunk_store is one of this. However, the current implementation needs to access FS on each operation, makes it behave very similar to the database using a DB file (hence slow).
Another KVS – cask, groups entries into file. This makes it acting like an in-memory database using a page cache (default size is 2GB).
It has two working modes: sync or non-sync. Which toggles whether carries out file.sync_data() on each write operation.
A benchmark shows 0.077s write an 1MB chunk with sync and 0.018s writes an 1MB chunk without sync.

It needs to be noted that:

  • When the limit of 2GB reached, a new log_file will be created and used. On request of get an entry, the log file contains the target will be loaded. This means there is chance max 4GB need to be loaded.
  • The mutation of data is achieved by append entry. i.e. when delete an entry, a special Entry::Delete will be appended to the tail of the log_file; when modiy an entry, the new copy will be appended. This makes the real used disk space will be totally different to the statistics. And in the extreme case, unlimited log files will be generated by just repeatedly updating a single entry.

Background Threading ChunkStore

Detached Worker Threads

If the chunk_store is only used for immutable_data (no mutation), the put operation can be put into a detached worker thread.

pros:

  • simplest to implement
  • super fast regarding the burst throughput

cons:

  • can only be used for immutable_data
  • when get happens at the same with put, may need to ask addiontional copies to be fetched, due to the uncompleted worker thread
  • in the extreme case, a get request may return with failure if the put worker threads haven’t been completed on the majority of the DMs
  • mutliple threads accessing FS at the same time may take a longer total time to complete
  • when there is burst of chunks, there might be too many threads to be spawned.

Managed Worker Threads

The Joiner of each worker thread of put operation can be stored. And a later on get or mutate operation has to wait till that worker thread completed.
We may want something like HashMap<DataId, Joiner>, and a Joiner for completed thread shall be removed from this map.

pros:

  • can be used for immutable_data as well
  • super fast regarding the burst throughput of put

cons:

  • mutliple threads accessing FS at the same time will take a longer total time to complete
  • when there is burst of chunks, there might be too many threads to be spawned.
  • performance benchmark is dependent on usage pattern, i.e. get or mutate operation to non-stopped worker thread affects throughput.

The performance test and benchmark tests on the tryout implementation shows that the throughput is around 55 MB/s writing speed and 30 MB/s reading speed (0.03s to read an 1MB chunk and 0.017s write an 1MB chunk). The slow down on reading speed is because the get operation needs to wait for the completion of put operation.

pub struct ChunkStore {
    ...,
    workers: HashMap<Key, (Arc<AtomicBool>, thread::Joiner)>,
    keys: BTreeSet<Key>,
}

impl ChunkStore {
    fn clean_up_threads(&mut self, key: &Key) {
        let _ = self.workers.remove(&key);

        let mut completed_threads = Vec::new();
        for (key, &(ref atomic_completed, _)) in self.workers.iter() {
            if atomic_completed.load(Ordering::Relaxed) {
                completed_threads.push(key);
            }
        }
        for key in &completed_threads {
            let _ = self.workers.remove(key);
        }
    }

    pub fn put(&mut self, key: &Key, value: &Value) {
        self.clean_up_threads(key);

        let atomic_completed = Arc::new(AtomicBool::new(false));
        let atomic_completed_clone = atomic_completed.clone();
        let joiner = thread::named("background_put", move || {
            // serialise data and write the file.
            atomic_completed_clone.store(true, Ordering::Relaxed);
        });
        let _ = self.workers.insert(key, (atomic_completed, joiner));
        let _ = self.keys.insert(key);
    }
    
    pub fn get(&mut self, key: &Key) -> Value {
        self.clean_up_threads(key);
        // read file from FS
    }
    
    pub fn delete(&mut self, key: &Key) {
        self.clean_up_threads(key);
        self.keys.remove(key);
        // delete file from FS
    }
    
    pub fn has(&self, key: &Key) -> bool {
        self.keys.contains(key)
    }
    
    pub fn keys(&self) -> BTreeSet<Key> {
        self.keys
    }
}

Single thread background chunk_store

Since that will be I/O and not CPU bound, more than one worker threads access the file system at the same time gains nothing in total.
It is suggested to have one single chunk store thread that does the I/O and communicates with the event thread via channels.

pros:

  • super fast regarding the burst throughput of put
  • single access to the FS

cons:

  • the backgroud chunk_store could be very retard and requires a large cache to hold un-processed chunks
  • DM needs to expect the results from the background chunk_store via channel
  • many public function of getters (such as has, keys, used_space and max_space) now need to switch to report via channel
  • the mutate operations needs to be handled in proper order by the background chunk_store
enum CSOperations {
    Put(Key, Value),
    Get(Key),
    Has(Key),
    Delete(Key),
    GetKeys,
    GetUsedSpace,
    GetMaxSpace,
    Terminate
}

enum CSResponses {
    GetResponse(Value),
    Has(bool),
    Keys(BTreeSet<Key>),
    UsedSpace(u64),
    MaxSpace(u64),
}

pub struct ChunkStore {
    ...,
    cs_op_sender: mspc::Sender,
    cs_rsp_receiver: mspc::Receiver,
    _joiner: Joiner,
}

impl ChunkStore {
    pub fn new() -> Self {
        let (cs_op_sender, cs_op_receiver) = mpsc::channel::<CSOperations>();
        let (cs_rsp_sender, cs_rsp_receiver) = mpsc::channel::<CSResponses>();
        ChunkStore {
            ...,
            cs_op_sender: cs_op_sender,
            cs_rsp_receiver: cs_rsp_receiver,
            _joiner: thread::named("Background chunk_store",
                                       move || {
                                           let worker = Worker::new(cs_op_receiver, cs_rsp_sender);
                                           worker.run();
                                        }),
        }
    }
    
    pub fn handle_put(&self, key: Key, value: Value) {
        self.cs_op_sender(CSOperations::Put(key, value))
    }
    
    pub fn handle_get(&self, key: Key) -> Value {
        self.cs_op_sender(CSOperations::Get(key));
        loop {
            if let Ok(CSResponses::GetResponse(value)) = self.cs_rsp_receiver.try_recv() {
                return value;
            }
        }
    }
    
    pub fn handle_delete(&self, key: Key) {
        self.cs_op_sender(CSOperations::Delete(key))
    }
    
    fn has_chunk(&self, key: Key) -> bool {
        self.cs_op_sender(CSOperations::Has(key));
        loop {
            if let Ok(CSResponses::Has(result)) = self.cs_rsp_receiver.try_recv() {
                return result;
            }
        }
    }
    
    fn chunk_keys(&self) -> BTreeSet<Key> {
        self.cs_op_sender(CSOperations::GetKeys);
        loop {
            if let Ok(CSResponses::Keys(result)) = self.cs_rsp_receiver.try_recv() {
                return result;
            }
        }
    }
    
    fn get_used_space(&self) -> u64 {
        self.cs_op_sender(CSOperations::GetUsedSpace);
        loop {
            if let Ok(CSResponses::UsedSpace(result)) = self.cs_rsp_receiver.try_recv() {
                return result;
            }
        }
    }
    
    fn get_max_space(&self) -> u64 {
        self.cs_op_sender(CSOperations::GetMaxSpace);
        loop {
            if let Ok(CSResponses::MaxSpace(result)) = self.cs_rsp_receiver.try_recv() {
                return result;
            }
        }
    }
}

pub struct Worker {
    ...,
    cs_rsp_sender: mspc::Sender,
    cs_op_receiver: mspc::Receiver,
}

impl Worker {
    pub fn new(cs_op_receiver: mspc::Receiver, cs_rsp_sender: mspc::Sender) -> Self {
        ChunkStore {
            ...,
            cs_rsp_sender: cs_rsp_sender,
            cs_op_receiver: cs_op_receiver,
        }
    }
    
    pub fn run(&self) {
        loop {
            while let Ok(operation) = self.cs_op_receiver.try_recv() {
                match operation {
                    Put(key, value) => self.put(key, value),
                    Get(key) => self.cs_rsp_sender.send(CSResponses::GetResponse(self.get(key))),
                    Has(key) => self.cs_rsp_sender.send(CSResponses::Has(self.has(key))),
                    Delete(Key) => self.delete(key),
                    GetKeys => self.cs_rsp_sender.send(CSResponses::Keys(self.keys())),
                    GetUsedSpace => self.cs_rsp_sender.send(CSResponses::MaxSpace(self.used_space)),
                    GetMaxSpace => self.cs_rsp_sender.send(CSResponses::UsedSpace(self.max_space)),
                    Terminate => return,
                }
            }
        }
    }
}

Using a buffer

A buffer can be used as an optimisation to the current chunk_store. Simply by using an lru_cache, the performance of current chunk_store on read access can be improved a lot when a high cache-hit rate is expected.

The approach of single thread background chunk_store can be optimised further by using a buffer and single thread background put inside.

pros:

  • super fast regarding the burst throughput of put and get (with high cache-hit rate)
  • single background access to the disk
  • controllable resource usage (RAM and threading)

cons:

  • the buffer needs to be carefully maintained to retain the proper operation sequence
  • chunks might not get flushed to disk due to un-expected termination
enum Status {
    InMemory,
    Flushing,
    OnDisk,
}

struct Chunk {
    value: Value,
    time_stamp: Instant,
    status: Status,
}

impl Chunk {
    fn new(value: Value) -> Self {
        Chunk {
            value: value,
            time_stamp: Instant::now(),
            status: Status::InMemory,
        }
    }

    fn set_flushed(&mut self) {
        if self.status == Status::Flushing {
            self.status = Status::OnDisk;
        }
        self.time_stamp = Instant::now();
    }
    
    fn set_flushing(&mut self) {
        self.status = Status::Flushing;
        self.time_stamp = Instant::now();
    }
    
    fn get_value(&mut self) -> Value {
        self.time_stamp = Instant::now();
        self.value
    }
    
    fn is_expired(&self) -> bool {
        chunk.status == Status::OnDisk && self.time_stamp.elapsed > EXPIRY_DURATION
    }
    
    fn can_be_purged(&self) -> Option<Duration> {
        if chunk.status == Status::OnDisk {
            Some(self.time_stamp.elapsed)
        } else {
            None
        }
    }
}

enum CSOperations {
    ...,
    Flushed(Key),
}

pub struct Worker {
    ...,
    cs_op_sender: mspc::Sender,
    buffer: BTreeMap<Key, Chunk>,
    worker: Option<(Key, thread::Joiner)>,
}

impl Worker {
    pub fn run(&self) {
        ......
        Flushed(key) => self.handle_flushed(key),
    }
    
    fn handle_flushed(&mut self, key: Key) {
        if let Some(chunk) = self.buffer.get_mut(key) {
            chunk.set_flushed();
        }
        if let Some((work_key, _)) = self.worker {
            if work_key != key {
                return;
            }
        }
        self.worker = None;
        self.purge_and_flush();
    }
    
    fn purge_and_flush(&mut self) {
        // purge chunks that `chunk.status == Status::OnDisk && chunk.is_expired()`
        // if still too many, keeps purge `can_be_purged` chunks from oldest till buffer is not too large

        if self.worker.is_some() {
            return;
        }
        // pick an oldest InMemory chunk in the buffer
        chunk.set_flushing();
        let cs_op_sender_clone = self.cs_op_sender;
        let joiner = thread::named("background_put", move || {
            // serialise data and write the file.
            cs_op_sender_clone.send(CSOperations::Flushed(key));
        });
        self.worker = Some((key, joiner));
    }
    
    fn put(&mut self, key: Key, value: Value) {
        self.buffer.insert(key, Chunk::new(value));
        self.purge_and_flush();
    }
    
    fn get(&mut self, key: Key) -> Value {
        if Some(chunk) = self.buffer.get_mut(key) {
            chunk.get_value()
        } else {
            // read from disk
        }
    }
    
    fn delete(&mut self, key: Key) {
        if Some(chunk) = self.buffer.remove(key) {
            match chunk.status {
                OnDisk => // delete from disk,
                ImMemory => // do nothing,
                Flushing => // wait till join then delete from disk,
            }
        } else {
            // delete from disk
        }
    }
}

MixedUp

It is possible to having one vault owns multiple chunk_store, which uses different approaches depends on the data type.

For example, for immutable_data, it can use a immutable_chunk_store, which completes put operation in spawned detached worker loader.
For mutable_data, a mutable_chunk_store, which uses in-memory database or background chunk_store.

pros:

  • each specific data type has its most suitable approach to achieve the best performance with least bad effects

cons:

  • can have many duplicated code and a larger code base to maintain

Avoid serialisation of ImmutableData

The API of current chunk_store takes Value in as serialisable then carries out serialisation before writing it to file. A benchmark test shows that 0.015s serialise an 1MB chunk.
To make comparation fair, the above benchmark tests of rusqlite and cask all include serialisation of data. When get that part removed, the evaluation shows 0.0025s writs an 1MB data for cask without sync and 0.0017 writes an 1MB data for in-memory database.

If the immutable data (normally will be as large as 1MB size) will be the main burden to the chunk_store. It will be worth to consider make value as Data type, and use ImmutableData.value() directly to avoid serialisation.
A quick benchmark test on per-put threading chunk_store without serialising the immutable data shows 0.005s writes an 1MB data.

Using shorter file_name

The current file name for a stored chunk needs to retain the id and version info for a quick query. This results in a much longer file name (72 hex chars for immutable_data and 88 hex chars for mutable_data).
If the id and version info to be stored in a separate cache (in-memory data structrue or a data_base), then a much shorter name, say 8 hex chars, can be used as the file name of chunks.
The current chunk_store::keys() query can then be no longer a cost iteration in file system, but a quick in-memory query.
The map can be Vec<Key>, and file_name can be calculated as file_name = Sha3_224(serialise(key)).to_hex().trucate(16) (which has a collision rate of 1/2^64).
alphanumeric name (non-case-sensitive and skips I,O,U,Z) could be used to push down the collision rate to 1/2^80.

Summary

Note:

  • When construct an ImmutableData from Vec, a hash of sha256 currently will be executed to calculate the name. This turned out to be a more cost expensive operation than de-serialisation. To compare the performance, a fake operation (return empty Data::Structured) has been used which proves the similiar result (0.0008s) as to the im-memory database or cask without deserialisation. This indicates an ImmutableData constructor from name and value directly might be necessary.
  • The tests were exectued with Ubuntun 14.04 64bits. When executed the same benchmark tests on the same machine with Windows 8 64bits using curren chunk_store, the read performance shows an improvement to 0.01s read 1MB data. However the write performance doesn’t have any change.

#2

Slight concern about one store per machine because this may leave spare resources unusable where the is more than one disk/partition with suitable space.

Since most machines will be set up for other things, and then have a vault added, this is quite likely.


#3

This is a huge negative feature. As @happybeing says people will want and need to run more than one vault per machine. Surely you can store the data required in a customised location and thus 2 vaults running simply use different areas of the storage space on the machine.


#4

I’m not clear on the terminology here. Above a vault could either mean a node/process/wallet that had been allocated a disk location, or it could refer to the chunk store disk location.

To clarify, I’m not concerned here about how many vault processes can run on a machine, but how many locations on disk can be allocated for SAFE storage.

Say I have two disks or two partitions with suitable space, I want to be able to offer storage aggregated across both, but I don’t mind if that is one process or two, so long as it’s reasonably efficient either way.


#5

Yes, I think though this topic is more related to how to efficiently store the chunks locally. So not really where or how many process. The focus is on in memory stores, buffered writes/reads or key value store systems, perhaps just plain buffered / threaded write/read pipelines.


#6

Routing already controls that there is only one vault per local LAN, which is stricter than one vault per machine. This is fine with me because I understand this is needed for the network stability.

The problem is for local networks. Until now it was relatively easy to modify the code to bypass the control and @ustulation even gave me some hints for external reachability and for invitations. I just hope that the code change will remain easy with new chunk store implementation.

For reference, here are the 3 lines of code I currently add in Vault::with_config function to allow distinct chunk_store directories on a machine with several vaults.

            use rand::{self, Rng};
            use hex::ToHex;
            chunk_store_root.push(rand::thread_rng().gen_iter().take(8).collect::<Vec<u8>>().to_hex());

#7

There is code happening now that will affect this for the better.

  1. Secure name will ensure a node can be recognised by that name through the code and not have 3 names as we do now and even restart with that name.
  2. Node age will allow a node to restart with a name, but immediately be relocated (after it does what we ask it to, such as give us data it may have etc. (ofc data chains are needed for that))

This means chunk stores will eventualy be named with the node name and not deleted on drop/restart. I think this will give you what you wish for here, but automatically.


#8

Hey,

I’m the author of the cask crate, really happy that someone is looking at it! Though I must add that, as far as I know, it’s never been used in production and I haven’t done any kind of performance testing. With that said, I’d be willing to help in any way needed if you intend to use it. I need to write some tests which are lacking, and also expose some configuration options which are currently hardcoded (e.g. the maximum file size threshold is currently hardcoded to 2GB).


#9

Fantastic @andrebeat that is really cool to hear and if we can help each other then it’s a great thing. I checked out cask a while back and was pretty impressed actually, ofc early stage, but looks to have had a decent amount of thought and honesty in the README as well.

Anyway I just wanted to say hi and thanks :slight_smile:

Plus, I am so nice I never mentioned emacs, just hoping spaces, not tabs :wink:


#10

Really appreciate the work put into the OP. Makes me very happy to see this!

How does this assumption affect the tests / options? As far as I can see all the options are valid for multiple vaults per machine.

FWIW I prefer buffer based on info in OP. MixedUp is a great idea, but perhaps it’s best do the simpler thing first (eg buffer) and then find the next low-hanging-fruit for performance (probably not PUT IO) before implementing MixedUp.

Also to echo @happybeing I think a good additional feature for vaults (albeit off topic to this thread) would be to allow multiple directories for chunk store so that multiple hdd can be used without having to combine them (ie to avoid needing RAID etc).


#11

[quote=“ustulation, post:1, topic:584”] When the limit of 2GB reached, a new log_file will be
created and used. On request of get an entry, the log file contains the target will be loaded. This
means there is chance max 4GB need to be loaded. [/quote]

I’m not sure what you mean by this. The whole log doesn’t need to be loaded into memory, since we
keep the position of the entry in the log file in an in-memory index. get operations are
guaranteed to only take one disk seek. I’ve given a presentation on this if you’re interested you
can take a look here.

I should probably learn a bit more about the current chunk store implementation so that I can better
evaluate the alternatives you’ve given. That said I think if your design is simple enough the OS FS
cache should be able to do a good job, so I’d be wary of managing your own cache/buffer.

[quote=“happybeing, post:4, topic:584”] To clarify, I’m not concerned here about how many vault
processes can run on a machine, but how many locations on disk can be allocated for SAFE storage.
[/quote]

I think maybe we can sidestep this discussion for now by providing a simple enough interface for
whatever KV store ends up being used. This way, in the future we could provide a more advanced
backend that would allow this. That said, AFAIK I don’t think many databases provide this feature in
the first place.

[quote=“dirvine, post:9, topic:584”] Fantastic @andrebeat that is really cool to hear and if we can
help each other then it’s a great thing. I checked out cask a while back and was pretty impressed
actually, ofc early stage, but looks to have had a decent amount of thought and honesty in the
README as well. [/quote]

Thank you, but credit where credit is due the overall design is based on
bitcask. I should maybe even rename the project to avoid any
conflicts (?).

Haha, in my defense I use vim-emulation inside emacs, so hopefully I get the best of both worlds.
:rolling_eyes: Yes, spaces over tabs! :wink:


#12

The whole log doesn’t need to be loaded into memory, since we
keep the position of the entry in the log file in an in-memory index. get operations are
guaranteed to only take one disk seek.

Yes, you are right.
That sentence of This means there is chance max 4GB need to be loaded. is incorrect.
The log file contains the target won’t be loaded and there will be only one seek.