有 Java 编程相关的问题?

你可以在下面搜索框中键入要查询的问题!

java每次100条记录到udf

我必须将记录传递给一个调用API的UDF,但因为我们想并行处理,所以我们使用spark,这就是开发UDF的原因,这里的问题是UDF一次只需要获取100条记录,不超过100条,它不能并行处理100多条记录,所以,如何确保一次只传递100条记录请注意,我们不想对整个记录使用count()函数

我在这里附加了UDF代码,它是一个返回结构数组的通用UDF。此外,如果我们每次在batchsize变量中传递100条记录,那么假设有198条记录,那么如果我们不想使用count(),我们将不知道它的最后一个batchsize将是98。那么如何处理这件事

伙计们。。。我有一个通用的UDF,其中对API进行了调用,但在调用之前,它首先创建了100个批,然后只调用restapi。。因此,UDF采用的参数是x1:string、x2:string、batchsize:integer(当前batchsize为100)。。因此,在UDF中,除非batchsize不是100,否则调用不会发生。。对于每个记录,它将返回null。 因此,直到第99个记录,它将返回。Null,但在第100条记录时,调用将发生 [所以,现在的问题是:由于我们使用的是batchsize 100,调用将只在第100条记录处进行。因此,在类似于假设文件中有198条记录的情况下,100条记录将获得输出,而其他98条记录将只返回null,因为它们不会得到处理。。 所以请大家帮忙,UDF一次只录一张唱片,但它会一直收集到第100张唱片。。我希望一切都会好起来

public class Standardize_Address extends GenericUDF { private static final Logger logger = LoggerFactory.getLogger(Standardize_Address.class); private int counter = 0; Client client = null; private Batch batch = new Batch(); public Standardize_Address() { client = new ClientBuilder().withUrl("https://ss-staging-public.beringmedia.com/street-address").build(); } // StringObjectInspector streeti; PrimitiveObjectInspector streeti; PrimitiveObjectInspector cityi; PrimitiveObjectInspector zipi; PrimitiveObjectInspector statei; PrimitiveObjectInspector batchsizei; private ArrayList ret; @Override public String getDisplayString(String[] argument) { return "My display string"; } @Override public ObjectInspector initialize(ObjectInspector[] args) throws UDFArgumentException { System.out.println("under initialize"); if (args[0] == null) { throw new UDFArgumentTypeException(0, "NO Street is mentioned"); } if (args[1] == null) { throw new UDFArgumentTypeException(0, "No Zip is mentioned"); } if (args[2] == null) { throw new UDFArgumentTypeException(0, "No city is mentioned"); } if (args[3] == null) { throw new UDFArgumentTypeException(0, "No State is mentioned"); } if (args[4] == null) { throw new UDFArgumentTypeException(0, "No batch size is mentioned"); } /// streeti =args[0]; streeti = (PrimitiveObjectInspector)args[0]; // this.streetvalue = (StringObjectInspector) streeti; cityi = (PrimitiveObjectInspector)args[1]; zipi = (PrimitiveObjectInspector)args[2]; statei = (PrimitiveObjectInspector)args[3]; batchsizei = (PrimitiveObjectInspector)args[4]; ret = new ArrayList(); ArrayList structFieldNames = new ArrayList(); ArrayList structFieldObjectInspectors = new ArrayList(); structFieldNames.add("Street"); structFieldNames.add("city"); structFieldNames.add("zip"); structFieldNames.add("state"); structFieldObjectInspectors.add(PrimitiveObjectInspectorFactory.writableStringObjectInspector); structFieldObjectInspectors.add(PrimitiveObjectInspectorFactory.writableStringObjectInspector); structFieldObjectInspectors.add(PrimitiveObjectInspectorFactory.writableStringObjectInspector); structFieldObjectInspectors.add(PrimitiveObjectInspectorFactory.writableStringObjectInspector); StructObjectInspector si2 = ObjectInspectorFactory.getStandardStructObjectInspector(structFieldNames, structFieldObjectInspectors); ListObjectInspector li2; li2 = ObjectInspectorFactory.getStandardListObjectInspector(si2); return li2; } @Override public Object evaluate(DeferredObject[] args) throws HiveException { ret.clear(); System.out.println("under evaluate"); // String street1 = streetvalue.getPrimitiveJavaObject(args[0].get()); Object oin = args[4].get(); System.out.println("under typecasting"); int batchsize = (Integer) batchsizei.getPrimitiveJavaObject(oin); System.out.println("batchsize"); Object oin1 = args[0].get(); String street1 = (String) streeti.getPrimitiveJavaObject(oin1); Object oin2 = args[1].get(); String zip1 = (String) zipi.getPrimitiveJavaObject(oin2); Object oin3 = args[2].get(); String city1 = (String) cityi.getPrimitiveJavaObject(oin3); Object oin4 = args[3].get(); String state1 = (String) statei.getPrimitiveJavaObject(oin4); logger.info("address passed, street=" + street1 + ",zip=" + zip1 + ",city=" + city1 + ",state=" + state1); counter++; try { System.out.println("under try"); Lookup lookup = new Lookup(); lookup.setStreet(street1); lookup.setCity(city1); lookup.setState(state1); lookup.setZipCode(zip1); lookup.setMaxCandidates(1); batch.add(lookup); } catch (BatchFullException ex) { logger.error(ex.getMessage(), ex); } catch (Exception e) { logger.error(e.getMessage(), e); } /* batch.add(lookup); */ if (counter == batchsize) { System.out.println("under if"); try { logger.info("batch input street " + batch.get(0).getStreet()); try { client.send(batch); } catch (Exception e) { logger.error(e.getMessage(), e); logger.warn("skipping current batch, continuing with the next batch"); batch.clear(); counter = 0; return null; } Vector<Lookup> lookups = batch.getAllLookups(); for (int i = 0; i < batch.size(); i++) { // ListObjectInspector candidates; ArrayList<Candidate> candidates = lookups.get(i).getResult(); if (candidates.isEmpty()) { logger.warn("Address " + i + " is invalid.\n"); continue; } logger.info("Address " + i + " is valid. (There is at least one candidate)"); for (Candidate candidate : candidates) { final Components components = candidate.getComponents(); final Metadata metadata = candidate.getMetadata(); logger.info("\nCandidate " + candidate.getCandidateIndex() + ":"); logger.info("Delivery line 1: " + candidate.getDeliveryLine1()); logger.info("Last line: " + candidate.getLastLine()); logger.info("ZIP Code: " + components.getZipCode() + "-" + components.getPlus4Code()); logger.info("County: " + metadata.getCountyName()); logger.info("Latitude: " + metadata.getLatitude()); logger.info("Longitude: " + metadata.getLongitude()); } Object[] e; e = new Object[4]; e[0] = new Text(candidates.get(i).getComponents().getStreetName()); e[1] = new Text(candidates.get(i).getComponents().getCityName()); e[2] = new Text(candidates.get(i).getComponents().getZipCode()); e[3] = new Text(candidates.get(i).getComponents().getState()); ret.add(e); } counter = 0; batch.clear(); } catch (Exception e) { logger.error(e.getMessage(), e); } return ret; } else { return null; } } }

共 (0) 个答案