|
Parallel_Queue_Architecture
Design and Implementation of Parallel Queues using ParallelStore
IntroductionOverviewTungsten Replicator uses parallel stores to implement pipeline stages in which multiple tasks read and apply transactions concurrently. The most significant application of this technique is known as parallel apply, which speeds up replication by executing slave updates on different shards in parallel. Parallel stores split serialized transactions into multiple output queues, which is why we sometimes call them parallel queues. This wiki page summarizes the current parallel store implementations. It is not end-user documentation but rather describes the algorithms behind the code as it currently stands and illustrates the trade-offs between different parallel store approaches. The goal is to provide orientation when reading the code, designing tests, and developing new applications for parallel stores. CaveatsThe Tungsten solution to parallel replication has developed incrementally over a period of a couple of years. Our understanding of the problem and solutions has evolved considerably during this time. This means that while the overall model and general algorithms are fairly consistent, some of the interfaces and naming for individual classes are not. There will consequently need to be some clean-up over time to rationalize naming and restructure interfaces. Also, reader can expect some additional iteration as we gain more understand of solutions and modify implementations in response to performance testing and production usage of parallel replication. Additional ReferencesFor more information on the overall approach to parallel replication in Tungsten, see the following: What We Mean by a Tungsten Parallel StoreTungsten Replicator uses a pipeline-based execution model, where pipelines are event flows that extract from a source and apply to a sink at the other end. Pipelines consist of one or more stages, which implement extract-filter-apply loops to transfer events. Stores sit between stages and buffer events as they pass through the pipeline. Stores therefore act as queues within the pipeline and may be either persistent (like the THL) or non-persistent in-memory buffers. A Tungsten parallel store is a type of Store that demultiplexes a serialized store of events into multiple queues. Parallel stores implement an additional ParallelStore interface that provides control methods to enable safe shutdown. The following diagrams shows how parallel stores work in general. A single stage task delivers serialized events to the store. The store demultiplexes the events into parallel queues that feed multiple tasks in the next stage.
Parallel threads are referred to as channels in user documentation. Channels are implemented so far as queues within the parallel store that serve corresponding tasks in the stage that extracts from the store. Extractors on the parallel store therefore have an additional method documented in interface ParallelExtractor to set the task ID so that each task extracts from the correct queue. ParallelStore implementations must deal with a number of practical issues that make the implementations a little harder than one might think at first.
There are many ways to implement this model in practice. The following sections describe the current implementations. In-Memory Parallel StoreParallelQueueStoreArchitectureThis implemention works by splitting applied events into a separate queue for each channel. One task thread applies new events. A task thread in the succeeding stage extracts from each individual queue. The general flow is illustrated in the following diagram.
Implementation NotesThe implementation partitions events into channels using a partitioner, as described in the general discussion of parallel stores. All processing including putting events into queues and fetching them out again occurs in the task threads, which means that no additional threads are required within the store itself. Queues are implemented using Java blocking queues, e.g., implementations of java.util.concurrency.BlockingQueue. Each queue has a maximum size (maxSize property) and blocks when it is reached. Blocking queues have the advantage that queue methods synchronize on the queue, which ensures that data are properly visible across queue reader and writer threads in accordance with guarantees of visibility provided by Java locks. AdvantagesThe ParallelQueueStore has a number of advantages.
DisadvantagesOn the other hand there are problems with the in-memory design that make it difficult to use on some workloads. The main reason is that even on systems with "nice" shard partitioning patterns across channels, events tend to show up in groups in different channels, so that at any given instant it is common to have many events on some channels and few or none on the rest. This leads in turn to the following problems.
Systems that process large numbers of transactions per second tend to see these problems in spades. On a system that processes 1000/TPS 60 seconds of processing is 60K transactions, which in turn could be 200GB of memory in the Java heap. This kind of math quickly leads to very large memory allocations to prevent serialization. Disk-Based Parallel StoreTHLParallelQueueArchitectureThe THLParallelQueue implementation works by reading events from the THL using a thread for each channel and putting them into a queue that can be read by extract task threads. The apply task applies events to the store, which allows the implementation to scan for serialization points. The general flow is illustrated in the following diagram.
Note that this approach spawns an extra thread for each channel to read the THL and put events that belong to the channel in the corresponding read queue. Implementation NotesThe implementation uses a partitioner to assign channels. This happens in two places. First the apply task thread uses a partitioner to check the channel and look for serialization points, which it marks in a queue. Second, channel read threads call the partitioner to decide whether to accept events when scanning the THL. The channel read threads are implemented by class THLParallelReadTask. The task implementation includes a read loop that connects to the THL and does what appoints to a table scan using a predicate to discard events that do not belong to the channel. The THLParallelQueue class handles synchronization between threads through a call-back to a global sync counter. The counter increments each time the apply task from the preceding stage applies a new event. This prevents threads from advancing ahead of the apply task that scans the THL. It also enforces serialization points, in which one and only one read thread is allowed to advance while others wait. Since we read events from disk, the actual in-memory queues are quite short. We use Java blocking queues for this purpose for reasons already cited in the discussion of in-memory parallel queues. AdvantagesThe THLParallelQueue implementation has the following benefits.
DisadvantagesOn the other hand there are several potential drawbacks to this approach.
Log File per Channel AlterativeDesignAnother approach, currently unimplemented, is to split the THL into multiple logs. In this case the new logs would implement what amount to on-disk queues, from which extract tasks would read directly. Here is an illustration.
Implementation NotesThis approach looks very much like an in-memory queue; in fact if the interface were designed to look like a Java BlockingQueue the code would look almost identical to the in-memory queue implementation, which would simplify maintenance. This implementation requires the following extensions to the current THL. Other than that it looks fairly straight-forward to implement.
Potential AdvantagesSome of the potential advantages of log-per-channel include the following:
DisadvantagesHere are the potential draw-backs.
|