Plasma GitLab Archive
Projects Blog Knowledge

Plasma Project:
Home 
Manual 
FAQ 
Slides 
Perf 
Releases 

Plasmafs_protocol


The PlasmaFS Protocol

PlasmaFS uses SunRPC for all TCP connections. There is nothing that is language-dependent.

There are a number of servers involved:

  • Main namenode server: All namenode operations can be done here. This server implements the public RPC programs Coordination, Filesystem, and Dn_admin, and the internal RPC programs Elect, and Nameslave. (Here, "public" means clients outside PlasmaFS may call it, and "internal" means the RPC programs should only be called by other PlasmFS server processes.)
  • Main datanode server: Clients can here read and write blocks. This server implements the public RPC program Datanode, and the internal RPC program Datanode_ctrl.
  • I/O processes of datanode: These are internal worker processes controlled by the main datanode server. The implement the internal program Datanode_io.
Usually clients communicate only with the main namenode server and as many main datanode servers as necessary.

There is right now nothing that prevents clients from calling internal RPC programs.

Coordination: Finding the coordinator

The first step of a client is to find the coordinator. The client may know one or several namenode ports, however, only a certain port is the right one to send the namenode queries to. At cluster startup, the namenodes elect a coordinator. Only the coordinator can actually respond to client queries. However, all namenode servers create the namenode socket, and RPC requests can be sent to them. The non-coordinators will emit errors if they get a query only the coordinator is able to respond.

The program Coordination (reachable via the main namenode port) includes functions to find the coordinator. All namenode servers can respond to these queries:

From a client's perspective it is unpredictable which host becomes the coordinator. Usually, clients just call find_coordinator to get the information. Long-running clients may call is_coordinator every time they establish a new TCP connection to a namenode to validate whether this is still the coordinator.

The Filesystem program

The Filesystem program allows the client to:

  • get and set metadata about files (the so-called inodes)
  • get and set where the blocks of the files are (but there is no function to actually read or write blocks)
  • navigate in the directory tree, and to manipulate it

Impersonation

Usually, the first step of the client is to impersonate as a certain user and group. This is done by passing the authentication ticket to the RPC procedure impersonate.

Inodes

An inode is identified by a 64 bit integer (in XDR notation this is the hyper type). The inode ID's are only used once - this is important because it means once you get an inode ID at hand, it is a permanent identifier for the same object. It is not possible that a parallel accessing client changes the inode ID so it points to a different file.

Unlike the normal Unix filesystem interface, the PlasmaFS protocol returns the inode ID's to the user, and the user can also access files by this ID. This is even the primary method of doing so.

An inode can exist without filename - but only for the lifetime of a transaction. When the transaction is committed, inodes without filename are automatically deleted. This prevents that PlasmaFS space is permanently consumed without having a visible entry in the directory tree.

There are two kinds of objects connected with an inode:

  • Metadata: The inodeinfo struct contains information such as owner, EOF position, access rights, etc. This struct resembles what is traditionally stored in a Unix inode, but is not exactly identical. For example, the field with the number of filename links is missing.
  • Content data: The blocklist array consisting of blockinfo elements describe where the data blocks are stored
The Filesystem program includes functions for accessing and modifying both kinds of data.

As an important addition of what is traditionally stored in an inode, PlasmaFS includes a sequence number in inodeinfo. The sequence number is automatically increased when the content data is changed. We will later see that the modification of content data always implies that new data blocks are allocated, so every change of the file contents is reflected as a change in the blocklist, i.e. it is not possible to change content data without notifying the namenode. This allows the namenode to maintain the sequence number. The sequence number can be thought as a version number of the contents, and may e.g. be useful for quick checks whether content has changed compared to some previous point in time.

Operations

Transactions

All metadata and data accesses are done in a transactional way. (There are a few exceptions, but these are quite dangerous.) This means the client has to open a transaction (begin_transaction ) first, and has to finish the transaction after doing the operations (either via commit_transaction or abort_transaction). A transaction sees immediately what other transactions have committed, i.e. we generally have a "read committed" isolation level. (An important exception is explained below.)

The client may open several transactions simultaneously on the same TCP connection. A transaction ID is used to identify the transactions.

When the TCP connection terminates, all pending transactions are implicitly aborted. This protects server resources. (Note that there are some myths that SunRPC does not allow it to control the connections on which messages are sent. This is nonsense. The only caveat is that some SunRPC implementations have automatic connection control on some higher API levels, and of course this has to be turned off in order to call PlasmaFS procedures.)

From the client's perspective the transactions look very much like SQL transactions. There are some subtle differences, though:

  • The way competing accesses to the same piece of data are handled. Generally, PlasmaFS uses a pessimistic concurrency scheme - locks are acquired before an operation is tried. When the locks cannot be acquired immediately, PlasmaFS aborts the operations and returns the error ECONFLICT. It does not wait until the locks are free again. The rationale is that the namenode is a scarce resource, and we don't want to waste it for organizing waiting. In the future we might add better lock management, but this will be most likely be done in a separate server.
  • The transactions extend to the whole cluster because the accesses to the datanode servers are also covered by transactions. This is done by handing out tickets to the client which the client has to present to the datanode server when reading or writing blocks. There is some communication hidden from the client between the namenode and the datanodes, where the namenode tells the datanodes when transactions start and finish.
There is no limit on the length of transactions. However, all transactional state is kept in RAM, meaning there is some implicit limit on the length of transactions.

Clients make best use of transactions when they only use them for short sequences of operations. The background of this is not only limited memory, but also the danger of getting ECONFLICT. The best way the clients handle this is to abort the current transaction, and to restart it from the beginning.

Note that the PlasmaFS transactions do not translate directly to PostgreSQL transactions. Actually, PlasmaFS keeps a number of PostgreSQL transactions continuously open, but these are only for reading data. During a commit, another PostgreSQL transaction is used for writing data. On PostgreSQL level, no conflicting updates can occur anymore.

Operations

Directories

PlasmaFS stores directories not as a special kind of file, but in the namenode database. This has the advantage of not having to deal with a special file format, and also the advantage of saving disk space for clusters with large block sizes.

There is a special PostgreSQL table storing the directory tree. In addition to this, each directory also has an inode. The inode is used for storing access rights. It is not possible to allocate data blocks for a directory inode.

It is possible that an inode has several file names. This works just like the hard links in Unix.

Unlike Unix, a directory does not have special entries for "." and "..". The list operation does not return these entries (and they do not exist in the databse table). Nevertheless, one can use "." and ".." in file paths - when following paths, the namenode server checks specially for these path particles, and interprets these in a compatible way.

There is subtle difference, though. When counting the number of hard links of directories, PlasmaFS will always return 1. In the traditional Unix implemention, however, the number of links of directories depends on the number of subdirectories (because ".." is considered as a link). This rarely causes compatibility problems.

There are symbolic links, and the lookup operations follows them.

Operations

  • lookup: Find an inode by file path
  • link: Create a file path for an inode
  • unlink: Remove a file path for an inode
  • list: List the contents of a directory

Example 1: Create file

This transaction creates a new file by allocating a new inode, and then linking this inode to a filename:

tid = 1;                          /* transaction ID chosen by client */
begin_transaction(tid);
inode = allocate_inode(tid, ii);  /* ii: an inodeinfo struct */
link(tid, "/dir/filename", inode);
commit_transaction(tid);

Example 2: Rename file

There is a rename operation. For non-directories, one can easily get exactly the same effect by:

tid = 1;                          /* transaction ID chosen by client */
begin_transaction(tid);
inode = lookup(tid, -1, "/dir/oldname", true);
unlink(tid, "/dir/oldname");
link(tid, "/dir/newname", inode);
commit_transaction(tid);

There is a subtle point, though. The lookup operation acquires a lock for the argument filename, so no competing transaction can unlink it or replace it, so we can be sure the looked up inode really corresponds to the filename.

Block allocation

As already mentioned, the client needs to present tickets to the datanode server in order to get the permission to read or write file blocks. The ticket system is controlled by the namenode. We have to keep that in mind for the following that the namenode can exactly control which blocks are accessible for the client.

Block allocation happens when a client extends a file, or when it modifies a file. The latter is crucial, because we cannot allow that clients directly modify a file. Instead, clients have to allocate replacement blocks for the file region they want to modify, and have to write the replacement blocks in units of whole blocks to the datanode servers. For instance, if the client wants to modify a single byte in a file, it has to read the whole block, change the byte locally, allocate a replacement block, and write the modified block to the position for the replacement.

This looks complicated, but is necessary. Why? If we allowed direct modification of the block, the data change would be immediately visible to all other transactions, even before commit. Well, one could say, then let's that happen, and do not make such transactional guarantees. This is a bit short-sighted, however, because we do not implement transactions to establish a nice logical view to data, but also to make modifications safe.

The problem here is that we do not write the data block to only one datanode, but that it is the task of the client to write the same block to all datanodes that store replicas of the block. We have to ensure that these replicas are all identical. If we allowed direct modifications of existing blocks, we cannot be sure whether these modifications are done on all datanodes in the same way: The client could simply crash in the middle of writing the replicas. In the transactional scheme, however, a crash is no longer a problem. The client would not reach the point where the changes are committed. When the transaction is aborted, the namenode simply gives the (incompletely written) replacement blocks up, and ensures again a consistent view.

Note that we fully trust the client to write the same data to all replicas. Even worse, we fully trust the client to write the data blocks at all: after allocating blocks, the client may choose not to write data to the datanode, in which case the allocated blocks simply keep their old content (which may be data from other files). Once PlasmaFS gets access control (which is not yet implemented), this becomes an issue, because data can leak from files a user must not have access to. This will be fixed when we introduce access control to PlasmaFS.

Operations

  • allocate_blocks: Allocate new or replacement blocks for an inode
  • free_blocks: Free blocks of an inode
  • get_blocks: Get the blocks of an inode (i.e. get where the blocks are stored)

Example of block allocation

Here, we allocate 10 blocks at block position 0 of the file. If there are already blocks, the old blocks are implicitly deallocated, i.e. we get the effect of a replacement. The returned blockinfo structs b have everything we need to write the data blocks to the datanodes. (See below.)

tid = 1;
begin_transaction(tid);
inode = lookup(tid, -1, "/filename", false);
bl = allocate_blocks(tid, inode, 0, 10, true, []);
for (b in bl) {
  <write_data_to_datanode>(b);
};
commit_transaction(tid);

Reading allocated block data

It looks trivial to read the data blocks: Just get the array of blockinfo structs that say where the data blocks are stored, and contact the datanodes for getting the data.

There is one problem, though. A read operation may conflict with the deallocation of blocks, i.e. we must prevent that a client reads blocks that are deallocated at the same time. For example, we could have two transactions doing this:

<transaction 1>                         <transaction2>
begin_transaction(tid1);
bl = get_blocks(tid1, inode, pos, len);
<read_data_from_datanode>(bl[0]);
<read_data_from_datanode>(bl[1]);       begin_transaction(tid2);
<read_data_from_datanode>(bl[2]);       free_blocks(tid2, inode, pos, len);
...                                     commit_transaction(tid2);
<read_data_from_datanode>(bl[k]);
...
<read_data_from_datanode>(bl[n-1]);
commit_transaction(tid1);

The second transaction deallocates the blocks of the inode while the first transaction still reads them, and even commits this modification. With the usual "read committed" isolation level, we would be faced by the problem that the blocks are no longer associated to the inode, and can even be reused by other transactions that allocate blocks.

PlasmaFS solves this by enforcing the stricter isolation level of "repeatable read" in this case. Once blocks are handed out to the client, they cannot be immediately deallocated. Instead, the blocks enter a special transitional state between "used" and "free". They are no longer in the blocklist of the inode, but they cannot be reclaimed immediately for other files. The blocks leave this special state when the last transaction finishes accessing these blocks.

What about EOF?

As you see, the client extends files block by block. Well, not every file has a length that is a multiple of the block size. How is that solved?

There is an EOF position in the inodeinfo struct. This is simply a 64 bit number. The convention is now that clients update and respect this EOF position, i.e. when more data is appended to the file, the EOF position is moved to the position where the data ends logically, and readers discard the part of the last block that is beyond EOF.

Note that this is only a convention - it is not enforced. This means we can have files whose EOF position is unrelated to where the last block is (i.e. a position before or after this block). In such cases, clients should give EOF precedence, and treat non-existing blocks as null blocks.

There is also some help for finding out what the last block is. In inodeinfo the field blocklimit is the "block EOF", i.e. the file has only blocks with index 0 to blocklimit-1. This field is automatically maintained with every block allocation or deallocation. (In the current implementation blocklimit may be larger than the actual block limit.)

The Datanode program

The Datanode program provides the operations for reading and writing blocks. The program supports several transport methods - right now an inline method (called DNCH_RPC) and a method where the block data are exchanged over POSIX shared memory (called DNCH_SHM). See the next section for how to use shared memory - let us first focus on DNCH_RPC.

The port the Datanode program listens on is returned in the field node of the blockinfo struct, so there is no additional communication overhead for finding this out.

For writing data, one has to call write as in

write(block, DNCH_RPC(data), ticket_id, ticket_verifier)

Here, block is the block number, and the ticket numbers come from the blockinfo struct (as returned by allocate_blocks). With the notation DNCH_RPC(data) it is meant that the DNCH_RPC branch of the union is selected, and data is the string field put into this branch. For writes, this string must be exactly as long as one block.

For reading data, one has to call read as in

r = read(block, DNCH_RPC, pos, len, ticket_id, ticket_verifier)

Again, block is the block number. Because read supports partial block reads, one can also pass a position and length within the block (pos and len, resp.). The ticket information comes from get_blocks.

The read operation returns a value like DNCH_RPC(data).

The Datanode program can handle multiple requests simultaneously. This means clients can send several requests in one go, without having to wait for responses. Clients should avoid to create several TCP connections to the same datanode to save resources.

The Datanode program tries to be as responsive to clients as possible. This especially means all requests sent to it are immediately interpreted, although this usually means they are only buffered up until the I/O operation can actually be done. Right now, there is even no upper limit for this buffering. Clients should try not to send too many requests to Datanode at once, i.e. before the responses arrive. A good scheme for a client is to limit the number of outstanding requests, e.g. to 10. It is likely that the protocol will be refined at some point, and a real limit will be set. Clients can then query this limit.

Operations

Using Datanode with shared memory

In the interfaces of read and write it is easy to request shared memory transport. Just use DNCH_SHM instead of DNCH_RPC, and put a dn_channel_shm_obj struct into it. This struct has fields for naming the shared memory file, and for selecting the part of this file that is used for data exchange. The read call will then put the read data there, and write will expect there the block to write. Of course, the datanode server must have the permission to access the shared memory file (it always requests for read/write access).

There are two difficulties, though. First, it is required that the client opens a connection to the Unix Domain socket the datanode server provides - the TCP port will not work here. Second, it may be difficult for the client to manage the lifetime of the shared memory file. For both problems, the datanode server has special support RPC's.

The function udsocket_if_local checks whether it is called from the same node as the datanode server runs on, and if so, the path of the Unix Domain socket is returned. This "same node check" is done by comparing the socket addresses of both endpoints of the incoming call - if both addresses have the same IP address, it is concluded that this is only possible on the same node (i.e. the criterion is whether getsockname and getpeername return the same IP address). Clients should call udsocket_if_local on the usual TCP port of the datanode when they want to use the shared memory transport method, and if they get the name of the Unix Domain socket, they should switch to this connection.

The function alloc_shm_if_local can be used to allocate shared memory so that the lifetime is bound to the current connection. This means this shared memory will be automatically deallocated when the client closes the connection to the datanode server. This is very practical, as there is no easy way to bind the lifetime of shared memory to the lifetime of another system resource (say, a process, or a file descriptor).

Note: Shared memory transport is not available on all OS. Especially on OS X the system interfaces are not POSIX-compatible enough to provide this feature. In this case alloc_shm_if_local never returns a shared memory object.

Operations

Speed of shared memory transport

The overhead of the shared memory transport is quite low - especially, it can be avoided that the data blocks are copied on the path to or from the disk. This allows it to do local I/O at full disk speed.

However, one should also realize that the latency of the data path is higher than when directly writing to a file. To compensate for that it is suggested to submit several requests at once to the datanode server, and to keep the server busy.

For getting maximum performance one should avoid using shared memory buffers that are not page-aligned. Also, one should avoid to write directly to a buffer. It is better to get a file descriptor for the buffer and to write to it via the Unix write system call. The kernel can then play nice tricks with the page table to fully avoid data copying.

In comparison to accessing local files, there is of course also the overhead of the central namenode. For reading or writing large files this overhead can be neglected.

All in all, the performance of the shared memory transport is good enough that Plasma MapReduce stores all data files, even local ones, in the distributed filesystem.

The ticket system

The tickets are created by the coordinator, but are checked by the datanodes. This means there must be some additional communication between the coordinator and the datanodes.

Before going into detail, let us first explain why we need this. Of course, we need a way to restrict access to data, and the ticket system is a method to extend the scope of an authorization system to loosely attached subsystems like datanodes. The ticket system helps us to enforce the integrity condition that blocks can only be accessed from inside transactions. Clients could otherwise e.g. allocate blocks, commit the transaction, and write the blocks later. This is dangerous, however, because at this moment there is no guarantee that the blocks are still associated to the file the client thinks they are. Also, it circumvents the security system.

In order to avoid that the coordinator has to notify the datanodes about the access permissions for each block separately, a cryptographic scheme is used to lower the overhead. At transaction start, the coordinator creates two numbers:

  • ticket_id: This is the user-visible ID of the ticket
  • ticket_secret: This is a random number
Both numbers are transmitted to each datanode. When the transaction is finished, the datanode servers are notified that the tickets are invalid. This communication path right now uses the normal datanode port, and the coordinator calls the special Datanode_ctrl program on it.

There is also a timeout for every ticket - for catching the rare and mostly hypothetical case that the datanode server is unreachable in the moment the coordinator wants to revoke a ticket, but is back up later.

The read and write calls now do not take the ticket_secret as arguments, but a ticket_verifier. The verifier is a message authentication code (MAC) for the combination of ticket_id, block number, and the permission (read-only or read-write). This means the verifier is a cryptographic hash computed from ticket_id, block, permission, and ticket_secret. Clients cannot compute it because they do not know the secret.

This web site is published by Informatikbüro Gerd Stolpmann
Powered by Caml