并发编程工具 - AtomicReferenceFieldUpdater、并发修改同一对象的不同属性会有问题吗?
并发修改同一对象的不同属性会有问题吗?当调用Tomcat等Web服务器时,会交给Tomcat的某一线程处理请求,为了提高接口的性能缩短返回时间,将能并行的任务放到Runnable/Callable中,知道Callable能返回Future进行有返回值的结果处理,但是那样的程序写出来会比较繁琐。所有很多时候直接用Runnable + 线程安全的容器存储(比如ConcurrentHashMap)每个线
目录
二、使用AtomicReferenceFieldUpdater
并发修改同一对象的不同属性会有问题吗?
当调用Tomcat等Web服务器时,会交给Tomcat的某一线程处理请求,为了提高接口的性能缩短返回时间,将能并行的任务放到Runnable/Callable中,知道Callable能返回Future进行有返回值的结果处理,但是那样的程序写出来会比较繁琐。所有很多时候直接用Runnable + 线程安全的容器存储(比如ConcurrentHashMap)每个线程池的值;当然很多时候我们返回的可能就是一个对象,比如:商品明细接口返回的对象中,即包含价格信息,也包含商品信息,促销信息等,在底层的时候肯定会从多个微服务的系统中获取数据(并且基本算是IO型任务,主要用于等待网络结果),那么可能就会出现这样的代码。
public void setBaseCustomerInfo(final OrderBO orderBo) {
final List<Callable<Boolean>> tasks = new ArrayList<>();
// 每个线程执行完任务后,会往对象的不同属性中设置值
tasks.add(() -> getCustomerInfo(orderBo));
tasks.add(() -> getBusinessAccountInfo(orderBo));
tasks.add(() -> getReceiveAddressInfo(orderBo));
tasks.add(() -> getReceiveCustomerInfo(orderBo));
tasks.add(() -> getAgencyInfo(orderBo));
tasks.add(() -> getSaleOrgInfo(orderBo));
tasks.add(() -> getSaleChannelInfo(orderBo));
tasks.add(() -> getProductGroupInfo(orderBo));
tasks.add(() -> getFactoryInfo(orderBo));
tasks.add(() -> getDistributionCenter(orderBo));
tasks.add(() -> getCreditRange(orderBo));
tasks.add(() -> getSaleManager(orderBo));
tasks.add(() -> getShippingCondition(orderBo));
List<Future<Boolean>> futureList = SimpleThreadPool.executeAll(BASE_CUSTOMER_INFO_POOL_NAME, 60000, tasks);
for (Future<Boolean> future : futureList) {
try {
future.get();
} catch (InterruptedException | ExecutionException e) {
Throwable throwable = (e.getCause() != null ? e.getCause() : e);
String errorMsg = throwable.getMessage();
log.error("取客户基本信息失败:" + errorMsg, throwable);
throw new BusinessException(errorMsg, e.getCause());
}
}
}
个人认为这样的设置值的时候存在并发问题(欢迎讨论)?现在相当于是多线程在处理一个没有同步锁的对象,由于多线程的高速缓存问题,那么可能出现下面的情况:
所以线程都获取了主内存中的空(所以属性都为空)对象,线程1为对象设置了属性name【age属性为空】;当线程2将空对象的age设置了值【name属性为空】直接刷到主内存中,则将线程1设置的name属性给刷没了,产生了脏数据。
自己未能重现自己的猜想,为一个对象设置了50个字段,并且使用多线程进行修改不同的属性,但是没有得到想要的效果,demo如下(和其他小伙伴讨论,觉得并发量不够【使用到的线程池工具可以参见:并发编程工具 - 线程池的使用和自己的封装】):
/**
* 测试多线程修改一个对象的结果
*
* @author kevin
* @date 2020/11/2 23:02
* @since 1.0.0
*/
@Slf4j
public class TestMutilModify {
public static void main(String[] args) throws InterruptedException {
User user = new User();
initThreadPool();
ArrayList<Callable<Object>> taskList = new ArrayList<>(15);
// CountDownLatch countDownLatch = new CountDownLatch(15);
taskList.add(() -> {
TestMutilModify.sleep(200);
user.setValue11("11");
log.info("Thread:" + Thread.currentThread().getName());
// countDownLatch.countDown();
return null;
});
// 省略中间的Callable设置
taskList.add(() -> {
TestMutilModify.sleep(200);
user.setValue15("15");
log.info("Thread:" + Thread.currentThread().getName());
// countDownLatch.countDown();
return null;
});
List<Future<Object>> futures = SimpleThreadPool.executeAll(ThreadPoolEnum.ORDER.name(), taskList);
// Thread.sleep(3000);
futures.forEach(future -> {
try {
future.get();
} catch (InterruptedException e) {
// e.printStackTrace();
} catch (ExecutionException e) {
// e.printStackTrace();
}
});
// countDownLatch.await();
log.info("user = {}", user.toString());
}
private static void initThreadPool() {
ThreadPoolImpl threadPool = new ThreadPoolImpl();
List<ThreadPoolEntity> threadPoolEntities = threadPool.appendThreadPool();
SimpleThreadPool.putThreadPool(threadPoolEntities.get(0));
}
public static void sleep(long timeout) {
try {
Thread.sleep(timeout);
} catch (InterruptedException e) {
// e.printStackTrace();
}
}
}
解决多个线程修改同一对象的不同属性的方法
一、使用volitile修饰需要修改的字段
由于该对象使用的地方比较多,其他地方赋值的时候并不是并发的,并且不是所有字段都在并发时赋值;那么自己想项目上是使用一个新的对象的将需要并发赋值的属性使用volatile进行修饰【自己的理论依据是Happen-before的顺序性、volatile规则、传递性,详细可以参见:并发理论基础 - JMM模型和Happens-Before规则】。即项目进来后先拷贝原有值到对象中【之前可能就有某些属性有值】,多线程处理线程安全的新对象,最后在拷贝回去,上面的demo修改后如下:
public void setBaseCustomerInfo(final OrderBO orderBo) {
final List<Callable<Boolean>> tasks = new ArrayList<>();
final OrderSafeBO orderSafeBO = new OrderSafeBO();
BeanUtil.copyProperties(orderBo, CopyOptions.create().setIgnoreNullValue(true));
tasks.add(() -> getCustomerInfo(orderSafeBO));
tasks.add(() -> getBusinessAccountInfo(orderSafeBO));
tasks.add(() -> getReceiveAddressInfo(orderSafeBO));
tasks.add(() -> getReceiveCustomerInfo(orderSafeBO));
tasks.add(() -> getAgencyInfo(orderSafeBO));
tasks.add(() -> getSaleOrgInfo(orderSafeBO));
tasks.add(() -> getSaleChannelInfo(orderSafeBO));
tasks.add(() -> getProductGroupInfo(orderSafeBO));
tasks.add(() -> getFactoryInfo(orderSafeBO));
tasks.add(() -> getDistributionCenter(orderSafeBO));
tasks.add(() -> getCreditRange(orderSafeBO));
tasks.add(() -> getSaleManager(orderSafeBO));
tasks.add(() -> getShippingCondition(orderSafeBO));
List<Future<Boolean>> futureList = SimpleThreadPool.executeAll(BASE_CUSTOMER_INFO_POOL_NAME, 60000, tasks);
for (Future<Boolean> future : futureList) {
try {
future.get();
} catch (InterruptedException | ExecutionException e) {
// 省略异常处理
}
}
BeanUtil.copyProperties(orderSafeBO, orderBo, CopyOptions.create().setIgnoreNullValue(true));
}
二、使用AtomicReferenceFieldUpdater
这样的话原代码不变,只是在更新属性的方法中,如下:
// 属性更新器可以设置为全局的
private static final AtomicReferenceFieldUpdater ORDERBO_NAME_UPDATER = AtomicReferenceFieldUpdater.newUpdater(OrderBO.class, String.class,"name");
private static final AtomicReferenceFieldUpdater ORDERBO_NAME2_UPDATER = AtomicReferenceFieldUpdater.newUpdater(OrderBO.class, String.class,"name2");
private Boolean getCustomerInfo(final OrderBO orderBo) {
// 如果:远程rpc获取到了数据
ORDERBO_NAME_UPDATER.compareAndSet(orderBo, "获取到的远程 name字段 值");
ORDERBO_NAME2_UPDATER.compareAndSet(orderBo, "获取到的远程 name2字段 值");
return Boolean.TRUE;
}
如果字段类型为Long或者Integer则可以使用AtomicLongFieldUpdater、AtomicIntgegerFieldUpdater,只是如下(构造中是否传入具体类型)的区别:
private static final AtomicReferenceFieldUpdater ORDERBO_INTEGER_UPDATER = AtomicReferenceFieldUpdater.newUpdater(OrderBO.class, Integer.class,"integerField");
private static final AtomicIntegerFieldUpdater ORDERBO_INTEGER2_UPDATER = AtomicIntegerFieldUpdater.newUpdater(OrderBO.class, "integerField");
我们看看底层的实现原理: 反射 + CAS; 构造器中使用反射初始化了对象的类和属性字段
private static final class AtomicReferenceFieldUpdaterImpl<T,V> extends AtomicReferenceFieldUpdater<T,V> {
private static final sun.misc.Unsafe U = sun.misc.Unsafe.getUnsafe();
private final long offset;
/**
* if field is protected, the subclass constructing updater, else
* the same as tclass
*/
private final Class<?> cclass;
/** class holding the field */
private final Class<T> tclass;
/** field value type */
private final Class<V> vclass;
AtomicReferenceFieldUpdaterImpl(final Class<T> tclass,
final Class<V> vclass,
final String fieldName,
final Class<?> caller) {
final Field field;
final Class<?> fieldClass;
final int modifiers;
try {
field = AccessController.doPrivileged(
new PrivilegedExceptionAction<Field>() {
public Field run() throws NoSuchFieldException {
return tclass.getDeclaredField(fieldName);
}
});
modifiers = field.getModifiers();
sun.reflect.misc.ReflectUtil.ensureMemberAccess(
caller, tclass, null, modifiers);
ClassLoader cl = tclass.getClassLoader();
ClassLoader ccl = caller.getClassLoader();
if ((ccl != null) && (ccl != cl) &&
((cl == null) || !isAncestor(cl, ccl))) {
sun.reflect.misc.ReflectUtil.checkPackageAccess(tclass);
}
fieldClass = field.getType();
} // 省略异常
this.cclass = (Modifier.isProtected(modifiers) &&
tclass.isAssignableFrom(caller) &&
!isSamePackage(tclass, caller))
? caller : tclass;
this.tclass = tclass;
this.vclass = vclass;
this.offset = U.objectFieldOffset(field);
}
}
我们调用其 设置方法如下:
public final boolean compareAndSet(T obj, V expect, V update) {
accessCheck(obj);
valueCheck(update);
return U.compareAndSwapObject(obj, offset, expect, update);
}
// 最后会调用Unsafe的cas方法, 对象本身是Object,如下
public final native boolean compareAndSwapObject(Object var1, long var2, Object var4, Object var5);
更多推荐
所有评论(0)