6.824 2022 Lecture 9: Zookeeper Case Study Reading: "ZooKeeper: wait-free coordination for internet-scale systems", Patrick Hunt, Mahadev Konar, Flavio P. Junqueira, Benjamin Reed. Proceedings of the 2010 USENIX Annual Technical Conference. announcements: mid-term next thursday, in class, in person project proposal due next friday today's lecture uses ZooKeeper in two ways: * a simpler way to structure fault-tolerant services. * high-performance in a real-life service built on Raft-like replication. if we wanted to make a fault-tolerant service like MR coordinator, we'd be tempted to replicate with Raft, and that would be OK! but building directly on Raft is complex an existing raft library wd help, but still complex replicated state machine usually affects application structure is there a simpler way? you can think of state machine replication (Raft) as replicating the computation; the state is replicated as a side-effect. how about replicating just the state? state in fault-tolerant storage system computation reads/writes fault-tolerant state like using a DB, or files -- easier than state machine if computation fails, state is nevertheless preserved. can re-start computation from that state. thus no need to replicate computation. what would it look like for MapReduce? [ZK, MR coord, workers, GFS] what might MR coord store in ZK? coord IP addr, set of jobs, status of tasks, set of workers, assignments ZK often acts as a general-purpose "configuration service" what if MR coord fails? we don't have a replica MR coord server! but we don't need one! just pick any server, run MR coord s/w on it, have it read its state from ZK. new coord can pick up where failed one left off. challenges update multi-item state in a way that's safe even if coord crashes new coord needs to be able to recover/repair state elect MR coord (one at a time!) what if old coord doesn't realize it's been replaced can it still read/write state in ZK? or in other storage? read performance Zookeeper data model (Figure 1) the state: a file-system-like tree of znodes file names, file content, directories, path names each znode has a version number types of znodes: regular ephemeral sequential: name + seqno how do people use ZK znodes? znodes might contain e.g. MR coordinator's IP address, workers' IP addresses, MR job description, which MR tasks are assigned/completed presence of znode might indicate that an active MR coordinator exists, that a worker is advertisiting its existence hierarchical names convenient to allow different apps to share a single ZK server, and to organize each app's different kind of data (one dir for workers to advertise existence, another dir for set of jobs, &c) Operations on znodes (Section 2.2) create(path, data, flags) exclusive -- only first create indicates success delete(path, version) if znode.version = version, then delete exists(path, watch) watch=true means also send notification if path is later created/deleted getData(path, watch) setData(path, data, version) if znode.version = version, then update getChildren(path, watch) sync() sync then read ensures writes before sync are visible to same client's read client could instead submit a write ZooKeeper API well tuned for concurrency and synchronization: + exclusive file creation; exactly one concurrent create returns success + getData()/setData(x, version) supports mini-transactions + sessions automate actions when clients fail (e.g. release lock on failure) + sequential files create order among multiple clients + watches avoid polling Example: MapReduce coordinator election ticker(): while true: if create("/mr/c", ephemeral=true) act as master... else if exists("/mr/c", watch=true) wait for watch event else try again retire(): delete("/mr/c") note: exclusive create if multiple clients concurrently attempt, only one will succeed ephemeral znode coordinator failure automatically lets new coordinator be elected watch potential replacement coordinators can wait w/o polling what if the elected service coordinator fails? note that even though looks like an exclusive lock, the possibility of failure makes the situation very different from e.g. Go sync.Mutex client failure -> client stops sending keep-alive messages to ZK no keep-alives -> ZK times out the session session timeout -> ZK automatically deletes client's ephemeral files what if the coordinator crashes while updating state in ZK? particularly if there are multiple znodes containing state data e.g. znode per MR task indicating pending/active/done, where to find output maybe structure data so coordinator never needs to update more than one znode individual setData() calls are atomic (all or nothing vs failure) maybe have a summary znode containing names of current state znodes create *new* znodes with new state setData("sum", names of current state znodes) ZK guarantees order, so "sum" update won't be visible unless all preceding writes are visible "single committing write" what if the coordinator is alive and thinks it is still coordinator, but ZK has decided it is dead and deleted its ephemeral /mr/c file? a new coordinator will likely be elected. will two computers think they are the coordinator? this could happen. can the old coordinator modify state in ZK? this cannnot happen! when ZK times out a client's session, two things happen atomically: ZK deletes the clients ephemeral nodes. ZK stops listening to the session. so old coordinator can no longer send requests to ZK. once old coordinator realizes the session is dead, it can create a new one, but now it knows it isn't coordinator. if elected coordinators store state outside of ZK, e.g. in GFS, new coordinator must tell GFS (or whatever) to ignore requests from deposed coordinator. perhaps using epoch numbers. called "fencing". what if you don't want to have a coordinator, but want multiple concurrent ZK clients to update shared state? Example: add one to a number stored in a ZooKeeper znode while true: x, v := getData("f") if setData(x + 1, version=v): break this is a "mini-transaction" effect is atomic read-modify-write lots of variants, e.g. test-and-set for VMware-FT we can use ZK for fault-tolerant communication as well as state storage worker advertises by creating an ephemeral znode fault-tolerant: worker crash -> znode disappears MR clients submitting jobs fault-tolerant: if client crashes, job request still there! a re-started client can check in ZK to see if/when job is done. how is the ZK client interface designed for good performance? main focus is on read performance 1) many ZK followers, clients are spread over them for parallelism client sends all operations to that ZK follower ZK follower executes reads locally, from its replica of ZK data to avoid loading the ZK leader ZK follower forwards writes to ZK leader 2) watch, not poll and the ZK follower (not the ZK leader) does the work 3) clients of ZK launch async operations i.e. send request; completion notification arrives separately unlike RPC a client can launch many writes without waiting ZK processes them efficiently in a batch; fewer msgs, disk writes client library numbers them, ZK executes them in that order e.g. to update a bunch of znodes then create "ready" znode a read may not see latest completed writes! since client's follower may be a little behind (not in write's majority) so reads aren't linearizable justified by performance/scalability can send a write (or sync) to force wait for recent writes writes, however, execute one at a time in log order ("zxid" order) you need this for mini-transactions like exclusive create() and writes appear in zxid order at all ZK followers and clients so if ZK client sees "ready" znode, it will see updates that preceded it what does ZK guarantee? a single order for all writes -- ZK leader's log order. "zxid" all clients see writes appear in zxid order. including writes by other clients. a client's read sees all of client's preceding writes. client's ZK follower may need to wait until client's latest write is committed by ZK leader. client guaranteed to see watch event before it sees values from writes after that event. an example of how read/write guarantees play out suppose we have configuration data in ZK that coordinator writes, but that many other ZK clients need to read and the data consists of a bunch of zknodes paper suggests: coordinator deletes "ready" zknode coordinator updates the configuration zknodes coordinator creates "ready" zknode clients wait until ready zknode exists before reading but what if client sees "ready" just before coordinator deletes it? Write order: Read order: exists("ready", watch=true) read f1 delete("ready") write f1 write f2 read f2 create("ready") Efficiency depends on how clients use ZK! what's wrong with Simple Locks? (page 6) suppose 100s of clients are waiting for the lock? better: Locks without Herd Effect 1. create a "sequential" file 2. list files 3. if no lower-numbered, lock is acquired! 4. if exists(next-lower-numbered, watch=true) 5. wait for event... 6. goto 2 Q: could a lower-numbered file be created between steps 2 and 3? Q: can watch fire before it is the client's turn? A: yes lock-10 <- current lock holder lock-11 <- next one lock-12 <- my request if client that created lock-11 dies before it gets the lock, the watch will fire but it isn't my turn yet. The ZK implementation aims at high performance: Data must fit in memory, so reads are fast (no need to read disk). So you can't store huge quantities of data in ZooKeeper. Writes (log entries) must be written to disk, and waited for. So committed updates aren't lost in a crash or power failure. Hurts latency; batching can help throughput. Periodically complete snapshots are written to disk. Fuzzy technique means snapshot write is concurrent with request processing. How is the performance? Figure 5 -- throughput. Why do the lines go up as they move to the right? Why does the x=0 performance go down as the number of servers increases? Why is the "3 servers" line change to be worst at 100% reads? Is the x=0 throughput of 20,000 ops/second good or bad? What might determine the 20,000? Why not 200,000? Each op is a 1000-byte write... What about latency? Table 2 / Section 5.2 says 1.2 milliseconds. For a single worker (client) waiting after each write request. Where might the 1.2 milliseconds come from? Disk writes? Communication? Computation? What about recovery time? Figure 8 Follower failure -> just a decrease in total throughput. Leader failure -> a pause for timeout and election. Visually, on the order of a second. ZooKeeper is a successful design. see ZooKeeper's Wikipedia page for a list of projects that use it often used as a kind of fault-tolerant name service what's the current coordinator's IP address? what workers exist? can be used to simplify overall fault-tolerance strategy store all state in ZK e.g. MR queue of jobs, status of tasks then service servers needn't themselves replicate References: https://zookeeper.apache.org/doc/r3.4.8/api/org/apache/zookeeper/ZooKeeper.html ZAB: http://dl.acm.org/citation.cfm?id=2056409 https://zookeeper.apache.org/ https://cs.brown.edu/~mph/Herlihy91/p124-herlihy.pdf (wait free, universal objects, etc.)