Big Data

OceanBase distributed database

OceanBase uses general-purpose server hardware, relies on local storage, and distributes multiple servers in a peer-to-peer manner without any special hardware requirements. OceanBase’s distributed database processing adopts a Shared Nothing architecture, and the SQL execution engine in the database has distributed execution capability.

On a server, OceanBase runs a single-process program called an observer as the running instance of the database, using local file storage for data and transaction redo logs.

OceanBase cluster deployment requires the configuration of availability zones (Zones), which are composed of several servers. An availability zone is a logical concept that represents a group of nodes within the cluster with similar hardware availability. It represents different meanings in different deployment modes. For example, when the entire cluster is deployed in the same data center (IDC), a node in an availability zone can belong to the same rack or switch. When the cluster is distributed across multiple data centers, each availability zone can correspond to a data center.

Data stored by users can have multiple replicas within the distributed cluster for fault tolerance and to distribute read pressure. Within an availability zone, there is only one replica of the data, while different availability zones can store multiple replicas of the same data, with data consistency ensured by a consensus protocol.

OceanBase has built-in multi-tenancy features, where each tenant is an independent database for users, and a tenant can set its own distributed deployment mode at the tenant level. CPU, memory, and IO are isolated between tenants.

Internally, OceanBase’s database instance consists of different components that collaborate with each other. These components include the storage layer, replication layer, balancing layer, transaction layer, SQL layer, and access layer.

Storage Layer

The storage layer provides data storage and access at the granularity of a table or partition, with each partition corresponding to a tablet (shard) used to store data. Non-partitioned tables defined by users also correspond to a tablet.

The internals of a tablet consist of a layered storage structure, with a total of four layers. DML operations such as insert, update, and delete are first written to the MemTable, which is dumped to disk as an L0 SSTable when the MemTable reaches a certain size. When the number of L0 SSTables reaches a threshold, multiple L0 SSTables are merged into an L1 SSTable. During the configured low peak period of daily business, all MemTables, L0 SSTables, and L1 SSTables are merged into a Major SSTable.

Internally, each SSTable is based on fixed-length macroblocks of 2MB, and each macroblock consists of multiple variable-length microblocks.

During the merge process, microblocks in the Major SSTable are converted using encoding methods. The data within microblocks is encoded column-wise, including dictionary/rle/constant/delta encoding, and after compressing each column, inter-column encoding rules such as equality and substring are applied. Encoding greatly compresses the data and the extracted column features accelerate subsequent query speeds.

After encoding compression, lossless compression can be applied using user-specified general compression algorithms to further improve data compression rates.

Copy Layer

The copy layer uses a log stream (LS, Log Stream) to synchronize states between multiple replicas. Each tablet corresponds to a specific log stream, and the redo logs generated from DML operations writing data to the tablet are persisted in the log stream. Multiple replicas of the log stream are distributed across different availability zones, and they maintain a consensus algorithm. One of the replicas is selected as the primary replica, while the others are secondary replicas. DML operations on the tablet and strongly consistent queries are performed only on the primary replica of the corresponding log stream.

In general, each tenant will have only one primary replica of a log stream on each machine, and there may be multiple secondary replicas of other log streams. The total number of log streams for a tenant depends on the configuration of the primary zone and locality.

The log stream implements the self-developed Paxos protocol to persist the redo logs on the local server and simultaneously send them to the secondary replicas of the log stream. After the secondary replicas complete their own persistence, they acknowledge the primary replica, which confirms the successful persistence of the corresponding redo logs after a majority of replicas have successfully persisted them. The secondary replicas replay the redo logs in real-time to ensure that their state matches that of the primary replica.

Once elected as the primary replica, the primary replica of the log stream obtains a lease. The normally functioning primary replica continuously extends the lease through an election protocol within its validity period. The primary replica only performs its primary duties while the lease is valid, and the lease mechanism ensures the ability to handle database anomalies.

The copy layer can automatically handle server failures and ensure the continuous availability of the database service. If fewer than half of the replicas on the secondary servers encounter issues, the database service remains unaffected because more than half of the replicas are still functioning properly. If the server hosting the primary replica encounters issues, its lease cannot be extended. After the lease expires, the other secondary replicas elect a new primary replica through the election protocol and grant it a new lease, thereby restoring the database service.

Balance Layer

When creating a new table or adding partitions, the system selects an appropriate log stream to create a tablet based on balancing principles. When a tenant’s attributes change, new machine resources are added, or after a long period of usage, tablets are no longer balanced across the machines, the balance layer performs log stream splitting and merging operations, along with the movement of log stream replicas, to rebalance data and services across multiple servers.

When a tenant undergoes scaling operations and acquires more server resources, the balance layer splits the existing log streams within the tenant, selects an appropriate number of tablets to split into new log streams, and then migrates the new log streams to the newly added servers to fully utilize the expanded resources. When a tenant undergoes downsizing operations, the balance layer migrates the log streams from the servers being reduced to other servers and merges them with the existing log streams on those servers to reduce resource usage.

After long-term usage of the database, with the continuous creation and deletion of tables and the addition of more data, even without changes in the number of server resources, the previously balanced situation may be disrupted. The most common scenario is when a batch of tables is deleted by a user, and those deleted tables may have been concentrated on certain machines. After deletion, the number of tablets on those machines decreases, so tablets from other machines should be balanced onto these machines. The balance layer periodically generates balance plans, splits temporary log streams from servers with excess tablets, carrying the tablets that need to be moved, migrates the temporary log streams to the destination servers, and then merges them with the log streams on the destination servers to achieve a balanced effect.

Transaction Layer

The transaction layer ensures the atomicity of both single-log and multi-log DML operations, as well as the multi-version isolation capability between concurrent transactions.


The modifications made by a transaction on a log stream, even if it involves multiple tablets, can ensure the atomicity of the transaction through the write-ahead log of the log stream. When a transaction involves multiple log streams, each log stream generates and persists its own write-ahead log. The transaction layer ensures atomicity by optimizing the two-phase commit protocol for the transaction’s submission.

The transaction layer selects a coordinator state machine for a log stream that a transaction modifies. The coordinator communicates with all the log streams involved in the transaction and determines whether the write-ahead log is persisted. When all the log streams have completed the persistence, the transaction enters the submission state. The coordinator then drives all the log streams to write the Commit log for this transaction, indicating the final submission state of the transaction. When replaying from replicas or restarting the database, the committed transactions are determined by the Commit log, which determines the state of each log stream transaction.

In the event of a crash and restart, if a transaction was not completed before the crash, there may be cases where the write-ahead log is written but the Commit log is not. Each log stream’s write-ahead log contains a list of all the log streams involved in the transaction. With this information, it is possible to determine which log stream is the coordinator and recover the coordinator’s state, and then proceed with the two-phase state machine until the transaction reaches its final Commit or Abort state.


The GTS service is a service that generates continuously increasing timestamps within a tenant. It ensures availability through multiple replicas, and the underlying mechanism is the same as the log stream replica synchronization mechanism described above.

When a transaction is committed, it obtains a timestamp from GTS as the transaction’s commit version number and persists it in the write-ahead log of the log stream. All data modified within the transaction is marked with this commit version number.

At the beginning of each statement (for Read Committed isolation level) or each transaction (for Repeatable Read and Serializable isolation levels), a timestamp is obtained from GTS as the read version number for the statement or transaction. When reading data, any data with a transaction version number greater than the read version number is skipped. This provides a unified global data snapshot for read operations.

SQL Layer

The SQL layer translates the user’s SQL requests into data access on one or multiple tablets.

Components of the SQL Layer

The SQL layer processes the execution flow of a request through the following components: Parser, Resolver, Transformer, Optimizer, Code Generator, and Executor.

Parser is responsible for lexical/syntactic parsing. It breaks down the user’s SQL into “tokens” and parses the entire request according to predefined grammar rules, transforming it into a syntax tree.

Resolver handles semantic parsing. It translates the tokens in the SQL request into corresponding objects (e.g., databases, tables, columns, indexes) based on the database metadata, generating a data structure called the Statement Tree.

Transformer performs logical rewriting. It rewrites the SQL into other equivalent forms based on internal rules or cost models, providing it to the optimizer for further optimization. The Transformer works by performing equivalent transformations on the original Statement Tree, resulting in another Statement Tree.

Optimizer generates the best execution plan for the SQL request. It considers the semantics of the SQL request, object data characteristics, object physical distribution, and other factors to solve problems such as access path selection, join order selection, join algorithm selection, distributed plan generation, ultimately generating an execution plan.

Code Generator converts the execution plan into executable code without making any optimization choices.

Executor initiates the execution process of the SQL.

In addition to the standard SQL flow, the SQL layer also has Plan Cache capability. It caches historical execution plans in memory, allowing subsequent executions to repeatedly use this plan, avoiding the process of repeated query optimization. When combined with the Fast-parser module, it uses only lexical analysis to directly parameterize the text string, obtaining the parameterized text and constant parameters, allowing the SQL to directly hit the Plan Cache, speeding up the execution of frequent 

SQL queries.

Multiple Plans

The execution plans in the SQL layer can be categorized into local, remote, and distributed. Local execution plans only access data on the local server. Remote execution plans access data on a server other than the local one. Distributed plans access data on multiple servers, and the execution plan is divided into multiple sub-plans executed on different servers.

The parallel execution capability of the SQL layer decomposes the execution plan into multiple parts, which are executed by multiple threads. Through proper scheduling, parallel processing of the execution plan is achieved. Parallel execution fully utilizes the CPU and IO processing capabilities of the server, reducing the response time of individual queries. Parallel query technology can be used for both distributed execution plans and local execution plans.

Access Layer

obproxy is the access layer for the OceanBase database, responsible for forwarding user requests to the appropriate OceanBase instances for processing.

obproxy is an independent process instance deployed separately from the OceanBase database instances. It listens on a network port, is compatible with the MySQL network protocol, and supports direct connections to OceanBase using the MySQL driver.

obproxy can automatically discover the data distribution information of the OceanBase cluster. For each proxied SQL statement, obproxy tries to identify the data it will access and forwards the statement directly to the corresponding OceanBase instance on the server where the data resides.

obproxy can be deployed in two ways: on each application server that needs to access the database, or on the same machine as OceanBase. In the first deployment method, the application directly connects to obproxy deployed on the same server, and all requests are sent by obproxy to the appropriate OceanBase server. In the second deployment method, a network load balancing service is used to aggregate multiple obproxy instances into a single entry address that serves the application.

To Top

Pin It on Pinterest

Share This