6.5840 2023 Lecture 10: Zookeeper Case Study Reading: "ZooKeeper: wait-free coordination for internet-scale systems", Patrick Hunt, Mahadev Konar, Flavio P. Junqueira, Benjamin Reed. 2010 USENIX Annual Technical Conference. announcements: mid-term next thursday, in class, in person project proposal due next friday today's lecture considers ZooKeeper from two angles: * a simpler way to structure fault-tolerant services. * high-performance in a real-life service built on Raft-like replication. if we wanted to make a fault-tolerant service like MR coordinator, we'd be tempted to replicate with Raft, and that would be OK! [diagram: Raft-replicated MR coordinator, workers] but building directly on Raft is complex using a raft library helps, but still complex replicated state machine usually affects application structure is there a simpler way? you can think of state machine replication (Raft) as replicating the computation; the state is replicated as a side-effect. can we replicate state without replicating computation? yes: use fault-tolerant storage, for example ZooKeeper [ZK x3, MR coord, workers] often easier to program than replicated state machine what might MR coord store in ZK? coord IP addr, set of jobs, status of tasks, set of workers, assignments update data in ZK on each change what if MR coord fails? we weren't replicating it on a backup coord server but we don't need one! just pick any computer, run MR coord s/w on it, have it read state from ZK. new coord can pick up where failed one left off. makes the most sense in a cloud easy to allocate a replacement server people often use ZK as a "configuration service" election, current server IP address, list of workers, &c challenges update multi-item state in a way that's safe even if coord crashes elect MR coord (one at a time!) new coord needs to be able to recover/repair state what if old coord doesn't realize it's been replaced can it still read/write state in ZK? can it affect other entities incorrectly? e.g. tell workers to do things? performance! Zookeeper data model (Figure 1) the state: a file-system-like tree of znodes file names, file content, directories, path names directories help different apps avoid interfering each znode has a version number types of znodes: regular ephemeral sequential: name + seqno Operations on znodes (Section 2.2) create(path, data, flags) exclusive -- fails if path already exists exists(path, watch) watch=true asks for notification if path is later created/deleted getData(path, watch) -> data, version setData(path, data, version) if znode.version = version, then update getChildren(path, watch) sync() sync then read ensures writes before sync are visible to same client's read client could instead submit a write ZooKeeper API well tuned for concurrency and synchronization: + exclusive file creation; exactly one concurrent create returns success + getData()/setData(x, version) supports mini-transactions + sessions help cope with client failure (e.g. release locks) + sequential files create order among multiple clients + watches avoid polling Example: MapReduce coordinator election this is the paper's Simple Lock in Section 2.4 ticker(): while true: if create("/mr/c", ephemeral=true) act as coordinator... else if exists("/mr/c", watch=true) wait for watch event note: exclusive create if multiple clients concurrently attempt, only one will succeed ephemeral znode coordinator failure automatically lets new coordinator be elected watch potential replacement coordinators can wait w/o polling what do we want to happen if the elected coordinator fails? * want to elect a replacement, so /mr/c lock file should be deleted * must cope with crash in the middle of updating state in ZK * must cope with possibility that the coordinator *didn't* fail! even though /mr/c looks like a lock, the possibility of failure makes the situation very different from e.g. Go sync.Mutex what does ZK do? client failure -> client stops sending keep-alive messages to ZK no keep-alives -> ZK times out the session session timeout -> ZK automatically deletes client's ephemeral files now a new coordinator can elect itself what if the coordinator crashes while updating state in ZK? maybe store all data in a single ZK file individual setData() calls are atomic (all or nothing vs failure) what if there are multiple znodes containing state data? e.g. znode per MR task maybe use paper's "ready" file scheme what if the coordinator is alive and thinks it is still coordinator? but ZK has decided it is dead and deleted its ephemeral /mr/c file? a new coordinator will likely be elected. will two computers think they are the coordinator? this could happen. can the old coordinator modify state in ZK? this cannnot happen! when ZK times out a client's session, two things happen atomically: ZK deletes the clients ephemeral nodes. ZK stops listening to the session. so old coordinator can no longer send requests to ZK! this is an important pattern in distributed systems: a single entity decides if various computers are alive or dead sometimes called a failure detector it may not be correct, e.g. if the network drops messages but everyone obeys its opinions agreement is more important than being right, to avoid split brain what if coordinator interacts with entities other than ZK? that don't know about the coordinator's ZK session state? e.g. coordinator talking to MapReduce workers. idea: worker could ask ZK for identity of current MR coordinator. and ignore requests from non-current coordinator. not ideal: likely to put too much load on ZK. idea: each new coordinator gets an increasing "epoch" number. from a file in ZK (see below). coordinator sends epoch in each message to workers. workers remember highest epoch they have seen. workers reject messages with epochs smaller than highest seen. so they'll ignore a superseded coordinator once they see a newer coordinator. often called a "fence". Example: allocate a unique epoch number next epoch number stored in a znode while true: e, version := getData("/epoch") if setData("/epoch", e + 1, version): break this is an atomic read-modify-write think about what happens if two clients execute this at the same time lots of variants, e.g. test-and-set for VMware-FT we can use ZK for fault-tolerant communication as well as state storage worker advertises by creating an ephemeral znode fault-tolerant: worker crash -> znode disappears MR clients submitting jobs fault-tolerant: if client crashes, job request still there! a re-started client can check in ZK to see if/when job is done. how is ZK designed for good performance? emphasis is on handling many reading clients 1) many ZK followers, clients are spread over them for parallelism client sends all operations to that ZK follower ZK follower executes reads locally, from its replica of ZK data to avoid loading the ZK leader ZK follower forwards writes to ZK leader 2) watch, not poll and the ZK follower (not the ZK leader) does the work 3) clients of ZK launch async operations i.e. send request; completion notification arrives separately unlike RPC a client can launch many writes without waiting ZK processes them efficiently in a batch; fewer msgs, disk writes client library numbers them, ZK executes them in that order e.g. to update a bunch of znodes then create "ready" znode a read may not see latest completed writes! since client's follower may be behind (not in write's majority) why is it ok for reads to see stale data? ZK has separate consistency guarantees for reads and writes writes: "A-linearizability" like linearizability but each client's async writes are ordered, not concurrent ZK leader picks an order for writes, all followers execute in that order reads: "client FIFO order" a client's operations fit into the overall write order in client issue order client FIFO order example: suppose a client issues these asynchronous operations: W1 W2 R3 W4 the client's ZK server (a follower) receives them and forwards the writes to the ZK leader the follower sees a stream of writes from the ZK leader: other client's writes, plus its own W W W1 ... the follower must wait for W2 to appear before executing R3 to obey client FIFO rule W W W W1 W W2 W R3 ... thus, R3 guaranteed to see any effects of W1 and W2 (but not W4) an example of how read/write guarantees play out suppose we have configuration data in ZK that coordinator writes, but that many other ZK clients need to read and the data consists of multiple znodes (files) paper suggests: coordinator deletes "ready" znode coordinator updates the configuration znode coordinator creates "ready" znode clients wait until ready znode exists before reading but what if client sees "ready" just before coordinator deletes it? Writer: Reader: exists("ready", watch=true) read f1 delete("ready") write f1 write f2 read f2 create("ready") Efficiency depends on how clients use ZK! what's wrong with Simple Locks? (page 6) suppose 100s of clients are waiting for the lock? better: Locks without Herd Effect 1. create a "sequential" file 2. list files 3. if no lower-numbered, lock is acquired! 4. if exists(next-lower-numbered, watch=true) 5. wait for event... 6. goto 2 Q: could a lower-numbered file be created between steps 2 and 3? Q: can watch fire before it is the client's turn? A: yes lock-10 <- current lock holder lock-11 <- next one lock-12 <- my request if client that created lock-11 dies before it gets the lock, the watch will fire but it isn't my turn yet. Some implementation details aimed at high performance: Data must fit in memory, so reads are fast (no need to read disk). So you can't store huge quantities of data in ZooKeeper. Writes (log entries) must be written to disk, and waited for. So committed updates aren't lost in a crash or power failure. Hurts latency; batching can help throughput. Periodically, complete snapshots are written to disk. Fuzzy technique allows snapshotting concurrently with write operations. How is the performance? Figure 5 -- throughput. Overall, can handle 10s of thousands of operations / second. Is this a lot? Enough? Why do the lines go up as they move to the right? Why does the x=0 performance go down as the number of servers increases? Why does the "3 servers" line change to be worst at 100% reads? What might limit it to 20,000? Why not 200,000? Each op is a 1000-byte write... What about latency? Table 2 / Section 5.2 implies 1.2 milliseconds. For a single worker (client) waiting after each write request. Where might the 1.2 milliseconds come from? Disk writes? Communication? Computation? (How can it be this fast, given mechanical disk rotation times?) What about recovery time? Figure 8 Follower failure -> just a decrease in total throughput. Leader failure -> a pause for timeout and election. Visually, on the order of a few seconds. ZooKeeper is very widely used. see ZooKeeper's Wikipedia page for a list of projects that use it often used as a kind of fault-tolerant name service what's the current coordinator's IP address? what workers exist? can be used to simplify overall fault-tolerance strategy store all state in ZK e.g. MR queue of jobs, status of tasks then service servers needn't themselves replicate References: https://zookeeper.apache.org/doc/r3.4.8/api/org/apache/zookeeper/ZooKeeper.html ZAB: http://dl.acm.org/citation.cfm?id=2056409 https://zookeeper.apache.org/ https://cs.brown.edu/~mph/Herlihy91/p124-herlihy.pdf (wait free, universal objects, etc.)