RESTful API V2
SeaTunnel has a monitoring API that can be used to query status and statistics of running jobs, as well as recent completed jobs. The monitoring API is a RESTful API that accepts HTTP requests and responds with JSON data.
Overviewâ
The v2 version of the api uses jetty support. It is the same as the interface specification of v1 version
, you can specify the port and context-path by modifying the configuration items in seatunnel.yaml
,
you can configure enable-dynamic-port
to enable dynamic ports (the default port is accumulated starting from port
), and the default is closed,
If enable-dynamic-port is true, We will use the unused port in the range within the range of port
and port
+ port-range
, default range is 100
seatunnel:
engine:
http:
enable-http: true
port: 8080
enable-dynamic-port: false
port-range: 100
Context-path can also be configured as follows:
seatunnel:
engine:
http:
enable-http: true
port: 8080
context-path: /seatunnel
API referenceâ
Returns an overview over the Zeta engine cluster.â
GET
/overview?tag1=value1&tag2=value2
(Returns an overview over the Zeta engine cluster.)
Parametersâ
name type data type description tag_name optional string the tags filter, you can add tag filter to get those matched worker count, and slot on those workers
Responsesâ
{
"projectVersion":"2.3.5-SNAPSHOT",
"gitCommitAbbrev":"DeadD0d0",
"totalSlot":"0",
"unassignedSlot":"0",
"works":"1",
"runningJobs":"0",
"finishedJobs":"0",
"failedJobs":"0",
"cancelledJobs":"0"
}
Notes:
- If you use
dynamic-slot
, thetotalSlot
andunassignedSlot
always be0
. when you set it to fix slot number, it will return the correct total and unassigned slot number - If the url has tag filter, the
works
,totalSlot
andunassignedSlot
will return the result on the matched worker. but the job related metric will always return the cluster level information.
Returns An Overview And State Of All Jobsâ
GET
/running-jobs
(Returns an overview over all jobs and their current state.)
Parametersâ
Responsesâ
[
{
"jobId": "",
"jobName": "",
"jobStatus": "",
"envOptions": {
},
"createTime": "",
"jobDag": {
"jobId": "",
"envOptions": [],
"vertexInfoMap": [
{
"vertexId": 1,
"type": "",
"vertexName": "",
"tablePaths": [
""
]
}
],
"pipelineEdges": {}
},
"pluginJarsUrls": [
],
"isStartWithSavePoint": false,
"metrics": {
"sourceReceivedCount": "",
"sinkWriteCount": ""
}
}
]
Return Details Of A Jobâ
GET
/job-info/:jobId
(Return details of a job. )
Parametersâ
name type data type description jobId required long job id
Responsesâ
{
"jobId": "",
"jobName": "",
"jobStatus": "",
"createTime": "",
"jobDag": {
"jobId": "",
"envOptions": [],
"vertexInfoMap": [
{
"vertexId": 1,
"type": "",
"vertexName": "",
"tablePaths": [
""
]
}
],
"pipelineEdges": {}
},
"metrics": {
"sourceReceivedCount": "",
"sinkWriteCount": ""
},
"finishedTime": "",
"errorMsg": null,
"envOptions": {
},
"pluginJarsUrls": [
],
"isStartWithSavePoint": false
}
jobId
, jobName
, jobStatus
, createTime
, jobDag
, metrics
always be returned.
envOptions
, pluginJarsUrls
, isStartWithSavePoint
will return when job is running.
finishedTime
, errorMsg
will return when job is finished.
When we can't get the job info, the response will be:
{
"jobId" : ""
}
Return Details Of A Jobâ
This API has been deprecated, please use /job-info/:jobId instead
GET
/running-job/:jobId
(Return details of a job. )
Parametersâ
name type data type description jobId required long job id
Responsesâ
{
"jobId": "",
"jobName": "",
"jobStatus": "",
"createTime": "",
"jobDag": {
"jobId": "",
"envOptions": [],
"vertexInfoMap": [
{
"vertexId": 1,
"type": "",
"vertexName": "",
"tablePaths": [
""
]
}
],
"pipelineEdges": {}
},
"metrics": {
"SourceReceivedCount": "",
"SourceReceivedQPS": "",
"SourceReceivedBytes": "",
"SourceReceivedBytesPerSeconds": "",
"SinkWriteCount": "",
"SinkWriteQPS": "",
"SinkWriteBytes": "",
"SinkWriteBytesPerSeconds": "",
"TableSourceReceivedCount": {},
"TableSourceReceivedBytes": {},
"TableSourceReceivedBytesPerSeconds": {},
"TableSourceReceivedQPS": {},
"TableSinkWriteCount": {},
"TableSinkWriteQPS": {},
"TableSinkWriteBytes": {},
"TableSinkWriteBytesPerSeconds": {}
},
"finishedTime": "",
"errorMsg": null,
"envOptions": {
},
"pluginJarsUrls": [
],
"isStartWithSavePoint": false
}
jobId
, jobName
, jobStatus
, createTime
, jobDag
, metrics
always be returned.
envOptions
, pluginJarsUrls
, isStartWithSavePoint
will return when job is running.
finishedTime
, errorMsg
will return when job is finished.
When we can't get the job info, the response will be:
{
"jobId" : ""
}
Return All Finished Jobs Infoâ
GET
/finished-jobs/:state
(Return all finished Jobs Info.)
Parametersâ
name type data type description state optional string finished job status. FINISHED
,CANCELED
,FAILED
,UNKNOWABLE
Responsesâ
[
{
"jobId": "",
"jobName": "",
"jobStatus": "",
"errorMsg": null,
"createTime": "",
"finishTime": "",
"jobDag": {
"jobId": "",
"envOptions": [],
"vertexInfoMap": [
{
"vertexId": 1,
"type": "",
"vertexName": "",
"tablePaths": [
""
]
}
],
"pipelineEdges": {}
},
"metrics": ""
}
]
Returns System Monitoring Informationâ
GET
/system-monitoring-information
(Returns system monitoring information.)
Parametersâ
Responsesâ
[
{
"processors":"8",
"physical.memory.total":"16.0G",
"physical.memory.free":"16.3M",
"swap.space.total":"0",
"swap.space.free":"0",
"heap.memory.used":"135.7M",
"heap.memory.free":"440.8M",
"heap.memory.total":"576.5M",
"heap.memory.max":"3.6G",
"heap.memory.used/total":"23.54%",
"heap.memory.used/max":"3.73%",
"minor.gc.count":"6",
"minor.gc.time":"110ms",
"major.gc.count":"2",
"major.gc.time":"73ms",
"load.process":"24.78%",
"load.system":"60.00%",
"load.systemAverage":"2.07",
"thread.count":"117",
"thread.peakCount":"118",
"cluster.timeDiff":"0",
"event.q.size":"0",
"executor.q.async.size":"0",
"executor.q.client.size":"0",
"executor.q.client.query.size":"0",
"executor.q.client.blocking.size":"0",
"executor.q.query.size":"0",
"executor.q.scheduled.size":"0",
"executor.q.io.size":"0",
"executor.q.system.size":"0",
"executor.q.operations.size":"0",
"executor.q.priorityOperation.size":"0",
"operations.completed.count":"10",
"executor.q.mapLoad.size":"0",
"executor.q.mapLoadAllKeys.size":"0",
"executor.q.cluster.size":"0",
"executor.q.response.size":"0",
"operations.running.count":"0",
"operations.pending.invocations.percentage":"0.00%",
"operations.pending.invocations.count":"0",
"proxy.count":"8",
"clientEndpoint.count":"0",
"connection.active.count":"2",
"client.connection.count":"0",
"connection.count":"0"
}
]
Submit A Jobâ
POST
/submit-job
(Returns jobId and jobName if job submitted successfully.)
Parametersâ
name type data type description jobId optional string job id jobName optional string job name isStartWithSavePoint optional string if job is started with save point
Bodyâ
{
"env": {
"job.mode": "batch"
},
"source": [
{
"plugin_name": "FakeSource",
"result_table_name": "fake",
"row.num": 100,
"schema": {
"fields": {
"name": "string",
"age": "int",
"card": "int"
}
}
}
],
"transform": [
],
"sink": [
{
"plugin_name": "Console",
"source_table_name": ["fake"]
}
]
}
Responsesâ
{
"jobId": 733584788375666689,
"jobName": "rest_api_test"
}
Batch Submit Jobsâ
POST
/submit-jobs
(Returns jobId and jobName if the job is successfully submitted.)
Parameters (add in the params
field in the request body)â
Parameter Name Required Type Description jobId optional string job id jobName optional string job name isStartWithSavePoint optional string if the job is started with save point
Request Bodyâ
[
{
"params":{
"jobId":"123456",
"jobName":"SeaTunnel-01"
},
"env": {
"job.mode": "batch"
},
"source": [
{
"plugin_name": "FakeSource",
"result_table_name": "fake",
"row.num": 1000,
"schema": {
"fields": {
"name": "string",
"age": "int",
"card": "int"
}
}
}
],
"transform": [
],
"sink": [
{
"plugin_name": "Console",
"source_table_name": ["fake"]
}
]
},
{
"params":{
"jobId":"1234567",
"jobName":"SeaTunnel-02"
},
"env": {
"job.mode": "batch"
},
"source": [
{
"plugin_name": "FakeSource",
"result_table_name": "fake",
"row.num": 1000,
"schema": {
"fields": {
"name": "string",
"age": "int",
"card": "int"
}
}
}
],
"transform": [
],
"sink": [
{
"plugin_name": "Console",
"source_table_name": ["fake"]
}
]
}
]
Responseâ
[
{
"jobId": "123456",
"jobName": "SeaTunnel-01"
},{
"jobId": "1234567",
"jobName": "SeaTunnel-02"
}
]
Stop A Jobâ
POST
/stop-job
(Returns jobId if job stoped successfully.)
Batch Stop Jobsâ
POST
/stop-jobs
(Returns jobId if the job is successfully stopped.)
Encrypt Configâ
POST
/encrypt-config
(Returns the encrypted config if config is encrypted successfully.)
Bodyâ
{
"env": {
"parallelism": 1,
"shade.identifier":"base64"
},
"source": [
{
"plugin_name": "MySQL-CDC",
"schema" : {
"fields": {
"name": "string",
"age": "int"
}
},
"result_table_name": "fake",
"parallelism": 1,
"hostname": "127.0.0.1",
"username": "seatunnel",
"password": "seatunnel_password",
"table-name": "inventory_vwyw0n"
}
],
"transform": [
],
"sink": [
{
"plugin_name": "Clickhouse",
"host": "localhost:8123",
"database": "default",
"table": "fake_all",
"username": "seatunnel",
"password": "seatunnel_password"
}
]
}
Responsesâ
{
"env": {
"parallelism": 1,
"shade.identifier": "base64"
},
"source": [
{
"plugin_name": "MySQL-CDC",
"schema": {
"fields": {
"name": "string",
"age": "int"
}
},
"result_table_name": "fake",
"parallelism": 1,
"hostname": "127.0.0.1",
"username": "c2VhdHVubmVs",
"password": "c2VhdHVubmVsX3Bhc3N3b3Jk",
"table-name": "inventory_vwyw0n"
}
],
"transform": [],
"sink": [
{
"plugin_name": "Clickhouse",
"host": "localhost:8123",
"database": "default",
"table": "fake_all",
"username": "c2VhdHVubmVs",
"password": "c2VhdHVubmVsX3Bhc3N3b3Jk"
}
]
}
Update the tags of running nodeâ
POST
/update-tags
Because the update can only target a specific node, the current node's `ip:port` needs to be used for the update
(If the update is successful, return a success message)
update node tagsâ
Bodyâ
If the request parameter is a Map
object, it indicates that the tags of the current node need to be updated
{
"tag1": "dev_1",
"tag2": "dev_2"
}
Responsesâ
{
"status": "success",
"message": "update node tags done."
}
remove node tagsâ
Bodyâ
If the parameter is an empty Map
object, it means that the tags of the current node need to be cleared
{}
Responsesâ
{
"status": "success",
"message": "update node tags done."
}
Request parameter exceptionâ
- If the parameter body is empty
Responsesâ
{
"status": "fail",
"message": "Request body is empty."
}
- If the parameter is not a
Map
object
Responsesâ
{
"status": "fail",
"message": "Invalid JSON format in request body."
}
Get Logs from All Nodesâ
GET
/logs/:jobId
(Returns a list of logs.)
Request Parametersâ
Parameters (to be added in the params
field of the request body)â
Parameter Name Required Type Description jobId optional string job id
If jobId
is empty, the request will return logs from all nodes. Otherwise, it will return the list of logs for the specified jobId
from all nodes.
Responseâ
Returns a list of logs from the requested nodes along with their content.
Return List of All Log Filesâ
If you want to view the log list first, you can retrieve it via a GET
request: http://localhost:8080/logs?format=json
[
{
"node": "localhost:8080",
"logLink": "http://localhost:8080/logs/job-899485770241277953.log",
"logName": "job-899485770241277953.log"
},
{
"node": "localhost:8080",
"logLink": "http://localhost:8080/logs/job-899470314109468673.log",
"logName": "job-899470314109468673.log"
}
]
Supported formats are json
and html
, with html
as the default.
Examplesâ
Retrieve logs for jobId
733584788375666689
across all nodes: http://localhost:8080/logs/733584788375666689
Retrieve the list of logs from all nodes: http://localhost:8080/logs
Retrieve the list of logs in JSON format: http://localhost:8080/logs?format=json
Retrieve the content of a specific log file: http://localhost:8080/logs/job-898380162133917698.log