@ -40,6 +40,10 @@ import java.time.format.DateTimeFormatter;
import java.util.* ;
import java.util.concurrent.* ;
import java.util.concurrent.atomic.AtomicInteger ;
import java.util.concurrent.locks.Condition ;
import java.util.concurrent.locks.Lock ;
import java.util.concurrent.locks.ReentrantLock ;
import java.util.function.Supplier ;
import feign.RetryableException ;
import org.slf4j.Logger ;
@ -126,6 +130,15 @@ public class JobMainTask {
final PatrolDeviceStateMapper patrolDeviceStateMapper ;
private final Lock controlLock = new ReentrantLock ( ) ;
private final Condition pauseCondition = controlLock . newCondition ( ) ;
/ / 控制标志
private volatile boolean stop = false ;
private volatile boolean pause = false ;
/ / 用于状态查询线程
private final ScheduledExecutorService statusUpdater = Executors . newSingleThreadScheduledExecutor ( ) ;
@Autowired
public JobMainTask (
ShaoXinBigModel shaoXinBigModel ,
@ -147,6 +160,57 @@ public class JobMainTask {
threadPool = new ThreadPoolExecutor ( corePoolSize , maximumPoolSize , keepAliveTime , TimeUnit . SECONDS , new LinkedBlockingQueue < > ( 1000 ) ) ;
}
/ / 等待恢复
private void waitIfPaused ( ) {
controlLock . lock ( ) ;
try {
while ( pause & & ! stop ) {
pauseCondition . await ( ) ; / / 所有线程阻塞在此
}
} catch ( InterruptedException e ) {
Thread . currentThread ( ) . interrupt ( ) ; / / 不中断会引发潜在问题
} finally {
controlLock . unlock ( ) ;
}
}
/ / 外部触发恢复
public void resumeIfPaused ( ) {
controlLock . lock ( ) ;
try {
if ( pause ) {
pause = false ;
pauseCondition . signalAll ( ) ; / / 唤醒所有暂停线程
}
} finally {
controlLock . unlock ( ) ;
}
}
/ / 用于外部强制停止所有线程
public void stopAll ( ) {
stop = true ;
resumeIfPaused ( ) ; / / 确保唤醒阻塞线程让它们能感知 stop = true
}
/ / 模拟查询状态
private int queryPatrolTaskStatus ( final String taskPatrolId ) {
int result = 0 ;
String taskPatrolledId = taskPatrolId . split ( StringUtils . UNDERLINE ) [ 1 ] + "_" + taskPatrolId . split ( StringUtils . UNDERLINE ) [ 2 ] ;
PatrolTaskStatus patrolTaskStatus = taskExecClient . selectPatrolTaskStatusByTaskPatrolledId ( taskPatrolledId ) ;
if ( patrolTaskStatus = = null ) {
return result ;
}
try {
result = Integer . parseInt ( patrolTaskStatus . getPatrolStatus ( ) ) ;
} catch ( Exception e ) {
result = - 1 ;
}
return result ;
}
private void startThread (
final PatrolTaskInfo patrolTaskInfo ,
final PatrolPresetPos patrolPresetPos ,
@ -418,42 +482,43 @@ public class JobMainTask {
myDelay ( 2000 ) ;
}
boolean taskHalted = false ;
String taskPatrolledId = taskPatrolId . split ( StringUtils . UNDERLINE ) [ 1 ] + "_" + taskPatrolId . split ( StringUtils . UNDERLINE ) [ 2 ] ;
PatrolTaskStatus patrolTaskStatus = new PatrolTaskStatus ( ) ;
patrolTaskStatus . setTaskPatrolledId ( taskPatrolledId ) ;
List < PatrolTaskStatus > list = taskExecClient . selectPatrolTaskStatusList ( patrolTaskStatus ) ;
if ( ! list . isEmpty ( ) ) {
if ( "3" . equals ( list . get ( 0 ) . getTaskState ( ) ) ) { / / 暂停
while ( true ) {
log . info ( "-----------------------task pause: {}" , taskPatrolledId ) ;
try {
Thread . sleep ( 1000 ) ;
} catch ( InterruptedException e ) {
}
list = taskExecClient . selectPatrolTaskStatusList ( patrolTaskStatus ) ;
if ( ! list . isEmpty ( ) ) {
log . info ( "-----------------------TaskState: {}" , list . get ( 0 ) . getTaskState ( ) ) ;
if ( "4" . equals ( list . get ( 0 ) . getTaskState ( ) ) ) { / / 终止
log . info ( "-----------------------task terminate: {}" , taskPatrolledId ) ;
taskHalted = true ;
break ;
} else if ( "2" . equals ( list . get ( 0 ) . getTaskState ( ) ) ) { / / 正在执行
log . info ( "-----------------------task resume: {}" , taskPatrolledId ) ;
break ;
}
} else {
log . info ( "-----------------------selectPatrolTaskStatusList is empty" ) ;
}
}
} else if ( "4" . equals ( list . get ( 0 ) . getTaskState ( ) ) ) { / / 终止
log . info ( "-----------------------task terminate: {}" , taskPatrolledId ) ;
taskHalted = true ;
}
}
if ( ! taskHalted ) {
/ / boolean taskHalted = false ;
/ / String taskPatrolledId = taskPatrolId . split ( StringUtils . UNDERLINE ) [ 1 ] + "_" + taskPatrolId . split ( StringUtils . UNDERLINE ) [ 2 ] ;
/ / PatrolTaskStatus patrolTaskStatus = new PatrolTaskStatus ( ) ;
/ / patrolTaskStatus . setTaskPatrolledId ( taskPatrolledId ) ;
/ / List < PatrolTaskStatus > list = taskExecClient . selectPatrolTaskStatusList ( patrolTaskStatus ) ;
/ / if ( ! list . isEmpty ( ) ) {
/ / if ( "3" . equals ( list . get ( 0 ) . getTaskState ( ) ) ) { / / 暂停
/ / while ( true ) {
/ / log . info ( "-----------------------task pause: {}" , taskPatrolledId ) ;
/ / try {
/ / Thread . sleep ( 1000 ) ;
/ / } catch ( InterruptedException e ) {
/ / }
/ /
/ / list = taskExecClient . selectPatrolTaskStatusList ( patrolTaskStatus ) ;
/ / if ( ! list . isEmpty ( ) ) {
/ / log . info ( "-----------------------TaskState: {}" , list . get ( 0 ) . getTaskState ( ) ) ;
/ / if ( "4" . equals ( list . get ( 0 ) . getTaskState ( ) ) ) { / / 终止
/ / log . info ( "-----------------------task terminate: {}" , taskPatrolledId ) ;
/ / taskHalted = true ;
/ / break ;
/ / } else if ( "2" . equals ( list . get ( 0 ) . getTaskState ( ) ) ) { / / 正在执行
/ / log . info ( "-----------------------task resume: {}" , taskPatrolledId ) ;
/ / break ;
/ / }
/ / } else {
/ / log . info ( "-----------------------selectPatrolTaskStatusList is empty" ) ;
/ / }
/ / }
/ / } else if ( "4" . equals ( list . get ( 0 ) . getTaskState ( ) ) ) { / / 终止
/ / log . info ( "-----------------------task terminate: {}" , taskPatrolledId ) ;
/ / taskHalted = true ;
/ / }
/ / }
/ /
/ / if ( ! taskHalted )
{
StringBuffer fileTypes = new StringBuffer ( ) ;
StringBuffer filePaths = new StringBuffer ( ) ;
PatrolPresetAction patrolPresetAction = PatrolPresetAction . builder ( ) . isEnable ( "1" ) . presetPosId ( presetPos . getPresetPosId ( ) ) . build ( ) ;
@ -1008,8 +1073,18 @@ public class JobMainTask {
/ / patrolTaskInfo . setDevNo ( task . getDevNo ( ) ) ;
/ / taskExecClient . selectTaskInfoList ( patrolTaskInfo ) ;
taskExecClient . updatePatrolTaskStatus ( patrolTask ) ;
/ / threadPool . submit ( ( ) - > prePointExecImmediate ( patrolTaskExecRecord , taskInfos ) ) ;
prePointExecImmediate ( patrolTaskExecRecord , taskInfos ) ;
threadPool . submit ( ( ) - > prePointExecImmediate ( patrolTaskExecRecord , taskInfos ) ) ;
}
@PostMapping ( { "/resumeExecTask" } )
public void resumeExecTask ( final String taskId ) {
log . info ( "resumeExecTask taskId: {}" , taskId ) ;
StatusMonitor monitor = TaskStatusManager . get ( taskId ) ;
if ( monitor ! = null ) {
synchronized ( monitor ) {
monitor . notifyAll ( ) ;
}
}
}
private List < PatrolTaskInfo > getInfosByRecord ( final PatrolTaskExecRecord record ) {
@ -1223,29 +1298,48 @@ public class JobMainTask {
}
CompletableFuture . allOf ( futures . toArray ( new CompletableFuture [ 0 ] ) ) . join ( ) ;
log . info ( "CompletableFuture Break Join!" ) ;
TaskStatusManager . remove ( patrolTaskExecRecord . getOldTaskPatrolId ( ) ) ;
log . info ( "CompletableFuture Break Join: taskPatrolId: {}" , patrolTaskExecRecord . getOldTaskPatrolId ( ) ) ;
}
private void handlePrePointBatch ( final int threadCnt , final PatrolTaskExecRecord patrolTaskExecRecord , final List < PatrolTaskInfo > batch ) {
asyncTaskPatrolPointCnt . getAndAdd ( batch . size ( ) ) ;
log . info ( "handlePrePointBatch threadCnt: {}, asyncTaskPatrolPointCnt: {}, batch size: {}, devNo: {}, taskId: {}" ,
threadCnt , asyncTaskPatrolPointCnt . get ( ) , batch . size ( ) , patrolTaskExecRecord . getDevNo ( ) , patrolTaskExecRecord . getTaskId ( ) ) ;
final String taskPatrolId = patrolTaskExecRecord . getOldTaskPatrolId ( ) ;
StatusMonitor monitor = TaskStatusManager . get ( patrolTaskExecRecord . getOldTaskPatrolId ( ) ) ;
if ( monitor = = null ) {
monitor = TaskStatusManager . register ( taskPatrolId , ( ) - > {
PatrolTaskStatus patrolTaskStatus = taskExecClient . selectPatrolTaskStatusByTaskPatrolledId ( taskPatrolId ) ;
int taskState ;
try {
taskState = Integer . parseInt ( patrolTaskStatus . getTaskState ( ) ) ;
} catch ( Exception e ) {
taskState = - 1 ;
}
return taskState ;
} ) ;
}
for ( final PatrolTaskInfo taskInfo : batch ) {
log . info ( "handlePrePointBatch taskPatrolId: {}, patrolPointId: {}, lineId: {}" , patrolTaskExecRecord . getTaskPatrolId ( ) , taskInfo . getDeviceId ( ) , taskInfo . getLineId ( ) ) ;
prePointExec ( patrolTaskExecRecord , taskInfo , batch . size ( ) ) ;
String taskPatrolledId = patrolTaskExecRecord . getTaskPatrolId ( ) . split ( StringUtils . UNDERLINE ) [ 1 ] + "_" + patrolTaskExecRecord . getTaskPatrolId ( ) . split ( StringUtils . UNDERLINE ) [ 2 ] ;
PatrolTaskStatus patrolTaskStatus = new PatrolTaskStatus ( ) ;
patrolTaskStatus . setTaskPatrolledId ( taskPatrolledId ) ;
List < PatrolTaskStatus > list = taskExecClient . selectPatrolTaskStatusList ( patrolTaskStatus ) ;
if ( ! list . isEmpty ( ) ) {
log . info ( "-----------------------handlePrePointBatch 2 TaskState: {}" , list . get ( 0 ) . getTaskState ( ) ) ;
if ( "4" . equals ( list . get ( 0 ) . getTaskState ( ) ) ) { / / 终止
log . info ( "-----------------------handlePrePointBatch 2 task terminate: {}" , taskPatrolledId ) ;
return ;
int status = monitor . getStatus ( ) ;
if ( status = = Integer . parseInt ( TaskStatus . HALTED . getCode ( ) ) ) {
log . info ( "task terminate taskPatrolId: {}" , taskPatrolId ) ;
break ;
} else if ( status = = Integer . parseInt ( TaskStatus . PAUSED . getCode ( ) ) ) {
synchronized ( monitor ) {
try {
log . info ( "task pause taskPatrolId: {}" , taskPatrolId ) ;
monitor . wait ( ) ; / / 等待外部唤醒
} catch ( InterruptedException e ) {
Thread . currentThread ( ) . interrupt ( ) ;
}
}
} else {
log . info ( "-----------------------handlePrePointBatch 2 selectPatrolTaskStatusList is empty" ) ;
} else if ( status = = - 1 | | status = = 0 ) {
log . info ( "task status error state taskPatrolId: {}" , taskPatrolId ) ;
}
prePointExec ( patrolTaskExecRecord , taskInfo , batch . size ( ) ) ;
}
}