Hi from all of us here in Prague -- this is day 1 of Eurosys and we'll be running the live blog as usual!
143 papers submitted (after removing ones that violated submission guidelines), accepted 28, in line with the ~15% acceptance rate that EuroSys has had for a few years. Work published at EuroSys continues to be well-cited, competitive with SOSP/OSDI. Heavy/light PC this year (26/14 members), with an emphasis on diversity in topics, gender and geography. 3 reviews in first round; Â 76 papers got to second round (ca. 50%), and got another 4 reviews, then rebuttal phase. All second-round papers discussed in PC meeting, with discussion lead being someone who had not read the paper, and at least 5 attendees had read it.
Geographic diversity of accepted papers: 21 from North America, 4 from Europe, 2 from Asia and 1 from Australia (poorer balance than previous years). Best paper awards will be done at theÂ end this time, allowing the PC to take into account reactions at the conference. (ms)
Session 1: Large scale distributed computation I
Zhengping Qian (Microsoft Research Asia), Yong He (South China University of Technology), Chunzhi Su, Zhuojie Wu, and Hongyu Zhu (Shanghai Jiaotong University),Â Taizhi Zhang (Peking University), Lidong Zhou (Microsoft Research Asia), Yuan Yu (Microsoft Research Silicon Valley), and Zheng Zhang (Microsoft Research Asia)
Natacha Crooks -Â This is a system designed for reliable computaiton for big streaming data for a coud environment. Key motivating examples are: 1) network infrastructure monitoring in a datacentre. Software agents in a datacentre attract various performance counters which need to support queries in real time, such as real time heap map of latency between various racks. So you need a system which can do near real time computing, which must also be scalale and easy to program.
The second scenario relates to online search adds. If you input a query in sa search engine, a number of adds will be displayed. There's a complex model that matches the query to the adds. They are trying to improve add matching model using the history of the past user queries/clicks. This has to be done in real-time (or near real-time). The system must also be reliable, as failure causes loss of money. It must be highly resilient to load variance and scale dynamically. There's also the requirement that it should be easy to program. As a summary, the key challenges for scalable near real time computing are scalability and reliability.
Traditional systems stem from database community:
- Streaming Datamabase (StreamInsight), but is implemented at too small a scale. Ex: not possible to use simple node replication
- TimeStream: declarative, has scalability, fault tolerance, easticity, preserves single node model but transparently ships computation to the cloud.
Motivating example: continuous word count over an infinite stream of tweets. Important to note that grouping on an infinite stream is different from MapReduce. Need to introduce window operations to enable aggregation. TimeStream introduces hash partitions to express how can partition strings so that can know how can run this in parallel. TimeStream allows you to dynamically change the number of partitions to respond to load changes in the future.
Time Stream tries to automatically ship sequential query written by the client to the cloud to run reliably.
In distributed runtime, mdoel computaiton using a dataflow graph. Each vertex is a determnistic ocmputaiton, which could be statement. Each edge is an ordered data channel. The data are just opaque values to the runtime. When the data arrives at the vertex it triggers some computation, and may produce output. The whole execution is determinsitic
Failures are handled by subsituting a vertex with an identical computation. If a channel is overloaded, they replace the whole subgraph with an equivalent computation, which may include more parititons (substitution for dynamic partitioning). Subsistution is used as a mechanisms for both failure recovering and dynamic partitioning. This may cause loss of state. But TimeStream does resilient subsistiton. Which means applying equivalent subsititons at sub dags at runtime. As long as the input is the same, the system will guarantee that hte output is not affected (again, by relying on the fact that the system guarantees determinism)
After subsistion they do stabilision, which guarantees no loss, no dupliation.
TimeStram does dependency tracking, in order to do efficient subsistution. For each output, we track the same portion of input that the output depends on. Output input data dependency is often bounded. They ensure that dependency tracking is lightweight and fine grained. The process of dependency tracking is hidden from the users in the operator library. Users need not wory about the dependency tracking.
Evaluation: Abacus counting for Ads Click Prediction
Requires multiple node to compute in parallel an expensive filter computaiton. Use hash partitions for the filter operation as a result. This is to get rid of the fraudulent queries (bot detection). Group a set of events as a single evnet in the computation, so first try to understand how group size affects computation perforamnce. With larger batch sizes, the latency goes up. With smaller batch sizes, the latency is high as well (revels dependency checking overhead). For throughput, larger batch sizes are better.
Show comparable performance to Storm but with stronger guarantes. When a failure occurs, Storm results are incorrect until the loss state is completely moved out of aggregation operators. Not in TimeStream. Estimate the dependency tracking to represent an approximate 10 percent overhead.
As conclusion, key contribution is resilient subsistion which argue is a unified mechanism to support fault tolerance and dynamic subsistution.
Q: Since computations represented as a graph, have you thought of using Trinity (Microsoft) instead of your own.
A: Main goal is reliable stream processing. Trinity mostly focussed on performance .But don't do anything to improve fault tolerance /
Q Peter Pietzuch: Did you consider streaming algorithms for which output depends on entire state of the stream
A. Dependencies is gudiance for when to start recomputing. When ened to recompute such operator fail, then the checkpointing mechanism will
Q (?) Showed Scalability nodes up to 16 nodes, what scale do you run system in production?
A: Currently push up to 32 nodes because this is what our particular scenario requires. Haven't tried to run with more nodes. Currently working on settting
Q Malte Scharzkopf :whats the bottleneck that requires you to scale to multiple nods rather than 32 cores on a single node.
A No particular reason, Computation needs to hold a large window so state doesn't hold in memory of one node.
- 1st scenario: Software agents in the DC that store performance counters. Hence, they are interested in a system that can query in real time this data.
- 2nd scenario: Microsoft's advertising team: When a user enters a query we also want to display some ads. They want to improve the ads based on the previous user behaviour. Hence, looking for a system that can do near real-time queries.
- Challenges: near real-time computing, scalability, reliability (fault tolerance, resiliency to load changes).
- Extends Microsoft's StreamInsight by introducing a new HashPartition.
- Model the computation using a dataflow graph. Each vertex is a deterministic computation that can be stateful. The whole execution is deterministic.
- When a failure occurs they substitute the vertex with an identical vertex. If there is a channel overload then they replace the channel with another graph that partitiones more.
- Resilient substitution = change vertices to equivalent subdags. Equivalent means that they're conducting the same computation.
- Observation: in streaming computation the output-input data dependency is often bounded. (dependency tracking).
- Lightweight, fine-grained dependency tracking is done at runtime to obtain minimized substitution-induced recomputation.
- Groups single events as a batch. They are then feed in to the system
- Claims that it scales linearly even though evaluation was only conducted on at most 16 nodes.
Q: Have you thought of using Microsoft's Trinity?
A: I don't know how it handle failures. I think they just focus on performance.
Q(Peter Pietzuch): Did you consider streaming algorithms where the computation depends on the entire history of input?
A: Yes. The dependency tracking is just a guidance. However, checkpointing would be used in your case.
Q(Malte Schwarzkopf): What is the bottleneck that requires you to scale to multiple machines opposed to just using a powerfull machine?
A: Sometimes the computation needs to hold a large window.
Qifa Ke, Michael Isard, and Yuan Yu (Microsoft Research Silicon Valley)
Natacha Crooks -
Distributed exeuction plan generated by query compiler are used. Distributed execupion plan represented as a DAG representing computation and dataflow of data parallel program. Box is vertex representing computation. The data flow is represented by edges.
First key problem Map Reduce: data parittioning, which is a basic operaiton to achieve data parallelism. But this has two issues: data skew (eg popular keys, etc.), and the number of paritions, there's a tradeoff between better load balancing but more IO overhead. Striking a good balance requires statitics of reducers. But this is not available at compile time, which suggets need some form of dynamic partitioning at runtime.
Second key problem: Matrix Computation. Key challenge is that spare matrises may compute intermediate dense matrices. But this is only known at runtime not compile time, so there's a need to change/adapt algorithms at runtime.
Third key progblem: Iterative Computation. The key problem is that don't know the stopping condition.
Fourth key problem: Fault tolerance. Usually replicate data, but for intermediate data, don't use replication because very expensive, so choose reexecution instead. But two issues: 1) if vertex is compute intensive, very expensive to reexecute 2) critical cahins: a long chain of vertices reside in same machine due to data locality, so when trying to reexecute a vertex, find out that many of its inputs also lost because int he same computer, so this creates a long chain. The key challenge here is : how to identify and protect important intermediate results at runtime, and replicate those.
Fifth problem: Currently do compile time query optimisation using data statistics at compile time, but want to be able to do this at runtime as well. Thought: aren't there already many systems which do some version of this (FlumeJava, Nectar amonst others? )
Present Optimius: dynamically rewrite EPG based on data statistics collected at runtime and compute resources avaiable at runtime. The key goal is for it to be extensible. Collect statistics at the data plane, and sends a message to the graph rewriter which rewrites queries dynamically.
The systme is built on top of DryadLinq. They add a rewriter module to the Dryad Job Manager. Provide User defined Rewrite Logic and User defined Statistics. The query compiler was extended to be able to generate code for user defined rewrite logic/user defined statistics. This is shipped to the Rewrite Logic in the Job Manager, which will rewrite the graph accordingly.
They minimise overhead to collect data statisitcs by piggy backing collection onto existing vertices. This is extensible, as the statistics estimator is defined at the language layer via user defined functions.
This is all done at the data place, which avoids saturating the control place.
The graph rewriting module is specified as a set of primitives to query the EPG and modify the graph. The rewriting operation depends ont he state o fth evertex. If the vertex is inactive, then can rewrite everything. If running, then is killed and partial results will be inactive. If the vertex has already been executed, then Optimus only allows rediction of IO input/outputs.
Dynamic Data (Co-)Paritioning. Co parititoning means using a common paramenter is set to partition multiple data sets, which is used by operators which take multiple streams as imput.
Co range paritioning can be used to prepare the data for joins. But may detect skew at runitme. So can recovert from that thanks to graph rewriter module.
For matrix multiplication, there are multiple ways to achieve results. They are different ways to partition matrix and for each, different algorithms to compute result. For Optimius, the exensibility allows intregating matrix computation by using a library. The library makes several runtime decisions: data partitioning: subdivide matrices, data model: sparse or dense: And finally how can we choose the right algoirhtm for the matrix operator.
They add a reliability enhancer for fault tolerance. They use replication graph to procted important data generated by a given node.
Optimus has significantly less computation time because has excellent cluster utilisation. Because of data skew, others don't have nearly as high a utilisation.
For matrix multiplication, use movie recommendation by collaborative filtering of the Netflix Challenge data. Optimus can choose best way to compute matrices and can change runtime dynamically. Outperforms MetaLynq.
Key contributions argued: flexible/extensible framework to modify EPG at runtime. Enable runtime optimisations that are difficult to achieve in other systems through rich set of graph rewriters.
Q (?) : in previous talk hear about subsition mechanism that also did substitution. whats the link between the two?
A: these are two systems. DryadLinq is a batched system vs via streaming computation.
Q (follow up): But are they not similar:
A: Main contribution is extensibility. The user specifies the rewrite logic.
Ionel Gog -
- optimize Execution Plan Graphs (EPG) at runtime.
- Problem 1: Data partitioning: it's difficult to partition without getting data skew => we need dynamic data partitioning.
- Problem 2: Matrix computation: at compile time we don't know the density of the intermediate matrices => dynamically choose data model and alternative algorithms.
- Problem 3: Iterative computation: We don't know the stopping condition.
- Problem 4: Fault tolerance: Intermediate results are expensive to regenerate when lost => How to identify and protect important intermediate results at runtime?
- Problem 5: EPG Optimization is usually done statically => Can we do it at runtime?
- Optimus - dynamically rewrite EPG based on data statistics collected at runtime.
- Modules of Optimus:
- Data Statistics Collector:
- piggy-back into existing vertices
- statistics collector defined by the user
- the aggregation of statistics is done in the data plane
- Graph Rewriting Module:
- 3 states (Inactive, running, completed). If inactive all rewriting can be applied, if running then rewriting has to consider intermediate data loss.
- Example of graph rewriting:
1) Dynamic Data Partitioning
- compute histogram at each partition, then merge histograms ...
2) Hybrid Join
- avoid data skew at runtime. Detect a partition that is big than others and divide it into smaller partitions.
3) Matrix Computation
- there are multiple way of multiplying two matrices. Different ways of partitioning each matrix. Which one we use depends on the statistics of the input metrics.
Reliability enhancer for fault tolerance:
- Add an extra node (on a different machine) to the graph to which the output of a vertex is sent as well.
Sameer Agarwal (University of California, Berkeley), Barzan Mozafari (Massachusetts Institute of Technology), Aurojit Panda (University of California, Berkeley), Henry Milner (University of California, Berkeley), Samuel Madden (Massachusetts Institute of Technology), and Ion Stoica (University of California, Berkeley)
Natacha Crooks -Â Presented with lots of data that must be processed in timely fashion. Online Media Websites are good example. Must make decisions on that data. Other examples include log processing for finding anomalies in system, etc. The key problem that try to address it hte need to compute aggregate statistics over massive sets of data. Goal is to support interactive analysis on these data sets.
BlinkDB would like the user to provide a query and a time bount, and the result would be a result, with an error bound. If user isn't happy, can reply with a larger time bound.
Target Workload: exploration is had-hoc. The only assumptions they make is that the set of columns that appear in queries are stable over time (ak: query schema doesn't hold). Traces from Facebook data, show that this assumption holds. There's a very small set of query templates that most queries map to.
Goal is for systems where real time latency is valued over high accuracy. There is no need for query parameters to be known in advance.
Overview of Sampling in Databases. Can uniformly smaple data. But doesn't support certain times of query due to anomalies introduced byt the sample rate. Can use stratified samples. This adds the requirement that all unique keys are represneted in the sample.
In BlinkDB, an offline sample creates an potimal set of samples (offline) which are iether cached in memory or placed on disk. When the query comes in, a query plan gets created and the sample selection module then trie to figure out what the possible sample model to answer the query. The new query plan gets created accordingly, and this is executed on a parallel computing cluster framework. Curretnly support Hadoop and Spark. And the result (with an error bound) is then returned. There are two key sets of challences: what shouldbe the optimsal set of samples to maintian in order to spuport ad hoc exploratory queries. And 2) given a query, what shouldbe the optimal saple type and size that can be processed to meet the constraints. They use the concept of query coverage, which is a probability that the attribute filtering on will be in the stratisfied sample, and the queyr converage is then the sum of the probabilities. For multiple predicates, a query qhich has one of them is more likely to include result then one which has none. So define a query coverage probability.
Not all strastified samples that are in theory possible have the same cost. The cost of sampling depends on the number of values the attribute that are stratifying on. They formulate a MILP program to represent this, where try to maximise coverage for a given cost (the cost of all samples). They also include a notion of sparsity function to model density and uniformity of a column. (ak: if highly uniform then uniform might be good, but if hsa a long tail, then need to use the stratified model). In order to determine optimal sample siz, they define an "error latency profile" which is a relationship between the relative error and the time taken to execute the query, as a function of the sample size. Most functions used in the system have the property of having closed forms, which means that the error is proportional to an equation. So they can use linear cost models to determine the "optimal" point (via extrapolation - run queries on small sample, and use that to extrapolate for larger sample sizes). For each possible sample, they generate an error latency profile, and see which one gives the highest accuracy for a particular time bound, and use the sample with the lowest error.
Key contributions: argue that approximate queries is an important means to achieve interactivity in rocessing large datasets. Ad Hoc exploratory queries on an optimal set of multi dimensional stratisfied samples converges to low error bounds significantly faster than non-sample.
Q (Jacob Eriksson UIC) : you mentioned that you run small queries first, how can you estimate the error for large properties.
A: We used the closed form property that can estimate the corresponding error (which is proportional for the sample size).
Q (?): when processing data, what is the effect that BlinkDB gives you, how do you tolerate data changes?
A: Don't support data that is updated repeatedly. Offline sampling. So need to refresh our data very often. But samples are offline. Do support bulk loading.
Ionel Gog -
- Need to compute aggregate statistics over large data seconds
- it takes 1-2h to process 100TB on 1k machines if data stored on HDDs. 25-30 minutes if data stored in memory.
- propose to have small samples.
- users specify for how should the query run and then the system returns a results with and a error interval
- 90% queries map to only 20% templates (Facebook)
- 17437 queries map to 108 query templates. 90% map to only 10% of the templates (Conviva)
- Offline-sampling: set of samples across few dimensions
- samples stored in memory or stripped over HDDs
- What is the optimal set of samples?
- Create samples for each tuple of columns of a table.
- Query coverage: the guarantee that a sample contains the columns on which a query is trying to filter on.
- Cost of the stratification: not all samples have the same cost.
- Tries to make sure that the cost of all samples is smaller then a user defined value.
- What is the optimal sample type and size to meet accuracy/response time requirements?
- Decide based on Runtime vs Sample size values computed.
Q: You mentioned you're running this little queries. How can you approximate the error without first running on the entire data?
A: There is a statical formula that will estimate the error of the specific query we're going to run. (Looks like the speaker expected this question. He had a slided prepared to answer it).
Q: How do you react to dynamic data?
A: We do not support data that is updated frequently because we would have to update our samples frequently.
Session 2: Security and Privacy
David Schultz and Barbara Liskov (MIT CSAIL)
Valentin Dalibard - Decentralised information flow control for databases
Information leaks are a problem. So they propose to tackle it.
Information flow control:
-track information as it flows
-control what can be released
IFDB: tracks information in the database (information flow control had be done before but not in the context of a database)
3 key concepts: principals (the people), tags (the security acesses you have) and labels (sets of tags)
information can flow from A to B if the labels of A are a subset of the labels of B.
Declassification removes a tag from the process =>unsafe but necessary in some cases. in IFDB, declassification needs to be explicit
Writes: Tuples are written with exactly the label of the process
-Declassifying views: add WITH DECLASSIFYING to query
-Constraints: uniqueness and foreign keys
-Transactions: can abort, processes can label transaction
Based on PostgreSQL and a web server in PHP
They show that the system is useful by applying it to a location recording database (e.g. where is Alice now? Where did she drive last week?) It does seem neat
They managed to use it to find bugs in existing systems e.g. HotCRP, anyone can download anyone's personal details.
Database is not the bottlneck, the web server is. But they were more concerned about the database so evaluated it seperately.
Database performs really well: 1% drop in performance with 10 labels (which is more than is realistic) in the worse case
Valentin Dalibard - A malicious process can create symbolic files and gives wrong names to fool an application into giving access to information. Basically, a malicious process creates a symbolic link file to a secure file (e.g. passwords), then ask a process with higher sercurity (e.g. web server) to access that file. The web server doesn't notice it is a symbolic link and returns the secure file. The issue is that the access control system gave access to that file to the web server, despite the fact it was actually accessed by an application that didn't have the right level of security.
What is needed: to know which processes are actually accessing resource (to avoid resources accessible by malicious processes)
Their solution: adding an additional check after access control to check ether this particular system call has access to the resource or not. They call it the process firewall.
To do this, they do process introspection in which they figure out what the process is actually trying to access.
They identify different system calls by using the process context, like its stack, entry point, system call history....
They need to gather context efficiently because it can be a lot of overhead. So they designed some optimisation to
I)Gather the context (using caches)
II)Check the rules that were designed (by grouping them)
Where do they get there rules? 3 ways:
-Can be manually specified
-Automatically generate rules from known vulnerabilities
-Automatic generate rules from runtime traces (which will have false positives) The mostly use the entry point
Hayawardh Vijayakumar (The Pennsylvania State University), Joshua Schiffman (Advanced Micro Devices), and Trent Jaeger (The Pennsylvania State University
Srinath Setty, Benjamin Braun, Victor Vu, and Andrew J. Blumberg (UT Austin), Bryan Parno (Microsoft Research Redmond), and Michael Walfish (UT Austin)
Valentin Dalibard - The authors present Zaatar, a sytem that implements Probabilistically Checkable Proof (PCP). This allows one to check that the computation performed by a server is the one it was asked to do. This comes from recent cryptographic work. The system is not at all usable yet (up to 2 000 000X overhead) but is order of magnitudes faster than the previously proposed solutions. One of the main cryptographic contribution is the ability to batch together many jobs with the same computation but different inputs.
Session 3: Replication
Sergio Almeida, Joao Leitao, and LuÄ±s Rodrigues (INESC-ID, Instituto Superior Tecnico, Universidade Tecnica de Lisboa
Natacha Crooks -
Large scale applications are deloyed word wide. These applications have a huge number of users creating a large amout of data and result in a high system load. They are geo replicated data storage systems. These are conflicting goal: high performance vs consistency. There exists several consistency models and multiple replication mechanisms. Are trying to provide causal+ consistency which is causality and convergence. They focus on chain replication. It's a simple replication mechanism, provides linearisability for a single object. When a client issues a put request, the request will be forwarded to the head of the chain, and will forward it to the rest of the chain. Once it reaches the tail, it will return the put operation to the client. For Get requests, the Get should laways be executed at the tail.
In their work, provide a new system that is causal+ consistency (single site and in a geo replicated scnario). Its a specialised version of Chain replication which removes the tail bottleneck by distirbuting reads and can relax the consistency requirements (replication factor) if necessary.
The client is a local library that manages metadata. The data servers are organised as one hop DHTs. The client library communicates via an application proxy to the correct dc in the DHT.
There are three key principles> allow writes to return before reaching the tail. Support reads on all nodes of the chain. And trade write efficiency for metadata efficiency.
They relax the number of nodes that have to process a write operation, controlled by a parameter K (how far you go down the chain before finishing and send it back to the client). This reply is tagged with the id of the last node in the chain, and stored in the client metadata. They also allow read operations to be distirubted across all nodes in the chain. But can no longer read from any node in the chiain otherwise might violate causal history of the node. It uses the metadata to determine which indices of nods contain the most up to date data. If the write that was only replicated to K node reaches (in between operations) they indicate that the write has become stable and the client can forget all metadata associated with this object. (Own question: is this information broadcast to all clients ever? What are the overheads for high write worklaods and low values of K). They trade metadata efficiency for write efficiency via a stabilitisation procedure.
When extending to a georeplicated solution, they replicate the same architecture to all data centres, so you have as many one hop DHTs as datacentres. They allow read operations to be processed ina single datacenter. Write operaitons will return when applied to K nodes in a single datacenter and will be replicated asynchronously. Conflicting versions are merged by using a last-writer-win conflict resolution strategy.
The systems is built on top / modifying FAWN-KV.
1) evaluating the throughput in a single datacentre. For a 50/50 read/write worload, don't get much of an improvement because trying to optimise read, But as the percentage of read increases, the throughput impoves a lot. For the georeplicated scenario, throuhgput (for read/write 50/50) is not as good as other systems, but as soon as read percentage increases performance outperforms other systems.
Q (?) : different between your system and eventual consistency is new variant of the chain replication? What are the differences between you and Dynamo?
A: Dynamo does'nt use chain replication. You are using the topology of the replicas ot do operations in a more efficient way. Chain replication was initially designed to provide linearisibility. Chain replication leverages the topology of the replicas to be more efficient.
Ionel Gog -
- Applications have huge number of users => high system load.
- There are conflicting goals: high performance vs consistency.
- Write operation - send put request, the request goes through the chain until the tail.
- Read operation - always processed by the tail of the chain => bottleneck at the tail.
- ChainReaction - provides Causal+ consistency.
- removes the tail-bottleneck by distributing reads.
- relaxes consistency.
- Architecture: - Client library that manages metadata .
- Data servers organized in a one-hop DHT.
- Allows writes to return before reaching the tail of the chain.
- Supports reads on all nodes of the chain. Use information enclosed in the metadata s.t. App Proxy knows where to send the request.
- Writes metadata efficiently.
- To make everything Geo-replicated, we have to extend the metadata. Operations return when applied to K nodes in a DC and in the background the operations are copied to other DCs (async).
- Evaluation: Implemented Optimized version of FAWN-KV and then compare Chain reaction with FAWN-KV and Apache Cassandra
- Runs over 10^6 objects of 1KB size (This looks small to me, it's only 1GB).
Q: I find very difficult to find the advantages of your system compared with Dynamo? They have something very similar to what have you just described. What are the main differences?
A: Dynamo doesn't do chain replication at it's core. Chain replication uses the topology of the replicas to be more efficient.
Q(Roxana Geambasu): Traditional models have a well defined definition based on history and so on. Can you provide a similar model for Causal+?
A: Causal+ is very similar to causal consistency.
Ricardo Padilha and Fernando Pedone (University of Lugano, Switzerland)
Natacha Crooks - Fudamental conflict between availability and consistency. There is a tention between ACID v BASE or NoSQL vs SQL. BASE states that should have scalabliity at the cost of strong consistency and avaiability. ACID: strong consistency simplies system design. The problem is that the cloud offers unreliable commondiyt hardware This means that there's a very broad failure specturm. Augustus offers a key value store with short transactions, byzantine fault olerance, and strong consistency.
Clients can submit short transactions to parittioned storage. Operations include compare, read, read_range, write, insert, delete. Transactions execute a batch of operations at once (one round of communication) The criteria for execution is 1) acquire all the locks 2) the compares are successful. Partitions are state machine fault tolerant machines. They execute the transactions in the same order. THey support both local and global transactions (where affect multiple partitions). Strong seriliazability is based on nowait locks. Key locks: writes are exlcusive and reads are shared, and partition locks, Inserts, deletes are shared, read_ranges are shared. Lock acquisition is non blocking
They assume that both clients and replicas can be Byzantine, and that bizantine nodes cannot subserts the cryptographic primitives.
Clients assemble transactions and submit it to all the partitions that should be involved with this transaction. The protocol relies on the non byzantine behaviour of clients. Honest clients mediate recovery, and only when conflict arises. When an honest client conflicts with an unfinished transaction, then will lead to the unfinihsed the transaction will terminate. Local transactions will commit immediately. Global read only tranasctions commit without signatures, byzantine clients can forge read only commit cerificates, forgery may as a result cause non serializable read only Byzantine transactions. These transactions cannot make honest transactions non serializable.
They achieve lineary scalabiltiy with a number of partitions. Read only mutli partition transactions better performance than updates. They also have an evaluation based on a Retwis port, and Apache Derby with an Augustus based SQL storage engine
Claims that achives full scalability for all transction types. And currently working on the perofrmance of global update transactions and contention.
Q (Allen Clement): Claim achieves scalability for all types. How do you mix contention and global read transactions?
There is a problem of contention. Need to keep them apart. If you add more paritions to increase the throughput. Compares guarantee that we have a consistent snapshot. T
Q(?) Any transaction, especially the compare, can only execute inside of a partition?
Yes. A transaction only commits if all the parittions if all the partions vote to commit. The compares act as a way to guarantee a certain state.
Q(Cheng Li). Do you have any principles to partition data? Do you add redundant data?
When you generate the data, had a 50/50 chance of the person that you were person that you were following being on the same partition as you. They choose arbitrary placements, make no assumption about the structure of the social graph.
Tim Kraska, Gene Pang, and Michael Franklin (UC Berkeley), Samuel Madden (MIT), and Alan Fekete (University of Sydney)
Natacha Crooks -
Big data in the data center which is distributed over many machines. Brings you relability availability, etc. Even though data is distirbuted over different machines entire datacenters can also fail. So we have unreliable datacenters. The solution for that is to georeplicate. There is also a high network latency so expensive to communicate.
ay the result.
The objective of the project is to provide strong data consistency across datacenters, with low latency and no data loss. MDCC tries to do read committed without lost updates, multi record transactions and only one RTT.
MDCC optimises for two key observations on workloads. Either conflicts are rare, where everyone updates their own data. Or conflicts commute where order of concurrent udpates isn't important.
Two phase commit is a standard method for distirbuted transactions, which has a prepare phase and a commit phase. Coordinator makes a final decision as to whether commit or abort can commit only when all votes are yes. Which means that all nodes must respond for commit, so any node failure may limit progress. The coordinator still has the power to change its mind, and the decision is stored at the coorditor. So the coordinator is a single point of failure.
MDCC Trnasactions. MDCC uses a fine grain paxos instance per record. Coordinoator proposes options (potential updates), to the appropriate Paxos instnaces. They check with all permutations of updates. They also check write write conflicts and violation of integrity constraints. Nodes tag the options as accepted or rejected. The coordinator cannot change transaction outcome. Once learned, options are never unearned. The state is stored at the nodes, no longer on the coordinator. It would even be possible for the nodes to talk to each other and bypass the coordnator. Options enable read committed isolation. So MDCC makes distirbuted commit decision, sotres distirbuted commit decision, and can tolerate node failures delays, by constraint, node failures and delays can block the protocol.
MDCC optimises for when conflicts are rare. It uses fast paxos for transactions (bypass the master for reducing the round trip for masters). Clients can propose directly to nodes and bypass the leader. Fast Paxos is sometimes slow because concurrent updates can prevent consensus, so the leader must step in to resolve conflicts, and consensus may take 2 additional message rounds to resolve (take 3 rounds rather than 2). When conflcits are rare, transactions can bypass the leader, which means that transactions can commit in 1 round trip.
Second optimisation relies on commutative updates. Commutative updates do not need to be totally ordered. In this case, they use generalised paxos. Instnaces agree on command structures, which are sequences of commands, not just a single command (and derive appropriate partial orderings for them). So they can use Fast Paxos (fast commits) without worrying about the order. However, order matters when integrity constraints play a part. MDCC relies quorum demarcation for tihs by adjusting the cosntraints to keep replicas consistnt without coordination and quorums.
Write responde time CDF performs similarly to quorum write (K=4), signifcantly outperforms by Megastore* (because all transactions in a partition have to be serialized). Provide three different versions of MDCC: with fast paxos, with generalised, and with commutative updates. For lower level of conflicts, MDCC still performs well. But for high conflict rates, MDCC performs badly.
In conclusion, claim new commit protocol base on observation that conflicts are rare or commute, and provide 1 round trip durable commits.
Session 4 Concurrency and Parallelism
Conversion: Multi-Version Concurrency Control for Main Memory Segments
Timothy Merrifield and Jakob Eriksson (University of Illinois at Chicago)
Natacha Crooks - Used to be that with each generation of processor performance would scale. But now, you can't necesary expect faster cores, but you get more of them. The problem is that multi htreaded programming is hard. So you get race conditions, deadlock, atomicity violations. Some of the porposed solutions include having a more disciplined shared memory pgoramming models, with deterministic concurrency. Most approaches force threads to execute in isolation for a limited period of time, limiting interaction.
Conversion is an implementation of version controlled memory. Conversion provides kernel support for multi versioning of shared memory sgments and only needed a few small changes to the kernel.
Each segment is backed by a repository. Conersion uses processes as threads. each thread operators on its own local copy. Have a checkout, which creates a new Conversion segmnet, or map in an existing one (anynymous or filed backed). Have commit functions, commits your local copy's changes to the responsitory. And finally have updates.
Applications that can work with a slighlty out of date local copy would get peformance benefits from doing that. Concurrent data structures could be read even though constantly updating: ex snapshot isolation for long running reads. And finally, deterministic concurrency systems: want isolation between threads, and the isolation is created by having a local copy.
5 steps to build Conversion: 1) bulk copying. Have a complete copy of the repository, and then commit entire segment to memory. This works but too slow and uses too much memory.
2) Use virtual memory.
The local copy is just a page table, which makes that can share phsyicla memory between local copies. The repository is now basically a copy of the page table. This is better because sharing memory between threads. But still have the proble that copying an entire page table.
3) USe dirty lists.
When get a copy on write page faults keep this in a dirty list, so when perform a commit, can commit only modified page to the repository. The problem is that when T2 wants to perform an update, no way to know whats been boidifed so need to pull entire page table .
4)V Version list.
Repository becomes a list o fdirty list. When get a copy on write page fault, update dirtly list, commit the diry list into the repository. When T2 performs an update, update traverses version list to check what the modifications are since it last called updated. But the oproblem is this contains redundant entires.
5) Faster udates. Only keep one version of the page every time, so no more redundant entries.
The key requirement is that want to do these operations concurrently. Updates do not acquire any locks. Commits can mostly be done concurrently as long as update disjoint set of pages.
Conversion adds a nano second of overhead on copy on write page fault.
Case Study: Dthreads w/ Conversion
Dthreads is a deterministic threading library which avoids race conditions, non deterministic deadlock . Guarantees race free execution. Dthreads processes communicate using shared memory. In the Dthread memory mode, there's no analogous operation to update(). Dthread's memory model uses fences, which ensures that everyone is "done" with the parallel phase. The token determines the order in which threads can communicate with one another. There's a lot of time spent waiting because you don't want to commit anything to shared memory because becomes immediately visible. With conversion, allow to write to shared memory in parallel with thread execution. They rely on the token to maintain determinismn. If a thread owns the tocken, then just ocmmits changes, andre ly on the token to maintain determinism. Argue that this is a simpler model to program with. Key difference is that with Conversion, when commit to memory, others can't yet see changes, so no need for others to wait, only when token comes back and call lock. Has the additional version that Conversion is in the kernel. One point to notice is that conversion spends a lot of time waiting for the token, and more times on commit (because you have this update, and potentially have to merge, which can be slow).
Valentin Dalibard - Instead of getting faster cores, we get more of them. But multithreading is hard.
Is an implementation of version controlled memory (svn like) which they built as a linux kernel module. Each thread operates on its own local copy with functions like: checkout, commit or update.
When doing a commit on an out of date, an implicit update or merge is done.
Who needs conversion: Applications that can work with a slightly out-of-date local copy
Design: (5 incremental improvements)
Bulk Copying => not efficient enough
Paging (finer granularity) => still not efficient enough
Dirty list => still not efficient enough
Version to only get the changes since instead of copying everything on update, still not efficient enough
Final optimization to avoid identical versions (I think)
They then do a case study: Dthreads
D threads are deterministic threads e.g. race condition will happen the same way everytime
For the v parallel stuff, Conversion performs just as optimal.
For the not so parallel stuff with locks, Dthreads on top of conversion are about 25% faster than normal Dthreads
Whose Cache Line Is It Anyway? Operating System Support for Live Detection and Repair of False Sharing
Mihir Nanavati, Mark Spear, Nathan Taylor, Shriram Rajagopalan, Dutch T. Meyer, William Aiello, and Andrew Warfield (University of British Columbia)
Natacha Crooks - Page tables are great because provide transparent translation layer between physical and virtual address space. Pages are really large. The moment you want to do translation at subpage granularity, then there's no more hardware support. Objective is to build a byte granularity software only remapping and isolation mechanism. Key point is that do it in software only. Motivating example used is false sharing.
Architecture looks like a target system, with a control VM on top of Xen, but no specific reliance on virtualisation, could build straight into the hardware.
Objective is to dynamic detection and mitigation of false sharing. The problem is that difficult to avoid threads that write to the same cache lines. One of the reasons is that the C structure, with malloc (allocator) may have metadata, so end up straddling cache lines unintentionally. Currently two techniques to limit failr sharing: modify access locations and modify access frequency.
The idea is to split the page up into an isolated page (which contains the contended regions). And the remainder of the page is mapped to an underlay page. How do you know which regions are contended?
Unless false sharing has really high frequency, tend not to affect performance. This allows them to use a low frequency sampling based approach. Start off with something which is really fast but imprecise, then narrow the focus as go along. The first stage is performance counters. They say whether there is any contention in the system, no more detail than that. Once that need to infer regions/ pages of memory which are contended. So use log page reads for that. Then finally, using instruction emulation, what are the bytes ranges being accessed. And once this is dnone , analyse the log and determine whether there is any contention.
Use fault driven redirection. Use data faults as a trigger for everything that they do. Catch all access via data paths. Avoid code trampolines, and amortise page fault cost.
Can optimitisically apply remapping when possible. Copy the isolated pages and underlay pages back to the original page.
Evaluation demonstrates that overhead is low for systems which have little false sharing.
Claimed contributions: low overhead runtime detection, and byte level file sharing.
Believe that this system is good for performance optimisations, but still need to do security enhancements as future work.
Q: what happens to the case where the access is withing a loop function?
Can detect that is a function, there are techniques to find call points but haven't implemented that.
Q: what security enahcnements are you talking about?
Protect subpage regions. (rest not heard)
Q: how much weasier would it be with Vmware where you could do binary rewriting all the time?
Not sure would make a significant difference. (rest not heard).
Adaptive Parallelism for Web Search
Myeongjae Jeon (Rice University), Yuxiong He (Microsoft Research), Sameh Elnikety (Microsoft Research), Alan L. Cox and Scott Rixner (Rice University)
Valentin Dalibard - Performance of search queries is dependent on: latency and quality.
To improve latency:
Early termination: only do the query on the highest rank pages since the low rank ones are unlikely to be useful anyway. They also show how queries can be parallelised.
Change execution depending on system load: parallelise it on light loads, and execute sequentially on heavy loads. Bing doesn't do so well with parallelisation, they have a formula to check whether it sould be parallelised or not and by how much (number of parallel threads). That's the adaptive bit. Basically minimise single query execution time + latency impact on waiting queries. They have a nice graph showing how the optimal number of threads decreases with the load.
Overall not very novel, but nice engineering.