Tier-Based Business-Critical Applications - Scalability Interrupted | Towards a Linearly-Scalable Architecture | Space Based Architecture - Scalability as Open Road | The SBA Value Proposition
In many application domains today, especially in financial services, the number of clients, the depth of services provided, and the data volumes are all growing simultaneously; in parallel, middle-office analytics applications are moving towards near-real-time processing. As a result, application workload is growing exponentially. One GigaSpaces customer is expecting to grow from 100K trades to 80 million trades – in only two years!
In order to understand the scalability problem, we must first define scalability: scalability is the ability to grow an application to meet growing demand, without changing the code, and without sacrificing the data affinity and service levels demanded by your users.
We identify two situations in which scalability is interrupted:
For most contemporary applications, particularly transactional applications in a low-latency environment, these barriers are inevitable. But this is not the only possible case. Theoretically, an application can achieve linear scalability – the ability to grow as much as needed, at a fixed price per capacity unit – in which case it would never face scalability barriers.
Consider two typical business-critical applications – front-office applications and back/middle office analytics applications. It is clear that both types of applications do not have linear scalability, because scaling becomes progressively more difficult and expensive as the application grows.
Interestingly, these two very different business-critical applications have striking similarities: they are both stateful, and both use a messaging tier for coordination, a data tier for storage of state information, and a business tier for the actual processing – in other words, they are both founded on the tier-based architecture.
More interesting still, both types of applications encounter similar scalability problems: cluster nodes get overloaded by inefficient clustering; different clustering models for each tier cause unnecessary ping-pong inside the tiers; unknown scaling ratios between system components cause unexpected bottlenecks when the system scales; growing messaging volumes might overaload the processing components; the network becomes the botleneck; inability to virtualize the tiers causes coordination problems; and different H/A models for each tier makes it difficult to guarantee recovery from partial failure.
Each of these problems can cause a scalability crash barrier. To avoid hitting a crash barrier, application administrators are forced to apply temporary fixes-complex, resource-consuming coordination and clustering mechanisms – and scale up each tier just to accommodate the additional overhead. As the system scales, these fixes need to be applied again and again, making scalability progressively more expensive.
The application's administrators find themselves caught between a rock and a hard place: if they apply the scalability fixes, the application grows more and more complex, until it hits a marginal cost barrier; but if they don't apply these fixes, the application quickly hits a scalability crash barrier, and must be replaced.
This dilemma is inherent in tier-based applications: because the system is divided into separate tiers, it increases in complexity as it scales, requires more and more overhead just to manage this complexity, and makes it more and more costly to increase capacity.
Therefore, tier-based applications cannot be linearly scalable. This is proven by the well-known Amdahl's Law, which states that if all processors in a system spend some of their time on overhead – as is the case in all tier-based systems – the speed improvement yielded by additional processors quickly hits an upper boundary (for 10% overhead, maximal improvement is 10X).
If scalability is a road, and applications are cars driving on the road, non-linear scalability is a dead end. To avoid ending up as a wreck, tier-based business-critical applications must become linearly scalable – and the only way to do this is a change of architecture.
The key to a linearly-scalable architecture is to expand the application by adding self-sufficient units. This way, as the application grows, there is no complex coordination that consumes resources and leads to scalability barriers.
This is a clear departure from the tier-based architecture: instead of running each application tier in a separate cluster of computing resources, all the tiers are compressed into a single unit, which is duplicated when the application is scaled. The middleware problem simply evaporates when each component becomes self-sufficient.
But in a stateful environment, how can you manage a workflow when each machine is completely self-sufficient? How can machines share state information between them? The secret is to collocate all steps of the business process – putting them on the same machine, in the same JVM. This requires developing a processing unit – a mini-application which can perform the entire business process on its own and return a result.
The processing unit manages its own workflow, providing messaging and data storage facilities to its collocated service instances. This saves the need to contact external resources, and means that all process information can be stored in local memory, reducing latency to a minimum.
To ensure reliability, state data and partial results can be persisted to a database, or replicated to another, identical processing unit. Thus, the processing unit can be made as reliable as needed: transient data can be stored in memory only; very sensitive data can be persisted to a remote database. This improves on the tier-based model, which forces all processing components to pay the high price of persistency – even if this type of reliability is not really required.
In the processing unit model, user requests that enter the system are distributed between the processing units using content-based routing – if the request has value A it goes to Processing Unit 1; if it has value B it goes to Processing Unit 2. This type of routing is very simple and inexpensive, but most importantly, it does not increase in complexity as the application scales.
The concept of a processing unit makes linear scalability possible, but this is not enough. A critical requirement is that as the distributed application scales, it should behave and look as one server, whether viewed by designers, developers, administrators, or clients. Everyone involved should see the system as one coherent unit: scaling should be simplified to the point of transparency.
Space-Based Architecture (SBA) is a way to implement processing units in your application, transforming scalability from dead end to open road.
At the core of an SBA processing unit is the space – a middleware infrastructure, collocated with the business process services, which provides in-memory messaging and data storage facilities for the processing unit. Like a conveyor belt in a production line, the space helps coordinate the business process, and allows business services to deposit their partial results on a shared work area, which can be accessed by other services in turn.
The space's messaging facility allows direct in-memory access to specific messages – this saves each business process service the overhead of connecting to an external messaging server and screening out unneeded messages from the queue.
When the space acts as data store, the services can save results at in-memory speeds, because they are collocated with the space on the same physical machine.
The space has a built-in clustering model, which allows it to share data and messages with other spaces on the network; this allows processing units to guarantee reliability by replicating state data between them. The space can also persist its data to a database. All this occurs in the background, without affecting the latency of the processing units.
This unified clustering model allows clients of the application to execute requests against a generic cluster proxy – the cluster matches the request at the object level, and routes it to the appropriate space. This routing is done in a distributed manner, without requiring a central server.
A central part of SBA is a deployment framework that takes care of running the processing unit on physical machines. The framework consists of SLA-driven containers that run on all the computers in the deployment scope.
Via the user interface of the deployment framework, it is easy to deploy as many processing units as needed – with one click. A general deployment descriptor describes a generic scaling policy for all possible deployment sizes, so deployment doesn't become even a bit more complex when scaling up. What's more, the framework dynamically responds to application workload.
Because of the flexibility of the deployment framework, SBA permits a combination of scaling-up (within a multi-core machine) and scaling-out, without breaking the SOA model.
SBA guarantees linear scalability, while ensuring simplicity – the application's scale becomes transparent to all involved, as if it was a single server:
SBA removes the scalability dead-end facing today's tier-based applications, and guarantees improved scalability in four dimensions:
This example shows how SBA would be implemented for a simple distributed application: a trading system that accepts buy and sell orders from clients. This application is transactional (there are two sequential steps in the business process), and requires very low latency, so that buy and sell orders can be executed in real time.
In order to make this example as simple as possible, we were forced to sacrifice some realism in portraying the business process of a trading application. This section shows the real business process for a trading application, and the simplified business process implemented in this example.
Our sample SBA application is comprised of three components:
It should be emphasized that the matching and routing operations are defined as independent, generic services – they are not hard-wired into the processing unit, and can easily be replaced with or supplemented by other services. They can also be independently discovered and managed, just like in a regular SOA. However, because these services are collocated, together with the space which facilitates coordination, they can cooperate within the processing unit to accomplish a structured workflow.
Latency as Low as Possible, Reliability as High as Necessary | FIFO Ordering Enables Virtual Messaging
The simplified business process is executed as follows (to see the differences between the simplified business process used in this example and a typical business process of a real trading application, see the Business Process section):
The workflow is illustrated in the diagram below.
It is easy to see that the entire business process takes place inside the processing unit (in the diagram, Processing Unit 2 handles the order, because the shares are Microsoft). The processing unit is not dependent on any other component, so it can perform its entire process on a single machine, at in-memory speeds.
The clients are not aware of the clustering at all: they submit an order through the Spring Framework, and view the result as they always would, through the exchange or a legacy database. This means that even if this application were to scale from 2 processing units to 200, the client application and the entire process would remain the same.
Because the services are collocated, communicate using the space's in-memory messaging and data store, and do not reference any external resources, the processing unit responds with the lowest possible latency – that of in-memory access.
The processing unit's data can be made as reliable as necessary, by either replicating it to another processing unit or persisting it to a database. But because the application is not forced to save all its data to a database, there is no need to suffer a latency penalty, incurred by disk access or network calls, for information that is transient in nature, or that needs to be highly available rather than persisted to hard disk.
This example uses the space's ability to record the order in which objects (representing trades in this case) are written to the space. When the matching service takes trades from the virtual 'messaging queue', it is in fact simply reading them from the space – because of the FIFO ordering, the space can return trades in the exact order they were submitted.
The application has two data classes: order and settlement. This section shows and explains these classes, and discusses the data lifecycle.
This class represents a buy or sell order submitted by a customer.
As you can see, this is a Plain Old Java Object (POJO). Five data fields are declared using native types (with four additional static fields):
@SpaceClass is an annotation that says this class's data should be represented in the space.
The @SpaceProperty annotation specifies that the amount field should be represented in the space. The null-value argument provides a specific value that corresponds to null (required because the field is of a native type).
The annotation is followed by two methods, getAmount() and setAmount() – the first is used when this field is written to the space (the space 'gets' the field value from the POJO); the second is used when this field is read from the space (the space 'sets' the field value into the POJO).
The same structure is repeated for the other fields, type, price, productId and isNew, so that they are represented in the space as well.
This object represents a deal executed between a buyer and a seller.
Like Order, this is a POJO with four fields; the @SpaceClass annotation says the class should be represented in the space.
The Settlement class has a constructor (not shown), which accepts one sell order and one buy order object, asserts that they match, and sets the amount, date, and price of the settlement to those of the sell order.
Like in the Order object, the @SpaceProperty annotation and the get/setAmount() methods specify that the amount field should be represented in the space. There are similar annotations for the other data fields.
In an SBA application, it is useful to examine the part played by different elements of the data model in the application workflow, and the relation between these elements:
The following sections show how SBA allows the application to match these requirements precisely – each object is only stored for the duration required, and is only made accessible to the required parties, reducing the application's overhead to a minimum.
The application imports Spring Framework context support, and org.springmodules.javaspaces.gigaspaces.GigaSpacesTemplate, the Spring abstraction of a GigaSpaces space. These two classes allow the application to connect to and work with the space, via a Spring DAO.
The MarketClient class declares an ORDER_LEASE_TIME. This is a kind of timeout: how long (in ms) the order should remain in the space of the processing unit, before expiring. The space automatically cleans orders when their lease expires.
It also declares GigaSpacesTemplate space, a field that can accept the Spring abstraction of the space.
Further in the code, the client defines getSpace(), a simple method that references the Spring application context file (at path resources/client.xml), and obtains a GigaSpacesTemplate bean.
In this example there are two spaces (one for each of the two processing units; one for Google shares and one for Microsoft shares). When getSpace() is called, the Spring Framework hands the relevant space to the application. Finally, the Order object is written to the space.
The customer enters order details using the command line (code not shown), and the application generates an Order object, setting the order details provided and setting isNew to true:
The order is then executed:
The application obtains a GigaSpacesTemplate bean by calling the getSpace() method, and writes the order to the space, by passing a POJO and a lease time to the write() method exposed by the bean.
It is important to realize that the code above remains exactly the same if the application scales up, if the cluster topology changes, or even if GigaSpaces is replaced by a different vendor – the clustering model is completely transparent to the client, which always sees the application as one coherent unit.
As we saw above, the client application obtains a GigaSpacesTemplate bean and 'writes' the order to it. But what does this mean in practice? How is the order actually delivered to the correct processing unit – the one that handles the specific shares specified in the order (either Google or Microsoft)?
Behind the scenes, the spaces in the two processing units form a partitioned cluster – the orders flowing into the system, as well as the settlements generated by the system, are divided between the spaces in the cluster according to a partitioning scheme.
In this example, requests are partitioned according to the productId – orders for Google shares go to the space of Processing Unit 1; orders for Microsoft shares go to the space of Processing Unit 2.
The cluster of spaces (two spaces in this case) exposes a clustered proxy, which allows clients to see the cluster as one coherent unit. This proxy accepts client requests, and routes them to the relevant space, based on the clustering mode – in this case, according to the partitioning scheme.
The GigaSpacesTemplate bean we saw earlier is merely a Spring abstraction of the cluster space proxy. When the client calls the write() method on the template, passing the Order object, the object is passed to the write() method of the cluster proxy. The proxy recognizes the productId of the request, and automatically routes it to the appropriate space.
When the application is deployed, using a service deployment framework such as the GigaSpaces Service Grid, the administrator can specify how many processing units to deploy and how they should be clustered. The Service Grid offers several basic clustering patterns (partitioned, partitioned with backup, synchronous replication, asynchronous replication), and allows users to define custom patterns.
The Space Service | The Matching Service (Business Process Step A) | The Routing Service (Business Process Step B)
The processing unit comprises three services: a space, a matching service, and a routing service.
The space is a uniform service that is injected by the deployment framework when the processing unit is deployed. It is tasked with providing messaging and data storage facilities for the business services (in this example, the matching service and the routing service).
The class which actually provides these middleware capabilities is net.jini.space.JavaSpace. This class exposes the JavaSpaces APIs: read(), write(), take(), and notify(). The space service, shown below, is a super-class that enables the space to be presented to the application as a Spring template.
The matching service occupies a thread, and constantly repeats the following operation: It reads a FIFO-ordered Order object from the space, tries to find a matching order, and if it finds one, generates a settlement.
This is exactly as if the service subcribes to a messaging queue containing orders waiting to be processed. The nice thing is, there is no messaging server – only Order objects waiting in the space, on the matching service's local machine. This is a simple demonstration of how the space can facilitate implicit messaging between software components.
The matching service defines two lifecycle methods (not shown), invoked by the deployment framework, which perform user-defined maintenance operations when the processing unit is initialized and shutdown:
The routing service asks to be notified by the space whenever a settlement is written to it. When a settlement appears in the space, the service receives a notification and takes it from the space.
Again, this is exactly as if the service subcribes to a messaging queue, this time a queue containing settlements. And again, there is no messaging server – there are only Settlement objects waiting in the space.
The routing service then writes the template to the exchange or to a persistent store.
This service imports a number of Jini and Spring classes that enable it to register for notifications on a space:
The service also imports application-specific classes (not shown).
Like the matching service, the routing service uses the startup() and shutdown() methods for thread management; these methods are invoked by the deployment framework when the processing unit is started and stopped, respectively.
Supporting Increased Trading Volume | Supporting More Products (Same Business Process) | Supporting More Products (Different Business Process)
The trading application we presented supports two shares, with one machine processing the trades for each share. From this starting point, the application might be asked to scale in two dimensions: the volume of trades might increase in one or both shares, and more products might be added. The new products might have the same business process as the old, or a completely different business process (which requires developing a new processing unit alongside the existing one).
Suppose that the maximum capacity of each existing machine is 1000 trades per second, that the current trading volume in Microsoft shares is 800, and that this volume is expected to triple from 800 to 2400 trades per second.
Initially, the application does not need to do anything, because there is still capacity to utilize on the Microsoft machine. Only when that machine reaches full utilization, does the application use a combination of scaling up and scaling out to increase capacity.
It is possible to scale up the machine running Processing Unit 2 (which is dedicated to Microsoft shares), by adding two more CPUs – Processing Unit 2 can then handle the additional load.
Alternatively, two more machines can be added – as the load increases, the deployment framework will automatically launch Processing Units 3 and 4 on these machines, and these units will process the additional Microsoft trades. It is also possible to combine scaling up and out – for example, adding one CPU to the Microsoft machine and adding one new machine.
The decision how to scale depends solely on economics, not on technical considerations. The application remains perfectly optimized, whether it scales by adding more threads or adding more machines – and neither option breaks the SOA model.
Most importantly, however the application scales, scaling is linear: each additional CPU or machine contributes 1000 more trades per second.
This is true linear scalability because:
Using the second definition of linear scalability (See The Scalability Revolution, SBA Concept Paper, pg. 5):
Suppose that there is a need to support trading in IBM shares, alongside Google and Microsoft. There are three ways to scale up:
As shown above, capacity increases linearly as more machines or CPUs are added.
Suppose there is a need to support trading in derivatives. This has a different business process from trading of shares – the application needs to get an input of the derivatives base and perform computations on it.
In this case, a new type of processing unit needs to be developed. This new processing unit might have a computation service, a different matching algorithm, etc. (The development of a new processing unit does not affect the design of the existing processing unit, which can remain exactly the same.)
The new processing unit is defined in the deployment framework, and deployed in one of two ways:
As shown above, capacity increases linearly (for each product type) as more machines or CPUs are added.
This example shows a very simple implementation of SBA. You are invited to read about the following additional options and considerations, which would be the next things to consider in developing a similar application:
We need your help to improve this wiki site. If you have any suggestions or corrections, write to us at email@example.com. Please provide a link to the wiki page you are referring to.