Mean Time to Failure (MTTF), Mean Time to Repair (MTTR), Mean Time Between Failure (MTBF) = MTF + MTTR, Availability = MTTF / (MTTF + MTTR)
Error Detection: timeout, parity, checksum, Cyclic Redundancy Check (CRC: treat data as polynomial coefficients)
Error Correction: retry, Error Correcting Codes (ECC: two dimensional parity bits), replication
Mean Time To First Data Loss (MTTDL): calculate from MTTF
Sequential n device: MTTDL_n = MTTDL_1 / n
Parallel n device: MTTDL_n = \sum_{i = 1}^n \frac{MTTF_1}{i}
k Parity n data: MTTDL_n = \sum_{i = n+k}^{n} \frac{MTTF_1}{i}
RAID Levels:
RAID 0: Data striping without redundancy Interleave data across multiple disks for a file (no tolerance) Parallel read and write across multiple disks Poor reliability
RAID 1: Mirroring of independent disks, make two or more copies of the same data (tolerate 1 disk failure)
RAID 4: Data striping plus parity disk. ensure D_1 \odot D_2 \odot D_3 \neq D_p, D_2 \odot D_3 \odot D_p = D_1 assuming D_1 fails and D_p is parity disk.
RAID 5: Data striping plus stripped (rotating) parity, Good compromise choice
MapReduce: map, compute, sort, reduce
Message Passing Interface (MPI): Standardized communication protocol (lower level used in HPC), hardware-specific code, coupled parallel tasks, good for iterative, application-level failure tolerance, memory-based, graph computation
MapReduce: simple model and messaging system, good for loosely-coupled, coarse-grain parallel tasks, failure handling handled at framework, not application. mostly disk-based. bad at graph computation, bad at simulation where communication is frequent
Hadoop Project: HDFS Fault Tolerance + MapReduce Programming Environment (Coarse-Grained Parallelism)
Dynamically scheduled: If a node fail, detect it with manager node (by heartbeat) and migrate job to other nodes.
Stragglers: Tasks that take long time to execute due to bugs, flaky hardware, poor partitioning. In this case, we detect and raise error.
Advantage of clusters: can read write large dataset. can dynamically schedule tasks, can have consumer-grade components, can have heterogenous nodes, but higher overhead
Google File System (GFS): large files: \geq 100 MB. large streaming reads: \geq 1 MB, large sequential write, many concurrent appends (files used as producer-consumer queues), (want atomic appends without synchronization overhead), high throughput is more important than low latency (although erasure code / parity chunk is better)
Apache Hadoop Distributed File System (HDFS): open-sourced version of GFS (widely deployed in companies)
Master Server: one master, but duplicated master data
Metadata: in RAM, mapping files to chunks in chunkservers
Migrates chunks between chunkservers (balance disk utilization and diskspace usage across chunkservers.) monitor heartbeat
Garbage collects orphaned chunks (when a file is deleted)
write write-update log and checkpoints replicated on multiple machines
when fail: replay log from disk, ask chunkserver which chunks they hold (location of chunks), find maximum version number among all chunkserver
Client Server:
64 MB chunk, with checksum, verified for every read/write, and periodically, with version number tied to lease
read, write, atomic append requests, snapshot
No caching, streaming read (read once) and append write (write once) don't benefit from caching. Simplify client: caching makes things messy
Big chunk size: reduce the size of metadata since we have a lot of data, lower load on master (less request) and network overhead
Small chunk size: less retry overhead due to failure when replicating chunks, reading and writing operation can be separated among more chunk servers, reduce server load and increase throughput, better disk utilization: less fragmentation, easy integrity check.
Consistency: GFS applications designed to accommodate the relaxed consistency model (Concurrent writes can be overwritten, ordered by a primary), Record appends is at least once, Possible duplicates
Resilient Distributed Datasets (RDDs): intermediate results during each update, they must be deterministic functions of input
map
, groupBy
, filter
, sample
, flatMap
)map
, filter
, sample
, groupByKey
, sortByKey
, union
, join
, cross
) - they are lazy evaluation to safe computationcount
, sum
, reduce
, save
, collect
)Apache Spark Deployment:
master server: linage, scheduling, Support for directed graphs of RDD operations, Automatic pipelining of functions within a stage, Partitioning/Cache-aware scheduling to minimizes shuffles
cluster manager: resource allocation (Mesos, YARN, K8S)
worker: Executors isolate concurrent tasks, Caches persist RDDs
Bulk Synchronous Parallel (BSP) model: any distributed system can be emulated as local network + message passing
Spark is bad for: fine-grain update and share state, non-batch workload. dataset don't fit to memory, don't have disk, if you need SIMD, GPU
Bounded stale state by N steps: instead of synchronization at every step, and instead of no synchronization at all, we can bound the number of steps per synchronization
Asynchronous: Bulk synchronous parallel (BSP), easier to implement correctly. easier to scale. faster per sample
Root: high TTL, Iterative (instead of Recursive DNS Query)
DNS-based Routing
high-level name server: large TTL, return NS-record (a DNS record that contains the name of the authoritative name server within a domain or DNS zone)
low-level name server: small TTL, choose specific caching server within its instance
Load-balancer: round robin load balancer, static partition, hash-based partition, consistent hashing (key always hash to next server in a circle, problem: 2 server very close, or 1 server leave. solution: virtual node)
Records: key (optional), value, timestamp
Record
: each message send from any producer
PartitionKey
: a field value user specify to be used for partitioning record into partitionsTopic
, Offset
, and PartitionNumber
Topic
: a grouping of partitions handeling the same type of data
Offset
: queue position in each partition. For each partition, Kafka tracks an offset for each ConsumerGroup
, not for each Consumer
(since there is maximally 1 consumer for each ConsumerGroup
to read Partition
).NumberOfPartitions
: the number of partition for a Topic
is configurable by user, since data within partitions are ordered, but are not ordered accross partitionsPartitionNumber
: uniquely identify partitionReplicationFactor
: replicate how many copies of the same partition. Replicas
are never read from, never written to. Kafka tolerates (ReplicationFactor - 1)
failed brokers before losing data. (LinkedIn uses ReplicationFactor=2
)Replica
: replicated Partition
. ReplicaNumber
for each Partition
is set to be the same number as BrokerNumber
. For each Partition
, Kafka elect a Leader
hosting the TrueReplica
that other Replicas
syncs to.Record
based on age
, or maxSize
, or key
.ConsumerGroup
: Consumer
in the same ConsumerGroup
do not share partition (will always read different record than other Consumer
in the same group). Therefore the number of Consumer
in a ConsumerGroup
is less or equal to NumberOfPartitions
. You can allow different application reading the same record by creating one application group for one application.
Producer API: API to produce streams of records
Consumer API: API to consume streams of records
Broker
: Kafka server that runs in a Kafka Cluster. Can host multiple Replicas
but only 1 Recplica
for each Partition
Authentication, Confidentiality, Integrity, Avaliability - Tools:
nonce
to client and require client to hash Secret-key cryptography (symmetric key): AES. Public-key cryptography (asymmetric key): RSA, ElGamalExample Quorums under Byzantine Faults: (with fail-stop, we only need 2f+1 nodes, for liveness, Quorum size is at most N - f, so for Quorum size of N-f, overlap region can be calculated with (N-f) + (N-f) - N. We want this to be \geq f + 1 as stated above, giving N \geq 3f + 1
Impossibility: No solution with fewer than 3f + 1 generals can cope with f traitors.
Algorithm: using a Replicated State Machine (RSM) with 3f+1 replicas
opcode
to Primary
Primary
broadcast Pre-prepare<viewNumber, sequenceNumber, opcode>
(and put it in log)Replicas
receive broadcast, determine if Pre-prepare<>
is valid, if so, broadcast Prepare<replicaID, viewNumber, sequenceNumber, opcode>
(and put it in log), wait until 2f+1 Prepare<>
from other replicas doing the same thingviewNumber
in message compared to storedPre-prepare<>
with the same viewNumber
sequenceNumber
<opcode1, viewNumber, sequenceNumber, replicaID1>
in []log
, then there is no <opcode2, viewNumber, sequenceNumber, replicaID2>
in []log
opcode
Prepare<>
and Pre-prepare<>
Replicas
received 2f+1 Prepare<>
, send Commit<replicaID, viewNumber, sequenceNumber, opcode>
, wait until 2f+1 Commit<>
from other replicas doing the same thingReplicas
received 2f+1 Commit<>
(if we assume viewNumber
can change, then we only need f+1), (put it in log), send result to clientClient
waits for f+1 matching replies before commitTable of Content