Franz`s blog

分布式调度视频转码实践

业务场景: 用户上传视频,通过转码服务器转换成不同码率之后再生成m3u8文件,最后通过hls技术在平台播放。

技术分析:

  1. 视频以及转码后的视频文件存储位置。考虑到拓展性,选择minio作为存储服务。
  2. 视频转码 ffmpeg
  3. 分布式任务调度 xxl-job,通过xxl-job调度转码服务器拉取minio内容并且转码后推送到minio

大体流程如下

2023-06-30_11-29

xxl-job搭建

参考xxl-job官方文档的快速入门部分

因为要通过调度中心触发任务,官方给的RESTful API的示例只能直接通过执行器ip的方式触发。通过调试发现调度中心触发任务是通过 http://调度中心地址/jobInfo/trigger的方式触发的。
找到对应的源码处为com.xxl.job.admin.controller.JobInfoController#triggerJob
我们无法直接调用这个接口,因为这个接口存在鉴权。

1
2
3
4
5
6
7
8
9
10
11
@RequestMapping("/trigger")
@ResponseBody
public ReturnT<String> triggerJob(int id, String executorParam, String addressList) {
// force cover job param
if (executorParam == null) {
executorParam = "";
}

JobTriggerPoolHelper.trigger(id, TriggerTypeEnum.MANUAL, -1, null, executorParam, addressList);
return ReturnT.SUCCESS;
}

通过分析源码发现xxl-job的鉴权主要是通过com.xxl.job.admin.controller.annotation.PermissionLimit注解和com.xxl.job.admin.controller.interceptor.PermissionInterceptor拦截器
通过在对于的接口处加上@PermissionLimit(limit = false)即可避免接口的鉴权,但是这并不是安全的做法,推荐重新自定义这个接口的鉴权逻辑。

通过以上修改我们可以通过http://调度中心地址/jobInfo/trigger这个api触发任务。

执行器编写

执行器的目的是为了将mino中的视频转码为不同码率,并且转换为m3u8,所以执行器需要部署在转码服务器上。

通过xxl-job的BEAN模式(类形式)编写执行器

通过xxl-job的jobParam传递转码参数。

注:执行器代码结构并未优化请自行优化

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
package io.github.franzli347.JobHandler;

import cn.hutool.core.io.FileUtil;
import cn.hutool.json.JSONUtil;
import com.xxl.job.core.context.XxlJobHelper;
import com.xxl.job.core.handler.IJobHandler;
import io.github.franzli347.entity.Task;
import io.github.franzli347.utils.ResponseTaskUtil;
import io.minio.DownloadObjectArgs;
import io.minio.MinioClient;

import java.io.*;
import java.time.LocalDateTime;
import java.util.Arrays;
import java.util.List;
import java.util.Map;

public class FfmpgeExecutor extends IJobHandler {

private MinioClient minioClient;

private static final String tmpPath = "/tmp/minio/";

private static final String logFormat = "===================== %s =====================";


@Override
public void init() throws Exception {
super.init();
}

@Override
public void execute() throws IOException, InterruptedException {
XxlJobHelper.log(logFormat.formatted("start cover task"));

String param = XxlJobHelper.getJobParam();

JobParam jobParam = JSONUtil.toBean(param, JobParam.class);
XxlJobHelper.log(param);

// 上报任务开始
Task task = new Task();
task.setId(jobParam.getTaskId());
task.setStatus(1);
task.setStartTime(LocalDateTime.now());
ResponseTaskUtil.responseTask(task);

String fileName = jobParam.getFileName();
String filePath = tmpPath + fileName;

if (!FileUtil.exist(tmpPath)) {
FileUtil.mkdir(tmpPath);
}

if (FileUtil.exist(filePath)) {
FileUtil.del(filePath);
}

minioClient = MinioClient.builder().endpoint(jobParam.getEndpoint()).credentials(jobParam.getAccessKey(), jobParam.getSecretKey()).build();

XxlJobHelper.log(logFormat.formatted("try pull resource from oss"));

try {
DownloadObjectArgs downloadObjectArgs = DownloadObjectArgs.builder().bucket(jobParam.getBucketName()).object(jobParam.getFileName()).filename(filePath).build();
minioClient.downloadObject(downloadObjectArgs);
} catch (Exception e) {
XxlJobHelper.log(logFormat.formatted("pull resource from oss failed"));
task.setStatus(2);
ResponseTaskUtil.responseTask(task);
throw new RuntimeException(e);
}

// 执行ffmpeg命令
XxlJobHelper.log(logFormat.formatted("try execute ffmpeg"));

String fileNameWithoutExt = fileName.split("\\.")[0];


ProcessBuilder builder = new ProcessBuilder("/bin/sh", "-c", """
ffmpeg -threads 4 -re -fflags +genpts -i "%s" -s:0 1920x1080 -ac 2 -vcodec libx264 -profile:v main -b:v:0 2000k -maxrate:0 2000k -bufsize:0 4000k -r 30 -ar 44100 -g 48 -c:a aac -b:a:0 128k -s:2 1280x720 -ac 2 -vcodec libx264 -profile:v main -b:v:1 1000k -maxrate:2 1000k -bufsize:2 2000k -r 30 -ar 44100 -g 48 -c:a aac -b:a:1 128k -s:4 720x480 -ac 2 -vcodec libx264 -profile:v main -b:v:2 600k -maxrate:4 600k -bufsize:4 1000k -r 30 -ar 44100 -g 48 -c:a aac -b:a:2 128k -map 0:v -map 0:a -map 0:v -map 0:a -map 0:v -map 0:a -f hls -var_stream_map "v:0,a:0 v:1,a:1 v:2,a:2" -hls_segment_type mpegts -start_number 10 -hls_time 10 -hls_list_size 0 -hls_start_number_source 1 -master_pl_name "%s.m3u8" -hls_segment_filename "%s_%%v-%%09d.ts" "%s_%%v.m3u8"
""".formatted(filePath, fileNameWithoutExt , tmpPath + fileNameWithoutExt, tmpPath + fileNameWithoutExt));

// 将标准输出和错误输出合并
builder.redirectErrorStream(true);

// 启动进程
Process process = builder.start();

// 获取输入流
InputStream inputStream = process.getInputStream();
BufferedReader reader = new BufferedReader(new InputStreamReader(inputStream));

String line;

// 逐行读取输出
while ((line = reader.readLine()) != null) {
XxlJobHelper.log(line);
System.out.println(line);
}

// 等待进程执行完毕
int exitCode = process.waitFor();
XxlJobHelper.log(logFormat.formatted("脚本执行完毕,退出码:" + exitCode));


// 删除原始视频文件
FileUtil.del(filePath);

// 获取临时文件夹所有文件
File[] files = FileUtil.ls(tmpPath);

List<File> collect = Arrays.stream(files).filter(file -> file.getName().startsWith(fileNameWithoutExt)).toList();

// 上传文件到oss
for (File file : collect) {
XxlJobHelper.log(logFormat.formatted("try upload " + file.getName() + " to oss"));
try {
minioClient
.uploadObject(io.minio.UploadObjectArgs.builder()
.bucket(jobParam.getBucketName())
.object(file.getName())
.filename(file.getAbsolutePath()).build());
} catch (Exception e) {
XxlJobHelper.log(logFormat.formatted("upload file to oss failed"));
e.printStackTrace();
throw new RuntimeException(e);
}
}

for (File file : collect) {
FileUtil.del(file);
}

// 删除oss中原始视频文件
XxlJobHelper.log(logFormat.formatted("try delete " + fileName + " from oss"));
try {
minioClient.removeObject(io.minio.RemoveObjectArgs.builder().bucket(jobParam.getBucketName()).object(fileName).build());
} catch (Exception e) {
XxlJobHelper.log(logFormat.formatted("delete file from oss failed"));
throw new RuntimeException(e);
}

task.setTaskResult(Map.of(
"masterM3u8",jobParam.getEndpoint() + "/" + jobParam.getBucketName() + "/" + fileNameWithoutExt + ".m3u8"
));
task.setStatus(3);
ResponseTaskUtil.responseTask(task);
}

static class JobParam {

private String endpoint;

private String accessKey;

private String secretKey;

private String bucketName;

private String fileName;

private Integer taskId;


public JobParam() {
}

public Integer getTaskId() {
return taskId;
}

public void setTaskId(Integer taskId) {
this.taskId = taskId;
}

public JobParam(String endpoint, String accessKey, String secretKey, String bucketName, String fileName, Integer taskId) {
this.endpoint = endpoint;
this.accessKey = accessKey;
this.secretKey = secretKey;
this.bucketName = bucketName;
this.fileName = fileName;
this.taskId = taskId;
}

public String getFileName() {
return fileName;
}

public void setFileName(String fileName) {
this.fileName = fileName;
}

public String getEndpoint() {
return endpoint;
}

public void setEndpoint(String endpoint) {
this.endpoint = endpoint;
}

public String getAccessKey() {
return accessKey;
}

public void setAccessKey(String accessKey) {
this.accessKey = accessKey;
}

public String getSecretKey() {
return secretKey;
}

public void setSecretKey(String secretKey) {
this.secretKey = secretKey;
}

public String getBucketName() {
return bucketName;
}

public void setBucketName(String bucketName) {
this.bucketName = bucketName;
}
}


}

上面代码中使用到的ffmpeg指令并为使用gpu加速,并且为了转码速度添加了-preset ultrafast参数,请根据实际环境更改ffmpeg的指令,或者通过更加推荐的jave2的方式在java中使用ffmpeg

上面用的是直接通过minio的java sdk操作的文件,但是ffmpeg好像可以直接通过minio的s3协议直接推送和拉取minio中的资源,反正我没成功。可以自行尝试。

服务后端

服务后端主要接口是生成minio的签名上传url和触发任务

签名上传

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
@GetMapping("tempurl/{id}/{name}")
public ResponseEntity<ResponseResult> getTempUrl(@PathVariable String name, @PathVariable String id) {
if (StrUtil.isBlankIfStr(name)) {
name = UUID.fastUUID().toString();
}

OssSource ossSource = service.getById(id);
Object client = OssFactory.createOssClient(ossSource);

String url = null;

if (ossSource.getType().equals("minio")) {
MinioClient minioClient = (MinioClient) client;
try {
url = minioClient.getPresignedObjectUrl(GetPresignedObjectUrlArgs
.builder()
.bucket(ossSource.getBucketName())
.object(name)
.method(Method.PUT)
.expiry(60 * 60 * 24)
.build()
);
} catch (Exception e) {
e.printStackTrace();
}
}
Map<String, String> m = Map.of("url", url);
return ResponseEntity.ok(ResponseResult.success(m));
}

出于方便,代码都写在controller里了

任务触发

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
public class XxlJobutil {

@Value("xxl-job.admin.addresses")
private static String adminAddresses;

@Value("xxl-job.executor.jobId")
private static Integer JobId;

public static String triggerJob(String fileName){
adminAddresses = addSplit4url(adminAddresses);
String triggerUrl = adminAddresses + "jobinfo/trigger";
Map<String, Object> paramMap = new HashMap<>();
paramMap.put("id", JobId);

JobParam jobParam = new JobParam();
jobParam.setEndpoint(adminAddresses);
jobParam.setAccessKey(ak);
jobParam.setSecretKey(ak);
jobParam.setBucketName(bk);
jobParam.setFileName(fileName);

paramMap.put("executorParam", JSONUtil.toJsonStr(jobParam));
paramMap.put("addressList", "");
return HttpUtil.post(triggerUrl, paramMap, 10000);
}

private static String addSplit4url(String url) {
if (url.endsWith("/")) {
return url;
} else {
return url + "/";
}
}

}


最终效果
2023-07-01_16-08

2023-07-01_16-09