Integrating Celeborn
Overview
The core components of Celeborn, i.e. Master
, Worker
, and Client
are all engine irrelevant. Developers can
integrate Celeborn with various engines or applications by using or extending Celeborn's Client
, as the officially
supported plugins for Spark/Flink/MapReduce.
This article briefly describes an example of integrating Celeborn into a simple distributed application using
Celeborn Client
.
Background
Say we have a distributed application who has two phases:
- Write phase that parallel tasks write data to some data service, each record is classified into some logical id, say partition id.
- Read phase that parallel tasks read data from the data service, each task read data from a specified partition id.
Suppose the application has failover mechanism so that it's acceptable that when some data is lost the application will rerun tasks.
Say developers of this application is searching for a suitable data service, and accidentally finds this article.
Step One: Setup Celeborn Cluster
First, you need an available Celeborn Cluster. Refer to QuickStart to set up a simple cluster in a single node, or Deploy to set up a multi-node cluster, standalone or on K8s.
Step Two: Create LifecycleManager
As described in Client, Client
is separated into LifecycleManager
, which is singleton
through an application; and ShuffleClient
, which can have multiple instances.
Step two is to create a LifecycleManager
instance, using the following API:
class LifecycleManager(val appUniqueId: String, val conf: CelebornConf)
appUniqueId
is the application id. Celeborn cluster stores, serves, and cleans up data in the granularity of (application id, shuffle id)conf
is an object ofCelebornConf
. The only required configuration is the address of CelebornMaster
. For the thorough description of configs, refer to Configuration
The example java code to create an LifecycleManager
instance is as follows:
CelebornConf celebornConf = new CelebornConf();
celebornConf.set("celeborn.master.endpoints", "<Master IP>:<Master Port>");
LifecycleManager lm = new LifecycleManager("myApp", celebornConf);
LifecycleManager
object automatically starts necessary service after creation, so there is no need to call
other APIs to initialize it. You can get LifecycleManager
's address after creating it, which is needed to
create ShuffleClient
.
String host = lm.getHost();
int = lm.getPort();
Step Three: Create ShuffleClient
With LifecycleManager
's host and port, you can create ShuffleClient
using the following API:
public static ShuffleClient get(
String appUniqueId,
String host,
int port,
CelebornConf conf,
UserIdentifier userIdentifier)
appUniqueId
is the application id, same as above.host
is the host ofLifecycleManager
port
is the port ofLifecycleManager
conf
is an object ofCelebornConf
, safe to pass an empty objectuserIdentifier
specifies user identity, safe to pass null
You can create a ShuffleClient
object using the following code:
ShuffleClient shuffleClient =
ShuffleClient.get(
"myApp",
<LifecycleManager Host>,
<LifecycleManager Port>,
new CelebornConf(),
null);
This method returns a singleton ShuffleClientImpl
object, and it's recommended to use this way as ShuffleClientImpl
maintains status and reuses resource across all shuffles. To make it work, you have to ensure that the
LifecycleManager
's host and port are reachable.
In practice, one ShuffleClient
instance is created in each Executor process of Spark, or in each TaskManager
process of Flink.
Step Four: Push Data
You can then push data with ShuffleClient
with pushData, like
the following:
int bytesWritten =
shuffleClient.pushData(
shuffleId,
mapId,
attemptId,
partitionId,
data,
0,
length,
numMappers,
numPartitions);
Each call of pushData
passes a byte array containing data from the same partition id. In addition to specifying the
shuffleId, mapId, attemptId that the data belongs, ShuffleClient
should also specify the number of mappers and the
number of partitions for Lazy Register.
After the map task finishes, ShuffleClient
should call mapperEnd
to tell LifecycleManager
that the map task
finishes pushing its data:
public abstract void mapperEnd(
int shuffleId,
int mapId,
int attempted,
int numMappers)
shuffleId
shuffle id of the current taskmapId
map id of the current taskattemptId
attempt id of the current tasknumMappers
number of map ids in this shuffle
Step Five: Read Data
After all tasks successfully called mapperEnd
, you can start reading data from some partition id, using the
readPartition API, as the following code:
InputStream inputStream = shuffleClient.readPartition(
shuffleId,
partitionId,
attemptId,
startMapIndex,
endMapIndex);
int byte = inputstream.read();
For simplicity, to read the whole data from the partition, you can pass 0 and Integer.MAX_VALUE
to startMapIndex
and endMapIndex
. This method will create an InputStream for the data, and guarantees no data lost and no
duplicate reading, else exception will be thrown.
Step Six: Clean Up
After the shuffle finishes, you can call LifecycleManager.unregisterShuffle
to clean up resources related to the
shuffle:
lm.unregisterShuffle(0);
It's safe not to call unregisterShuffle
, because Celeborn Master
recognizes application finish by heartbeat
timeout, and will self-clean resources in the cluster.