Skip to content

Integrating Celeborn

Overview

The core components of Celeborn, i.e. Master, Worker, and Client are all engine irrelevant. Developers can integrate Celeborn with various engines or applications by using or extending Celeborn's Client, as the officially supported plugins for Spark/Flink/MapReduce.

This article briefly describes an example of integrating Celeborn into a simple distributed application using Celeborn Client.

Background

Say we have a distributed application who has two phases:

  • Write phase that parallel tasks write data to some data service, each record is classified into some logical id, say partition id.
  • Read phase that parallel tasks read data from the data service, each task read data from a specified partition id.

Suppose the application has failover mechanism so that it's acceptable that when some data is lost the application will rerun tasks.

Say developers of this application is searching for a suitable data service, and accidentally finds this article.

Step One: Setup Celeborn Cluster

First, you need an available Celeborn Cluster. Refer to QuickStart to set up a simple cluster in a single node, or Deploy to set up a multi-node cluster, standalone or on K8s.

Step Two: Create LifecycleManager

As described in Client, Client is separated into LifecycleManager, which is singleton through an application; and ShuffleClient, which can have multiple instances.

Step two is to create a LifecycleManager instance, using the following API:

class LifecycleManager(val appUniqueId: String, val conf: CelebornConf)
  • appUniqueId is the application id. Celeborn cluster stores, serves, and cleans up data in the granularity of (application id, shuffle id)
  • conf is an object of CelebornConf. The only required configuration is the address of Celeborn Master. For the thorough description of configs, refer to Configuration

The example java code to create an LifecycleManager instance is as follows:

CelebornConf celebornConf = new CelebornConf();
celebornConf.set("celeborn.master.endpoints", "<Master IP>:<Master Port>");

LifecycleManager lm = new LifecycleManager("myApp", celebornConf);

LifecycleManager object automatically starts necessary service after creation, so there is no need to call other APIs to initialize it. You can get LifecycleManager's address after creating it, which is needed to create ShuffleClient.

String host = lm.getHost();
int = lm.getPort();

Step Three: Create ShuffleClient

With LifecycleManager's host and port, you can create ShuffleClient using the following API:

public static ShuffleClient get(
    String appUniqueId,
    String host,
    int port,
    CelebornConf conf,
    UserIdentifier userIdentifier)
  • appUniqueId is the application id, same as above.
  • host is the host of LifecycleManager
  • port is the port of LifecycleManager
  • conf is an object of CelebornConf, safe to pass an empty object
  • userIdentifier specifies user identity, safe to pass null

You can create a ShuffleClient object using the following code:

ShuffleClient shuffleClient =
    ShuffleClient.get(
        "myApp",
        <LifecycleManager Host>,
        <LifecycleManager Port>,
        new CelebornConf(),
        null);

This method returns a singleton ShuffleClientImpl object, and it's recommended to use this way as ShuffleClientImpl maintains status and reuses resource across all shuffles. To make it work, you have to ensure that the LifecycleManager's host and port are reachable.

In practice, one ShuffleClient instance is created in each Executor process of Spark, or in each TaskManager process of Flink.

Step Four: Push Data

You can then push data with ShuffleClient with pushData, like the following:

int bytesWritten =
    shuffleClient.pushData(
        shuffleId,
        mapId,
        attemptId,
        partitionId,
        data,
        0,
        length,
        numMappers,
        numPartitions);

Each call of pushData passes a byte array containing data from the same partition id. In addition to specifying the shuffleId, mapId, attemptId that the data belongs, ShuffleClient should also specify the number of mappers and the number of partitions for Lazy Register.

After the map task finishes, ShuffleClient should call mapperEnd to tell LifecycleManager that the map task finishes pushing its data:

public abstract void mapperEnd(
    int shuffleId,
    int mapId,
    int attempted,
    int numMappers)
  • shuffleId shuffle id of the current task
  • mapId map id of the current task
  • attemptId attempt id of the current task
  • numMappers number of map ids in this shuffle

Step Five: Read Data

After all tasks successfully called mapperEnd, you can start reading data from some partition id, using the readPartition API, as the following code:

InputStream inputStream = shuffleClient.readPartition(
  shuffleId,
  partitionId,
  attemptId,
  startMapIndex,
  endMapIndex);

int byte = inputstream.read();

For simplicity, to read the whole data from the partition, you can pass 0 and Integer.MAX_VALUE to startMapIndex and endMapIndex. This method will create an InputStream for the data, and guarantees no data lost and no duplicate reading, else exception will be thrown.

Step Six: Clean Up

After the shuffle finishes, you can call LifecycleManager.unregisterShuffle to clean up resources related to the shuffle:

lm.unregisterShuffle(0);

It's safe not to call unregisterShuffle, because Celeborn Master recognizes application finish by heartbeat timeout, and will self-clean resources in the cluster.