有 Java 编程相关的问题?

你可以在下面搜索框中键入要查询的问题!

如何提取一个函数,在Java 8中,我可以将函数作为这些链式lambda的参数传递?

我在一段代码中有一个代码模式,使用卡夫卡流不断重复,我做一个映射,然后按键分组,然后减少。看起来是这样的:

KTable<ProjectKey, EventConfigurationIdsWithDeletedState> eventConfigurationsByProjectTable = eventConfigurationStream
        .map((key, value) -> {
            Map<String, Boolean> eventConfigurationUpdates = new HashMap<>();
            eventConfigurationUpdates.put(key.getEventConfigurationId(), value != null);
            ProjectKey projectKey = ProjectKey.newBuilder().setId(key.getProjectId()).build();
            EventConfigurationIdsWithDeletedState eventConfigurationIdsWithDeletedState = EventConfigurationIdsWithDeletedState.newBuilder().setEventConfigurations(eventConfigurationUpdates).build();
            return KeyValue.pair(projectKey, eventConfigurationIdsWithDeletedState);
        })
        .groupByKey()
        .reduce((aggValue, newValue) -> {
            Map<String, Boolean> newEventConfigurations = newValue.getEventConfigurations();
            Map<String, Boolean> aggEventConfigurations = aggValue.getEventConfigurations();
            Map.Entry<String, Boolean> newEntry = newEventConfigurations.entrySet().iterator().next();
            if (newEntry.getValue())
                aggEventConfigurations.putAll(newEventConfigurations);
            else
                aggEventConfigurations.remove(newEntry.getKey());
            if (aggEventConfigurations.size() == 0)
                return null;
            return aggValue;
        });

(eventConfigurationStream的类型为KStream<EventConfigurationKey, EventConfiguration>

另一个遵循这种模式的例子。注意这里也有一个过滤器,但情况并非总是如此:

KTable<ProjectKey, NotificationSettingsTransition> globalNotificationSettingsPerProjectTable = notificationSettingTable.toStream()
        .filter((key, value) -> {
            return key.getEventConfigurationId() == null;
        })
        .map((key, value) -> {
            ProjectKey projectKey = ProjectKey.newBuilder().setId(key.getProjectId()).build();
            Map<String, NotificationSetting> notificationSettingsMap = new HashMap<>();
            notificationSettingsMap.put(getAsCompoundKeyString(key), value);
            NotificationSettingsTransition notificationSettingTransition = NotificationSettingsTransition
                    .newBuilder()
                    .setNotificationSettingCompoundKeyLastUpdate(getAsCompoundKey(key))
                    .setNotificationSettingLastUpdate(value)
                    .setEventConfigurationIds(new ArrayList<>())
                    .setNotificationSettingsMap(notificationSettingsMap)
                    .build();

            return KeyValue.pair(projectKey, notificationSettingTransition);
        })
        .groupByKey()
        .reduce((aggValue, newValue) -> {
            Map<String, NotificationSetting> notificationSettingMap = aggValue.getNotificationSettingsMap();
            String compoundKeyAsString = getAsString(newValue.getNotificationSettingCompoundKeyLastUpdate());
            if (newValue.getNotificationSettingLastUpdate() != null)
                notificationSettingMap.put(compoundKeyAsString, newValue.getNotificationSettingLastUpdate());
            else
                notificationSettingMap.remove(compoundKeyAsString);
            aggValue.setNotificationSettingCompoundKeyLastUpdate(newValue.getNotificationSettingCompoundKeyLastUpdate());
            aggValue.setNotificationSettingLastUpdate(newValue.getNotificationSettingLastUpdate());
            aggValue.setNotificationSettingsMap(notificationSettingMap);
            return aggValue;
        });

(notificationSettingsTable的类型为KTable<NotificationSettingKey, NotificationSetting> notificationSettingTable,但也会立即转换为KStream。)

我如何将其提取到一个函数中,在这个函数中,我传递一个map代码和reduce代码的函数,但不必重复.map().groupByKey().reduce()的模式?尽管返回类型不同,取决于map函数中的代码,并且应该保持类型化。理想情况下,在Java 8中,但更高版本是可能的。我想我有一个很好的主意,当map代码中的KeyValuePair的内部类型不会改变时,如何做到这一点,但不确定现在该怎么做


共 (1) 个答案

  1. # 1 楼答案

    可以对函数进行参数化,以接受两个泛型函数,在调用函数时将推断(或明确设置,如果不可能)

    对于map的输入,需要一个BiFunction<K, V, T>,对于reduce需要一个BiFunction<U, U, U>,其中:

    • Kmap函数中key的类型
    • Vmap函数中value的类型
    • Tmap函数的返回类型
    • U是聚合器的类型,reduce函数的值和返回类型

    查看^{}^{},可以获得更详细的类型信息来进一步约束函数

    这将使您的自定义函数如下所示:

    <K, V, T, U> U mapGroupReduce(final KStream<K, V> stream, final BiFunction<K, V, T> mapper, final BiFunction<U, U, U> reducer) {
        return stream.map(mapper).groupByKey().reduce(reducer);
    }
    

    你可以这样称呼它:

    mapGroupReduce(yourStream,
        (key, value) -> new KeyValue(k, v)),
        (acc, value) -> acc);
    

    在您的情况下,您需要使用以下命令,而不是使用BiFunctions:

    • KeyValueMapper<K, V, KeyValue<T, U>>用于映射程序
    • Reducer<U>用于减速器

    然而,这真的比每次只写stream.map(M).groupByKey().reduce(R)好得多吗?更详细的版本更明确,并且考虑到映射器和减缩器的相对大小,您并没有真正节省那么多